Tasks (internal API)#
Collection of tasks.
The debusine.tasks module hierarchy hosts a collection of Task
that are
used by workers to fulfill WorkRequest sent by the debusine scheduler.
Creating a new task requires adding a new file containing a class inheriting
from the Task
base class. The name of the class must be unique among
all child classes.
A child class must, at the very least, override the Task.execute()
method.
- class debusine.tasks.Task[source]#
Bases:
object
Base class for tasks.
A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf
configure()
andexecute()
that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with theanalyze_worker()
method that is run on a worker) and then, based on this metadata, see if a task is suitable (withcan_run_on()
that is executed on the scheduler).- TASK_DATA_SCHEMA: dict[str, Any] = {}#
Can be overridden to enable jsonschema validation of the
task_data
parameter passed toconfigure()
.
- TASK_VERSION: Optional[int] = None#
Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.
- property aborted: bool#
Return if the task is aborted.
Tasks cannot transition from aborted -> not-aborted.
- analyze_worker() dict [source]#
Return dynamic metadata about the current worker.
This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.
That information is then reused on the scheduler to be fed to
can_run_on()
and determine if a task is suitable to be executed on the worker.Derived objects can extend the behaviour by overriding the method, calling
metadata = super().analyze_worker()
, and then adding supplementary data in the dictionary.To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with
self.prefix_with_task_name(...)
.- Returns:
a dictionary describing the worker.
- Return type:
dict.
- classmethod analyze_worker_all_tasks()[source]#
Return dictionary with metadata for each task in Task._sub_tasks.
Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks.
This method is executed in the worker when submitting the dynamic metadata.
- append_to_log_file(filename: str, lines: list[str]) None [source]#
Open log file and write contents into it.
- Parameters:
filename – use self.open_debug_log_file(filename)
lines – write contents to the logfile
- can_run_on(worker_metadata: dict) bool [source]#
Check if the specified worker can run the task.
This method shall take its decision solely based on the supplied
worker_metadata
and on the configured task data (self.data
).The default implementation returns always True except if there’s a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side.
Derived objects can implement further checks by overriding the method in the following way:
if not super().can_run_on(worker_metadata): return False if ...: return False return True
- Parameters:
worker_metadata (dict) – The metadata collected from the worker by running
analyze_worker()
on all the tasks on the worker under consideration.- Returns:
the boolean result of the check.
- Return type:
bool.
- static class_from_name(sub_task_class_name: str) Type[Task] [source]#
Return class for :param sub_task_class_name (case-insensitive).
__init_subclass__() registers Task subclasses’ into Task._sub_tasks.
- configure(task_data)[source]#
Configure the task with the supplied
task_data
.The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied task_data is stored in the data attribute.
Derived objects can extend the behaviour by overriding the method and calling
super().configure(task_data)
however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests.- Parameters:
task_data (dict) – The supplied data describing the task.
- Raises:
TaskConfigError – if the JSON schema is not respected.
- execute() bool [source]#
Call the _execute() method, upload debug artifacts.
See _execute() for more information.
- Returns:
result of the _execute() method.
- find_file_by_suffix(directory: Path, suffix: str) Optional[Path] [source]#
Find file in directory with the specified suffix.
If there is no file ending with suffix or there is more than one file: return None and write a log in the directory.
- Parameters:
directory – directory to find the file. Not recursive.
suffix – suffix to find.
- Returns:
file path or None
- static is_valid_task_name(task_name) bool [source]#
Return True if task_name is registered (its class is imported).
- logger#
A
logging.Logger
instance that can be used in child classes when you override methods to implement the task.
- name#
The name of the task. It is computed by
__init__()
by converting the class name to lowercase.
- open_debug_log_file(filename: str, *, mode: OpenTextModeWriting = 'a') TextIO [source]#
- open_debug_log_file(filename: str, *, mode: OpenBinaryModeWriting) BinaryIO
Open a temporary file and return it.
The files are always for the same temporary directory, calling it twice with the same file name will open the same file.
The caller must call .close() when finished writing.
- exception debusine.tasks.TaskConfigError[source]#
Bases:
Exception
Halt the task due to invalid configuration.
Task to build Debian packages with sbuild.
This task implements the PackageBuild generic task for its task_data: https://freexian-team.pages.debian.net/debusine/reference/tasks.html#task-packagebuild
- class debusine.tasks.sbuild.Sbuild[source]#
Bases:
SbuildValidatorMixin
,TaskRunCommandMixin
,FetchExecUploadMixin
,Task
Task implementing a Debian package build with sbuild.
- TASK_DATA_SCHEMA: dict[str, Any] = {'additionalProperties': False, 'properties': {'build_components': {'items': {'enum': ['any', 'all', 'source']}, 'type': 'array', 'uniqueItems': True}, 'distribution': {'type': 'string'}, 'host_architecture': {'type': 'string'}, 'input': {'additionalProperties': False, 'properties': {'source_artifact_id': {'type': 'integer'}}, 'required': ['source_artifact_id'], 'type': 'object'}, 'sbuild_options': {'items': {'type': 'string'}, 'type': 'array'}}, 'required': ['input', 'distribution', 'host_architecture'], 'type': 'object'}#
Can be overridden to enable jsonschema validation of the
task_data
parameter passed toconfigure()
.
- TASK_VERSION: Optional[int] = 1#
Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.
- can_run_on(worker_metadata: dict) bool [source]#
Check the specified worker can run the requested task.
- configure_for_execution(download_directory: Path) bool [source]#
Configure Task: set variables needed for the build() step.
Return True if configuration worked, False, if there was a problem.
Note: self.find_file_by_suffix() write to a log file to be uploaded as artifact.