Source code for debusine.tasks._task

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of :class:`Task` that are
used by workers to fulfill WorkRequest sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting
from the :class:`Task` base class. The name of the class must be unique among
all child classes.

A child class must, at the very least, override the :py:meth:`Task.execute`
method.
"""
import abc
import copy
import logging
import shlex
import tempfile
from pathlib import Path
from typing import (
    Any,
    BinaryIO,
    IO,
    Optional,
    TYPE_CHECKING,
    TextIO,
    Type,
    Union,
    overload,
)

if TYPE_CHECKING:
    from _typeshed import OpenBinaryModeWriting, OpenTextModeWriting

import jsonschema

from debusine import utils
from debusine.artifacts import WorkRequestDebugLogs
from debusine.client.debusine import Debusine


[docs]class TaskConfigError(Exception): """Halt the task due to invalid configuration."""
[docs]class Task: """ Base class for tasks. A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf :py:meth:`configure` and :py:meth:`execute` that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the :py:meth:`analyze_worker` method that is run on a worker) and then, based on this metadata, see if a task is suitable (with :py:meth:`can_run_on` that is executed on the scheduler). """ #: Must be overridden by child classes to document the current version of #: the task's code. A task will only be scheduled on a worker if its task #: version is the same as the one running on the scheduler. TASK_VERSION: Optional[int] = None #: Can be overridden to enable jsonschema validation of the ``task_data`` #: parameter passed to :py:meth:`configure`. TASK_DATA_SCHEMA: dict[str, Any] = {} _TASK_DATA_NOTIFICATIONS_SCHEMA = { "type": "object", "properties": { "on_failure": { "type": "array", "items": { "type": "object", "properties": { "channel": {"type": "string"}, "data": {"type": "object"}, }, }, "required": ["channel"], } }, "required": ["on_failure"], } _sub_tasks: dict[str, Type["Task"]] = {} def __init_subclass__(cls, **kwargs): """ Register the subclass into Task._sub_tasks. Used by Task.class_from_name() to return the class given the name. """ super().__init_subclass__(**kwargs) if abc.ABC in cls.__bases__: # Ideally we would check cls.__abstractmethods__: classes # with abstract methods should not be listed as tasks. # But, at the point that __init_subclass__ is called the # methods decorated as @abc.abstractmethod are not yet # added in cls.__abstractmethods__. For now: if the class # is an direct descendent of abc.ABC is not added here # because is probably an ontology and not a task. return sub_task_name_lowercase = cls.__name__.lower() # The same sub-task could register twice # (but assert that is the *same* class, not a different # subtask with a name with a different capitalisation) if ( sub_task_name_lowercase in cls._sub_tasks and cls._sub_tasks[sub_task_name_lowercase] != cls ): raise AssertionError cls._sub_tasks[sub_task_name_lowercase] = cls
[docs] def __init__(self): """Initialize the task.""" #: Validated task data submitted through :py:meth:`configure` without # Task generic data (e.g. "notifications") self.data = None #: The name of the task. It is computed by :py:meth:`__init__` by #: converting the class name to lowercase. self.name = self.__class__.__name__.lower() #: A :class:`logging.Logger` instance that can be used in child classes #: when you override methods to implement the task. self.logger = logging.getLogger("debusine.tasks") # Task is aborted: the task does not need to be executed, and can be # stopped if it is already running self._aborted = False self.work_request: Optional[int] = None # Workspace is used when uploading artifacts. # If it's None: the artifacts are created in the default workspace. # When the worker instantiates the Task it should set self.workspace # (see issue #186). self.workspace: Optional[str] = None # fetch_input() add the downloaded artifacts. Used by # `Task._upload_work_request_debug_logs()` and maybe by # required method `upload_artifacts()`. self._source_artifacts_ids: list[int] = [] self.debusine: Optional[Debusine] = None self._debug_log_files_directory: Optional[ tempfile.TemporaryDirectory ] = None
[docs] def configure_server_access(self, debusine: Debusine): """Set the object to access the server.""" self.debusine = debusine
[docs] def append_to_log_file(self, filename: str, lines: list[str]) -> None: """ Open log file and write contents into it. :param filename: use self.open_debug_log_file(filename) :param lines: write contents to the logfile """ with self.open_debug_log_file(filename) as file: file.writelines([line + "\n" for line in lines])
@overload def open_debug_log_file( self, filename: str, *, mode: "OpenTextModeWriting" = "a" ) -> TextIO: ... @overload def open_debug_log_file( self, filename: str, *, mode: "OpenBinaryModeWriting" ) -> BinaryIO: ...
[docs] def open_debug_log_file( self, filename: str, *, mode: Union["OpenTextModeWriting", "OpenBinaryModeWriting"] = "a", ) -> IO[Any]: """ Open a temporary file and return it. The files are always for the same temporary directory, calling it twice with the same file name will open the same file. The caller must call .close() when finished writing. """ if self._debug_log_files_directory is None: self._debug_log_files_directory = tempfile.TemporaryDirectory( prefix="debusine-task-debug-log-files-" ) debug_file = Path(self._debug_log_files_directory.name) / filename return debug_file.open(mode)
[docs] def prefix_with_task_name(self, text: str) -> str: """:return: the ``text`` prefixed with the task name and a colon.""" return f"{self.name}:{text}"
[docs] def analyze_worker(self) -> dict: """ Return dynamic metadata about the current worker. This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary. That information is then reused on the scheduler to be fed to :py:meth:`can_run_on` and determine if a task is suitable to be executed on the worker. Derived objects can extend the behaviour by overriding the method, calling ``metadata = super().analyze_worker()``, and then adding supplementary data in the dictionary. To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with ``self.prefix_with_task_name(...)``. :return: a dictionary describing the worker. :rtype: dict. """ version_key_name = self.prefix_with_task_name("version") return { version_key_name: self.TASK_VERSION, }
[docs] @classmethod def analyze_worker_all_tasks(cls): """ Return dictionary with metadata for each task in Task._sub_tasks. Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks. This method is executed in the worker when submitting the dynamic metadata. """ metadata = {} for task_class in cls._sub_tasks.values(): task = task_class() metadata.update(task.analyze_worker()) return metadata
[docs] def can_run_on(self, worker_metadata: dict) -> bool: """ Check if the specified worker can run the task. This method shall take its decision solely based on the supplied ``worker_metadata`` and on the configured task data (``self.data``). The default implementation returns always True except if there's a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side. Derived objects can implement further checks by overriding the method in the following way:: if not super().can_run_on(worker_metadata): return False if ...: return False return True :param dict worker_metadata: The metadata collected from the worker by running :py:meth:`analyze_worker` on all the tasks on the worker under consideration. :return: the boolean result of the check. :rtype: bool. """ version_key_name = self.prefix_with_task_name("version") if worker_metadata.get(version_key_name) != self.TASK_VERSION: return False return True
[docs] def configure(self, task_data): """ Configure the task with the supplied ``task_data``. The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied `task_data` is stored in the `data` attribute. Derived objects can extend the behaviour by overriding the method and calling ``super().configure(task_data)`` however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests. :param dict task_data: The supplied data describing the task. :raises TaskConfigError: if the JSON schema is not respected. """ task_data = copy.deepcopy(task_data) data_notifications = task_data.pop("notifications", None) if data_notifications is not None: try: jsonschema.validate( data_notifications, self._TASK_DATA_NOTIFICATIONS_SCHEMA, ) except jsonschema.ValidationError as exc: raise TaskConfigError(exc.message) try: jsonschema.validate(task_data, self.TASK_DATA_SCHEMA) except jsonschema.ValidationError as exc: raise TaskConfigError(exc.message) self.data = task_data
[docs] def execute_logging_exceptions(self) -> bool: """Execute self.execute() logging any raised exceptions.""" try: return self.execute() except Exception as exc: self.logger.exception("Exception in Task %s", self.name) raise exc
[docs] def execute(self) -> bool: """ Call the _execute() method, upload debug artifacts. See _execute() for more information. :return: result of the _execute() method. """ # noqa: D402 result = self._execute() self._upload_work_request_debug_logs() return result
def _execute(self) -> bool: """ Execute the requested task. The task must first have been configured. It is allowed to take as much time as required. This method will only be run on a worker. It is thus allowed to access resources local to the worker. It is recommended to fail early by raising a :py:exc:TaskConfigError if the parameters of the task let you anticipate that it has no chance of completing successfully. :return: True to indicate success, False for a failure. :rtype: bool. :raises TaskConfigError: if the parameters of the work request are incompatible with the worker. """ raise NotImplementedError()
[docs] def abort(self): """Task does not need to be executed. Once aborted cannot be changed.""" self._aborted = True
@property def aborted(self) -> bool: """ Return if the task is aborted. Tasks cannot transition from aborted -> not-aborted. """ return self._aborted
[docs] @staticmethod def class_from_name(sub_task_class_name: str) -> Type["Task"]: """ Return class for :param sub_task_class_name (case-insensitive). __init_subclass__() registers Task subclasses' into Task._sub_tasks. """ sub_task_class_name_lowercase = sub_task_class_name.lower() if sub_task_class_name_lowercase in Task._sub_tasks: return Task._sub_tasks[sub_task_class_name_lowercase] raise ValueError( f"'{sub_task_class_name_lowercase}' is not a registered task_name" )
[docs] @staticmethod def is_valid_task_name(task_name) -> bool: """Return True if task_name is registered (its class is imported).""" return task_name.lower() in Task._sub_tasks
[docs] @staticmethod def task_names() -> list[str]: """Return list of sub-task names.""" return sorted(Task._sub_tasks)
def _upload_work_request_debug_logs(self): """ Create a WorkRequestDebugLogs artifact and upload the logs. The logs might exist in self._debug_log_files_directory and were added via self.open_debug_log_file() or self.create_debug_log_file(). For each self._source_artifacts_ids: create a relation from WorkRequestDebugLogs to source_artifact_id. """ if self._debug_log_files_directory is None: return work_request_debug_logs_artifact = WorkRequestDebugLogs.create( files=Path(self._debug_log_files_directory.name).glob("*") ) remote_artifact = self.debusine.upload_artifact( work_request_debug_logs_artifact, workspace=self.workspace, work_request=self.work_request, ) for source_artifact_id in self._source_artifacts_ids: self.debusine.relation_create( remote_artifact.id, source_artifact_id, "relates-to", ) self._debug_log_files_directory.cleanup() self._debug_log_files_directory = None
[docs] def find_file_by_suffix( self, directory: Path, suffix: str ) -> Optional[Path]: """ Find file in directory with the specified suffix. If there is no file ending with suffix or there is more than one file: return None and write a log in the directory. :param directory: directory to find the file. Not recursive. :param suffix: suffix to find. :return: file path or None """ dsc_files = utils.find_files_suffixes(directory, [suffix]) if len(dsc_files) == 1: return dsc_files[0] else: with self.open_debug_log_file( "configure_for_execution.log" ) as config_for_build_file: list_of_files = sorted(map(str, directory.iterdir())) config_for_build_file.write( f"There must be one *{suffix} file. " f"Current files: {list_of_files}" ) return None
@staticmethod def _quote_cmd(cmd: list[Any]) -> str: """Return user readable cmd, passing each item to shlex.quote().""" return " ".join(shlex.quote(str(c)) for c in cmd)