From d6eb2be702bcc8f6a9d27a4c0e8ec5cdac79c982 Mon Sep 17 00:00:00 2001 From: Ed Younis Date: Fri, 12 Apr 2024 13:05:05 -0400 Subject: [PATCH] Add log context capabilities to runtime --- bqskit/runtime/__init__.py | 116 ++++++++++++++++++++++++++++++++++++- bqskit/runtime/task.py | 9 +-- bqskit/runtime/worker.py | 56 ++++++++++++++++++ 3 files changed, 175 insertions(+), 6 deletions(-) diff --git a/bqskit/runtime/__init__.py b/bqskit/runtime/__init__.py index 2c01c4ddd..371427ffd 100644 --- a/bqskit/runtime/__init__.py +++ b/bqskit/runtime/__init__.py @@ -98,6 +98,7 @@ from typing import Any from typing import Callable from typing import Protocol +from typing import Sequence from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -143,18 +144,129 @@ def submit( self, fn: Callable[..., Any], *args: Any, + task_name: str | None = None, + log_context: dict[str, str] = {}, **kwargs: Any, ) -> RuntimeFuture: - """Submit a `fn` to the runtime.""" + """ + Submit a function to the runtime for execution. + + This method schedules the function `fn` to be executed by the + runtime with the provided arguments `args` and keyword arguments + `kwargs`. The execution may happen asynchronously. + + Args: + fn (Callable[..., Any]): The function to be executed. + + *args (Any): Variable length argument list to be passed to + the function `fn`. + + task_name (str | None): An optional name for the task, which + can be used for logging or tracking purposes. Defaults to + None, which will use the function name as the task name. + + log_context (dict[str, str]): A dictionary containing logging + context information. All log messages produced by the fn + and any children tasks will contain this context if the + appropriate level (logging.DEBUG) is set on the logger. + Defaults to an empty dictionary for no added context. + + **kwargs (Any): Arbitrary keyword arguments to be passed to + the function `fn`. + + Returns: + RuntimeFuture: An object representing the future result of + the function execution. This can be used to retrieve the + result by `await`ing it. + + Example: + >>> from bqskit.runtime import get_runtime + >>> + >>> def add(x, y): + ... return x + y + >>> + >>> future = get_runtime().submit(add, 1, 2) + >>> result = await future + >>> print(result) + 3 + + See Also: + - :func:`map` for submitting multiple tasks in parallel. + - :func:`cancel` for cancelling tasks. + - :class:`~bqskit.runtime.future.RuntimeFuture` for more + information on how to interact with the future object. + """ ... def map( self, fn: Callable[..., Any], *args: Any, + task_name: Sequence[str | None] | str | None = None, + log_context: Sequence[dict[str, str]] | dict[str, str] = {}, **kwargs: Any, ) -> RuntimeFuture: - """Map `fn` over the input arguments distributed across the runtime.""" + """ + Map a function over a sequence of arguments and execute in parallel. + + This method schedules the function `fn` to be executed by the runtime + for each set of arguments provided in `args`. Each invocation of `fn` + will be executed potentially in parallel, depending on the runtime's + capabilities and current load. + + Args: + fn (Callable[..., Any]): The function to be executed. + + *args (Any): Variable length argument list to be passed to + the function `fn`. Each argument is expected to be a + sequence of arguments to be passed to a separate + invocation. The sequences should be of equal length. + + task_name (Sequence[str | None] | str | None): An optional + name for the task group, which can be used for logging + or tracking purposes. Defaults to None, which will use + the function name as the task name. If a string is + provided, it will be used as the prefix for all task + names. If a sequence of strings is provided, each task + will be named with the corresponding string in the + sequence. + + log_context (Sequence[dict[str, str]]) | dict[str, str]): A + dictionary containing logging context information. All + log messages produced by the `fn` and any children tasks + will contain this context if the appropriate level + (logging.DEBUG) is set on the logger. Defaults to an + empty dictionary for no added context. Can be a sequence + of contexts, one for each task, or a single context to be + used for all tasks. + + **kwargs (Any): Arbitrary keyword arguments to be passed to + each invocation of the function `fn`. + + Returns: + RuntimeFuture: An object representing the future result of + the function executions. This can be used to retrieve the + results by `await`ing it, which will return a list. + + Example: + >>> from bqskit.runtime import get_runtime + >>> + >>> def add(x, y): + ... return x + y + >>> + >>> args_list = [(1, 2, 3), (4, 5, 6)] + >>> future = get_runtime().map(add, *args_list) + >>> results = await future + >>> print(results) + [5, 7, 9] + + See Also: + - :func:`submit` for submitting a single task. + - :func:`cancel` for cancelling tasks. + - :func:`next` for retrieving results incrementally. + - :class:`~bqskit.runtime.future.RuntimeFuture` for more + information on how to interact with the future object. + """ ... def cancel(self, future: RuntimeFuture) -> None: diff --git a/bqskit/runtime/task.py b/bqskit/runtime/task.py index 962ca1ff6..d74d79ec4 100644 --- a/bqskit/runtime/task.py +++ b/bqskit/runtime/task.py @@ -36,6 +36,8 @@ def __init__( breadcrumbs: tuple[RuntimeAddress, ...], logging_level: int | None = None, max_logging_depth: int = -1, + task_name: str | None = None, + log_context: dict[str, str] = {}, ) -> None: """Create the task with a new id and return address.""" RuntimeTask.task_counter += 1 @@ -43,7 +45,7 @@ def __init__( self.serialized_fnargs = dill.dumps(fnargs) self._fnargs: tuple[Any, Any, Any] | None = None - self._name = fnargs[0].__name__ + self._name = fnargs[0].__name__ if task_name is None else task_name """Tuple of function pointer, arguments, and keyword arguments.""" self.return_address = return_address @@ -68,9 +70,6 @@ def __init__( self.coro: Coroutine[Any, Any, Any] | None = None """The coroutine containing this tasks code.""" - # self.send: Any = None - # """A register that both the coroutine and task have access to.""" - self.desired_box_id: int | None = None """When waiting on a mailbox, this stores that mailbox's id.""" @@ -80,6 +79,8 @@ def __init__( self.wake_on_next: bool = False """Set to true if this task should wake immediately on a result.""" + self.log_context: dict[str, str] = log_context + @property def fnargs(self) -> tuple[Any, Any, Any]: """Return the function pointer, arguments, and keyword arguments.""" diff --git a/bqskit/runtime/worker.py b/bqskit/runtime/worker.py index 8aa0b4ce7..dee3bdca7 100644 --- a/bqskit/runtime/worker.py +++ b/bqskit/runtime/worker.py @@ -20,6 +20,7 @@ from typing import Callable from typing import cast from typing import List +from typing import Sequence from bqskit.runtime import default_worker_port from bqskit.runtime import set_blas_thread_counts @@ -202,6 +203,14 @@ def record_factory(*args: Any, **kwargs: Any) -> logging.LogRecord: if active_task is not None: lvl = active_task.logging_level if lvl is None or lvl <= record.levelno: + if lvl <= logging.DEBUG: + record.msg += f' [wid={self._id}' + items = active_task.log_context.items() + if len(items) > 0: + record.msg += ', ' + con_str = ', '.join(f'{k}={v}' for k, v in items) + record.msg += con_str + record.msg += ']' tid = active_task.comp_task_id self._conn.send((RuntimeMessage.LOG, (tid, record))) return record @@ -503,10 +512,25 @@ def submit( self, fn: Callable[..., Any], *args: Any, + task_name: str | None = None, + log_context: dict[str, str] = {}, **kwargs: Any, ) -> RuntimeFuture: """Submit `fn` as a task to the runtime.""" assert self._active_task is not None + + if task_name is not None and not isinstance(task_name, str): + raise RuntimeError('task_name must be a string.') + + if not isinstance(log_context, dict): + raise RuntimeError('log_context must be a dictionary.') + + for k, v in log_context.items(): + if not isinstance(k, str) or not isinstance(v, str): + raise RuntimeError( + 'log_context must be a map from strings to strings.', + ) + # Group fnargs together fnarg = (fn, args, kwargs) @@ -523,6 +547,8 @@ def submit( self._active_task.breadcrumbs + (self._active_task.return_address,), self._active_task.logging_level, self._active_task.max_logging_depth, + task_name, + self._active_task.log_context | log_context, ) # Submit the task (on the next cycle) @@ -535,10 +561,38 @@ def map( self, fn: Callable[..., Any], *args: Any, + task_name: Sequence[str | None] | str | None = None, + log_context: Sequence[dict[str, str]] | dict[str, str] = {}, **kwargs: Any, ) -> RuntimeFuture: """Map `fn` over the input arguments distributed across the runtime.""" assert self._active_task is not None + + if task_name is None or isinstance(task_name, str): + task_name = [task_name] * len(args[0]) + + if len(task_name) != len(args[0]): + raise RuntimeError( + 'task_name must be a string or a list of strings equal' + 'in length to the number of tasks.', + ) + + if isinstance(log_context, dict): + log_context = [log_context] * len(args[0]) + + if len(log_context) != len(args[0]): + raise RuntimeError( + 'log_context must be a dictionary or a list of dictionaries' + ' equal in length to the number of tasks.', + ) + + for context in log_context: + for k, v in context.items(): + if not isinstance(k, str) or not isinstance(v, str): + raise RuntimeError( + 'log_context must be a map from strings to strings.', + ) + # Group fnargs together fnargs = [] if len(args) == 1: @@ -568,6 +622,8 @@ def map( breadcrumbs, self._active_task.logging_level, self._active_task.max_logging_depth, + task_name[i], + self._active_task.log_context | log_context[i], ) for i, fnarg in enumerate(fnargs) ]