Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based flow storage #2840

Merged
merged 24 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e74c424
Working flow save and load for Local storage as file
joshmeek Jun 17, 2020
2941ade
Checkpoint comments
joshmeek Jun 17, 2020
3581430
Merge branch 'master' into file_based_storage
joshmeek Jun 18, 2020
7683e06
Implement base GitHub storage POC
joshmeek Jun 18, 2020
fed569b
First roundtrip file based flow storage execution
joshmeek Jun 19, 2020
3517a44
Merge branch 'master' into file_based_storage
joshmeek Jun 19, 2020
a748afe
Fix up register CLI command
joshmeek Jun 19, 2020
da786fc
Clean up GitHub storage
joshmeek Jun 22, 2020
3223a3c
Merge branch 'master' into file_based_storage
joshmeek Jun 22, 2020
82553ff
Revert local storage, smooth registration process, add github client …
joshmeek Jun 22, 2020
129a522
Add unit tests for file based storage pattern and new functions
joshmeek Jun 22, 2020
0fd55f9
Fix labeling in LocalAgent tests
joshmeek Jun 22, 2020
861b553
Update github client mock
joshmeek Jun 22, 2020
598bcc1
Add function gate when using file based execution
joshmeek Jun 23, 2020
3fc228e
Update docs and add file based storage idiom
joshmeek Jun 23, 2020
408ffc1
Add changelog entry
joshmeek Jun 23, 2020
8df8fcb
Merge branch 'master' into file_based_storage
joshmeek Jun 23, 2020
5606431
Update changelog entries
joshmeek Jun 23, 2020
7da4daa
Add function gate to register CLI command
joshmeek Jun 23, 2020
b6a8c3d
Remove leading slash in file based idiom
joshmeek Jun 23, 2020
95199ae
Update function_gate flag to loading_flow
joshmeek Jun 24, 2020
8c5d383
Address comments and fix tests
joshmeek Jun 24, 2020
5ec815a
Remove doc section about storage agent limitations
joshmeek Jun 24, 2020
bebf596
Update docs/core/idioms/file-based.md
joshmeek Jun 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions changes/pr2839.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
# Here's an example of a PR that adds an enhancement

enhancement:
- "Forward state change status back to core - [#2839](https://github.com/PrefectHQ/prefect/pull/2839)"

contributor:
- "[Alex Cano](https://github.com/alexisprince1994)"

- "Forward state change status back to core - [#2839](https://github.com/PrefectHQ/prefect/pull/2839)"

contributor:
- "[Alex Cano](https://github.com/alexisprince1994)"
7 changes: 7 additions & 0 deletions changes/pr2840.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
feature:
- "Flows can now be stored and executed using file-based storage - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)"

enhancement:
- "Add GitHub storage for storing flows as files in a GitHub repo - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)"
- "Add `prefect register flow` CLI command for registering flows from files - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)"
- "Add default `GITHUB_ACCESS_TOKEN` secret - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)"
11 changes: 6 additions & 5 deletions docs/core/concepts/secrets.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ For example, given an environment with the environment variable `PREFECT__CONTEX
::: tab PrefectSecret
```python
>>> from prefect.tasks.secrets import PrefectSecret
>>> p = PrefectSecret('foo')
>>> p.run()
>>> p = PrefectSecret('foo')
>>> p.run()
'mypassword'
```
:::
::: tab Secret API
```python
>>> from prefect.client.secrets import Secret
>>> s = Secret("FOO")
>>> s.get()
>>> from prefect.client.secrets import Secret
>>> s = Secret("FOO")
>>> s.get()
'mypassword'
```
:::
Expand Down Expand Up @@ -74,6 +74,7 @@ The following is a list of the default names and contents of Prefect Secrets tha

- `GCP_CREDENTIALS`: a dictionary containing a valid [Service Account Key](https://cloud.google.com/docs/authentication/getting-started)
- `AWS_CREDENTIALS`: a dictionary containing two required keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY`, and an optional `SESSION_TOKEN`, which are passed directly to [the `boto3` client](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html)
- `GITHUB_ACCESS_TOKEN`: a string value of a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line)

For example, when using local secrets, your Prefect installation can be configured to authenticate to AWS automatically by adding that specific `AWS_CREDENTIALS` key value pair into your secrets context like so:

Expand Down
69 changes: 69 additions & 0 deletions docs/core/idioms/file-based.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Using file based flow storage

Prefect version `0.12.1` began to implement support for storing flows as paths to files. This means that flow code can change in between (or even during) runs without needing to be reregistered. As long as the structure of the flow itself does not change, only the task content, then a Prefect API backend will be able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD processes, and more!

### Example file based workflow

In this example we will walk through a potential workflow you may use when registering flows with [GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub repository with the following structure:

```
repo
README.md
flows/
my_flow.py
```

First, compose your flow file and give the flow `GitHub` storage:

```python
# flows/my_flow.py

from prefect import task, Flow
from prefect.environments.storage import GitHub

@task
def get_data():
return [1, 2, 3, 4, 5]

@task
def print_data(data):
print(data)

with Flow("file-based-flow") as flow:
data = get_data()
print_data(data)

flow.storage = GitHub(
repo="org/repo", # name of repo
path="flows/my_flow.py", # location of flow file in repo
secrets=["GITHUB_ACCESS_TOKEN"] # name of personal access token secret
)
```

Here's a breakdown of the three kwargs set on the `GitHub` storage:

- `repo`: the name of the repo that this code will live in
- `path`: the location of the flow file in the repo. This must be an exact match to the path of the file.
- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo.

Push this code to the repository:

```bash
git add .
git commit -m 'Add my flow'
git push
```

Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either Core's server or Prefect Cloud).

```bash
prefect register -f flows/my_flow.py
Result check: OK
Flow: http://localhost:8080/flow/9f5f7bea-186e-44d1-a746-417239663614
```

The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all you need to do is commit that code to the same location in the repository and each subsequent run will use that code.

::: warning Flow Structure
If you change any of the structure of your flow such as task names, rearrange task order, etc. then you will need to reregister that flow.
:::
1 change: 1 addition & 0 deletions docs/core/idioms/idioms.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
- [Testing Prefect flows and tasks](testing-flows.html)
- [Using Result targets for efficient caching](targets.html)
- [Configuring notifications](notifications.html)
- [Using file based flow storage](file-based.html)
20 changes: 17 additions & 3 deletions docs/orchestration/execution/storage_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Additionally, in more recent releases of Core your flow will default to using a

## Azure Blob Storage

[Azure Storage](/api/latest/environments/storage.html#azure) is a storage option that uploads flows to an Azure Blob container. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from that Azure Blob container using a connection string or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments).
[Azure Storage](/api/latest/environments/storage.html#azure) is a storage option that uploads flows to an Azure Blob container.

```python
from prefect import Flow
Expand All @@ -54,7 +54,7 @@ Azure Storage uses an Azure [connection string](https://docs.microsoft.com/en-us

## AWS S3

[S3 Storage](/api/latest/environments/storage.html#s3) is a storage option that uploads flows to an AWS S3 bucket. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from an S3 bucket or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments).
[S3 Storage](/api/latest/environments/storage.html#s3) is a storage option that uploads flows to an AWS S3 bucket.

```python
from prefect import Flow
Expand All @@ -79,7 +79,7 @@ S3 Storage uses AWS credentials the same way as [boto3](https://boto3.amazonaws.

## Google Cloud Storage

[GCS Storage](/api/latest/environments/storage.html#gcs) is a storage option that uploads flows to a Google Cloud Storage bucket. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from a GCS bucket or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments).
[GCS Storage](/api/latest/environments/storage.html#gcs) is a storage option that uploads flows to a Google Cloud Storage bucket.

```python
from prefect import Flow
Expand All @@ -102,6 +102,20 @@ Additionally, in more recent releases of Core your flow will default to using a
GCS Storage uses Google Cloud credentials the same way as the standard [google.cloud library](https://cloud.google.com/docs/authentication/production#auth-cloud-implicit-python) which means both upload (build) and download (local agent) times need to have the proper Google Application Credentials configuration.
:::

## GitHub

[GitHub Storage](/api/latest/environments/storage.html#github) is a storage option that uploads flows to a GitHub repository as `.py` files.

For a detailed look on how to use GitHub storage visit the [Using file based storage](/core/idioms/file-based.html) idiom.

::: tip Sensible Defaults
Flows registered with this storage option will automatically be labeled with `"github-flow-storage"`; this helps prevents agents not explicitly authenticated with your GitHub repo from attempting to run this flow.
:::

:::tip GitHub Credentials
GitHub storage uses a [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) for authenticating with repositories.
:::

## Docker

[Docker Storage](/api/latest/environments/storage.html#docker) is a storage option that puts flows inside of a Docker image and pushes them to a container registry. This method of Storage has deployment compatability with the [Docker Agent](/orchestration/agents/docker.html), [Kubernetes Agent](/orchestration/agents/kubernetes.html), and [Fargate Agent](/orchestration/agents/fargate.html).
Expand Down
2 changes: 1 addition & 1 deletion docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ classes = ["CloudFlowRunner", "CloudTaskRunner"]
[pages.environments.storage]
title = "Storage"
module = "prefect.environments.storage"
classes = ["Docker", "Local", "S3", "GCS", "Azure"]
classes = ["Docker", "Local", "S3", "GCS", "Azure", "GitHub"]

[pages.environments.execution]
title = "Execution Environments"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def run(self):
"google-cloud-bigquery >= 1.6.0, < 2.0",
"google-cloud-storage >= 1.13, < 2.0",
],
"github": ["PyGithub >= 1.51, < 2.0"],
"google": [
"google-cloud-bigquery >= 1.6.0, < 2.0",
"google-cloud-storage >= 1.13, < 2.0",
Expand Down
11 changes: 8 additions & 3 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from prefect import config
from prefect.agent import Agent
from prefect.environments.storage import GCS, S3, Azure, Local
from prefect.environments.storage import GCS, S3, Azure, Local, GitHub
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult

Expand Down Expand Up @@ -83,7 +83,12 @@ def __init__(
assert isinstance(self.labels, list)
self.labels.append(hostname)
self.labels.extend(
["azure-flow-storage", "gcs-flow-storage", "s3-flow-storage"]
[
"azure-flow-storage",
"gcs-flow-storage",
"s3-flow-storage",
"github-flow-storage",
]
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
)

self.logger.debug(f"Import paths: {self.import_paths}")
Expand Down Expand Up @@ -117,7 +122,7 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str:
)

if not isinstance(
StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3)
StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3, GitHub)
):
self.logger.error(
"Storage for flow run {} is not a supported type.".format(flow_run.id)
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .run import run as _run
from .server import server as _server
from .heartbeat import heartbeat as _heartbeat
from .register import register as _register


CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
Expand Down Expand Up @@ -66,6 +67,7 @@ def cli():
cli.add_command(_run)
cli.add_command(_server)
cli.add_command(_heartbeat)
cli.add_command(_register)


# Miscellaneous Commands
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/cli/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def cloud_flow():
for secret in storage.secrets:
secrets[secret] = PrefectSecret(name=secret).run()

with prefect.context(secrets=secrets):
with prefect.context(secrets=secrets, loading_flow=True):
flow = storage.get_flow(storage.flows[flow_data.name])
environment = flow.environment

Expand Down
67 changes: 67 additions & 0 deletions src/prefect/cli/register.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os

import click

import prefect
from prefect.utilities.storage import extract_flow_from_file


@click.group(hidden=True)
def register():
"""
Register flows
"""


@register.command(
hidden=True,
context_settings=dict(ignore_unknown_options=True, allow_extra_args=True),
)
@click.option(
"--file",
"-f",
required=True,
help="A file that contains a flow",
hidden=True,
default=None,
type=click.Path(exists=True),
)
@click.option(
"--name",
"-n",
required=False,
help="The `flow.name` to pull out of the file provided.",
hidden=True,
default=None,
)
@click.option(
"--project",
"-p",
required=False,
help="The name of a Prefect Cloud project to register this flow.",
hidden=True,
default=None,
)
def flow(file, name, project):
"""
Register a flow from a file. This call will pull a Flow object out of a `.py` file
and call `flow.register` on it.

\b
Options:
--file, -f TEXT The path to a local file which contains a flow [required]
--name, -n TEXT The `flow.name` to pull out of the file provided. If a name
is not provided then the first flow object found will be registered.
--project TEXT The name of a Prefect Cloud project to register this flow

\b
Examples:
$ prefect register flow --file my_flow.py --name My-Flow
"""

# Don't run extra `run` and `register` functions inside file
with prefect.context({"loading_flow": True}):
file_path = os.path.abspath(file)
flow_obj = extract_flow_from_file(file_path=file_path, flow_name=name)

flow_obj.register(project_name=project)
10 changes: 10 additions & 0 deletions src/prefect/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,11 @@ def run(
Returns:
- State: the state of the flow after its final run
"""
if prefect.context.get("loading_flow", False):
raise RuntimeError(
"Attempting to call `flow.run` during execution of flow file will lead to unexpected results."
)

# protect against old behavior
if "return_tasks" in kwargs:
raise ValueError(
Expand Down Expand Up @@ -1452,6 +1457,11 @@ def register(
Returns:
- str: the ID of the flow that was registered
"""
if prefect.context.get("loading_flow", False):
raise RuntimeError(
"Attempting to call `flow.register` during execution of flow file will lead to unexpected results."
)

if self.storage is None:
self.storage = get_default_storage_class()(**kwargs)

Expand Down
1 change: 1 addition & 0 deletions src/prefect/environments/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from prefect.environments.storage.azure import Azure
from prefect.environments.storage.gcs import GCS
from prefect.environments.storage.s3 import S3
from prefect.environments.storage.github import GitHub


def get_default_storage_class() -> type:
Expand Down
Loading