Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include update info in logging output #664

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,14 @@ class UpdateInfo:
name: str
"""Update type name."""

@property
def _logger_details(self) -> Mapping[str, Any]:
"""Data to be included in string appended to default logging output."""
return {
"update_id": self.id,
"update_name": self.name,
}


class _Runtime(ABC):
@staticmethod
Expand Down Expand Up @@ -1211,6 +1219,10 @@ class LoggerAdapter(logging.LoggerAdapter):
use by others. Default is False.
log_during_replay: Boolean for whether logs should occur during replay.
Default is False.

Values added to ``extra`` are merged with the ``extra`` dictionary from a
logging call, with values from the logging call taking precedence. I.e. the
behavior is that of `merge_extra=True` in Python >= 3.13.
"""

def __init__(
Expand All @@ -1232,20 +1244,28 @@ def process(
or self.workflow_info_on_extra
or self.full_workflow_info_on_extra
):
extra: Dict[str, Any] = {}
msg_extra: Dict[str, Any] = {}
runtime = _Runtime.maybe_current()
if runtime:
workflow_details = runtime.logger_details
if self.workflow_info_on_message:
msg = f"{msg} ({runtime.logger_details})"
msg_extra.update(workflow_details)
if self.workflow_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["temporal_workflow"] = runtime.logger_details
kwargs["extra"] = extra
extra["temporal_workflow"] = workflow_details
if self.full_workflow_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["workflow_info"] = runtime.workflow_info()
kwargs["extra"] = extra
update_info = current_update_info()
if update_info:
update_details = update_info._logger_details
if self.workflow_info_on_message:
msg_extra.update(update_details)
if self.workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)

kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
if msg_extra:
msg = f"{msg} ({msg_extra})"
return (msg, kwargs)

def isEnabledFor(self, level: int) -> bool:
Expand Down
27 changes: 24 additions & 3 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,10 @@ def my_signal(self, value: str) -> None:
self._last_signal = value
workflow.logger.info(f"Signal: {value}")

@workflow.update
def my_update(self, value: str) -> None:
workflow.logger.info(f"Update: {value}")

@workflow.query
def last_signal(self) -> str:
return self._last_signal
Expand Down Expand Up @@ -1955,14 +1959,22 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Send a couple signals
# Send some signals and updates
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
await handle.execute_update(
LoggingWorkflow.my_update, "update 1", id="update-1"
)
await handle.execute_update(
LoggingWorkflow.my_update, "update 2", id="update-2"
)
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)

# Confirm two logs happened
# Confirm logs were produced
assert capturer.find_log("Signal: signal 1 ({'attempt':")
assert capturer.find_log("Signal: signal 2")
assert capturer.find_log("Update: update 1")
assert capturer.find_log("Update: update 2")
assert not capturer.find_log("Signal: signal 3")
# Also make sure it has some workflow info and correct funcName
record = capturer.find_log("Signal: signal 1")
Expand All @@ -1974,6 +1986,15 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
)
# Since we enabled full info, make sure it's there
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
# Check the log emitted by the update execution.
record = capturer.find_log("Update: update 1")
assert (
record
and record.__dict__["temporal_workflow"]["update_id"] == "update-1"
and record.__dict__["temporal_workflow"]["update_name"] == "my_update"
and "'update_id': 'update-1'" in record.message
and "'update_name': 'my_update'" in record.message
)

# Clear queue and start a new one with more signals
capturer.log_queue.queue.clear()
Expand All @@ -1983,7 +2004,7 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
task_queue=worker.task_queue,
max_cached_workflows=0,
) as worker:
# Send a couple signals
# Send signals and updates
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
await handle.signal(LoggingWorkflow.my_signal, "finish")
await handle.result()
Expand Down
Loading