From b43b9800c2b9eeaf52570a5380cd0323bf0b1b8d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 9 Oct 2024 10:12:58 -0400 Subject: [PATCH 1/7] Make LoggerAdapter behavior identical to merge_extra=True --- temporalio/workflow.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index cb1464c4..5409df5d 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -1211,6 +1211,10 @@ class LoggerAdapter(logging.LoggerAdapter): use by others. Default is False. log_during_replay: Boolean for whether logs should occur during replay. Default is False. + + Values added to ``extra`` are merged with the ``extra`` map from a logging + call, with values from the logging call taking precedence. I.e. the + behavior is that of `merge_extra=True` in Python >= 3.13. """ def __init__( @@ -1232,20 +1236,17 @@ def process( or self.workflow_info_on_extra or self.full_workflow_info_on_extra ): + extra = {} runtime = _Runtime.maybe_current() if runtime: if self.workflow_info_on_message: msg = f"{msg} ({runtime.logger_details})" if self.workflow_info_on_extra: - # Extra can be absent or None, this handles both - extra = kwargs.get("extra", None) or {} extra["temporal_workflow"] = runtime.logger_details - kwargs["extra"] = extra if self.full_workflow_info_on_extra: - # Extra can be absent or None, this handles both - extra = kwargs.get("extra", None) or {} extra["workflow_info"] = runtime.workflow_info() - kwargs["extra"] = extra + + kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})} return (msg, kwargs) def isEnabledFor(self, level: int) -> bool: From d7f2e9fb2a6e6db27d6c35f1b40acb06ac927a9b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 9 Oct 2024 14:27:21 -0400 Subject: [PATCH 2/7] Refactor --- temporalio/workflow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 5409df5d..b5467d7e 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -1237,16 +1237,19 @@ def process( or self.full_workflow_info_on_extra ): extra = {} + msg_extra = {} runtime = _Runtime.maybe_current() if runtime: if self.workflow_info_on_message: - msg = f"{msg} ({runtime.logger_details})" + msg_extra.update(runtime.logger_details) if self.workflow_info_on_extra: extra["temporal_workflow"] = runtime.logger_details if self.full_workflow_info_on_extra: extra["workflow_info"] = runtime.workflow_info() kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})} + if msg_extra: + msg = f"{msg} ({msg_extra})" return (msg, kwargs) def isEnabledFor(self, level: int) -> bool: From f452f03ed64573df66d826adddb674f25ee22838 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 9 Oct 2024 10:17:21 -0400 Subject: [PATCH 3/7] Add update info to logging `extra` --- temporalio/workflow.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index b5467d7e..5c7031a9 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -11,7 +11,7 @@ import warnings from abc import ABC, abstractmethod from contextlib import contextmanager -from dataclasses import dataclass +from dataclasses import asdict, dataclass from datetime import datetime, timedelta, timezone from enum import Enum, IntEnum from functools import partial @@ -500,6 +500,14 @@ class UpdateInfo: name: str """Update type name.""" + @property + def logger_details(self) -> Mapping[str, Any]: + """Data to be included in string appended to default logging output.""" + return { + "update_id": self.id, + "update_name": self.name, + } + class _Runtime(ABC): @staticmethod @@ -1205,6 +1213,10 @@ class LoggerAdapter(logging.LoggerAdapter): dictionary value will be added to the ``extra`` dictionary with some workflow info, making it present on the ``LogRecord.__dict__`` for use by others. Default is True. + update_info_on_extra: Boolean for whether a ``temporal_update`` + dictionary value will be added to the ``extra`` dictionary with some + update info, making it present on the ``LogRecord.__dict__`` for use + by others. Default is True. full_workflow_info_on_extra: Boolean for whether a ``workflow_info`` value will be added to the ``extra`` dictionary with the entire workflow info, making it present on the ``LogRecord.__dict__`` for @@ -1224,6 +1236,7 @@ def __init__( super().__init__(logger, extra or {}) self.workflow_info_on_message = True self.workflow_info_on_extra = True + self.update_info_on_extra = True self.full_workflow_info_on_extra = False self.log_during_replay = False @@ -1234,10 +1247,11 @@ def process( if ( self.workflow_info_on_message or self.workflow_info_on_extra + or self.update_info_on_extra or self.full_workflow_info_on_extra ): - extra = {} - msg_extra = {} + extra: Dict[str, Any] = {} + msg_extra: Dict[str, Any] = {} runtime = _Runtime.maybe_current() if runtime: if self.workflow_info_on_message: @@ -1246,6 +1260,12 @@ def process( extra["temporal_workflow"] = runtime.logger_details if self.full_workflow_info_on_extra: extra["workflow_info"] = runtime.workflow_info() + update_info = current_update_info() + if update_info: + if self.update_info_on_extra: + extra["temporal_update"] = asdict(update_info) + if self.workflow_info_on_message: + msg_extra.update(update_info.logger_details) kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})} if msg_extra: From f94249e7d25619c7c36e64d0c14ee68247e1157f Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 9 Oct 2024 17:37:38 -0400 Subject: [PATCH 4/7] Test --- tests/worker/test_workflow.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 15afe8c4..7c8dff51 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1908,6 +1908,10 @@ def my_signal(self, value: str) -> None: self._last_signal = value workflow.logger.info(f"Signal: {value}") + @workflow.update + def my_update(self, value: str) -> None: + workflow.logger.info(f"Update: {value}") + @workflow.query def last_signal(self) -> str: return self._last_signal @@ -1955,14 +1959,22 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment): id=f"workflow-{uuid.uuid4()}", task_queue=worker.task_queue, ) - # Send a couple signals + # Send some signals and updates await handle.signal(LoggingWorkflow.my_signal, "signal 1") await handle.signal(LoggingWorkflow.my_signal, "signal 2") + await handle.execute_update( + LoggingWorkflow.my_update, "update 1", id="update-1" + ) + await handle.execute_update( + LoggingWorkflow.my_update, "update 2", id="update-2" + ) assert "signal 2" == await handle.query(LoggingWorkflow.last_signal) - # Confirm two logs happened + # Confirm logs were produced assert capturer.find_log("Signal: signal 1 ({'attempt':") assert capturer.find_log("Signal: signal 2") + assert capturer.find_log("Update: update 1") + assert capturer.find_log("Update: update 2") assert not capturer.find_log("Signal: signal 3") # Also make sure it has some workflow info and correct funcName record = capturer.find_log("Signal: signal 1") @@ -1974,6 +1986,15 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment): ) # Since we enabled full info, make sure it's there assert isinstance(record.__dict__["workflow_info"], workflow.Info) + # Check the log emitted by the update execution. + record = capturer.find_log("Update: update 1") + assert ( + record + and record.__dict__["temporal_update"]["id"] == "update-1" + and record.__dict__["temporal_update"]["name"] == "my_update" + and "'update_id': 'update-1'" in record.message + and "'update_name': 'my_update'" in record.message + ) # Clear queue and start a new one with more signals capturer.log_queue.queue.clear() @@ -1983,7 +2004,7 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment): task_queue=worker.task_queue, max_cached_workflows=0, ) as worker: - # Send a couple signals + # Send signals and updates await handle.signal(LoggingWorkflow.my_signal, "signal 3") await handle.signal(LoggingWorkflow.my_signal, "finish") await handle.result() From 28fcd5aeda3d5544d6b3a00e75ce31a2fe3f1232 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 10 Oct 2024 09:29:32 -0400 Subject: [PATCH 5/7] Tweak docstring --- temporalio/workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 5c7031a9..db61883a 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -1224,8 +1224,8 @@ class LoggerAdapter(logging.LoggerAdapter): log_during_replay: Boolean for whether logs should occur during replay. Default is False. - Values added to ``extra`` are merged with the ``extra`` map from a logging - call, with values from the logging call taking precedence. I.e. the + Values added to ``extra`` are merged with the ``extra`` dictionary from a + logging call, with values from the logging call taking precedence. I.e. the behavior is that of `merge_extra=True` in Python >= 3.13. """ From 09fcea6c0f10278755427eced1898797b6470ed7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 10 Oct 2024 10:40:33 -0400 Subject: [PATCH 6/7] Simplify as per @cretz suggestion --- temporalio/workflow.py | 20 ++++++++------------ tests/worker/test_workflow.py | 4 ++-- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index db61883a..903cd5c5 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -11,7 +11,7 @@ import warnings from abc import ABC, abstractmethod from contextlib import contextmanager -from dataclasses import asdict, dataclass +from dataclasses import dataclass from datetime import datetime, timedelta, timezone from enum import Enum, IntEnum from functools import partial @@ -1213,10 +1213,6 @@ class LoggerAdapter(logging.LoggerAdapter): dictionary value will be added to the ``extra`` dictionary with some workflow info, making it present on the ``LogRecord.__dict__`` for use by others. Default is True. - update_info_on_extra: Boolean for whether a ``temporal_update`` - dictionary value will be added to the ``extra`` dictionary with some - update info, making it present on the ``LogRecord.__dict__`` for use - by others. Default is True. full_workflow_info_on_extra: Boolean for whether a ``workflow_info`` value will be added to the ``extra`` dictionary with the entire workflow info, making it present on the ``LogRecord.__dict__`` for @@ -1236,7 +1232,6 @@ def __init__( super().__init__(logger, extra or {}) self.workflow_info_on_message = True self.workflow_info_on_extra = True - self.update_info_on_extra = True self.full_workflow_info_on_extra = False self.log_during_replay = False @@ -1247,25 +1242,26 @@ def process( if ( self.workflow_info_on_message or self.workflow_info_on_extra - or self.update_info_on_extra or self.full_workflow_info_on_extra ): extra: Dict[str, Any] = {} msg_extra: Dict[str, Any] = {} runtime = _Runtime.maybe_current() if runtime: + workflow_details = runtime.logger_details if self.workflow_info_on_message: - msg_extra.update(runtime.logger_details) + msg_extra.update(workflow_details) if self.workflow_info_on_extra: - extra["temporal_workflow"] = runtime.logger_details + extra["temporal_workflow"] = workflow_details if self.full_workflow_info_on_extra: extra["workflow_info"] = runtime.workflow_info() update_info = current_update_info() if update_info: - if self.update_info_on_extra: - extra["temporal_update"] = asdict(update_info) + update_details = update_info.logger_details if self.workflow_info_on_message: - msg_extra.update(update_info.logger_details) + msg_extra.update(update_details) + if self.workflow_info_on_extra: + extra.setdefault("temporal_workflow", {}).update(update_details) kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})} if msg_extra: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 7c8dff51..35ad3e9a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -1990,8 +1990,8 @@ async def test_workflow_logging(client: Client, env: WorkflowEnvironment): record = capturer.find_log("Update: update 1") assert ( record - and record.__dict__["temporal_update"]["id"] == "update-1" - and record.__dict__["temporal_update"]["name"] == "my_update" + and record.__dict__["temporal_workflow"]["update_id"] == "update-1" + and record.__dict__["temporal_workflow"]["update_name"] == "my_update" and "'update_id': 'update-1'" in record.message and "'update_name': 'my_update'" in record.message ) From 747db44423d38337a11d0c627ad422dbe70cbcbf Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 10 Oct 2024 10:45:28 -0400 Subject: [PATCH 7/7] Make method private --- temporalio/workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 903cd5c5..521d3c10 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -501,7 +501,7 @@ class UpdateInfo: """Update type name.""" @property - def logger_details(self) -> Mapping[str, Any]: + def _logger_details(self) -> Mapping[str, Any]: """Data to be included in string appended to default logging output.""" return { "update_id": self.id, @@ -1257,7 +1257,7 @@ def process( extra["workflow_info"] = runtime.workflow_info() update_info = current_update_info() if update_info: - update_details = update_info.logger_details + update_details = update_info._logger_details if self.workflow_info_on_message: msg_extra.update(update_details) if self.workflow_info_on_extra: