Skip to content

Commit

Permalink
Add log_prints option to redirect print to logs (#7580)
Browse files Browse the repository at this point in the history
Co-authored-by: Will Raphaelson <[email protected]>
Co-authored-by: Will Raphaelson <[email protected]>
Co-authored-by: Nathan Nowack <[email protected]>
Co-authored-by: Terrence Dorsey <[email protected]>
  • Loading branch information
5 people authored and github-actions[bot] committed Dec 1, 2022
1 parent e37653d commit 36d0543
Show file tree
Hide file tree
Showing 11 changed files with 490 additions and 2 deletions.
62 changes: 62 additions & 0 deletions docs/concepts/logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,68 @@ Prefect automatically uses the task run logger based on the task context. The de

The underlying log model for task runs captures the task name, task run ID, and parent flow run ID, which are persisted to the database for reporting and may also be used in custom message formatting.

### Logging print statements

Prefect provides the `log_prints` option to enable logging of print statements at the task or flow level. By default, tasks and subflows will inherit the print logging behavior from their parent flow, unless opted out with their own `log_print=False`.

```python
from prefect import task, flow

@task
def my_task():
print("we're logging print statements from a task")

@flow(log_prints=True)
def my_flow():
print("we're logging print statements from a flow")
my_task()
```

Will output:

<div class='terminal'>
```bash
15:52:11.244 | INFO | prefect.engine - Created flow run 'emerald-gharial' for flow 'my-flow'
15:52:11.812 | INFO | Flow run 'emerald-gharial' - we're logging print statements from a flow
15:52:11.926 | INFO | Flow run 'emerald-gharial' - Created task run 'my_task-20c6ece6-0' for task 'my_task'
15:52:11.927 | INFO | Flow run 'emerald-gharial' - Executing 'my_task-20c6ece6-0' immediately...
15:52:12.217 | INFO | Task run 'my_task-20c6ece6-0' - we're logging print statements from a task
```
</div>

```python
from prefect import task, flow

@task
def my_task(log_prints=False):
print("not logging print statements in this task")

@flow(log_prints=True)
def my_flow():
print("we're logging print statements from a flow")
my_task()
```

Using `log_prints=False` at the task level will output:

<div class='terminal'>
```bash
15:52:11.244 | INFO | prefect.engine - Created flow run 'emerald-gharial' for flow 'my-flow'
15:52:11.812 | INFO | Flow run 'emerald-gharial' - we're logging print statements from a flow
15:52:11.926 | INFO | Flow run 'emerald-gharial' - Created task run 'my_task-20c6ece6-0' for task 'my_task'
15:52:11.927 | INFO | Flow run 'emerald-gharial' - Executing 'my_task-20c6ece6-0' immediately...
not logging print statements in this task
```
</div>
You can also configure this behavior globally for all Prefect flows, tasks, and subflows.
<div class='terminal'>
```bash
prefect config set PREFECT_LOGGING_LOG_PRINTS=True
```
</div>
## Formatters
Prefect log formatters specify the format of log messages. You can see details of message formatting for different loggers in [`logging.yml`](https://github.com/PrefectHQ/prefect/blob/orion/src/prefect/logging/logging.yml). For example, the default formatting for task run log records is:
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class FlowRunContext(RunContext):
flow: "Flow"
flow_run: FlowRun
task_runner: BaseTaskRunner
log_prints: bool = False

# Result handling
result_factory: ResultFactory
Expand Down Expand Up @@ -259,6 +260,7 @@ class TaskRunContext(RunContext):
task: "Task"
task_run: TaskRun
timeout_scope: Optional[anyio.abc.CancelScope] = None
log_prints: bool = False

# Result handling
result_factory: ResultFactory
Expand Down
36 changes: 34 additions & 2 deletions src/prefect/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
flow_run_logger,
get_logger,
get_run_logger,
patch_print,
task_run_logger,
)
from prefect.orion.schemas.core import TaskRunInput, TaskRunResult
Expand All @@ -63,7 +64,7 @@
from prefect.orion.schemas.sorting import FlowRunSort
from prefect.orion.schemas.states import StateDetails, StateType
from prefect.results import BaseResult, ResultFactory
from prefect.settings import PREFECT_DEBUG_MODE
from prefect.settings import PREFECT_DEBUG_MODE, PREFECT_LOGGING_LOG_PRINTS
from prefect.states import (
Paused,
Pending,
Expand Down Expand Up @@ -332,7 +333,8 @@ async def begin_flow_run(
"""
logger = flow_run_logger(flow_run, flow)

flow_run_context = PartialModel(FlowRunContext)
log_prints = should_log_prints(flow)
flow_run_context = PartialModel(FlowRunContext, log_prints=log_prints)

async with AsyncExitStack() as stack:

Expand Down Expand Up @@ -362,6 +364,9 @@ async def begin_flow_run(
flow, client=client
)

if log_prints:
stack.enter_context(patch_print())

terminal_state = await orchestrate_flow_run(
flow,
flow_run=flow_run,
Expand Down Expand Up @@ -409,6 +414,7 @@ async def create_and_begin_subflow_run(
"""
parent_flow_run_context = FlowRunContext.get()
parent_logger = get_run_logger(parent_flow_run_context)
log_prints = should_log_prints(flow)

parent_logger.debug(f"Resolving inputs to {flow.name!r}")
task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}
Expand Down Expand Up @@ -488,6 +494,10 @@ async def create_and_begin_subflow_run(
report_flow_run_crashes(flow_run=flow_run, client=client)
)
task_runner = await stack.enter_async_context(flow.task_runner.start())

if log_prints:
stack.enter_context(patch_print())

terminal_state = await orchestrate_flow_run(
flow,
flow_run=flow_run,
Expand All @@ -503,6 +513,7 @@ async def create_and_begin_subflow_run(
task_runner=task_runner,
background_tasks=parent_flow_run_context.background_tasks,
result_factory=result_factory,
log_prints=log_prints,
),
)

Expand Down Expand Up @@ -587,6 +598,7 @@ async def orchestrate_flow_run(
client=client,
timeout_scope=timeout_scope,
) as flow_run_context:

args, kwargs = parameters_to_args_kwargs(flow.fn, parameters)
logger.debug(
f"Executing flow {flow.name!r} for flow run {flow_run.name!r}..."
Expand Down Expand Up @@ -1095,6 +1107,7 @@ async def submit_task_run(
result_factory=await ResultFactory.from_task(
task, client=flow_run_context.client
),
log_prints=should_log_prints(task),
settings=prefect.context.SettingsContext.get().copy(),
),
)
Expand All @@ -1111,6 +1124,7 @@ async def begin_task_run(
parameters: Dict[str, Any],
wait_for: Optional[Iterable[PrefectFuture]],
result_factory: ResultFactory,
log_prints: bool,
settings: prefect.context.SettingsContext,
):
"""
Expand Down Expand Up @@ -1170,6 +1184,9 @@ async def begin_task_run(

# TODO: Use the background tasks group to manage logging for this task

if log_prints:
stack.enter_context(patch_print())

connect_error = await client.api_healthcheck()
if connect_error:
raise RuntimeError(
Expand All @@ -1184,6 +1201,7 @@ async def begin_task_run(
parameters=parameters,
wait_for=wait_for,
result_factory=result_factory,
log_prints=log_prints,
interruptible=interruptible,
client=client,
)
Expand All @@ -1210,6 +1228,7 @@ async def orchestrate_task_run(
parameters: Dict[str, Any],
wait_for: Optional[Iterable[PrefectFuture]],
result_factory: ResultFactory,
log_prints: bool,
interruptible: bool,
client: OrionClient,
) -> State:
Expand Down Expand Up @@ -1246,6 +1265,7 @@ async def orchestrate_task_run(
task=task,
client=client,
result_factory=result_factory,
log_prints=log_prints,
)

try:
Expand Down Expand Up @@ -1729,6 +1749,18 @@ def link_if_trackable(obj: Any) -> None:
visit_collection(expr=result, visit_fn=link_if_trackable, max_depth=1)


def should_log_prints(flow_or_task: Union[Flow, Task]) -> bool:
flow_run_context = FlowRunContext.get()

if flow_or_task.log_prints is None:
if flow_run_context:
return flow_run_context.log_prints
else:
return PREFECT_LOGGING_LOG_PRINTS.value()

return flow_or_task.log_prints


if __name__ == "__main__":
import os
import sys
Expand Down
13 changes: 13 additions & 0 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def __init__(
result_storage: Optional[ResultStorage] = None,
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
):
if not callable(fn):
raise TypeError("'fn' must be callable")
Expand All @@ -144,6 +145,8 @@ def __init__(
task_runner() if isinstance(task_runner, type) else task_runner
)

self.log_prints = log_prints

self.description = description or inspect.getdoc(fn)
update_wrapper(self, fn)
self.fn = fn
Expand Down Expand Up @@ -221,6 +224,7 @@ def with_options(
result_storage: Optional[ResultStorage] = NotSet,
result_serializer: Optional[ResultSerializer] = NotSet,
cache_result_in_memory: bool = None,
log_prints: Optional[bool] = NotSet,
):
"""
Create a new flow from the current object, updating provided options.
Expand Down Expand Up @@ -301,6 +305,7 @@ def with_options(
if cache_result_in_memory is not None
else self.cache_result_in_memory
),
log_prints=log_prints if log_prints is not NotSet else self.log_prints,
)

def validate_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -512,6 +517,7 @@ def flow(
result_storage: Optional[ResultStorage] = None,
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
) -> Callable[[Callable[P, R]], Flow[P, R]]:
...

Expand All @@ -531,6 +537,7 @@ def flow(
result_storage: Optional[ResultStorage] = None,
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
):
"""
Decorator to designate a function as a Prefect workflow.
Expand Down Expand Up @@ -574,6 +581,10 @@ def flow(
in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER`
will be used unless called as a subflow, at which point the default will be
loaded from the parent flow.
log_prints: If set, `print` statements in the flow will be redirected to the
Prefect logger for the flow run. Defaults to `None`, which indicates that
the value from the parent flow should be used. If this is a parent flow,
the default is pulled from the `PREFECT_LOGGING_LOG_PRINTS` setting.
Returns:
A callable `Flow` object which, when called, will run the flow and return its
Expand Down Expand Up @@ -630,6 +641,7 @@ def flow(
result_storage=result_storage,
result_serializer=result_serializer,
cache_result_in_memory=cache_result_in_memory,
log_prints=log_prints,
),
)
else:
Expand All @@ -649,6 +661,7 @@ def flow(
result_storage=result_storage,
result_serializer=result_serializer,
cache_result_in_memory=cache_result_in_memory,
log_prints=log_prints,
),
)

Expand Down
41 changes: 41 additions & 0 deletions src/prefect/logging/loggers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import io
import logging
from builtins import print
from contextlib import contextmanager
from functools import lru_cache
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -194,3 +196,42 @@ def disable_run_logger():
"""
with disable_logger("prefect.flow_run"), disable_logger("prefect.task_run"):
yield


def print_as_log(*args, **kwargs):
"""
A patch for `print` to send printed messages to the Prefect run logger.
If no run is active, `print` will behave as if it were not patched.
"""
from prefect.context import FlowRunContext, TaskRunContext

context = TaskRunContext.get() or FlowRunContext.get()
if not context or not context.log_prints:
return print(*args, **kwargs)

logger = get_run_logger()

# Print to an in-memory buffer; so we do not need to implement `print`
buffer = io.StringIO()
kwargs["file"] = buffer
print(*args, **kwargs)

# Remove trailing whitespace to prevent duplicates
logger.info(buffer.getvalue().rstrip())


@contextmanager
def patch_print():
"""
Patches the Python builtin `print` method to use `print_as_log`
"""
import builtins

original = builtins.print

try:
builtins.print = print_as_log
yield
finally:
builtins.print = original
9 changes: 9 additions & 0 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,15 @@ def default_cloud_ui_url(settings, value):
be set to the same level as the 'prefect' logger.
"""

PREFECT_LOGGING_LOG_PRINTS = Setting(
bool,
default=False,
)
"""
If set, `print` statements in flows and tasks will be redirected to the Prefect logger
for the given run. This setting can be overriden by individual tasks and flows.
"""

PREFECT_LOGGING_ORION_ENABLED = Setting(
bool,
default=True,
Expand Down
Loading

0 comments on commit 36d0543

Please sign in to comment.