diff --git a/changes/pr2839.yaml b/changes/pr2839.yaml index 320c2bc64892..8d8a21e28007 100644 --- a/changes/pr2839.yaml +++ b/changes/pr2839.yaml @@ -18,8 +18,7 @@ # Here's an example of a PR that adds an enhancement enhancement: - - "Forward state change status back to core - [#2839](https://github.com/PrefectHQ/prefect/pull/2839)" - - contributor: - - "[Alex Cano](https://github.com/alexisprince1994)" - \ No newline at end of file + - "Forward state change status back to core - [#2839](https://github.com/PrefectHQ/prefect/pull/2839)" + +contributor: + - "[Alex Cano](https://github.com/alexisprince1994)" diff --git a/changes/pr2840.yaml b/changes/pr2840.yaml new file mode 100644 index 000000000000..f51b9aa071cc --- /dev/null +++ b/changes/pr2840.yaml @@ -0,0 +1,7 @@ +feature: + - "Flows can now be stored and executed using file-based storage - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" + +enhancement: + - "Add GitHub storage for storing flows as files in a GitHub repo - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" + - "Add `prefect register flow` CLI command for registering flows from files - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" + - "Add default `GITHUB_ACCESS_TOKEN` secret - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" diff --git a/docs/core/concepts/secrets.md b/docs/core/concepts/secrets.md index 7668e64d6b42..6d065d98b067 100644 --- a/docs/core/concepts/secrets.md +++ b/docs/core/concepts/secrets.md @@ -27,16 +27,16 @@ For example, given an environment with the environment variable `PREFECT__CONTEX ::: tab PrefectSecret ```python >>> from prefect.tasks.secrets import PrefectSecret ->>> p = PrefectSecret('foo') ->>> p.run() +>>> p = PrefectSecret('foo') +>>> p.run() 'mypassword' ``` ::: ::: tab Secret API ```python ->>> from prefect.client.secrets import Secret ->>> s = Secret("FOO") ->>> s.get() +>>> from prefect.client.secrets import Secret +>>> s = Secret("FOO") +>>> s.get() 'mypassword' ``` ::: @@ -74,6 +74,7 @@ The following is a list of the default names and contents of Prefect Secrets tha - `GCP_CREDENTIALS`: a dictionary containing a valid [Service Account Key](https://cloud.google.com/docs/authentication/getting-started) - `AWS_CREDENTIALS`: a dictionary containing two required keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY`, and an optional `SESSION_TOKEN`, which are passed directly to [the `boto3` client](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html) +- `GITHUB_ACCESS_TOKEN`: a string value of a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) For example, when using local secrets, your Prefect installation can be configured to authenticate to AWS automatically by adding that specific `AWS_CREDENTIALS` key value pair into your secrets context like so: diff --git a/docs/core/idioms/file-based.md b/docs/core/idioms/file-based.md new file mode 100644 index 000000000000..75833d8d3388 --- /dev/null +++ b/docs/core/idioms/file-based.md @@ -0,0 +1,69 @@ +# Using file based flow storage + +Prefect version `0.12.1` began to implement support for storing flows as paths to files. This means that flow code can change in between (or even during) runs without needing to be reregistered. As long as the structure of the flow itself does not change, only the task content, then a Prefect API backend will be able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD processes, and more! + +### Example file based workflow + +In this example we will walk through a potential workflow you may use when registering flows with [GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub repository with the following structure: + +``` +repo + README.md + flows/ + my_flow.py +``` + +First, compose your flow file and give the flow `GitHub` storage: + +```python +# flows/my_flow.py + +from prefect import task, Flow +from prefect.environments.storage import GitHub + +@task +def get_data(): + return [1, 2, 3, 4, 5] + +@task +def print_data(data): + print(data) + +with Flow("file-based-flow") as flow: + data = get_data() + print_data(data) + +flow.storage = GitHub( + repo="org/repo", # name of repo + path="flows/my_flow.py", # location of flow file in repo + secrets=["GITHUB_ACCESS_TOKEN"] # name of personal access token secret +) +``` + +Here's a breakdown of the three kwargs set on the `GitHub` storage: + +- `repo`: the name of the repo that this code will live in +- `path`: the location of the flow file in the repo. This must be an exact match to the path of the file. +- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo. + +Push this code to the repository: + +```bash +git add . +git commit -m 'Add my flow' +git push +``` + +Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either Core's server or Prefect Cloud). + +```bash +prefect register -f flows/my_flow.py +Result check: OK +Flow: http://localhost:8080/flow/9f5f7bea-186e-44d1-a746-417239663614 +``` + +The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all you need to do is commit that code to the same location in the repository and each subsequent run will use that code. + +::: warning Flow Structure +If you change any of the structure of your flow such as task names, rearrange task order, etc. then you will need to reregister that flow. +::: diff --git a/docs/core/idioms/idioms.md b/docs/core/idioms/idioms.md index f199e4b3cfb6..e9b79cba8577 100644 --- a/docs/core/idioms/idioms.md +++ b/docs/core/idioms/idioms.md @@ -8,3 +8,4 @@ - [Testing Prefect flows and tasks](testing-flows.html) - [Using Result targets for efficient caching](targets.html) - [Configuring notifications](notifications.html) +- [Using file based flow storage](file-based.html) \ No newline at end of file diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index 4667a65b153d..f61c9384d8cc 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -29,7 +29,7 @@ Additionally, in more recent releases of Core your flow will default to using a ## Azure Blob Storage -[Azure Storage](/api/latest/environments/storage.html#azure) is a storage option that uploads flows to an Azure Blob container. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from that Azure Blob container using a connection string or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[Azure Storage](/api/latest/environments/storage.html#azure) is a storage option that uploads flows to an Azure Blob container. ```python from prefect import Flow @@ -54,7 +54,7 @@ Azure Storage uses an Azure [connection string](https://docs.microsoft.com/en-us ## AWS S3 -[S3 Storage](/api/latest/environments/storage.html#s3) is a storage option that uploads flows to an AWS S3 bucket. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from an S3 bucket or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[S3 Storage](/api/latest/environments/storage.html#s3) is a storage option that uploads flows to an AWS S3 bucket. ```python from prefect import Flow @@ -79,7 +79,7 @@ S3 Storage uses AWS credentials the same way as [boto3](https://boto3.amazonaws. ## Google Cloud Storage -[GCS Storage](/api/latest/environments/storage.html#gcs) is a storage option that uploads flows to a Google Cloud Storage bucket. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from a GCS bucket or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[GCS Storage](/api/latest/environments/storage.html#gcs) is a storage option that uploads flows to a Google Cloud Storage bucket. ```python from prefect import Flow @@ -102,6 +102,20 @@ Additionally, in more recent releases of Core your flow will default to using a GCS Storage uses Google Cloud credentials the same way as the standard [google.cloud library](https://cloud.google.com/docs/authentication/production#auth-cloud-implicit-python) which means both upload (build) and download (local agent) times need to have the proper Google Application Credentials configuration. ::: +## GitHub + +[GitHub Storage](/api/latest/environments/storage.html#github) is a storage option that uploads flows to a GitHub repository as `.py` files. + +For a detailed look on how to use GitHub storage visit the [Using file based storage](/core/idioms/file-based.html) idiom. + +::: tip Sensible Defaults +Flows registered with this storage option will automatically be labeled with `"github-flow-storage"`; this helps prevents agents not explicitly authenticated with your GitHub repo from attempting to run this flow. +::: + +:::tip GitHub Credentials +GitHub storage uses a [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) for authenticating with repositories. +::: + ## Docker [Docker Storage](/api/latest/environments/storage.html#docker) is a storage option that puts flows inside of a Docker image and pushes them to a container registry. This method of Storage has deployment compatability with the [Docker Agent](/orchestration/agents/docker.html), [Kubernetes Agent](/orchestration/agents/kubernetes.html), and [Fargate Agent](/orchestration/agents/fargate.html). diff --git a/docs/outline.toml b/docs/outline.toml index 59aae85fb0c8..580f673917ec 100644 --- a/docs/outline.toml +++ b/docs/outline.toml @@ -179,7 +179,7 @@ classes = ["CloudFlowRunner", "CloudTaskRunner"] [pages.environments.storage] title = "Storage" module = "prefect.environments.storage" -classes = ["Docker", "Local", "S3", "GCS", "Azure"] +classes = ["Docker", "Local", "S3", "GCS", "Azure", "GitHub"] [pages.environments.execution] title = "Execution Environments" diff --git a/setup.py b/setup.py index cbd678539b70..f91c7fbad339 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ def run(self): "google-cloud-bigquery >= 1.6.0, < 2.0", "google-cloud-storage >= 1.13, < 2.0", ], + "github": ["PyGithub >= 1.51, < 2.0"], "google": [ "google-cloud-bigquery >= 1.6.0, < 2.0", "google-cloud-storage >= 1.13, < 2.0", diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index e5fe735cb6e1..a6ecd7ae1fae 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -6,7 +6,7 @@ from prefect import config from prefect.agent import Agent -from prefect.environments.storage import GCS, S3, Azure, Local +from prefect.environments.storage import GCS, S3, Azure, Local, GitHub from prefect.serialization.storage import StorageSchema from prefect.utilities.graphql import GraphQLResult @@ -83,7 +83,12 @@ def __init__( assert isinstance(self.labels, list) self.labels.append(hostname) self.labels.extend( - ["azure-flow-storage", "gcs-flow-storage", "s3-flow-storage"] + [ + "azure-flow-storage", + "gcs-flow-storage", + "s3-flow-storage", + "github-flow-storage", + ] ) self.logger.debug(f"Import paths: {self.import_paths}") @@ -117,7 +122,7 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str: ) if not isinstance( - StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3) + StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3, GitHub) ): self.logger.error( "Storage for flow run {} is not a supported type.".format(flow_run.id) diff --git a/src/prefect/cli/__init__.py b/src/prefect/cli/__init__.py index e40f561c37b6..1826cc8e5a88 100644 --- a/src/prefect/cli/__init__.py +++ b/src/prefect/cli/__init__.py @@ -15,6 +15,7 @@ from .run import run as _run from .server import server as _server from .heartbeat import heartbeat as _heartbeat +from .register import register as _register CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) @@ -66,6 +67,7 @@ def cli(): cli.add_command(_run) cli.add_command(_server) cli.add_command(_heartbeat) +cli.add_command(_register) # Miscellaneous Commands diff --git a/src/prefect/cli/execute.py b/src/prefect/cli/execute.py index c56c6dcd586d..d55ea4897699 100644 --- a/src/prefect/cli/execute.py +++ b/src/prefect/cli/execute.py @@ -65,7 +65,7 @@ def cloud_flow(): for secret in storage.secrets: secrets[secret] = PrefectSecret(name=secret).run() - with prefect.context(secrets=secrets): + with prefect.context(secrets=secrets, loading_flow=True): flow = storage.get_flow(storage.flows[flow_data.name]) environment = flow.environment diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py new file mode 100644 index 000000000000..f2eb9d546305 --- /dev/null +++ b/src/prefect/cli/register.py @@ -0,0 +1,67 @@ +import os + +import click + +import prefect +from prefect.utilities.storage import extract_flow_from_file + + +@click.group(hidden=True) +def register(): + """ + Register flows + """ + + +@register.command( + hidden=True, + context_settings=dict(ignore_unknown_options=True, allow_extra_args=True), +) +@click.option( + "--file", + "-f", + required=True, + help="A file that contains a flow", + hidden=True, + default=None, + type=click.Path(exists=True), +) +@click.option( + "--name", + "-n", + required=False, + help="The `flow.name` to pull out of the file provided.", + hidden=True, + default=None, +) +@click.option( + "--project", + "-p", + required=False, + help="The name of a Prefect Cloud project to register this flow.", + hidden=True, + default=None, +) +def flow(file, name, project): + """ + Register a flow from a file. This call will pull a Flow object out of a `.py` file + and call `flow.register` on it. + + \b + Options: + --file, -f TEXT The path to a local file which contains a flow [required] + --name, -n TEXT The `flow.name` to pull out of the file provided. If a name + is not provided then the first flow object found will be registered. + --project TEXT The name of a Prefect Cloud project to register this flow + + \b + Examples: + $ prefect register flow --file my_flow.py --name My-Flow + """ + + # Don't run extra `run` and `register` functions inside file + with prefect.context({"loading_flow": True}): + file_path = os.path.abspath(file) + flow_obj = extract_flow_from_file(file_path=file_path, flow_name=name) + + flow_obj.register(project_name=project) diff --git a/src/prefect/core/flow.py b/src/prefect/core/flow.py index 849bf6b5e78b..174e7a423996 100644 --- a/src/prefect/core/flow.py +++ b/src/prefect/core/flow.py @@ -1068,6 +1068,11 @@ def run( Returns: - State: the state of the flow after its final run """ + if prefect.context.get("loading_flow", False): + raise RuntimeError( + "Attempting to call `flow.run` during execution of flow file will lead to unexpected results." + ) + # protect against old behavior if "return_tasks" in kwargs: raise ValueError( @@ -1452,6 +1457,11 @@ def register( Returns: - str: the ID of the flow that was registered """ + if prefect.context.get("loading_flow", False): + raise RuntimeError( + "Attempting to call `flow.register` during execution of flow file will lead to unexpected results." + ) + if self.storage is None: self.storage = get_default_storage_class()(**kwargs) diff --git a/src/prefect/environments/storage/__init__.py b/src/prefect/environments/storage/__init__.py index 2402f1690db7..75ae45e951d6 100644 --- a/src/prefect/environments/storage/__init__.py +++ b/src/prefect/environments/storage/__init__.py @@ -21,6 +21,7 @@ from prefect.environments.storage.azure import Azure from prefect.environments.storage.gcs import GCS from prefect.environments.storage.s3 import S3 +from prefect.environments.storage.github import GitHub def get_default_storage_class() -> type: diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py new file mode 100644 index 000000000000..6a4ce565cf97 --- /dev/null +++ b/src/prefect/environments/storage/github.py @@ -0,0 +1,131 @@ +from typing import TYPE_CHECKING, Any, Dict, List + + +from prefect.environments.storage import Storage +from prefect.utilities.storage import extract_flow_from_file + +if TYPE_CHECKING: + from prefect.core.flow import Flow + + +class GitHub(Storage): + """ + GitHub storage class. This class represents the Storage interface for Flows stored + in `.py` files in a GitHub repository. + + This class represents a mapping of flow name to file paths contained in the git repo, + meaning that all flow files should be pushed independently. A typical workflow using + this storage type might look like the following: + + - Compose flow `.py` file where flow has GitHub storage: + + ```python + flow = Flow("my-flow") + flow.storage = GitHub(repo="my/repo", path="/flows/flow.py") + ``` + + - Push this `flow.py` file to the `my/repo` repository under `/flows/flow.py`. + + - Call `prefect register -f flow.py` to register this flow with GitHub storage. + + Args: + - repo (str): the name of a GitHub repository to store this Flow + - path (str, optional): a path pointing to a flow file in the repo + - **kwargs (Any, optional): any additional `Storage` initialization options + """ + + def __init__(self, repo: str, path: str = None, **kwargs: Any) -> None: + self.flows = dict() # type: Dict[str, str] + self._flows = dict() # type: Dict[str, "Flow"] + self.repo = repo + self.path = path + + super().__init__(**kwargs) + + @property + def default_labels(self) -> List[str]: + return ["github-flow-storage"] + + def get_flow(self, flow_location: str) -> "Flow": + """ + Given a flow_location within this Storage object, returns the underlying Flow (if possible). + If the Flow is not found an error will be logged and `None` will be returned. + + Args: + - flow_location (str): the location of a flow within this Storage; in this case, + a file path on a repository where a Flow file has been committed + + Returns: + - Flow: the requested Flow + + Raises: + - UnknownObjectException: if the Flow file is unable to be retrieved + """ + from github import UnknownObjectException + + repo = self._github_client.get_repo(self.repo) + + try: + contents = repo.get_contents(flow_location) + decoded_contents = contents.decoded_content + except UnknownObjectException as exc: + self.logger.error( + "Error retrieving file contents from {} on repo {}. Ensure the file exists.".format( + flow_location, self.repo + ) + ) + raise exc + + return extract_flow_from_file(file_contents=decoded_contents) + + def add_flow(self, flow: "Flow") -> str: + """ + Method for storing a new flow as bytes in the local filesytem. + + Args: + - flow (Flow): a Prefect Flow to add + + Returns: + - str: the location of the added flow in the repo + + Raises: + - ValueError: if a flow with the same name is already contained in this storage + """ + if flow.name in self: + raise ValueError( + 'Name conflict: Flow with the name "{}" is already present in this storage.'.format( + flow.name + ) + ) + + self.flows[flow.name] = self.path # type: ignore + self._flows[flow.name] = flow + return self.path # type: ignore + + def build(self) -> "Storage": + """ + Build the GitHub storage object and run basic healthchecks. Due to this object + supporting file based storage no files are committed to the repository during + this step. Instead, all files should be committed independently. + + Returns: + - Storage: a GitHub object that contains information about how and where + each flow is stored + """ + self.run_basic_healthchecks() + + return self + + def __contains__(self, obj: Any) -> bool: + """ + Method for determining whether an object is contained within this storage. + """ + if not isinstance(obj, str): + return False + return obj in self.flows + + @property + def _github_client(self): # type: ignore + from prefect.utilities.git import get_github_client + + return get_github_client() diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 9b265a096de9..7483c8023ff2 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -2,7 +2,7 @@ from marshmallow import fields, post_load -from prefect.environments.storage import GCS, S3, Azure, Docker, Local, Storage +from prefect.environments.storage import GCS, S3, Azure, Docker, Local, Storage, GitHub from prefect.utilities.serialization import JSONCompatible, ObjectSchema, OneOfSchema @@ -102,6 +102,23 @@ def create_object(self, data: dict, **kwargs: Any) -> S3: return base_obj +class GitHubSchema(ObjectSchema): + class Meta: + object_class = GitHub + + repo = fields.String(allow_none=False) + path = fields.String(allow_none=True) + flows = fields.Dict(key=fields.Str(), values=fields.Str()) + secrets = fields.List(fields.Str(), allow_none=True) + + @post_load + def create_object(self, data: dict, **kwargs: Any) -> GitHub: + flows = data.pop("flows", dict()) + base_obj = super().create_object(data) + base_obj.flows = flows + return base_obj + + class StorageSchema(OneOfSchema): """ Field that chooses between several nested schemas @@ -115,4 +132,5 @@ class StorageSchema(OneOfSchema): "Local": LocalSchema, "Storage": BaseStorageSchema, "S3": S3Schema, + "GitHub": GitHubSchema, } diff --git a/src/prefect/utilities/git.py b/src/prefect/utilities/git.py new file mode 100644 index 000000000000..441040aadf4d --- /dev/null +++ b/src/prefect/utilities/git.py @@ -0,0 +1,36 @@ +""" +Utility functions for interacting with git. +""" +import os +import prefect + +from github import Github +from typing import Any + + +def get_github_client(credentials: dict = None, **kwargs: Any) -> "Github": + """ + Utility function for loading github client objects from a given set of credentials. + + Args: + - credentials (dict, optional): a dictionary of AWS credentials used to initialize the Client; if + not provided, will attempt to load the Client using ambient environment settings + - **kwargs (Any, optional): additional keyword arguments to pass to the github Client + + Returns: + - Client: an initialized and authenticated github Client + """ + access_token = None + + if credentials: + access_token = credentials.get("GITHUB_ACCESS_TOKEN") + else: + access_token = prefect.context.get("secrets", {}).get( + "GITHUB_ACCESS_TOKEN", None + ) + + # Attempt to grab out of env if not provided directly or through Prefect Secret + if not access_token: + access_token = os.getenv("GITHUB_ACCESS_TOKEN", None) + + return Github(access_token, **kwargs) diff --git a/src/prefect/utilities/storage.py b/src/prefect/utilities/storage.py index 8bc3bc650c20..74ed34845c75 100644 --- a/src/prefect/utilities/storage.py +++ b/src/prefect/utilities/storage.py @@ -33,3 +33,51 @@ def get_flow_image(flow: "Flow") -> str: ) return storage.name + + +def extract_flow_from_file( + file_path: str = None, file_contents: str = None, flow_name: str = None +) -> "Flow": + """ + Extract a flow object from a file. + + Args: + - file_path (str, optional): A file path pointing to a .py file containing a flow + - file_contents (str, optional): The string contents of a .py file containing a flow + - flow_name (str, optional): A specific name of a flow to extract from a file. + If not set then the first flow object retrieved from file will be returned. + + Returns: + - Flow: A flow object extracted from a file + + Raises: + - ValueError: if both `file_path` and `file_contents` are provided or neither are. + """ + if file_path and file_contents: + raise ValueError("Provide either `file_path` or `file_contents` but not both.") + + if not file_path and not file_contents: + raise ValueError("Provide either `file_path` or `file_contents`.") + + # Read file contents + if file_path: + with open(file_path, "r") as f: + contents = f.read() + + # Use contents directly + if file_contents: + contents = file_contents + + # Load objects from file into dict + exec_vals = {} # type: ignore + exec(contents, exec_vals) + + # Grab flow name from values loaded via exec + for var in exec_vals: + if isinstance(exec_vals[var], prefect.Flow): + if flow_name and exec_vals[var].name == flow_name: + return exec_vals[var] + elif not flow_name: + return exec_vals[var] + + raise ValueError(f"No flow found in file.") diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index 53733ace62aa..fd1c2e5fbf1b 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -20,6 +20,7 @@ def test_local_agent_init(runner_token): "azure-flow-storage", "s3-flow-storage", "gcs-flow-storage", + "github-flow-storage", } assert agent.name == "agent" @@ -44,6 +45,7 @@ def test_local_agent_config_options(runner_token): "azure-flow-storage", "s3-flow-storage", "gcs-flow-storage", + "github-flow-storage", "test_label", } @@ -68,6 +70,7 @@ def test_local_agent_config_options_hostname(runner_token): "azure-flow-storage", "s3-flow-storage", "gcs-flow-storage", + "github-flow-storage", } @@ -136,6 +139,7 @@ def test_populate_env_vars(runner_token): "azure-flow-storage", "gcs-flow-storage", "s3-flow-storage", + "github-flow-storage", ] ), "PREFECT__CONTEXT__FLOW_RUN_ID": "id", @@ -175,6 +179,7 @@ def test_populate_env_vars_includes_agent_labels(runner_token): "azure-flow-storage", "gcs-flow-storage", "s3-flow-storage", + "github-flow-storage", ] ), "PREFECT__CONTEXT__FLOW_RUN_ID": "id", diff --git a/tests/cli/test_register.py b/tests/cli/test_register.py new file mode 100644 index 000000000000..99f62f8b3ff5 --- /dev/null +++ b/tests/cli/test_register.py @@ -0,0 +1,46 @@ +import os +import tempfile +from unittest.mock import MagicMock + +from click.testing import CliRunner + +from prefect.cli.register import register + + +def test_register_init(): + runner = CliRunner() + result = runner.invoke(register) + assert result.exit_code == 0 + assert "Register flows" in result.output + + +def test_register_help(): + runner = CliRunner() + result = runner.invoke(register, ["--help"]) + assert result.exit_code == 0 + assert "Register flows" in result.output + + +def test_register_flow(): + runner = CliRunner() + result = runner.invoke(register, ["flow", "--help"]) + assert result.exit_code == 0 + assert "Register a flow" in result.output + + +def test_register_flow_kwargs(monkeypatch, tmpdir): + monkeypatch.setattr("prefect.Client", MagicMock()) + + contents = """from prefect import Flow\nf=Flow('test-flow')""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + runner = CliRunner() + result = runner.invoke( + register, + ["flow", "--file", full_path, "--name", "test-flow", "--project", "project",], + ) + assert result.exit_code == 0 diff --git a/tests/environments/storage/test_github_storage.py b/tests/environments/storage/test_github_storage.py new file mode 100644 index 000000000000..4dcc6caadc99 --- /dev/null +++ b/tests/environments/storage/test_github_storage.py @@ -0,0 +1,87 @@ +from unittest.mock import MagicMock + +import pytest + +from prefect import context, Flow +from prefect.environments.storage import GitHub + +pytest.importorskip("github") + + +def test_create_github_storage(): + storage = GitHub(repo="test/repo") + assert storage + assert storage.logger + + +def test_create_github_storage_init_args(): + storage = GitHub(repo="test/repo", path="flow.py", secrets=["auth"],) + assert storage + assert storage.flows == dict() + assert storage.repo == "test/repo" + assert storage.path == "flow.py" + assert storage.secrets == ["auth"] + + +def test_serialize_github_storage(): + storage = GitHub(repo="test/repo", path="flow.py", secrets=["auth"],) + serialized_storage = storage.serialize() + + assert serialized_storage["type"] == "GitHub" + assert serialized_storage["repo"] == "test/repo" + assert serialized_storage["path"] == "flow.py" + assert serialized_storage["secrets"] == ["auth"] + + +def test_github_client_property(monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + + storage = GitHub(repo="test/repo") + + credentials = "ACCESS_TOKEN" + with context(secrets=dict(GITHUB_ACCESS_TOKEN=credentials)): + github_client = storage._github_client + assert github_client + github.assert_called_with("ACCESS_TOKEN",) + + +def test_add_flow_to_github_storage(): + storage = GitHub(repo="test/repo", path="flow.py") + + f = Flow("test") + assert f.name not in storage + assert storage.add_flow(f) == "flow.py" + assert f.name in storage + + +def test_add_flow_to_github_already_added(): + storage = GitHub(repo="test/repo", path="flow.py") + + f = Flow("test") + assert f.name not in storage + assert storage.add_flow(f) == "flow.py" + assert f.name in storage + + with pytest.raises(ValueError): + storage.add_flow(f) + + +def test_get_flow_github(monkeypatch): + f = Flow("test") + + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + + monkeypatch.setattr( + "prefect.environments.storage.github.extract_flow_from_file", + MagicMock(return_value=f), + ) + + storage = GitHub(repo="test/repo", path="flow") + + assert f.name not in storage + flow_location = storage.add_flow(f) + + new_flow = storage.get_flow(flow_location) + assert new_flow.run() diff --git a/tests/utilities/test_git.py b/tests/utilities/test_git.py new file mode 100644 index 000000000000..06cd0f520799 --- /dev/null +++ b/tests/utilities/test_git.py @@ -0,0 +1,38 @@ +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("github") + +import prefect +from prefect.utilities.git import get_github_client +from prefect.utilities.configuration import set_temporary_config + + +class TestGetGitHubClient: + def test_uses_context_secrets(self, monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + with set_temporary_config({"cloud.use_local_secrets": True}): + with prefect.context(secrets=dict(GITHUB_ACCESS_TOKEN="ACCESS_TOKEN")): + get_github_client() + assert github.call_args[0][0] == "ACCESS_TOKEN" + + def test_prefers_passed_credentials_over_secrets(self, monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + desired_credentials = {"GITHUB_ACCESS_TOKEN": "PROVIDED_KEY"} + with set_temporary_config({"cloud.use_local_secrets": True}): + with prefect.context(secrets=dict(GITHUB_ACCESS_TOKEN="ACCESS_TOKEN")): + get_github_client(credentials=desired_credentials) + assert github.call_args[0][0] == "PROVIDED_KEY" + + def test_creds_default_to_environment(self, monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + get_github_client() + assert github.call_args[0][0] == None + + monkeypatch.setenv("GITHUB_ACCESS_TOKEN", "TOKEN") + get_github_client() + assert github.call_args[0][0] == "TOKEN" diff --git a/tests/utilities/test_storage.py b/tests/utilities/test_storage.py index e5c95ae7fdc1..2ba1f141b338 100644 --- a/tests/utilities/test_storage.py +++ b/tests/utilities/test_storage.py @@ -1,9 +1,13 @@ +import os +import tempfile + import pytest +import prefect from prefect import Flow from prefect.environments import LocalEnvironment from prefect.environments.storage import Docker, Local -from prefect.utilities.storage import get_flow_image +from prefect.utilities.storage import get_flow_image, extract_flow_from_file def test_get_flow_image_docker_storage(): @@ -30,3 +34,54 @@ def test_get_flow_image_raises_on_missing_info(): flow = Flow("test", environment=LocalEnvironment(), storage=Local(),) with pytest.raises(ValueError): image = get_flow_image(flow=flow) + + +def test_extract_flow_from_file(tmpdir): + contents = """from prefect import Flow\nf=Flow('test-flow')""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + flow = extract_flow_from_file(file_path=full_path) + assert flow.run().is_successful() + + flow = extract_flow_from_file(file_contents=contents) + assert flow.run().is_successful() + + flow = extract_flow_from_file(file_path=full_path, flow_name="test-flow") + assert flow.run().is_successful() + + with pytest.raises(ValueError): + extract_flow_from_file(file_path=full_path, flow_name="not-real") + + with pytest.raises(ValueError): + extract_flow_from_file(file_path=full_path, file_contents=contents) + + with pytest.raises(ValueError): + extract_flow_from_file() + + +def test_extract_flow_from_file_raises_on_run_register(tmpdir): + contents = """from prefect import Flow\nf=Flow('test-flow')\nf.run()""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + with prefect.context({"loading_flow": True}): + with pytest.raises(RuntimeError): + extract_flow_from_file(file_path=full_path) + + contents = """from prefect import Flow\nf=Flow('test-flow')\nf.register()""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + with prefect.context({"loading_flow": True}): + with pytest.raises(RuntimeError): + extract_flow_from_file(file_path=full_path)