Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC - Flow builder function #3927

Closed
wants to merge 1 commit into from
Closed

POC - Flow builder function #3927

wants to merge 1 commit into from

Conversation

zanieb
Copy link
Contributor

@zanieb zanieb commented Jan 6, 2021

Note: Holding off on tests and documentation until discussion

Summary

I'd like to improve the flow development experience in a project format which benefits from some additional utilities.

This introduces a flow_builder decorator for functions that will initialize a flow named after the wrapped function and enter the context.

Tasks called from within the function are added to the flow

from prefect import task
from prefect.utilities.flows import flow_builder

@task(log_stdout=True)
def foo():
    print("Foobar!")

@flow_builder
def my_flow(flow):
    foo()

Flow initializer arguments can be passed

import prefect
from prefect import task
from prefect.utilities.flows import flow_builder

def say_name():
    prefect.context.logger.info(f"Hi, my name is {prefect.context.flow_id!r}")

@flow_builder(name="my-custom-name")
def my_flow(flow):
    say_name()

It also introduces a run_or_register utility which takes the place of

if __name__ == "__main__":
     flow.run(...)
     flow.register(...)

where users frequently have to comment out one of the calls. This allows you to do

run_or_register(flow, run_kwargs=dict(for_example=True))

then

python my-flow-file.py <run/register> to run or register it.

It also supports callables that produce flows so flows can be functional e.g.

def my_flow() -> Flow:
    with Flow("test") as flow:
        pass

run_or_register(my_flow)

The flow_builder decorator calls this automatically so it doesn't need to be included in every file and args can be passed to it still, e.g.

from prefect.utilities.flows import flow_builder
from prefect.executors import DaskExecutor

@flow_builder(
    register_kwargs=dict(project_name="default"),
    run_kwargs=dict(executor=DaskExecutor()),
)
def my_flow(flow):
    pass

Flow builder functions can be imported and treated normally as well, this doesn't make use of run/register_kwargs though -- it may be nice to have these defaults be stored in the Flow object?

from my_project import fn_that_uses_flow_builder

flow = fn_that_uses_flow_builder()
flow.register()

Changes

  • Adds new flows utilities

Importance

  • Defining flows in functions keeps module import times fast
  • Improves flow development experience without introducing new CLI
  • Provides a future pathway to tracking flows
  • Reduces user confusion around Python __main__ usage

Checklist

This PR:

  • adds new tests (if appropriate)
  • adds a change file in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

@codecov
Copy link

codecov bot commented Jan 6, 2021

Codecov Report

Merging #3927 (499713b) into master (79ea2b6) will decrease coverage by 0.37%.
The diff coverage is 15.90%.

@zanieb
Copy link
Contributor Author

zanieb commented Jan 8, 2021

Also considered having the decorator be named flow and having the first arg be self but I'm hesitant to make it so people can't have flow in their global namespace because it'd collide with this utility. That said, having @flow and @task makes sense...

@task
def foo():
    pass

@task
def bar():
    pass

@flow
def foobar(self):
     foo()
     bar()

@zanieb
Copy link
Contributor Author

zanieb commented Jan 13, 2021

Another possible name that does not collide and does not introduce another "named thing" is @with_flow. We could also consider making the Flow object work as a decorator

class Flow:
    def __init__(self, name_or_fn, *, name: str = None):
        if callable(name_or_fn):
            self._function = name_or_fn
            self.name = name or self._function.__name__.replace("_", "-")
        else:
            if name_or_fn and name:
                raise ValueError("Multiple values received for Flow name")
            self._function = None
            self.name = name_or_fn

    def __call__(self, *args, **kwargs):
        if not self._function:
            raise ValueError("Flow cannot be called unless used as a decorator")
        with self:
            return self._function(self)

    def __enter__(self):
        print("Enter context...")

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Exit context.")


@Flow
def test(self):
    print(self)


test()

@zanieb
Copy link
Contributor Author

zanieb commented Feb 18, 2021

Closing for now, perhaps a use-case in the future.

@zanieb zanieb closed this Feb 18, 2021
@zanieb zanieb deleted the flow-function branch January 3, 2023 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant