From e74c424dc04ff58bb9d76fd82a8a124eb8feeee0 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Wed, 17 Jun 2020 10:37:01 -0400 Subject: [PATCH 01/20] Working flow save and load for Local storage as file --- src/prefect/environments/storage/local.py | 96 ++++++++++++++++++++--- 1 file changed, 86 insertions(+), 10 deletions(-) diff --git a/src/prefect/environments/storage/local.py b/src/prefect/environments/storage/local.py index 16f8d0ae10a5..294bba0e898a 100644 --- a/src/prefect/environments/storage/local.py +++ b/src/prefect/environments/storage/local.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import socket from typing import TYPE_CHECKING, Any, Dict, List @@ -33,7 +34,11 @@ class Local(Storage): """ def __init__( - self, directory: str = None, validate: bool = True, **kwargs: Any + self, + directory: str = None, + validate: bool = True, + store_as_pickle: bool = True, + **kwargs: Any, ) -> None: directory = directory or os.path.join(prefect.config.home_dir, "flows") self.flows = dict() # type: Dict[str, str] @@ -46,6 +51,9 @@ def __init__( else: abs_directory = directory + # This needs to be added to the serializers + self.store_as_pickle = store_as_pickle + self.directory = abs_directory result = LocalResult(self.directory, validate_dir=validate) super().__init__(result=result, **kwargs) @@ -71,10 +79,26 @@ def get_flow(self, flow_location: str) -> "Flow": Raises: - ValueError: if the flow is not contained in this storage """ - if not flow_location in self.flows.values(): - raise ValueError("Flow is not contained in this Storage") - return prefect.core.flow.Flow.load(flow_location) + if not self.store_as_pickle: + with open(flow_location, "r") as file: + contents = file.read() + + exec_vals = {} + exec(contents, exec_vals) + + # Grab flow name from values loaded via exec + # This will only work with one flow per file + for i in exec_vals: + if isinstance(exec_vals[i], prefect.Flow): + return exec_vals[i] + else: + # Temporarily moving this value error due to having to save in one file + # and load in another + # We need to figure out a good workflow for this + if not flow_location in self.flows.values(): + raise ValueError("Flow is not contained in this Storage") + return prefect.core.flow.Flow.load(flow_location) def add_flow(self, flow: "Flow") -> str: """ @@ -96,12 +120,64 @@ def add_flow(self, flow: "Flow") -> str: ) ) - flow_location = os.path.join( - self.directory, "{}.prefect".format(slugify(flow.name)) - ) - flow_location = flow.save(flow_location) - self.flows[flow.name] = flow_location - self._flows[flow.name] = flow + import inspect + import re + + if not self.store_as_pickle: + # Grab file off the stack, however this will only work if calling f.serialize(build=True) + # when calling .add_flow directly it will be [1] level down the stack + frame = inspect.stack()[2] + filename = frame[1] + path_to_flow_file = os.path.abspath(filename) + + # Grab contents of file + with open(path_to_flow_file, "r") as file: + contents = file.read() + + # Need to strip out things which could cause circular problems + # In this case I'm calling serialize(build=True) directly and we don't want this + contents = re.sub(r"^.*\b(serialize)\b.*$", "", contents, flags=re.M) + + # Save to where flows normally save + # Could be merged with flow.save where this is ripped from + path = "{home}/flows".format(home=prefect.context.config.home_dir) + fpath = Path(os.path.expanduser(path)) / f"{filename}" + assert fpath is not None # mypy assert + fpath.parent.mkdir(exist_ok=True, parents=True) + + flow_location = str(fpath) + + # Write contents to file, should be moved to flow.save somehow + with open(flow_location, "w") as file: + file.write(contents) + + + # Exec the file to get the flow name + exec_vals = {} + exec(contents, exec_vals) + + # Grab flow name from values loaded via exec + # This will only work with one flow per file + for i in exec_vals: + if isinstance(exec_vals[i], prefect.Flow): + flow_name = exec_vals[i].name + flow = exec_vals[i] + break + + # Update self.flows with file path + self.flows[flow_name] = flow_location + self._flows[flow_name] = flow + else: + # TODO: This logic needs to be moved to build to follow other storage types + flow_location = os.path.join( + self.directory, "{}.prefect".format(slugify(flow.name)) + ) + flow_location = flow.save(flow_location) + self.flows[flow.name] = flow_location + self._flows[flow.name] = flow + + # Temporary convenience print + print(flow_location) return flow_location def __contains__(self, obj: Any) -> bool: From 2941ade9e088c62452d38a06c14c09cd2b8484ab Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Wed, 17 Jun 2020 13:09:46 -0400 Subject: [PATCH 02/20] Checkpoint comments --- src/prefect/environments/storage/local.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/prefect/environments/storage/local.py b/src/prefect/environments/storage/local.py index 294bba0e898a..c3d83a3ee6ed 100644 --- a/src/prefect/environments/storage/local.py +++ b/src/prefect/environments/storage/local.py @@ -136,10 +136,13 @@ def add_flow(self, flow: "Flow") -> str: # Need to strip out things which could cause circular problems # In this case I'm calling serialize(build=True) directly and we don't want this + # exec may not run __main__ which could resolve this? TBD contents = re.sub(r"^.*\b(serialize)\b.*$", "", contents, flags=re.M) # Save to where flows normally save # Could be merged with flow.save where this is ripped from + # When we register a script, we shouldn't move it and instead run the + # flow from that workdir. Helps with import problems. path = "{home}/flows".format(home=prefect.context.config.home_dir) fpath = Path(os.path.expanduser(path)) / f"{filename}" assert fpath is not None # mypy assert @@ -151,8 +154,8 @@ def add_flow(self, flow: "Flow") -> str: with open(flow_location, "w") as file: file.write(contents) - # Exec the file to get the flow name + # when giving dict to exec, contents are not run with main exec_vals = {} exec(contents, exec_vals) From 7683e06dc4424770d502e4cccaa948afafa969ea Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Thu, 18 Jun 2020 17:25:01 -0400 Subject: [PATCH 03/20] Implement base GitHub storage POC --- src/prefect/cli/__init__.py | 2 + src/prefect/cli/register.py | 64 +++++++++ src/prefect/environments/storage/__init__.py | 1 + src/prefect/environments/storage/github.py | 133 +++++++++++++++++++ 4 files changed, 200 insertions(+) create mode 100644 src/prefect/cli/register.py create mode 100644 src/prefect/environments/storage/github.py diff --git a/src/prefect/cli/__init__.py b/src/prefect/cli/__init__.py index e40f561c37b6..1826cc8e5a88 100644 --- a/src/prefect/cli/__init__.py +++ b/src/prefect/cli/__init__.py @@ -15,6 +15,7 @@ from .run import run as _run from .server import server as _server from .heartbeat import heartbeat as _heartbeat +from .register import register as _register CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) @@ -66,6 +67,7 @@ def cli(): cli.add_command(_run) cli.add_command(_server) cli.add_command(_heartbeat) +cli.add_command(_register) # Miscellaneous Commands diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py new file mode 100644 index 000000000000..b71b59728497 --- /dev/null +++ b/src/prefect/cli/register.py @@ -0,0 +1,64 @@ +import os + +import click + +import prefect + +@click.group(hidden=True) +def register(): + """ + Register flows + """ + + +@register.command( + hidden=True, + context_settings=dict(ignore_unknown_options=True, allow_extra_args=True), +) +@click.option( + "--file", + "-f", + required=False, + help="A file that contains a flow", + hidden=True, + default=None, + type=click.Path(exists=True) +) +def flow(file): + """ + Register a flow + """ + + file_path = os.path.abspath(file) + + # Grab contents of file + with open(file_path, "r") as f: + contents = f.read() + + # Exec the file to get the flow name + # when giving dict to exec, contents are not run with main + exec_vals = {} + exec(contents, exec_vals) + + # Grab flow name from values loaded via exec + # This will only work with one flow per file, could add name option + for i in exec_vals: + if isinstance(exec_vals[i], prefect.Flow): + flow_name = exec_vals[i].name + flow = exec_vals[i] + break + + + # print(flow.storage.flows) + # return + # Add to storage (will replace with add_flow) + # Temp until Local is updated to move write to build + # flow.storage.flows[flow_name] = file_path + # flow.storage._flows[flow_name] = flow + flow.storage.add_flow(file=file, file_path=file_path) + + # Replace with register + flow.storage.build() + # flow.serialize(build=True) + + print("Registered!") \ No newline at end of file diff --git a/src/prefect/environments/storage/__init__.py b/src/prefect/environments/storage/__init__.py index 2402f1690db7..75ae45e951d6 100644 --- a/src/prefect/environments/storage/__init__.py +++ b/src/prefect/environments/storage/__init__.py @@ -21,6 +21,7 @@ from prefect.environments.storage.azure import Azure from prefect.environments.storage.gcs import GCS from prefect.environments.storage.s3 import S3 +from prefect.environments.storage.github import GitHub def get_default_storage_class() -> type: diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py new file mode 100644 index 000000000000..63dc8085b6f4 --- /dev/null +++ b/src/prefect/environments/storage/github.py @@ -0,0 +1,133 @@ +import io +from typing import TYPE_CHECKING, Any, Dict, List + +import cloudpickle +import pendulum +from slugify import slugify + +import prefect +from prefect.engine.results import S3Result +from prefect.environments.storage import Storage + +if TYPE_CHECKING: + from prefect.core.flow import Flow + + +class GitHub(Storage): + """ + GitHub storage class + """ + + def __init__( + self, repo: str, **kwargs: Any + ) -> None: + self.flows = dict() # type: Dict[str, str] + self._flows = dict() # type: Dict[str, "Flow"] + self.repo = repo + + super().__init__(**kwargs) + + @property + def default_labels(self) -> List[str]: + return ["github-flow-storage"] + + def get_flow(self, file: str) -> "Flow": + """ + Given a flow_location within this Storage object, returns the underlying Flow (if possible). + If the Flow is not found an error will be logged and `None` will be returned. + + Args: + - flow_location (str): the location of a flow within this Storage; in this case, + a file path where a Flow has been serialized to + + Returns: + - Flow: the requested Flow + + Raises: + - ValueError: if the Flow is not contained in this storage + - botocore.ClientError: if there is an issue downloading the Flow from S3 + """ + repo = self._github_client.get_repo(self.repo) + + contents = repo.get_contents(file) + f = contents.decoded_content + + exec_vals = {} + exec(f, exec_vals) + + # Grab flow name from values loaded via exec + # This will only work with one flow per file + for i in exec_vals: + if isinstance(exec_vals[i], prefect.Flow): + return exec_vals[i] + # return cloudpickle.loads(output) + + def add_flow(self, file: str, file_path: str) -> str: + """ + Method for storing a new flow as bytes in the local filesytem. + + Args: + - flow (Flow): a Prefect Flow to add + + Returns: + - str: the location of the newly added flow in this Storage object + + Raises: + - ValueError: if a flow with the same name is already contained in this storage + """ + if file in self: + raise ValueError( + 'Name conflict: Flow with the name "{}" is already present in this storage.'.format( + file + ) + ) + + # the file name should be able to be specified or we default to the name of the original .py file + self.flows[file] = file_path + return file + + def build(self) -> "Storage": + """ + Build the S3 storage object by uploading Flows to an S3 bucket. This will upload + all of the flows found in `storage.flows`. If there is an issue uploading to the + S3 bucket an error will be logged. + + Returns: + - Storage: an S3 object that contains information about how and where + each flow is stored + + Raises: + - botocore.ClientError: if there is an issue uploading a Flow to S3 + """ + self.run_basic_healthchecks() + + for file, file_path in self.flows.items(): + repo = self._github_client.get_repo(self.repo) + # try: + # need to figure this out + # repo.get_contents(file) + # need to update + # except: + # Doesn't exist + with open(file_path, "r") as f: + content = f.read() + repo.create_file(path=file, content=content, message="I am flow") + + return self + + def __contains__(self, obj: Any) -> bool: + """ + Method for determining whether an object is contained within this storage. + """ + if not isinstance(obj, str): + return False + return obj in self.flows + + @property + def _github_client(self): # type: ignore + from github import Github + import os + + # We should have some prefect secret defaults for this + # add support for user/pass/access_token and github enterprise + return Github(os.getenv("ACCESS_TOKEN")) From fed569bb856871047415f171db39153c36dfeb4a Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 19 Jun 2020 14:20:28 -0400 Subject: [PATCH 04/20] First roundtrip file based flow storage execution --- src/prefect/agent/local/agent.py | 11 +++-- src/prefect/cli/register.py | 36 ++++------------ src/prefect/environments/storage/github.py | 48 ++++++++++------------ src/prefect/serialization/storage.py | 18 +++++++- src/prefect/utilities/storage.py | 30 ++++++++++++++ 5 files changed, 86 insertions(+), 57 deletions(-) diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index e5fe735cb6e1..a6ecd7ae1fae 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -6,7 +6,7 @@ from prefect import config from prefect.agent import Agent -from prefect.environments.storage import GCS, S3, Azure, Local +from prefect.environments.storage import GCS, S3, Azure, Local, GitHub from prefect.serialization.storage import StorageSchema from prefect.utilities.graphql import GraphQLResult @@ -83,7 +83,12 @@ def __init__( assert isinstance(self.labels, list) self.labels.append(hostname) self.labels.extend( - ["azure-flow-storage", "gcs-flow-storage", "s3-flow-storage"] + [ + "azure-flow-storage", + "gcs-flow-storage", + "s3-flow-storage", + "github-flow-storage", + ] ) self.logger.debug(f"Import paths: {self.import_paths}") @@ -117,7 +122,7 @@ def deploy_flow(self, flow_run: GraphQLResult) -> str: ) if not isinstance( - StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3) + StorageSchema().load(flow_run.flow.storage), (Local, Azure, GCS, S3, GitHub) ): self.logger.error( "Storage for flow run {} is not a supported type.".format(flow_run.id) diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index b71b59728497..99e3f42c0cdb 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -3,6 +3,7 @@ import click import prefect +from prefect.utilities.storage import extract_flow_from_file @click.group(hidden=True) def register(): @@ -30,35 +31,16 @@ def flow(file): """ file_path = os.path.abspath(file) + flow_obj = extract_flow_from_file(file_path=file_path) - # Grab contents of file - with open(file_path, "r") as f: - contents = f.read() - - # Exec the file to get the flow name - # when giving dict to exec, contents are not run with main - exec_vals = {} - exec(contents, exec_vals) - - # Grab flow name from values loaded via exec - # This will only work with one flow per file, could add name option - for i in exec_vals: - if isinstance(exec_vals[i], prefect.Flow): - flow_name = exec_vals[i].name - flow = exec_vals[i] - break - - - # print(flow.storage.flows) - # return - # Add to storage (will replace with add_flow) - # Temp until Local is updated to move write to build - # flow.storage.flows[flow_name] = file_path - # flow.storage._flows[flow_name] = flow - flow.storage.add_flow(file=file, file_path=file_path) + # Allow a path to be set here + # Maybe we can overload the add_flow to take both flow and flow.name + # OR keep passing in the flow object and then pull the name off of it in there + # which is probably the best option + flow_obj.storage.add_flow(flow_name=flow_obj.name) # Replace with register - flow.storage.build() - # flow.serialize(build=True) + # Should have a build option here + flow_obj.register() print("Registered!") \ No newline at end of file diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py index 63dc8085b6f4..c436fabdade3 100644 --- a/src/prefect/environments/storage/github.py +++ b/src/prefect/environments/storage/github.py @@ -8,6 +8,7 @@ import prefect from prefect.engine.results import S3Result from prefect.environments.storage import Storage +from prefect.utilities.storage import extract_flow_from_file if TYPE_CHECKING: from prefect.core.flow import Flow @@ -19,11 +20,12 @@ class GitHub(Storage): """ def __init__( - self, repo: str, **kwargs: Any + self, repo: str, path: str = None, **kwargs: Any ) -> None: self.flows = dict() # type: Dict[str, str] self._flows = dict() # type: Dict[str, "Flow"] self.repo = repo + self.path = path super().__init__(**kwargs) @@ -49,20 +51,14 @@ def get_flow(self, file: str) -> "Flow": """ repo = self._github_client.get_repo(self.repo) + # Needs some error handling contents = repo.get_contents(file) - f = contents.decoded_content + decoded_contents = contents.decoded_content - exec_vals = {} - exec(f, exec_vals) + return extract_flow_from_file(file_contents=decoded_contents) - # Grab flow name from values loaded via exec - # This will only work with one flow per file - for i in exec_vals: - if isinstance(exec_vals[i], prefect.Flow): - return exec_vals[i] - # return cloudpickle.loads(output) - def add_flow(self, file: str, file_path: str) -> str: + def add_flow(self, flow_name: str, path: str = None) -> str: """ Method for storing a new flow as bytes in the local filesytem. @@ -75,16 +71,16 @@ def add_flow(self, file: str, file_path: str) -> str: Raises: - ValueError: if a flow with the same name is already contained in this storage """ - if file in self: + if flow_name in self: raise ValueError( 'Name conflict: Flow with the name "{}" is already present in this storage.'.format( - file + flow_name ) ) # the file name should be able to be specified or we default to the name of the original .py file - self.flows[file] = file_path - return file + self.flows[flow_name] = path or self.path + return path or self.path def build(self) -> "Storage": """ @@ -101,17 +97,17 @@ def build(self) -> "Storage": """ self.run_basic_healthchecks() - for file, file_path in self.flows.items(): - repo = self._github_client.get_repo(self.repo) - # try: - # need to figure this out - # repo.get_contents(file) - # need to update - # except: - # Doesn't exist - with open(file_path, "r") as f: - content = f.read() - repo.create_file(path=file, content=content, message="I am flow") + # for file, file_path in self.flows.items(): + # repo = self._github_client.get_repo(self.repo) + # # try: + # # need to figure this out + # # repo.get_contents(file) + # # need to update + # # except: + # # Doesn't exist + # with open(file_path, "r") as f: + # content = f.read() + # repo.create_file(path=file, content=content, message="I am flow") return self diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 9b265a096de9..342b9046d86a 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -2,7 +2,7 @@ from marshmallow import fields, post_load -from prefect.environments.storage import GCS, S3, Azure, Docker, Local, Storage +from prefect.environments.storage import GCS, S3, Azure, Docker, Local, Storage, GitHub from prefect.utilities.serialization import JSONCompatible, ObjectSchema, OneOfSchema @@ -101,6 +101,21 @@ def create_object(self, data: dict, **kwargs: Any) -> S3: base_obj.flows = flows return base_obj +class GitHubSchema(ObjectSchema): + class Meta: + object_class = GitHub + + repo = fields.String(allow_none=False) + path = fields.String(allow_none=True) + flows = fields.Dict(key=fields.Str(), values=fields.Str()) + secrets = fields.List(fields.Str(), allow_none=True) + + @post_load + def create_object(self, data: dict, **kwargs: Any) -> GitHub: + flows = data.pop("flows", dict()) + base_obj = super().create_object(data) + base_obj.flows = flows + return base_obj class StorageSchema(OneOfSchema): """ @@ -115,4 +130,5 @@ class StorageSchema(OneOfSchema): "Local": LocalSchema, "Storage": BaseStorageSchema, "S3": S3Schema, + "GitHub": GitHubSchema, } diff --git a/src/prefect/utilities/storage.py b/src/prefect/utilities/storage.py index 8bc3bc650c20..2d1229a205ae 100644 --- a/src/prefect/utilities/storage.py +++ b/src/prefect/utilities/storage.py @@ -33,3 +33,33 @@ def get_flow_image(flow: "Flow") -> str: ) return storage.name + +def extract_flow_from_file(file_path: str = None, file_contents: str = None) -> "Flow": + """ + Extract a flow object from a file + """ + # TODO: Add support for passing name, otherwise get first flow found + + if file_path and file_contents: + raise ValueError("Only one can be used") + + # Read file contents + if file_path: + with open(file_path, "r") as f: + contents = f.read() + + # Use contents directly + if file_contents: + contents = file_contents + + # Load objects from file into dict + exec_vals = {} + exec(contents, exec_vals) + + # Grab flow name from values loaded via exec + for var in exec_vals: + if isinstance(exec_vals[var], prefect.Flow): + # flow_name = exec_vals[var].name + return exec_vals[var] + + raise ValueError(f"No flow found in {file_path}") \ No newline at end of file From a748afea67355edd2443965ac1851d4376a1dee6 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 19 Jun 2020 15:47:11 -0400 Subject: [PATCH 05/20] Fix up register CLI command --- src/prefect/cli/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index 99e3f42c0cdb..a55afdce4614 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -43,4 +43,4 @@ def flow(file): # Should have a build option here flow_obj.register() - print("Registered!") \ No newline at end of file + # print("Registered!") \ No newline at end of file From da786fcc95a1b4177d0e1433eba6b686b9ba9eaa Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Mon, 22 Jun 2020 13:02:00 -0400 Subject: [PATCH 06/20] Clean up GitHub storage --- src/prefect/environments/storage/github.py | 95 ++++++++++++---------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py index c436fabdade3..9f8a1045e1db 100644 --- a/src/prefect/environments/storage/github.py +++ b/src/prefect/environments/storage/github.py @@ -1,12 +1,6 @@ -import io from typing import TYPE_CHECKING, Any, Dict, List -import cloudpickle -import pendulum -from slugify import slugify -import prefect -from prefect.engine.results import S3Result from prefect.environments.storage import Storage from prefect.utilities.storage import extract_flow_from_file @@ -16,12 +10,31 @@ class GitHub(Storage): """ - GitHub storage class + GitHub storage class. This class represents the Storage interface for Flows stored + in `.py` files in a GitHub repository. + + This class represents a mapping of flow name to file paths contained in the git repo, + meaning that all flow files should be pushed independently. A typical workflow using + this storage type might look like the following: + + Compose flow `.py` file where flow has GitHub storage: + + ```python + flow = Flow("my-flow") + flow.storage = GitHub(repo="my/repo", path="/flows/flow.py") + ``` + + Push this `flow.py` file to the `my/repo` repository under `/flows/flow.py`. + + Call `prefect register -f flow.py` to register this flow with GitHub storage. + + Args: + - repo (str): the name of a GitHub repository to store this Flow + - path (str, optional): a path pointing to a flow file in the repo + - **kwargs (Any, optional): any additional `Storage` initialization options """ - def __init__( - self, repo: str, path: str = None, **kwargs: Any - ) -> None: + def __init__(self, repo: str, path: str = None, **kwargs: Any) -> None: self.flows = dict() # type: Dict[str, str] self._flows = dict() # type: Dict[str, "Flow"] self.repo = repo @@ -33,82 +46,76 @@ def __init__( def default_labels(self) -> List[str]: return ["github-flow-storage"] - def get_flow(self, file: str) -> "Flow": + def get_flow(self, flow_location: str) -> "Flow": """ Given a flow_location within this Storage object, returns the underlying Flow (if possible). If the Flow is not found an error will be logged and `None` will be returned. Args: - flow_location (str): the location of a flow within this Storage; in this case, - a file path where a Flow has been serialized to + a file path on a repository where a Flow file has been committed Returns: - Flow: the requested Flow Raises: - - ValueError: if the Flow is not contained in this storage - - botocore.ClientError: if there is an issue downloading the Flow from S3 + - UnknownObjectException: if the Flow file is unable to be retrieved """ + from github import UnknownObjectException + repo = self._github_client.get_repo(self.repo) - # Needs some error handling - contents = repo.get_contents(file) - decoded_contents = contents.decoded_content + try: + contents = repo.get_contents(flow_location) + decoded_contents = contents.decoded_content + except UnknownObjectException as exc: + self.logger.error( + "Error retrieving file contents from {} on repo {}. Ensure the file exists.".format( + flow_location, self.repo + ) + ) + raise exc return extract_flow_from_file(file_contents=decoded_contents) - - def add_flow(self, flow_name: str, path: str = None) -> str: + def add_flow(self, flow: "Flow", path: str = None) -> str: """ Method for storing a new flow as bytes in the local filesytem. Args: - flow (Flow): a Prefect Flow to add + - path (str, optional): location of `.py` file in the repo. Defaults to the + value set in `self.path`. Returns: - - str: the location of the newly added flow in this Storage object + - str: the location of the added flow in the repo Raises: - ValueError: if a flow with the same name is already contained in this storage """ - if flow_name in self: + if flow.name in self: raise ValueError( 'Name conflict: Flow with the name "{}" is already present in this storage.'.format( - flow_name + flow.name ) ) - # the file name should be able to be specified or we default to the name of the original .py file - self.flows[flow_name] = path or self.path - return path or self.path + self.flows[flow.name] = path or self.path # type: ignore + self._flows[flow.name] = flow + return path or self.path # type: ignore def build(self) -> "Storage": """ - Build the S3 storage object by uploading Flows to an S3 bucket. This will upload - all of the flows found in `storage.flows`. If there is an issue uploading to the - S3 bucket an error will be logged. + Build the GitHub storage object and run basic healthchecks. Due to this object + supporting file based storage no files are committed to the repository during + this step. Instead, all files should be committed independently. Returns: - - Storage: an S3 object that contains information about how and where + - Storage: a GitHub object that contains information about how and where each flow is stored - - Raises: - - botocore.ClientError: if there is an issue uploading a Flow to S3 """ self.run_basic_healthchecks() - # for file, file_path in self.flows.items(): - # repo = self._github_client.get_repo(self.repo) - # # try: - # # need to figure this out - # # repo.get_contents(file) - # # need to update - # # except: - # # Doesn't exist - # with open(file_path, "r") as f: - # content = f.read() - # repo.create_file(path=file, content=content, message="I am flow") - return self def __contains__(self, obj: Any) -> bool: From 82553ffa8124a46718a834362d4a853880ace359 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Mon, 22 Jun 2020 14:32:05 -0400 Subject: [PATCH 07/20] Revert local storage, smooth registration process, add github client util, add default github secret --- setup.py | 1 + src/prefect/cli/register.py | 52 ++++++++---- src/prefect/environments/storage/github.py | 17 ++-- src/prefect/environments/storage/local.py | 99 +++------------------- src/prefect/utilities/git.py | 37 ++++++++ src/prefect/utilities/storage.py | 31 +++++-- 6 files changed, 111 insertions(+), 126 deletions(-) create mode 100644 src/prefect/utilities/git.py diff --git a/setup.py b/setup.py index 1b284efae367..6d8432fc69a2 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ def run(self): "google-cloud-bigquery >= 1.6.0, < 2.0", "google-cloud-storage >= 1.13, < 2.0", ], + "github": ["PyGithub >= 1.51, < 2.0"], "google": [ "google-cloud-bigquery >= 1.6.0, < 2.0", "google-cloud-storage >= 1.13, < 2.0", diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index a55afdce4614..37628784b862 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -2,9 +2,9 @@ import click -import prefect from prefect.utilities.storage import extract_flow_from_file + @click.group(hidden=True) def register(): """ @@ -19,28 +19,44 @@ def register(): @click.option( "--file", "-f", - required=False, + required=True, help="A file that contains a flow", hidden=True, default=None, - type=click.Path(exists=True) + type=click.Path(exists=True), +) +@click.option( + "--name", + "-n", + required=False, + help="The name of a flow to pull out of the file provided.", + hidden=True, + default=None, ) -def flow(file): +@click.option( + "--project", + required=False, + help="The name of a Prefect Cloud project to register this flow.", + hidden=True, + default=None, +) +def flow(file, name, project): """ - Register a flow + Register a flow from a file. This call will pull a Flow object out of a `.py` file + and call `flow.register` on it. + + \b + Options: + --file, -f TEXT The path to a local file which contains a flow [required] + --name, -n TEXT The name of a flow to pull out of the file provided. If a name + is not provided then the first flow object found will be registered. + --project TEXT The name of a Prefect Cloud project to register this flow + + \b + Examples: + $ prefect register flow --file my_flow.py --name My-Flow """ - file_path = os.path.abspath(file) - flow_obj = extract_flow_from_file(file_path=file_path) - - # Allow a path to be set here - # Maybe we can overload the add_flow to take both flow and flow.name - # OR keep passing in the flow object and then pull the name off of it in there - # which is probably the best option - flow_obj.storage.add_flow(flow_name=flow_obj.name) - - # Replace with register - # Should have a build option here - flow_obj.register() + flow_obj = extract_flow_from_file(file_path=file_path, flow_name=name) - # print("Registered!") \ No newline at end of file + flow_obj.register(project_name=project) diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py index 9f8a1045e1db..548faa6b4363 100644 --- a/src/prefect/environments/storage/github.py +++ b/src/prefect/environments/storage/github.py @@ -62,7 +62,7 @@ def get_flow(self, flow_location: str) -> "Flow": - UnknownObjectException: if the Flow file is unable to be retrieved """ from github import UnknownObjectException - + print("HERE") repo = self._github_client.get_repo(self.repo) try: @@ -78,14 +78,12 @@ def get_flow(self, flow_location: str) -> "Flow": return extract_flow_from_file(file_contents=decoded_contents) - def add_flow(self, flow: "Flow", path: str = None) -> str: + def add_flow(self, flow: "Flow") -> str: """ Method for storing a new flow as bytes in the local filesytem. Args: - flow (Flow): a Prefect Flow to add - - path (str, optional): location of `.py` file in the repo. Defaults to the - value set in `self.path`. Returns: - str: the location of the added flow in the repo @@ -100,9 +98,9 @@ def add_flow(self, flow: "Flow", path: str = None) -> str: ) ) - self.flows[flow.name] = path or self.path # type: ignore + self.flows[flow.name] = self.path # type: ignore self._flows[flow.name] = flow - return path or self.path # type: ignore + return self.path # type: ignore def build(self) -> "Storage": """ @@ -128,9 +126,6 @@ def __contains__(self, obj: Any) -> bool: @property def _github_client(self): # type: ignore - from github import Github - import os + from prefect.utilities.git import get_github_client - # We should have some prefect secret defaults for this - # add support for user/pass/access_token and github enterprise - return Github(os.getenv("ACCESS_TOKEN")) + return get_github_client() diff --git a/src/prefect/environments/storage/local.py b/src/prefect/environments/storage/local.py index c3d83a3ee6ed..16f8d0ae10a5 100644 --- a/src/prefect/environments/storage/local.py +++ b/src/prefect/environments/storage/local.py @@ -1,5 +1,4 @@ import os -from pathlib import Path import socket from typing import TYPE_CHECKING, Any, Dict, List @@ -34,11 +33,7 @@ class Local(Storage): """ def __init__( - self, - directory: str = None, - validate: bool = True, - store_as_pickle: bool = True, - **kwargs: Any, + self, directory: str = None, validate: bool = True, **kwargs: Any ) -> None: directory = directory or os.path.join(prefect.config.home_dir, "flows") self.flows = dict() # type: Dict[str, str] @@ -51,9 +46,6 @@ def __init__( else: abs_directory = directory - # This needs to be added to the serializers - self.store_as_pickle = store_as_pickle - self.directory = abs_directory result = LocalResult(self.directory, validate_dir=validate) super().__init__(result=result, **kwargs) @@ -79,26 +71,10 @@ def get_flow(self, flow_location: str) -> "Flow": Raises: - ValueError: if the flow is not contained in this storage """ + if not flow_location in self.flows.values(): + raise ValueError("Flow is not contained in this Storage") - if not self.store_as_pickle: - with open(flow_location, "r") as file: - contents = file.read() - - exec_vals = {} - exec(contents, exec_vals) - - # Grab flow name from values loaded via exec - # This will only work with one flow per file - for i in exec_vals: - if isinstance(exec_vals[i], prefect.Flow): - return exec_vals[i] - else: - # Temporarily moving this value error due to having to save in one file - # and load in another - # We need to figure out a good workflow for this - if not flow_location in self.flows.values(): - raise ValueError("Flow is not contained in this Storage") - return prefect.core.flow.Flow.load(flow_location) + return prefect.core.flow.Flow.load(flow_location) def add_flow(self, flow: "Flow") -> str: """ @@ -120,67 +96,12 @@ def add_flow(self, flow: "Flow") -> str: ) ) - import inspect - import re - - if not self.store_as_pickle: - # Grab file off the stack, however this will only work if calling f.serialize(build=True) - # when calling .add_flow directly it will be [1] level down the stack - frame = inspect.stack()[2] - filename = frame[1] - path_to_flow_file = os.path.abspath(filename) - - # Grab contents of file - with open(path_to_flow_file, "r") as file: - contents = file.read() - - # Need to strip out things which could cause circular problems - # In this case I'm calling serialize(build=True) directly and we don't want this - # exec may not run __main__ which could resolve this? TBD - contents = re.sub(r"^.*\b(serialize)\b.*$", "", contents, flags=re.M) - - # Save to where flows normally save - # Could be merged with flow.save where this is ripped from - # When we register a script, we shouldn't move it and instead run the - # flow from that workdir. Helps with import problems. - path = "{home}/flows".format(home=prefect.context.config.home_dir) - fpath = Path(os.path.expanduser(path)) / f"{filename}" - assert fpath is not None # mypy assert - fpath.parent.mkdir(exist_ok=True, parents=True) - - flow_location = str(fpath) - - # Write contents to file, should be moved to flow.save somehow - with open(flow_location, "w") as file: - file.write(contents) - - # Exec the file to get the flow name - # when giving dict to exec, contents are not run with main - exec_vals = {} - exec(contents, exec_vals) - - # Grab flow name from values loaded via exec - # This will only work with one flow per file - for i in exec_vals: - if isinstance(exec_vals[i], prefect.Flow): - flow_name = exec_vals[i].name - flow = exec_vals[i] - break - - # Update self.flows with file path - self.flows[flow_name] = flow_location - self._flows[flow_name] = flow - else: - # TODO: This logic needs to be moved to build to follow other storage types - flow_location = os.path.join( - self.directory, "{}.prefect".format(slugify(flow.name)) - ) - flow_location = flow.save(flow_location) - self.flows[flow.name] = flow_location - self._flows[flow.name] = flow - - # Temporary convenience print - print(flow_location) + flow_location = os.path.join( + self.directory, "{}.prefect".format(slugify(flow.name)) + ) + flow_location = flow.save(flow_location) + self.flows[flow.name] = flow_location + self._flows[flow.name] = flow return flow_location def __contains__(self, obj: Any) -> bool: diff --git a/src/prefect/utilities/git.py b/src/prefect/utilities/git.py new file mode 100644 index 000000000000..280cdc5b332c --- /dev/null +++ b/src/prefect/utilities/git.py @@ -0,0 +1,37 @@ +""" +Utility functions for interacting with git. +""" +import os +import prefect + +from github import Github +from typing import Any + + +def get_github_client(credentials: dict = None, **kwargs: Any) -> "Github": + """ + Utility function for loading github client objects from a given set of credentials. + + Args: + - credentials (dict, optional): a dictionary of AWS credentials used to initialize the Client; if + not provided, will attempt to load the Client using ambient environment settings + - **kwargs (Any, optional): additional keyword arguments to pass to the github Client + + Returns: + - Client: an initialized and authenticated github Client + """ + # https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line + access_token = None + + if credentials: + access_token = credentials.get("GITHUB_ACCESS_TOKEN") + else: + access_token = prefect.context.get("secrets", {}).get( + "GITHUB_ACCESS_TOKEN", None + ) + + # Attempt to grab out of env if not provided directly or through Prefect Secret + if not access_token: + access_token = os.getenv("GITHUB_ACCESS_TOKEN", None) + + return Github(access_token, **kwargs) diff --git a/src/prefect/utilities/storage.py b/src/prefect/utilities/storage.py index 2d1229a205ae..65571517f429 100644 --- a/src/prefect/utilities/storage.py +++ b/src/prefect/utilities/storage.py @@ -34,14 +34,27 @@ def get_flow_image(flow: "Flow") -> str: return storage.name -def extract_flow_from_file(file_path: str = None, file_contents: str = None) -> "Flow": +def extract_flow_from_file(file_path: str = None, file_contents: str = None, flow_name: str = None) -> "Flow": """ - Extract a flow object from a file - """ - # TODO: Add support for passing name, otherwise get first flow found + Extract a flow object from a file. + + Args: + - file_path (str, optional): A file path pointing to a .py file containing a flow + - file_contents (str, optional): The string contents of a .py file containing a flow + - flow_name (str, optional): A specific name of a flow to extract from a file. + If not set then the first flow object retrieved from file will be returned. + + Returns: + - Flow: A flow object extracted from a file + Raises: + - ValueError: if both `file_path` and `file_contents` are provided or neither are. + """ if file_path and file_contents: - raise ValueError("Only one can be used") + raise ValueError("Provide either `file_path` or `file_contents` but not both.") + + if not file_path and not file_contents: + raise ValueError("Provide either `file_path` or `file_contents`.") # Read file contents if file_path: @@ -59,7 +72,9 @@ def extract_flow_from_file(file_path: str = None, file_contents: str = None) -> # Grab flow name from values loaded via exec for var in exec_vals: if isinstance(exec_vals[var], prefect.Flow): - # flow_name = exec_vals[var].name - return exec_vals[var] + if flow_name and exec_vals[var].name == flow_name: + return exec_vals[var] + elif not flow_name: + return exec_vals[var] - raise ValueError(f"No flow found in {file_path}") \ No newline at end of file + raise ValueError(f"No flow found in file.") \ No newline at end of file From 129a5220d75950607a2189f25cf2fa7aa01c3bd9 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Mon, 22 Jun 2020 15:56:28 -0400 Subject: [PATCH 08/20] Add unit tests for file based storage pattern and new functions --- src/prefect/environments/storage/github.py | 2 +- src/prefect/serialization/storage.py | 2 + src/prefect/utilities/git.py | 1 - src/prefect/utilities/storage.py | 9 +- tests/cli/test_register.py | 56 ++++++++++++ .../storage/test_github_storage.py | 88 +++++++++++++++++++ tests/utilities/test_git.py | 38 ++++++++ tests/utilities/test_storage.py | 34 ++++++- 8 files changed, 224 insertions(+), 6 deletions(-) create mode 100644 tests/cli/test_register.py create mode 100644 tests/environments/storage/test_github_storage.py create mode 100644 tests/utilities/test_git.py diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py index 548faa6b4363..408bf79be1c7 100644 --- a/src/prefect/environments/storage/github.py +++ b/src/prefect/environments/storage/github.py @@ -62,7 +62,7 @@ def get_flow(self, flow_location: str) -> "Flow": - UnknownObjectException: if the Flow file is unable to be retrieved """ from github import UnknownObjectException - print("HERE") + repo = self._github_client.get_repo(self.repo) try: diff --git a/src/prefect/serialization/storage.py b/src/prefect/serialization/storage.py index 342b9046d86a..7483c8023ff2 100644 --- a/src/prefect/serialization/storage.py +++ b/src/prefect/serialization/storage.py @@ -101,6 +101,7 @@ def create_object(self, data: dict, **kwargs: Any) -> S3: base_obj.flows = flows return base_obj + class GitHubSchema(ObjectSchema): class Meta: object_class = GitHub @@ -117,6 +118,7 @@ def create_object(self, data: dict, **kwargs: Any) -> GitHub: base_obj.flows = flows return base_obj + class StorageSchema(OneOfSchema): """ Field that chooses between several nested schemas diff --git a/src/prefect/utilities/git.py b/src/prefect/utilities/git.py index 280cdc5b332c..441040aadf4d 100644 --- a/src/prefect/utilities/git.py +++ b/src/prefect/utilities/git.py @@ -20,7 +20,6 @@ def get_github_client(credentials: dict = None, **kwargs: Any) -> "Github": Returns: - Client: an initialized and authenticated github Client """ - # https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line access_token = None if credentials: diff --git a/src/prefect/utilities/storage.py b/src/prefect/utilities/storage.py index 65571517f429..74ed34845c75 100644 --- a/src/prefect/utilities/storage.py +++ b/src/prefect/utilities/storage.py @@ -34,7 +34,10 @@ def get_flow_image(flow: "Flow") -> str: return storage.name -def extract_flow_from_file(file_path: str = None, file_contents: str = None, flow_name: str = None) -> "Flow": + +def extract_flow_from_file( + file_path: str = None, file_contents: str = None, flow_name: str = None +) -> "Flow": """ Extract a flow object from a file. @@ -66,7 +69,7 @@ def extract_flow_from_file(file_path: str = None, file_contents: str = None, flo contents = file_contents # Load objects from file into dict - exec_vals = {} + exec_vals = {} # type: ignore exec(contents, exec_vals) # Grab flow name from values loaded via exec @@ -77,4 +80,4 @@ def extract_flow_from_file(file_path: str = None, file_contents: str = None, flo elif not flow_name: return exec_vals[var] - raise ValueError(f"No flow found in file.") \ No newline at end of file + raise ValueError(f"No flow found in file.") diff --git a/tests/cli/test_register.py b/tests/cli/test_register.py new file mode 100644 index 000000000000..ffd4e254c1ca --- /dev/null +++ b/tests/cli/test_register.py @@ -0,0 +1,56 @@ +import os +import tempfile +from unittest.mock import MagicMock + +from click.testing import CliRunner + +from prefect.cli.register import register + + +def test_register_init(): + runner = CliRunner() + result = runner.invoke(register) + assert result.exit_code == 0 + assert "Register flows" in result.output + + +def test_register_help(): + runner = CliRunner() + result = runner.invoke(register, ["--help"]) + assert result.exit_code == 0 + assert "Register flows" in result.output + + +def test_register_flow(): + runner = CliRunner() + result = runner.invoke(register, ["flow", "--help"]) + assert result.exit_code == 0 + assert "Register a flow" in result.output + + +def test_register_flow_kwargs(monkeypatch): + monkeypatch.setattr("prefect.Client", MagicMock()) + + with tempfile.TemporaryDirectory() as tmpdir: + + contents = """from prefect import Flow\nf=Flow('test-flow')""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + runner = CliRunner() + result = runner.invoke( + register, + [ + "flow", + "--file", + full_path, + "--name", + "test-flow", + "--project", + "project", + ], + ) + assert result.exit_code == 0 diff --git a/tests/environments/storage/test_github_storage.py b/tests/environments/storage/test_github_storage.py new file mode 100644 index 000000000000..46a0a325f045 --- /dev/null +++ b/tests/environments/storage/test_github_storage.py @@ -0,0 +1,88 @@ +from unittest.mock import MagicMock + +import pytest + +from prefect import context, Flow +from prefect.environments.storage import GitHub + +pytest.importorskip("github") + + +def test_create_github_storage(): + storage = GitHub(repo="test/repo") + assert storage + assert storage.logger + + +def test_create_github_storage_init_args(): + storage = GitHub(repo="test/repo", path="flow.py", secrets=["auth"],) + assert storage + assert storage.flows == dict() + assert storage.repo == "test/repo" + assert storage.path == "flow.py" + assert storage.secrets == ["auth"] + + +def test_serialize_github_storage(): + storage = GitHub(repo="test/repo", path="flow.py", secrets=["auth"],) + serialized_storage = storage.serialize() + + assert serialized_storage["type"] == "GitHub" + assert serialized_storage["repo"] == "test/repo" + assert serialized_storage["path"] == "flow.py" + assert serialized_storage["secrets"] == ["auth"] + + +def test_github_client_property(monkeypatch): + client = MagicMock() + github = MagicMock(MagicMock(return_value=client)) + monkeypatch.setattr("github.Github", github) + + storage = GitHub(repo="test/repo") + + credentials = "ACCESS_TOKEN" + with context(secrets=dict(GITHUB_ACCESS_TOKEN=credentials)): + github_client = storage._github_client + assert github_client + github.assert_called_with("ACCESS_TOKEN",) + + +def test_add_flow_to_github_storage(): + storage = GitHub(repo="test/repo", path="flow.py") + + f = Flow("test") + assert f.name not in storage + assert storage.add_flow(f) == "flow.py" + assert f.name in storage + + +def test_add_flow_to_github_already_added(): + storage = GitHub(repo="test/repo", path="flow.py") + + f = Flow("test") + assert f.name not in storage + assert storage.add_flow(f) == "flow.py" + assert f.name in storage + + with pytest.raises(ValueError): + storage.add_flow(f) + + +def test_get_flow_github(monkeypatch): + f = Flow("test") + + github = MagicMock(MagicMock(return_value=MagicMock())) + monkeypatch.setattr("github.Github", github) + + monkeypatch.setattr( + "prefect.environments.storage.github.extract_flow_from_file", + MagicMock(return_value=f), + ) + + storage = GitHub(repo="test/repo", path="flow") + + assert f.name not in storage + flow_location = storage.add_flow(f) + + new_flow = storage.get_flow(flow_location) + assert new_flow.run() diff --git a/tests/utilities/test_git.py b/tests/utilities/test_git.py new file mode 100644 index 000000000000..06cd0f520799 --- /dev/null +++ b/tests/utilities/test_git.py @@ -0,0 +1,38 @@ +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("github") + +import prefect +from prefect.utilities.git import get_github_client +from prefect.utilities.configuration import set_temporary_config + + +class TestGetGitHubClient: + def test_uses_context_secrets(self, monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + with set_temporary_config({"cloud.use_local_secrets": True}): + with prefect.context(secrets=dict(GITHUB_ACCESS_TOKEN="ACCESS_TOKEN")): + get_github_client() + assert github.call_args[0][0] == "ACCESS_TOKEN" + + def test_prefers_passed_credentials_over_secrets(self, monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + desired_credentials = {"GITHUB_ACCESS_TOKEN": "PROVIDED_KEY"} + with set_temporary_config({"cloud.use_local_secrets": True}): + with prefect.context(secrets=dict(GITHUB_ACCESS_TOKEN="ACCESS_TOKEN")): + get_github_client(credentials=desired_credentials) + assert github.call_args[0][0] == "PROVIDED_KEY" + + def test_creds_default_to_environment(self, monkeypatch): + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) + get_github_client() + assert github.call_args[0][0] == None + + monkeypatch.setenv("GITHUB_ACCESS_TOKEN", "TOKEN") + get_github_client() + assert github.call_args[0][0] == "TOKEN" diff --git a/tests/utilities/test_storage.py b/tests/utilities/test_storage.py index e5c95ae7fdc1..4514b5a86f9e 100644 --- a/tests/utilities/test_storage.py +++ b/tests/utilities/test_storage.py @@ -1,9 +1,12 @@ +import os +import tempfile + import pytest from prefect import Flow from prefect.environments import LocalEnvironment from prefect.environments.storage import Docker, Local -from prefect.utilities.storage import get_flow_image +from prefect.utilities.storage import get_flow_image, extract_flow_from_file def test_get_flow_image_docker_storage(): @@ -30,3 +33,32 @@ def test_get_flow_image_raises_on_missing_info(): flow = Flow("test", environment=LocalEnvironment(), storage=Local(),) with pytest.raises(ValueError): image = get_flow_image(flow=flow) + + +def test_extract_flow_from_file(): + with tempfile.TemporaryDirectory() as tmpdir: + + contents = """from prefect import Flow\nf=Flow('test-flow')""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + flow = extract_flow_from_file(file_path=full_path) + assert flow.run().is_successful() + + flow = extract_flow_from_file(file_contents=contents) + assert flow.run().is_successful() + + flow = extract_flow_from_file(file_path=full_path, flow_name="test-flow") + assert flow.run().is_successful() + + with pytest.raises(ValueError): + extract_flow_from_file(file_path=full_path, flow_name="not-real") + + with pytest.raises(ValueError): + extract_flow_from_file(file_path=full_path, file_contents=contents) + + with pytest.raises(ValueError): + extract_flow_from_file() From 0fd55f938e55a4a340f039372403c0c89acca3c0 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Mon, 22 Jun 2020 16:17:50 -0400 Subject: [PATCH 09/20] Fix labeling in LocalAgent tests --- tests/agent/test_local_agent.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/agent/test_local_agent.py b/tests/agent/test_local_agent.py index 53733ace62aa..fd1c2e5fbf1b 100644 --- a/tests/agent/test_local_agent.py +++ b/tests/agent/test_local_agent.py @@ -20,6 +20,7 @@ def test_local_agent_init(runner_token): "azure-flow-storage", "s3-flow-storage", "gcs-flow-storage", + "github-flow-storage", } assert agent.name == "agent" @@ -44,6 +45,7 @@ def test_local_agent_config_options(runner_token): "azure-flow-storage", "s3-flow-storage", "gcs-flow-storage", + "github-flow-storage", "test_label", } @@ -68,6 +70,7 @@ def test_local_agent_config_options_hostname(runner_token): "azure-flow-storage", "s3-flow-storage", "gcs-flow-storage", + "github-flow-storage", } @@ -136,6 +139,7 @@ def test_populate_env_vars(runner_token): "azure-flow-storage", "gcs-flow-storage", "s3-flow-storage", + "github-flow-storage", ] ), "PREFECT__CONTEXT__FLOW_RUN_ID": "id", @@ -175,6 +179,7 @@ def test_populate_env_vars_includes_agent_labels(runner_token): "azure-flow-storage", "gcs-flow-storage", "s3-flow-storage", + "github-flow-storage", ] ), "PREFECT__CONTEXT__FLOW_RUN_ID": "id", From 861b5532c4c6229208affbf998dac9d4f72d3816 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Mon, 22 Jun 2020 16:57:34 -0400 Subject: [PATCH 10/20] Update github client mock --- tests/environments/storage/test_github_storage.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/environments/storage/test_github_storage.py b/tests/environments/storage/test_github_storage.py index 46a0a325f045..4dcc6caadc99 100644 --- a/tests/environments/storage/test_github_storage.py +++ b/tests/environments/storage/test_github_storage.py @@ -34,9 +34,8 @@ def test_serialize_github_storage(): def test_github_client_property(monkeypatch): - client = MagicMock() - github = MagicMock(MagicMock(return_value=client)) - monkeypatch.setattr("github.Github", github) + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) storage = GitHub(repo="test/repo") @@ -71,8 +70,8 @@ def test_add_flow_to_github_already_added(): def test_get_flow_github(monkeypatch): f = Flow("test") - github = MagicMock(MagicMock(return_value=MagicMock())) - monkeypatch.setattr("github.Github", github) + github = MagicMock() + monkeypatch.setattr("prefect.utilities.git.Github", github) monkeypatch.setattr( "prefect.environments.storage.github.extract_flow_from_file", From 598bcc18f86e069e0ff2de857c7e3cd1d30d86e3 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 23 Jun 2020 14:43:30 -0400 Subject: [PATCH 11/20] Add function gate when using file based execution --- src/prefect/cli/execute.py | 2 +- src/prefect/cli/register.py | 1 + src/prefect/core/flow.py | 10 ++++++++++ tests/utilities/test_storage.py | 27 +++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/prefect/cli/execute.py b/src/prefect/cli/execute.py index c56c6dcd586d..477043753c6e 100644 --- a/src/prefect/cli/execute.py +++ b/src/prefect/cli/execute.py @@ -65,7 +65,7 @@ def cloud_flow(): for secret in storage.secrets: secrets[secret] = PrefectSecret(name=secret).run() - with prefect.context(secrets=secrets): + with prefect.context(secrets=secrets, function_gate=True): flow = storage.get_flow(storage.flows[flow_data.name]) environment = flow.environment diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index 37628784b862..e23f76072a91 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -35,6 +35,7 @@ def register(): ) @click.option( "--project", + "-p", required=False, help="The name of a Prefect Cloud project to register this flow.", hidden=True, diff --git a/src/prefect/core/flow.py b/src/prefect/core/flow.py index 849bf6b5e78b..cf1a9913e30d 100644 --- a/src/prefect/core/flow.py +++ b/src/prefect/core/flow.py @@ -1068,6 +1068,11 @@ def run( Returns: - State: the state of the flow after its final run """ + if prefect.context.get("function_gate", False): + raise RuntimeError( + "Attempting to call `flow.run` during execution of flow file will lead to unexpected results." + ) + # protect against old behavior if "return_tasks" in kwargs: raise ValueError( @@ -1452,6 +1457,11 @@ def register( Returns: - str: the ID of the flow that was registered """ + if prefect.context.get("function_gate", False): + raise RuntimeError( + "Attempting to call `flow.register` during execution of flow file will lead to unexpected results." + ) + if self.storage is None: self.storage = get_default_storage_class()(**kwargs) diff --git a/tests/utilities/test_storage.py b/tests/utilities/test_storage.py index 4514b5a86f9e..3269df6c1bcc 100644 --- a/tests/utilities/test_storage.py +++ b/tests/utilities/test_storage.py @@ -3,6 +3,7 @@ import pytest +import prefect from prefect import Flow from prefect.environments import LocalEnvironment from prefect.environments.storage import Docker, Local @@ -62,3 +63,29 @@ def test_extract_flow_from_file(): with pytest.raises(ValueError): extract_flow_from_file() + + +def test_extract_flow_from_file_raises_on_run_register(): + with tempfile.TemporaryDirectory() as tmpdir: + + contents = """from prefect import Flow\nf=Flow('test-flow')\nf.run()""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + with prefect.context({"function_gate": True}): + with pytest.raises(RuntimeError): + extract_flow_from_file(file_path=full_path) + + contents = """from prefect import Flow\nf=Flow('test-flow')\nf.register()""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + with prefect.context({"function_gate": True}): + with pytest.raises(RuntimeError): + extract_flow_from_file(file_path=full_path) From 3fc228ec7d77b40d008013736b424492fe3cc904 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 23 Jun 2020 15:17:24 -0400 Subject: [PATCH 12/20] Update docs and add file based storage idiom --- docs/core/concepts/secrets.md | 11 +-- docs/core/idioms/file-based.md | 69 +++++++++++++++++++ docs/core/idioms/idioms.md | 1 + .../execution/storage_options.md | 14 ++++ docs/outline.toml | 2 +- 5 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 docs/core/idioms/file-based.md diff --git a/docs/core/concepts/secrets.md b/docs/core/concepts/secrets.md index 7668e64d6b42..6d065d98b067 100644 --- a/docs/core/concepts/secrets.md +++ b/docs/core/concepts/secrets.md @@ -27,16 +27,16 @@ For example, given an environment with the environment variable `PREFECT__CONTEX ::: tab PrefectSecret ```python >>> from prefect.tasks.secrets import PrefectSecret ->>> p = PrefectSecret('foo') ->>> p.run() +>>> p = PrefectSecret('foo') +>>> p.run() 'mypassword' ``` ::: ::: tab Secret API ```python ->>> from prefect.client.secrets import Secret ->>> s = Secret("FOO") ->>> s.get() +>>> from prefect.client.secrets import Secret +>>> s = Secret("FOO") +>>> s.get() 'mypassword' ``` ::: @@ -74,6 +74,7 @@ The following is a list of the default names and contents of Prefect Secrets tha - `GCP_CREDENTIALS`: a dictionary containing a valid [Service Account Key](https://cloud.google.com/docs/authentication/getting-started) - `AWS_CREDENTIALS`: a dictionary containing two required keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY`, and an optional `SESSION_TOKEN`, which are passed directly to [the `boto3` client](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html) +- `GITHUB_ACCESS_TOKEN`: a string value of a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) For example, when using local secrets, your Prefect installation can be configured to authenticate to AWS automatically by adding that specific `AWS_CREDENTIALS` key value pair into your secrets context like so: diff --git a/docs/core/idioms/file-based.md b/docs/core/idioms/file-based.md new file mode 100644 index 000000000000..73c0f55a03af --- /dev/null +++ b/docs/core/idioms/file-based.md @@ -0,0 +1,69 @@ +# Using file based flow storage + +Prefect version `0.12.1` began to implement support for storing flows as paths to files. This means that flow code can change in between (or even during) runs without needing to be reregistered. As long as the structure of the flow itself does not change, only the task content, then a Prefect API backend will be able to execute the flow. This is a useful storage mechanism especially for testing, debugging, CI/CD processes, and more! + +### Example file based workflow + +In this example we will walk through a potential workflow you may use when registering flows with [GitHub](/api/latest/environments/storage.html#github) storage. This example takes place in a GitHub repository with the following structure: + +``` +repo + README.md + flows/ + my_flow.py +``` + +First, compose your flow file and give the flow `GitHub` storage: + +```python +# /flows/my_flow.py + +from prefect import task, Flow +from prefect.environments.storage import GitHub + +@task +def get_data(): + return [1, 2, 3, 4, 5] + +@task +def print_data(data): + print(data) + +with Flow("file-based-flow") as flow: + data = get_data() + print_data(data) + +flow.storage = GitHub( + repo="org/repo", # name of repo + path="/flows/my_flow.py", # location of flow file in repo + secrets=["GITHUB_ACCESS_TOKEN"] # name of personal access token secret +) +``` + +Here's a breakdown of the three kwargs set on the `GitHub` storage: + +- `repo`: the name of the repo that this code will live in +- `path`: the location of the flow file in the repo. This must be an exact match to the path of the file. +- `secrets`: the name of a [default Prefect secret](http://localhost:8081/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo. + +Push this code to the repository: + +```bash +git add . +git commit -m 'Add my flow' +git push +``` + +Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either Core's server or Prefect Cloud). + +```bash +prefect register -f /flows/my_flow.py +Result check: OK +Flow: http://localhost:8080/flow/9f5f7bea-186e-44d1-a746-417239663614 +``` + +The flow is ready to run! Every time you need to change the code inside your flow's respective tasks all you need to do is commit that code to the same location in the repository and each subsequent run will use that code. + +::: warning Flow Structure +If you change any of the structure of your flow such as task names, rearrange task order, etc. then you will need to reregister that flow. +::: diff --git a/docs/core/idioms/idioms.md b/docs/core/idioms/idioms.md index f199e4b3cfb6..e9b79cba8577 100644 --- a/docs/core/idioms/idioms.md +++ b/docs/core/idioms/idioms.md @@ -8,3 +8,4 @@ - [Testing Prefect flows and tasks](testing-flows.html) - [Using Result targets for efficient caching](targets.html) - [Configuring notifications](notifications.html) +- [Using file based flow storage](file-based.html) \ No newline at end of file diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index 4667a65b153d..fd107cbdf616 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -102,6 +102,20 @@ Additionally, in more recent releases of Core your flow will default to using a GCS Storage uses Google Cloud credentials the same way as the standard [google.cloud library](https://cloud.google.com/docs/authentication/production#auth-cloud-implicit-python) which means both upload (build) and download (local agent) times need to have the proper Google Application Credentials configuration. ::: +## GitHub + +[GitHub Storage](/api/latest/environments/storage.html#github) is a storage option that uploads flows to a GitHub repository as `.py` files. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to pull from a git repo or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). + +For a detailed look on how to use GitHub storage visit the [Using file based storage](/core/idioms/file-based.html) idiom. + +::: tip Sensible Defaults +Flows registered with this storage option will automatically be labeled with `"github-flow-storage"`; this helps prevents agents not explicitly authenticated with your GitHub repo from attempting to run this flow. +::: + +:::tip GitHub Credentials +GitHub storage uses a [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) for authenticating with repositories. +::: + ## Docker [Docker Storage](/api/latest/environments/storage.html#docker) is a storage option that puts flows inside of a Docker image and pushes them to a container registry. This method of Storage has deployment compatability with the [Docker Agent](/orchestration/agents/docker.html), [Kubernetes Agent](/orchestration/agents/kubernetes.html), and [Fargate Agent](/orchestration/agents/fargate.html). diff --git a/docs/outline.toml b/docs/outline.toml index 59aae85fb0c8..580f673917ec 100644 --- a/docs/outline.toml +++ b/docs/outline.toml @@ -179,7 +179,7 @@ classes = ["CloudFlowRunner", "CloudTaskRunner"] [pages.environments.storage] title = "Storage" module = "prefect.environments.storage" -classes = ["Docker", "Local", "S3", "GCS", "Azure"] +classes = ["Docker", "Local", "S3", "GCS", "Azure", "GitHub"] [pages.environments.execution] title = "Execution Environments" From 408ffc13fd7ca808063d411e5ecc1596310bd9de Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 23 Jun 2020 15:20:26 -0400 Subject: [PATCH 13/20] Add changelog entry --- changes/pr2840.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 changes/pr2840.yaml diff --git a/changes/pr2840.yaml b/changes/pr2840.yaml new file mode 100644 index 000000000000..f51b9aa071cc --- /dev/null +++ b/changes/pr2840.yaml @@ -0,0 +1,7 @@ +feature: + - "Flows can now be stored and executed using file-based storage - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" + +enhancement: + - "Add GitHub storage for storing flows as files in a GitHub repo - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" + - "Add `prefect register flow` CLI command for registering flows from files - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" + - "Add default `GITHUB_ACCESS_TOKEN` secret - [#2840](https://github.com/PrefectHQ/prefect/pull/2840)" From 5606431c8bd3985bf94e952ba4ffee2d68156d7f Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 23 Jun 2020 15:21:26 -0400 Subject: [PATCH 14/20] Update changelog entries --- changes/pr2839.yaml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/changes/pr2839.yaml b/changes/pr2839.yaml index 320c2bc64892..8d8a21e28007 100644 --- a/changes/pr2839.yaml +++ b/changes/pr2839.yaml @@ -18,8 +18,7 @@ # Here's an example of a PR that adds an enhancement enhancement: - - "Forward state change status back to core - [#2839](https://github.com/PrefectHQ/prefect/pull/2839)" - - contributor: - - "[Alex Cano](https://github.com/alexisprince1994)" - \ No newline at end of file + - "Forward state change status back to core - [#2839](https://github.com/PrefectHQ/prefect/pull/2839)" + +contributor: + - "[Alex Cano](https://github.com/alexisprince1994)" From 7da4daa1d6db5078b4d9f91f56277a6a439f2a41 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 23 Jun 2020 15:36:41 -0400 Subject: [PATCH 15/20] Add function gate to register CLI command --- src/prefect/cli/register.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index e23f76072a91..07de2327a262 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -2,6 +2,7 @@ import click +import prefect from prefect.utilities.storage import extract_flow_from_file @@ -57,7 +58,10 @@ def flow(file, name, project): Examples: $ prefect register flow --file my_flow.py --name My-Flow """ - file_path = os.path.abspath(file) - flow_obj = extract_flow_from_file(file_path=file_path, flow_name=name) + + # Don't run extra `run` and `register` functions inside file + with prefect.context({"function_gate": True}): + file_path = os.path.abspath(file) + flow_obj = extract_flow_from_file(file_path=file_path, flow_name=name) flow_obj.register(project_name=project) From b6a8c3d05bd2026240a29eb5199abb3ee36d2a9a Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 23 Jun 2020 15:37:21 -0400 Subject: [PATCH 16/20] Remove leading slash in file based idiom --- docs/core/idioms/file-based.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/core/idioms/file-based.md b/docs/core/idioms/file-based.md index 73c0f55a03af..19dccd625cf9 100644 --- a/docs/core/idioms/file-based.md +++ b/docs/core/idioms/file-based.md @@ -16,7 +16,7 @@ repo First, compose your flow file and give the flow `GitHub` storage: ```python -# /flows/my_flow.py +# flows/my_flow.py from prefect import task, Flow from prefect.environments.storage import GitHub @@ -35,7 +35,7 @@ with Flow("file-based-flow") as flow: flow.storage = GitHub( repo="org/repo", # name of repo - path="/flows/my_flow.py", # location of flow file in repo + path="flows/my_flow.py", # location of flow file in repo secrets=["GITHUB_ACCESS_TOKEN"] # name of personal access token secret ) ``` @@ -57,7 +57,7 @@ git push Now that the file exists on the repo the flow needs to be registered with a Prefect API backend (either Core's server or Prefect Cloud). ```bash -prefect register -f /flows/my_flow.py +prefect register -f flows/my_flow.py Result check: OK Flow: http://localhost:8080/flow/9f5f7bea-186e-44d1-a746-417239663614 ``` From 95199aee6e153e14318ce819485aa35e52a3171c Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Wed, 24 Jun 2020 08:42:58 -0400 Subject: [PATCH 17/20] Update function_gate flag to loading_flow --- src/prefect/cli/execute.py | 2 +- src/prefect/cli/register.py | 2 +- src/prefect/core/flow.py | 4 ++-- tests/utilities/test_storage.py | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/prefect/cli/execute.py b/src/prefect/cli/execute.py index 477043753c6e..d55ea4897699 100644 --- a/src/prefect/cli/execute.py +++ b/src/prefect/cli/execute.py @@ -65,7 +65,7 @@ def cloud_flow(): for secret in storage.secrets: secrets[secret] = PrefectSecret(name=secret).run() - with prefect.context(secrets=secrets, function_gate=True): + with prefect.context(secrets=secrets, loading_flow=True): flow = storage.get_flow(storage.flows[flow_data.name]) environment = flow.environment diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index 07de2327a262..4cf076115e0b 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -60,7 +60,7 @@ def flow(file, name, project): """ # Don't run extra `run` and `register` functions inside file - with prefect.context({"function_gate": True}): + with prefect.context({"loading_flow": True}): file_path = os.path.abspath(file) flow_obj = extract_flow_from_file(file_path=file_path, flow_name=name) diff --git a/src/prefect/core/flow.py b/src/prefect/core/flow.py index cf1a9913e30d..174e7a423996 100644 --- a/src/prefect/core/flow.py +++ b/src/prefect/core/flow.py @@ -1068,7 +1068,7 @@ def run( Returns: - State: the state of the flow after its final run """ - if prefect.context.get("function_gate", False): + if prefect.context.get("loading_flow", False): raise RuntimeError( "Attempting to call `flow.run` during execution of flow file will lead to unexpected results." ) @@ -1457,7 +1457,7 @@ def register( Returns: - str: the ID of the flow that was registered """ - if prefect.context.get("function_gate", False): + if prefect.context.get("loading_flow", False): raise RuntimeError( "Attempting to call `flow.register` during execution of flow file will lead to unexpected results." ) diff --git a/tests/utilities/test_storage.py b/tests/utilities/test_storage.py index 3269df6c1bcc..03e3923b8717 100644 --- a/tests/utilities/test_storage.py +++ b/tests/utilities/test_storage.py @@ -75,7 +75,7 @@ def test_extract_flow_from_file_raises_on_run_register(): with open(full_path, "w") as f: f.write(contents) - with prefect.context({"function_gate": True}): + with prefect.context({"loading_flow": True}): with pytest.raises(RuntimeError): extract_flow_from_file(file_path=full_path) @@ -86,6 +86,6 @@ def test_extract_flow_from_file_raises_on_run_register(): with open(full_path, "w") as f: f.write(contents) - with prefect.context({"function_gate": True}): + with prefect.context({"loading_flow": True}): with pytest.raises(RuntimeError): extract_flow_from_file(file_path=full_path) From 8c5d383b5c2a1f52197ca379bae50a290c73c1c3 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Wed, 24 Jun 2020 08:47:37 -0400 Subject: [PATCH 18/20] Address comments and fix tests --- src/prefect/cli/register.py | 4 +- src/prefect/environments/storage/github.py | 6 +- tests/cli/test_register.py | 38 +++++------- tests/utilities/test_storage.py | 68 ++++++++++------------ 4 files changed, 51 insertions(+), 65 deletions(-) diff --git a/src/prefect/cli/register.py b/src/prefect/cli/register.py index 4cf076115e0b..f2eb9d546305 100644 --- a/src/prefect/cli/register.py +++ b/src/prefect/cli/register.py @@ -30,7 +30,7 @@ def register(): "--name", "-n", required=False, - help="The name of a flow to pull out of the file provided.", + help="The `flow.name` to pull out of the file provided.", hidden=True, default=None, ) @@ -50,7 +50,7 @@ def flow(file, name, project): \b Options: --file, -f TEXT The path to a local file which contains a flow [required] - --name, -n TEXT The name of a flow to pull out of the file provided. If a name + --name, -n TEXT The `flow.name` to pull out of the file provided. If a name is not provided then the first flow object found will be registered. --project TEXT The name of a Prefect Cloud project to register this flow diff --git a/src/prefect/environments/storage/github.py b/src/prefect/environments/storage/github.py index 408bf79be1c7..6a4ce565cf97 100644 --- a/src/prefect/environments/storage/github.py +++ b/src/prefect/environments/storage/github.py @@ -17,16 +17,16 @@ class GitHub(Storage): meaning that all flow files should be pushed independently. A typical workflow using this storage type might look like the following: - Compose flow `.py` file where flow has GitHub storage: + - Compose flow `.py` file where flow has GitHub storage: ```python flow = Flow("my-flow") flow.storage = GitHub(repo="my/repo", path="/flows/flow.py") ``` - Push this `flow.py` file to the `my/repo` repository under `/flows/flow.py`. + - Push this `flow.py` file to the `my/repo` repository under `/flows/flow.py`. - Call `prefect register -f flow.py` to register this flow with GitHub storage. + - Call `prefect register -f flow.py` to register this flow with GitHub storage. Args: - repo (str): the name of a GitHub repository to store this Flow diff --git a/tests/cli/test_register.py b/tests/cli/test_register.py index ffd4e254c1ca..99f62f8b3ff5 100644 --- a/tests/cli/test_register.py +++ b/tests/cli/test_register.py @@ -28,29 +28,19 @@ def test_register_flow(): assert "Register a flow" in result.output -def test_register_flow_kwargs(monkeypatch): +def test_register_flow_kwargs(monkeypatch, tmpdir): monkeypatch.setattr("prefect.Client", MagicMock()) - with tempfile.TemporaryDirectory() as tmpdir: - - contents = """from prefect import Flow\nf=Flow('test-flow')""" - - full_path = os.path.join(tmpdir, "flow.py") - - with open(full_path, "w") as f: - f.write(contents) - - runner = CliRunner() - result = runner.invoke( - register, - [ - "flow", - "--file", - full_path, - "--name", - "test-flow", - "--project", - "project", - ], - ) - assert result.exit_code == 0 + contents = """from prefect import Flow\nf=Flow('test-flow')""" + + full_path = os.path.join(tmpdir, "flow.py") + + with open(full_path, "w") as f: + f.write(contents) + + runner = CliRunner() + result = runner.invoke( + register, + ["flow", "--file", full_path, "--name", "test-flow", "--project", "project",], + ) + assert result.exit_code == 0 diff --git a/tests/utilities/test_storage.py b/tests/utilities/test_storage.py index 03e3923b8717..2ba1f141b338 100644 --- a/tests/utilities/test_storage.py +++ b/tests/utilities/test_storage.py @@ -36,56 +36,52 @@ def test_get_flow_image_raises_on_missing_info(): image = get_flow_image(flow=flow) -def test_extract_flow_from_file(): - with tempfile.TemporaryDirectory() as tmpdir: +def test_extract_flow_from_file(tmpdir): + contents = """from prefect import Flow\nf=Flow('test-flow')""" - contents = """from prefect import Flow\nf=Flow('test-flow')""" + full_path = os.path.join(tmpdir, "flow.py") - full_path = os.path.join(tmpdir, "flow.py") + with open(full_path, "w") as f: + f.write(contents) - with open(full_path, "w") as f: - f.write(contents) + flow = extract_flow_from_file(file_path=full_path) + assert flow.run().is_successful() - flow = extract_flow_from_file(file_path=full_path) - assert flow.run().is_successful() + flow = extract_flow_from_file(file_contents=contents) + assert flow.run().is_successful() - flow = extract_flow_from_file(file_contents=contents) - assert flow.run().is_successful() + flow = extract_flow_from_file(file_path=full_path, flow_name="test-flow") + assert flow.run().is_successful() - flow = extract_flow_from_file(file_path=full_path, flow_name="test-flow") - assert flow.run().is_successful() - - with pytest.raises(ValueError): - extract_flow_from_file(file_path=full_path, flow_name="not-real") - - with pytest.raises(ValueError): - extract_flow_from_file(file_path=full_path, file_contents=contents) + with pytest.raises(ValueError): + extract_flow_from_file(file_path=full_path, flow_name="not-real") - with pytest.raises(ValueError): - extract_flow_from_file() + with pytest.raises(ValueError): + extract_flow_from_file(file_path=full_path, file_contents=contents) + with pytest.raises(ValueError): + extract_flow_from_file() -def test_extract_flow_from_file_raises_on_run_register(): - with tempfile.TemporaryDirectory() as tmpdir: - contents = """from prefect import Flow\nf=Flow('test-flow')\nf.run()""" +def test_extract_flow_from_file_raises_on_run_register(tmpdir): + contents = """from prefect import Flow\nf=Flow('test-flow')\nf.run()""" - full_path = os.path.join(tmpdir, "flow.py") + full_path = os.path.join(tmpdir, "flow.py") - with open(full_path, "w") as f: - f.write(contents) + with open(full_path, "w") as f: + f.write(contents) - with prefect.context({"loading_flow": True}): - with pytest.raises(RuntimeError): - extract_flow_from_file(file_path=full_path) + with prefect.context({"loading_flow": True}): + with pytest.raises(RuntimeError): + extract_flow_from_file(file_path=full_path) - contents = """from prefect import Flow\nf=Flow('test-flow')\nf.register()""" + contents = """from prefect import Flow\nf=Flow('test-flow')\nf.register()""" - full_path = os.path.join(tmpdir, "flow.py") + full_path = os.path.join(tmpdir, "flow.py") - with open(full_path, "w") as f: - f.write(contents) + with open(full_path, "w") as f: + f.write(contents) - with prefect.context({"loading_flow": True}): - with pytest.raises(RuntimeError): - extract_flow_from_file(file_path=full_path) + with prefect.context({"loading_flow": True}): + with pytest.raises(RuntimeError): + extract_flow_from_file(file_path=full_path) From 5ec815aa0d33ae8389d6cdca3aab34f32512ece2 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Wed, 24 Jun 2020 08:50:38 -0400 Subject: [PATCH 19/20] Remove doc section about storage agent limitations --- docs/orchestration/execution/storage_options.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/orchestration/execution/storage_options.md b/docs/orchestration/execution/storage_options.md index fd107cbdf616..f61c9384d8cc 100644 --- a/docs/orchestration/execution/storage_options.md +++ b/docs/orchestration/execution/storage_options.md @@ -29,7 +29,7 @@ Additionally, in more recent releases of Core your flow will default to using a ## Azure Blob Storage -[Azure Storage](/api/latest/environments/storage.html#azure) is a storage option that uploads flows to an Azure Blob container. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from that Azure Blob container using a connection string or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[Azure Storage](/api/latest/environments/storage.html#azure) is a storage option that uploads flows to an Azure Blob container. ```python from prefect import Flow @@ -54,7 +54,7 @@ Azure Storage uses an Azure [connection string](https://docs.microsoft.com/en-us ## AWS S3 -[S3 Storage](/api/latest/environments/storage.html#s3) is a storage option that uploads flows to an AWS S3 bucket. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from an S3 bucket or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[S3 Storage](/api/latest/environments/storage.html#s3) is a storage option that uploads flows to an AWS S3 bucket. ```python from prefect import Flow @@ -79,7 +79,7 @@ S3 Storage uses AWS credentials the same way as [boto3](https://boto3.amazonaws. ## Google Cloud Storage -[GCS Storage](/api/latest/environments/storage.html#gcs) is a storage option that uploads flows to a Google Cloud Storage bucket. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to download from a GCS bucket or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[GCS Storage](/api/latest/environments/storage.html#gcs) is a storage option that uploads flows to a Google Cloud Storage bucket. ```python from prefect import Flow @@ -104,7 +104,7 @@ GCS Storage uses Google Cloud credentials the same way as the standard [google.c ## GitHub -[GitHub Storage](/api/latest/environments/storage.html#github) is a storage option that uploads flows to a GitHub repository as `.py` files. Flows stored using this option can be run by [local agents](/orchestration/agents/local.html) as long as the machine running the local agent is configured to pull from a git repo or by container-based agents using the method outlined [below](/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments). +[GitHub Storage](/api/latest/environments/storage.html#github) is a storage option that uploads flows to a GitHub repository as `.py` files. For a detailed look on how to use GitHub storage visit the [Using file based storage](/core/idioms/file-based.html) idiom. From bebf596068e2d7c270318034a3b5e4e29104517e Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Wed, 24 Jun 2020 13:00:08 -0400 Subject: [PATCH 20/20] Update docs/core/idioms/file-based.md --- docs/core/idioms/file-based.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/core/idioms/file-based.md b/docs/core/idioms/file-based.md index 19dccd625cf9..75833d8d3388 100644 --- a/docs/core/idioms/file-based.md +++ b/docs/core/idioms/file-based.md @@ -44,7 +44,7 @@ Here's a breakdown of the three kwargs set on the `GitHub` storage: - `repo`: the name of the repo that this code will live in - `path`: the location of the flow file in the repo. This must be an exact match to the path of the file. -- `secrets`: the name of a [default Prefect secret](http://localhost:8081/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo. +- `secrets`: the name of a [default Prefect secret](/core/concepts/secrets.html#default-secrets) which is a GitHub [personal access token](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line). This is set so that when the flow is executed it has the proper permissions to pull the file from the repo. Push this code to the repository: