Skip to content

Commit

Permalink
Add log context capabilities to runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
edyounis committed Apr 12, 2024
1 parent 4189b23 commit d6eb2be
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 6 deletions.
116 changes: 114 additions & 2 deletions bqskit/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from typing import Any
from typing import Callable
from typing import Protocol
from typing import Sequence
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand Down Expand Up @@ -143,18 +144,129 @@ def submit(
self,
fn: Callable[..., Any],
*args: Any,
task_name: str | None = None,
log_context: dict[str, str] = {},
**kwargs: Any,
) -> RuntimeFuture:
"""Submit a `fn` to the runtime."""
"""
Submit a function to the runtime for execution.
This method schedules the function `fn` to be executed by the
runtime with the provided arguments `args` and keyword arguments
`kwargs`. The execution may happen asynchronously.
Args:
fn (Callable[..., Any]): The function to be executed.
*args (Any): Variable length argument list to be passed to
the function `fn`.
task_name (str | None): An optional name for the task, which
can be used for logging or tracking purposes. Defaults to
None, which will use the function name as the task name.
log_context (dict[str, str]): A dictionary containing logging
context information. All log messages produced by the fn
and any children tasks will contain this context if the
appropriate level (logging.DEBUG) is set on the logger.
Defaults to an empty dictionary for no added context.
**kwargs (Any): Arbitrary keyword arguments to be passed to
the function `fn`.
Returns:
RuntimeFuture: An object representing the future result of
the function execution. This can be used to retrieve the
result by `await`ing it.
Example:
>>> from bqskit.runtime import get_runtime
>>>
>>> def add(x, y):
... return x + y
>>>
>>> future = get_runtime().submit(add, 1, 2)
>>> result = await future
>>> print(result)
3
See Also:
- :func:`map` for submitting multiple tasks in parallel.
- :func:`cancel` for cancelling tasks.
- :class:`~bqskit.runtime.future.RuntimeFuture` for more
information on how to interact with the future object.
"""
...

def map(
self,
fn: Callable[..., Any],
*args: Any,
task_name: Sequence[str | None] | str | None = None,
log_context: Sequence[dict[str, str]] | dict[str, str] = {},
**kwargs: Any,
) -> RuntimeFuture:
"""Map `fn` over the input arguments distributed across the runtime."""
"""
Map a function over a sequence of arguments and execute in parallel.
This method schedules the function `fn` to be executed by the runtime
for each set of arguments provided in `args`. Each invocation of `fn`
will be executed potentially in parallel, depending on the runtime's
capabilities and current load.
Args:
fn (Callable[..., Any]): The function to be executed.
*args (Any): Variable length argument list to be passed to
the function `fn`. Each argument is expected to be a
sequence of arguments to be passed to a separate
invocation. The sequences should be of equal length.
task_name (Sequence[str | None] | str | None): An optional
name for the task group, which can be used for logging
or tracking purposes. Defaults to None, which will use
the function name as the task name. If a string is
provided, it will be used as the prefix for all task
names. If a sequence of strings is provided, each task
will be named with the corresponding string in the
sequence.
log_context (Sequence[dict[str, str]]) | dict[str, str]): A
dictionary containing logging context information. All
log messages produced by the `fn` and any children tasks
will contain this context if the appropriate level
(logging.DEBUG) is set on the logger. Defaults to an
empty dictionary for no added context. Can be a sequence
of contexts, one for each task, or a single context to be
used for all tasks.
**kwargs (Any): Arbitrary keyword arguments to be passed to
each invocation of the function `fn`.
Returns:
RuntimeFuture: An object representing the future result of
the function executions. This can be used to retrieve the
results by `await`ing it, which will return a list.
Example:
>>> from bqskit.runtime import get_runtime
>>>
>>> def add(x, y):
... return x + y
>>>
>>> args_list = [(1, 2, 3), (4, 5, 6)]
>>> future = get_runtime().map(add, *args_list)
>>> results = await future
>>> print(results)
[5, 7, 9]
See Also:
- :func:`submit` for submitting a single task.
- :func:`cancel` for cancelling tasks.
- :func:`next` for retrieving results incrementally.
- :class:`~bqskit.runtime.future.RuntimeFuture` for more
information on how to interact with the future object.
"""
...

def cancel(self, future: RuntimeFuture) -> None:
Expand Down
9 changes: 5 additions & 4 deletions bqskit/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ def __init__(
breadcrumbs: tuple[RuntimeAddress, ...],
logging_level: int | None = None,
max_logging_depth: int = -1,
task_name: str | None = None,
log_context: dict[str, str] = {},
) -> None:
"""Create the task with a new id and return address."""
RuntimeTask.task_counter += 1
self.task_id = RuntimeTask.task_counter

self.serialized_fnargs = dill.dumps(fnargs)
self._fnargs: tuple[Any, Any, Any] | None = None
self._name = fnargs[0].__name__
self._name = fnargs[0].__name__ if task_name is None else task_name
"""Tuple of function pointer, arguments, and keyword arguments."""

self.return_address = return_address
Expand All @@ -68,9 +70,6 @@ def __init__(
self.coro: Coroutine[Any, Any, Any] | None = None
"""The coroutine containing this tasks code."""

# self.send: Any = None
# """A register that both the coroutine and task have access to."""

self.desired_box_id: int | None = None
"""When waiting on a mailbox, this stores that mailbox's id."""

Expand All @@ -80,6 +79,8 @@ def __init__(
self.wake_on_next: bool = False
"""Set to true if this task should wake immediately on a result."""

self.log_context: dict[str, str] = log_context

@property
def fnargs(self) -> tuple[Any, Any, Any]:
"""Return the function pointer, arguments, and keyword arguments."""
Expand Down
56 changes: 56 additions & 0 deletions bqskit/runtime/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Callable
from typing import cast
from typing import List
from typing import Sequence

from bqskit.runtime import default_worker_port
from bqskit.runtime import set_blas_thread_counts
Expand Down Expand Up @@ -202,6 +203,14 @@ def record_factory(*args: Any, **kwargs: Any) -> logging.LogRecord:
if active_task is not None:
lvl = active_task.logging_level
if lvl is None or lvl <= record.levelno:
if lvl <= logging.DEBUG:
record.msg += f' [wid={self._id}'
items = active_task.log_context.items()
if len(items) > 0:
record.msg += ', '
con_str = ', '.join(f'{k}={v}' for k, v in items)
record.msg += con_str
record.msg += ']'
tid = active_task.comp_task_id
self._conn.send((RuntimeMessage.LOG, (tid, record)))
return record
Expand Down Expand Up @@ -503,10 +512,25 @@ def submit(
self,
fn: Callable[..., Any],
*args: Any,
task_name: str | None = None,
log_context: dict[str, str] = {},
**kwargs: Any,
) -> RuntimeFuture:
"""Submit `fn` as a task to the runtime."""
assert self._active_task is not None

if task_name is not None and not isinstance(task_name, str):
raise RuntimeError('task_name must be a string.')

if not isinstance(log_context, dict):
raise RuntimeError('log_context must be a dictionary.')

for k, v in log_context.items():
if not isinstance(k, str) or not isinstance(v, str):
raise RuntimeError(
'log_context must be a map from strings to strings.',
)

# Group fnargs together
fnarg = (fn, args, kwargs)

Expand All @@ -523,6 +547,8 @@ def submit(
self._active_task.breadcrumbs + (self._active_task.return_address,),
self._active_task.logging_level,
self._active_task.max_logging_depth,
task_name,
self._active_task.log_context | log_context,
)

# Submit the task (on the next cycle)
Expand All @@ -535,10 +561,38 @@ def map(
self,
fn: Callable[..., Any],
*args: Any,
task_name: Sequence[str | None] | str | None = None,
log_context: Sequence[dict[str, str]] | dict[str, str] = {},
**kwargs: Any,
) -> RuntimeFuture:
"""Map `fn` over the input arguments distributed across the runtime."""
assert self._active_task is not None

if task_name is None or isinstance(task_name, str):
task_name = [task_name] * len(args[0])

if len(task_name) != len(args[0]):
raise RuntimeError(
'task_name must be a string or a list of strings equal'
'in length to the number of tasks.',
)

if isinstance(log_context, dict):
log_context = [log_context] * len(args[0])

if len(log_context) != len(args[0]):
raise RuntimeError(
'log_context must be a dictionary or a list of dictionaries'
' equal in length to the number of tasks.',
)

for context in log_context:
for k, v in context.items():
if not isinstance(k, str) or not isinstance(v, str):
raise RuntimeError(
'log_context must be a map from strings to strings.',
)

# Group fnargs together
fnargs = []
if len(args) == 1:
Expand Down Expand Up @@ -568,6 +622,8 @@ def map(
breadcrumbs,
self._active_task.logging_level,
self._active_task.max_logging_depth,
task_name[i],
self._active_task.log_context | log_context[i],
)
for i, fnarg in enumerate(fnargs)
]
Expand Down

0 comments on commit d6eb2be

Please sign in to comment.