From dbc242ec33a6470352e002bc71fdff885869eee0 Mon Sep 17 00:00:00 2001 From: cosmicBboy Date: Tue, 6 Apr 2021 09:48:24 -0400 Subject: [PATCH] add local integration tests Signed-off-by: cosmicBboy --- dev-requirements.in | 1 + dev-requirements.txt | 260 +++++++++++++++++- flytekit/control_plane/identifier.py | 2 +- flytekit/control_plane/launch_plan.py | 34 +-- flytekit/control_plane/nodes.py | 9 +- flytekit/control_plane/workflow_execution.py | 18 +- pytest.ini | 2 + .../control_plane/mock_flyte_repo/.gitignore | 1 + .../control_plane/mock_flyte_repo/README.md | 4 + .../control_plane/mock_flyte_repo/__init__.py | 0 .../mock_flyte_repo/in_container.mk | 24 ++ .../mock_flyte_repo/workflows/Dockerfile | 35 +++ .../mock_flyte_repo/workflows/Makefile | 208 ++++++++++++++ .../mock_flyte_repo/workflows/__init__.py | 0 .../workflows/basic/__init__.py | 0 .../workflows/basic/basic_workflow.py | 54 ++++ .../mock_flyte_repo/workflows/basic/files.py | 68 +++++ .../workflows/basic/folders.py | 77 ++++++ .../workflows/basic/hello_world.py | 41 +++ .../mock_flyte_repo/workflows/basic/lp.py | 89 ++++++ .../workflows/basic/mocking.py | 71 +++++ .../mock_flyte_repo/workflows/basic/task.py | 55 ++++ .../workflows/basic/task_cache.py | 48 ++++ .../mock_flyte_repo/workflows/requirements.in | 4 + .../workflows/requirements.txt | 136 +++++++++ .../mock_flyte_repo/workflows/sandbox.config | 7 + .../control_plane/test_workflow.py | 57 ++++ 27 files changed, 1262 insertions(+), 43 deletions(-) create mode 100644 pytest.ini create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/.gitignore create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/README.md create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/__init__.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/in_container.mk create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Dockerfile create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Makefile create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/__init__.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/__init__.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/basic_workflow.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/files.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/folders.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/hello_world.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/lp.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/mocking.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task_cache.py create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.in create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.txt create mode 100644 tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/sandbox.config create mode 100644 tests/flytekit/integration/control_plane/test_workflow.py diff --git a/dev-requirements.in b/dev-requirements.in index 6fa5ebb080..2571afa6ce 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -1,5 +1,6 @@ -c requirements.txt +git+git://github.com/flyteorg/pytest-flyte.git#egg=pytest_flyte black coverage[toml] flake8 diff --git a/dev-requirements.txt b/dev-requirements.txt index 5acc7855b8..d35b97e375 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -2,8 +2,12 @@ # This file is autogenerated by pip-compile # To update, run: # -# make dev-requirements.txt +# pip-compile dev-requirements.in # +-e file:.#egg=flytekit + # via + # -c requirements.txt + # pytest-flyte appdirs==1.4.4 # via # -c requirements.txt @@ -11,18 +15,80 @@ appdirs==1.4.4 attrs==20.3.0 # via # -c requirements.txt + # jsonschema # pytest + # pytest-docker + # scantree +bcrypt==3.2.0 + # via + # -c requirements.txt + # paramiko black==20.8b1 # via # -c requirements.txt # -r dev-requirements.in # flake8-black +certifi==2020.12.5 + # via + # -c requirements.txt + # requests +cffi==1.14.5 + # via + # -c requirements.txt + # bcrypt + # cryptography + # pynacl +chardet==4.0.0 + # via + # -c requirements.txt + # requests click==7.1.2 # via # -c requirements.txt # black + # flytekit coverage[toml]==5.5 # via -r dev-requirements.in +croniter==1.0.10 + # via + # -c requirements.txt + # flytekit +cryptography==3.4.7 + # via + # -c requirements.txt + # paramiko +dataclasses-json==0.5.2 + # via + # -c requirements.txt + # flytekit +decorator==4.4.2 + # via + # -c requirements.txt + # retry +deprecated==1.2.12 + # via + # -c requirements.txt + # flytekit +dirhash==0.2.1 + # via + # -c requirements.txt + # flytekit +distro==1.5.0 + # via docker-compose +docker-compose==1.28.6 + # via + # pytest-docker + # pytest-flyte +docker-image-py==0.1.10 + # via + # -c requirements.txt + # flytekit +docker[ssh]==4.4.4 + # via docker-compose +dockerpty==0.4.1 + # via docker-compose +docopt==0.6.2 + # via docker-compose flake8-black==0.2.1 # via -r dev-requirements.in flake8-isort==4.0.0 @@ -32,12 +98,53 @@ flake8==3.9.0 # -r dev-requirements.in # flake8-black # flake8-isort +flyteidl==0.18.26 + # via + # -c requirements.txt + # flytekit +grpcio==1.36.1 + # via + # -c requirements.txt + # flytekit +idna==2.10 + # via + # -c requirements.txt + # requests +importlib-metadata==3.9.1 + # via + # -c requirements.txt + # keyring iniconfig==1.1.1 # via pytest isort==5.8.0 # via # -r dev-requirements.in # flake8-isort +jinja2==2.11.3 + # via + # -c requirements.txt + # pytest-flyte +jsonschema==3.2.0 + # via + # -c requirements.txt + # docker-compose +keyring==23.0.1 + # via + # -c requirements.txt + # flytekit +markupsafe==1.1.1 + # via + # -c requirements.txt + # jinja2 +marshmallow-enum==1.5.1 + # via + # -c requirements.txt + # dataclasses-json +marshmallow==3.11.0 + # via + # -c requirements.txt + # dataclasses-json + # marshmallow-enum mccabe==0.6.1 # via flake8 mock==4.0.3 @@ -47,38 +154,156 @@ mypy-extensions==0.4.3 # -c requirements.txt # black # mypy + # typing-inspect mypy==0.812 # via -r dev-requirements.in +natsort==7.1.1 + # via + # -c requirements.txt + # flytekit +numpy==1.20.2 + # via + # -c requirements.txt + # pandas + # pyarrow packaging==20.9 # via # -c requirements.txt # pytest +pandas==1.2.3 + # via + # -c requirements.txt + # flytekit +paramiko==2.7.2 + # via + # -c requirements.txt + # docker pathspec==0.8.1 # via # -c requirements.txt # black + # scantree pluggy==0.13.1 # via pytest +protobuf==3.15.6 + # via + # -c requirements.txt + # flyteidl + # flytekit py==1.10.0 # via # -c requirements.txt # pytest + # retry +pyarrow==3.0.0 + # via + # -c requirements.txt + # flytekit pycodestyle==2.7.0 # via flake8 +pycparser==2.20 + # via + # -c requirements.txt + # cffi pyflakes==2.3.1 # via flake8 +pynacl==1.4.0 + # via + # -c requirements.txt + # paramiko pyparsing==2.4.7 # via # -c requirements.txt # packaging -pytest==6.2.2 +pyrsistent==0.17.3 + # via + # -c requirements.txt + # jsonschema +pytest-docker==0.10.1 + # via pytest-flyte +git+git://github.com/flyteorg/pytest-flyte.git#egg=pytest-flyte # via -r dev-requirements.in +pytest==6.2.2 + # via + # -r dev-requirements.in + # pytest-docker + # pytest-flyte +python-dateutil==2.8.1 + # via + # -c requirements.txt + # croniter + # flytekit + # pandas +python-dotenv==0.17.0 + # via docker-compose +pytimeparse==1.1.8 + # via + # -c requirements.txt + # flytekit +pytz==2018.4 + # via + # -c requirements.txt + # flytekit + # pandas +pyyaml==5.4.1 + # via + # -c requirements.txt + # docker-compose regex==2021.3.17 # via # -c requirements.txt # black + # docker-image-py +requests==2.25.1 + # via + # -c requirements.txt + # docker + # docker-compose + # flytekit + # responses +responses==0.13.2 + # via + # -c requirements.txt + # flytekit +retry==0.9.2 + # via + # -c requirements.txt + # flytekit +scantree==0.0.1 + # via + # -c requirements.txt + # dirhash +six==1.15.0 + # via + # -c requirements.txt + # bcrypt + # docker + # dockerpty + # flytekit + # grpcio + # jsonschema + # protobuf + # pynacl + # python-dateutil + # responses + # scantree + # websocket-client +sortedcontainers==2.3.0 + # via + # -c requirements.txt + # flytekit +statsd==3.3.0 + # via + # -c requirements.txt + # flytekit +stringcase==1.2.0 + # via + # -c requirements.txt + # dataclasses-json testfixtures==6.17.1 # via flake8-isort +texttable==1.6.3 + # via docker-compose toml==0.10.2 # via # -c requirements.txt @@ -95,3 +320,34 @@ typing-extensions==3.7.4.3 # -c requirements.txt # black # mypy + # typing-inspect +typing-inspect==0.6.0 + # via + # -c requirements.txt + # dataclasses-json +urllib3==1.25.11 + # via + # -c requirements.txt + # flytekit + # requests + # responses +websocket-client==0.58.0 + # via + # docker + # docker-compose +wheel==0.36.2 + # via + # -c requirements.txt + # flytekit +wrapt==1.12.1 + # via + # -c requirements.txt + # deprecated + # flytekit +zipp==3.4.1 + # via + # -c requirements.txt + # importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/flytekit/control_plane/identifier.py b/flytekit/control_plane/identifier.py index 7c7abd7d68..7913c4eca6 100644 --- a/flytekit/control_plane/identifier.py +++ b/flytekit/control_plane/identifier.py @@ -13,7 +13,7 @@ class Identifier(_core_identifier.Identifier): @classmethod def promote_from_model(cls, base_model: _core_identifier.Identifier) -> "Identifier": - return cls(base_model.response_type, base_model.project, base_model.domain, base_model.name, base_model.version) + return cls(base_model.resource_type, base_model.project, base_model.domain, base_model.name, base_model.version) @classmethod def from_urn(cls, urn: str) -> "Identifier": diff --git a/flytekit/control_plane/launch_plan.py b/flytekit/control_plane/launch_plan.py index 4619f5c424..946665272b 100644 --- a/flytekit/control_plane/launch_plan.py +++ b/flytekit/control_plane/launch_plan.py @@ -3,7 +3,6 @@ from flytekit.common.exceptions import scopes as _exception_scopes from flytekit.common.exceptions import user as _user_exceptions -from flytekit.common.mixins import launchable as _launchable_mixin from flytekit.configuration import sdk as _sdk_config from flytekit.control_plane import identifier as _identifier from flytekit.control_plane import interface as _interface @@ -12,16 +11,13 @@ from flytekit.engines.flyte import engine as _flyte_engine from flytekit.models import common as _common_models from flytekit.models import execution as _execution_models -from flytekit.models import identifier as _identifier_model from flytekit.models import interface as _interface_models from flytekit.models import launch_plan as _launch_plan_models from flytekit.models import literals as _literal_models +from flytekit.models.core import identifier as _identifier_model -class FlyteLaunchPlan( - _launchable_mixin.LaunchableEntity, - _launch_plan_models.LaunchPlanSpec, -): +class FlyteLaunchPlan(_launch_plan_models.LaunchPlanSpec): def __init__(self, *args, **kwargs): super(FlyteLaunchPlan, self).__init__(*args, **kwargs) # Set all the attributes we expect this class to have @@ -165,30 +161,6 @@ def update(self, state: _launch_plan_models.LaunchPlanState): ) return _flyte_engine.get_client().update_launch_plan(self.id, state) - @_deprecated(reason="Use launch_with_literals instead", version="0.9.0") - def execute_with_literals( - self, - project, - domain, - literal_inputs, - name=None, - notification_overrides=None, - label_overrides=None, - annotation_overrides=None, - ): - """ - Deprecated. - """ - return self.launch_with_literals( - project, - domain, - literal_inputs, - name, - notification_overrides, - label_overrides, - annotation_overrides, - ) - @_exception_scopes.system_entry_point def launch_with_literals( self, @@ -220,7 +192,7 @@ def launch_with_literals( if disable_all: notification_overrides = None else: - notification_overrides = _uuid.NotificationList(notification_overrides or []) + notification_overrides = _execution_models.NotificationList(notification_overrides or []) disable_all = None client = _flyte_engine.get_client() diff --git a/flytekit/control_plane/nodes.py b/flytekit/control_plane/nodes.py index eac38847af..199c2cd510 100644 --- a/flytekit/control_plane/nodes.py +++ b/flytekit/control_plane/nodes.py @@ -21,6 +21,7 @@ from flytekit.core.type_engine import TypeEngine from flytekit.engines.flyte import engine as _flyte_engine from flytekit.interfaces.data import data_proxy as _data_proxy +from flytekit.models import literals as _literal_models from flytekit.models import node_execution as _node_execution_models from flytekit.models import task as _task_model from flytekit.models.core import execution as _execution_models @@ -207,7 +208,9 @@ def inputs(self) -> Dict[str, Any]: _common_utils.load_proto_from_file(_literals_pb2.LiteralMap, tmp_name) ) - self._inputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=input_map) + # TODO: need to convert flyte literals to python types. For now just use literals + # self._inputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=input_map) + self._inputs = input_map return self._inputs @property @@ -239,7 +242,9 @@ def outputs(self) -> Dict[str, Any]: output_map = _literal_models.LiteralMap.from_flyte_idl( _common_utils.load_proto_from_file(_literals_pb2.LiteralMap, tmp_name) ) - self._outputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=output_map) + # TODO: need to convert flyte literals to python types. For now just use literals + # self._outputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=output_map) + self._outputs = output_map return self._outputs @property diff --git a/flytekit/control_plane/workflow_execution.py b/flytekit/control_plane/workflow_execution.py index 1aafa4b081..2936d2b203 100644 --- a/flytekit/control_plane/workflow_execution.py +++ b/flytekit/control_plane/workflow_execution.py @@ -1,10 +1,11 @@ import os as _os -from typing import Dict, List +from typing import Any, Dict, List from flyteidl.core import literals_pb2 as _literals_pb2 from flytekit.clients.helpers import iterate_node_executions as _iterate_node_executions from flytekit.common import utils as _common_utils +from flytekit.common.exceptions import user as _user_exceptions from flytekit.common.mixins import artifact as _artifact from flytekit.control_plane import identifier as _core_identifier from flytekit.control_plane import nodes as _nodes @@ -15,6 +16,7 @@ from flytekit.models import execution as _execution_models from flytekit.models import filters as _filter_models from flytekit.models import literals as _literal_models +from flytekit.models.core import execution as _core_execution_models class FlyteWorkflowExecution(_execution_models.Execution, _artifact.ExecutionArtifact): @@ -48,8 +50,9 @@ def inputs(self) -> Dict[str, Any]: input_map = _literal_models.LiteralMap.from_flyte_idl( _common_utils.load_proto_from_file(_literals_pb2.Literalmap, tmp_name) ) - - self._inputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=input_map) + # TODO: need to convert flyte literals to python types. For now just use literals + # self._inputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=input_map) + self._inputs = input_map return self._inputs @property @@ -80,12 +83,13 @@ def outputs(self) -> Dict[str, Any]: output_map = _literal_models.LiteralMap.from_flyte_idl( _common_utils.load_proto_from_file(_literals_pb2.LiteralMap, tmp_name) ) - - self._outputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=output_map) - self._outputs + # TODO: need to convert flyte literals to python types. For now just use literals + # self._outputs = TypeEngine.literal_map_to_kwargs(ctx=FlyteContext.current_context(), lm=output_map) + self._outputs = output_map + return self._outputs @property - def error(self) -> _execution_models.ExecutionError: + def error(self) -> _core_execution_models.ExecutionError: """ If execution is in progress, raise an exception. Otherwise, return None if no error was present upon reaching completion. diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000..9fa7d30555 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = -p docker -p flyte diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/.gitignore b/tests/flytekit/integration/control_plane/mock_flyte_repo/.gitignore new file mode 100644 index 0000000000..9bf95ea680 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/.gitignore @@ -0,0 +1 @@ +*.pb \ No newline at end of file diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/README.md b/tests/flytekit/integration/control_plane/mock_flyte_repo/README.md new file mode 100644 index 0000000000..1972a7c658 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/README.md @@ -0,0 +1,4 @@ +# Mock Flyte Repo + +This is a trimmed down version of the [flytesnacks](https://github.com/flyteorg/flytesnacks) +repo for the purposes of local integration testing. diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/__init__.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/in_container.mk b/tests/flytekit/integration/control_plane/mock_flyte_repo/in_container.mk new file mode 100644 index 0000000000..15bc979759 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/in_container.mk @@ -0,0 +1,24 @@ +SERIALIZED_PB_OUTPUT_DIR := /tmp/output + +.PHONY: clean +clean: + rm -rf $(SERIALIZED_PB_OUTPUT_DIR)/* + +$(SERIALIZED_PB_OUTPUT_DIR): clean + mkdir -p $(SERIALIZED_PB_OUTPUT_DIR) + +.PHONY: serialize +serialize: $(SERIALIZED_PB_OUTPUT_DIR) + pyflyte --config /root/sandbox.config serialize workflows -f $(SERIALIZED_PB_OUTPUT_DIR) + +.PHONY: register +register: serialize + flyte-cli register-files -h ${FLYTE_HOST} ${INSECURE_FLAG} -p ${PROJECT} -d development -v ${VERSION} --kubernetes-service-account ${SERVICE_ACCOUNT} --output-location-prefix ${OUTPUT_DATA_PREFIX} $(SERIALIZED_PB_OUTPUT_DIR)/* + +.PHONY: fast_serialize +fast_serialize: $(SERIALIZED_PB_OUTPUT_DIR) + pyflyte --config /root/sandbox.config serialize fast workflows -f $(SERIALIZED_PB_OUTPUT_DIR) + +.PHONY: fast_register +fast_register: fast_serialize + flyte-cli fast-register-files -h ${FLYTE_HOST} ${INSECURE_FLAG} -p ${PROJECT} -d development --kubernetes-service-account ${SERVICE_ACCOUNT} --output-location-prefix ${OUTPUT_DATA_PREFIX} --additional-distribution-dir ${ADDL_DISTRIBUTION_DIR} $(SERIALIZED_PB_OUTPUT_DIR)/* diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Dockerfile b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Dockerfile new file mode 100644 index 0000000000..7e5d01829f --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Dockerfile @@ -0,0 +1,35 @@ +FROM python:3.8-slim-buster +LABEL org.opencontainers.image.source https://github.com/flyteorg/flytesnacks + +WORKDIR /root +ENV VENV /opt/venv +ENV LANG C.UTF-8 +ENV LC_ALL C.UTF-8 +ENV PYTHONPATH /root + +# This is necessary for opencv to work +RUN apt-get update && apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg build-essential + +# Install the AWS cli separately to prevent issues with boto being written over +RUN pip3 install awscli + +ENV VENV /opt/venv +# Virtual environment +RUN python3 -m venv ${VENV} +ENV PATH="${VENV}/bin:$PATH" + +# Install Python dependencies +COPY workflows/requirements.txt /root +RUN pip install -r /root/requirements.txt + +# Copy the makefile targets to expose on the container. This makes it easier to register +COPY in_container.mk /root/Makefile +COPY workflows/sandbox.config /root + +# Copy the actual code +COPY workflows /root/workflows + +# This tag is supplied by the build script and will be used to determine the version +# when registering tasks, workflows, and launch plans +ARG tag +ENV FLYTE_INTERNAL_IMAGE $tag diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Makefile b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Makefile new file mode 100644 index 0000000000..5812f4893c --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/Makefile @@ -0,0 +1,208 @@ +.SILENT: + +PREFIX=workflows + +# This is used by the image building script referenced below. Normally it just takes the directory name but in this +# case we want it to be called something else. +IMAGE_NAME=flytecookbook +export VERSION ?= $(shell git rev-parse HEAD) + +define PIP_COMPILE +pip-compile $(1) ${PIP_ARGS} --upgrade --verbose +endef + +# Set SANDBOX=1 to automatically fill in sandbox config +ifdef SANDBOX + +# The url for Flyte Control plane +export FLYTE_HOST ?= localhost:30081 + +# Overrides s3 url. This is solely needed for SANDBOX deployments. Shouldn't be overriden in production AWS S3. +export FLYTE_AWS_ENDPOINT ?= http://localhost:30084/ + +# Used to authenticate to s3. For a production AWS S3, it's discouraged to use keys and key ids. +export FLYTE_AWS_ACCESS_KEY_ID ?= minio + +# Used to authenticate to s3. For a production AWS S3, it's discouraged to use keys and key ids. +export FLYTE_AWS_SECRET_ACCESS_KEY ?= miniostorage + +# Used to publish artifacts for fast registration +export ADDL_DISTRIBUTION_DIR ?= s3://my-s3-bucket/fast/ + +# The base of where Blobs, Schemas and other offloaded types are, by default, serialized. +export OUTPUT_DATA_PREFIX ?= s3://my-s3-bucket/raw-data + +# Instructs flyte-cli commands to use insecure channel when communicating with Flyte's control plane. +# If you're port-forwarding your service or running the sandbox Flyte deployment, specify INSECURE=1 before your make command. +# If your Flyte Admin is behind SSL, don't specify anything. +ifndef INSECURE + export INSECURE_FLAG=-i +endif + +# The docker registry that should be used to push images. +# e.g.: +# export REGISTRY ?= ghcr.io/flyteorg +endif + +# The Flyte project that we want to register under +export PROJECT ?= flytesnacks + +# If the REGISTRY environment variable has been set, that means the image name will not just be tagged as +# flytecookbook: but rather, +# ghcr.io/flyteorg/flytecookbook: or whatever your REGISTRY is. +ifdef REGISTRY + FULL_IMAGE_NAME = ${REGISTRY}/${IMAGE_NAME} +endif +ifndef REGISTRY + FULL_IMAGE_NAME = ${IMAGE_NAME} +endif + +# If you are using a different service account on your k8s cluster, add SERVICE_ACCOUNT=my_account before your make command +ifndef SERVICE_ACCOUNT + SERVICE_ACCOUNT=default +endif + +.PHONY: help +help: ## show help message + @awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n"} /^[$$()% a-zA-Z_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST) + +.PHONY: debug +debug: + echo "IMAGE NAME ${IMAGE_NAME}" + echo "FULL IMAGE NAME ${FULL_IMAGE_NAME}" + echo "VERSION TAG ${VERSION}" + echo "REGISTRY ${REGISTRY}" + +TAGGED_IMAGE=${FULL_IMAGE_NAME}:${PREFIX}-${VERSION} + +# This should only be used by Admins to push images to the public Dockerhub repo. Make sure you +# specify REGISTRY=ghcr.io/flyteorg or your registry before the make command otherwise this won't actually push +# Also if you want to push the docker image for sagemaker consumption then +# specify ECR_REGISTRY +.PHONY: docker_push +docker_push: docker_build +ifdef REGISTRY + docker push ${TAGGED_IMAGE} +endif + +.PHONY: fmt +fmt: # Format code with black and isort + black . + isort . + +.PHONY: install-piptools +install-piptools: + pip install -U pip-tools + +.PHONY: setup +setup: install-piptools # Install requirements + pip-sync dev-requirements.txt + +.PHONY: lint +lint: # Run linters + flake8 . + +requirements.txt: export CUSTOM_COMPILE_COMMAND := $(MAKE) requirements.txt +requirements.txt: requirements.in install-piptools + $(call PIP_COMPILE,requirements.in) + +.PHONY: requirements +requirements: requirements.txt + +.PHONY: fast_serialize +fast_serialize: clean _pb_output + echo ${CURDIR} + docker run -it --rm \ + -e REGISTRY=${REGISTRY} \ + -e MAKEFLAGS=${MAKEFLAGS} \ + -e FLYTE_HOST=${FLYTE_HOST} \ + -e INSECURE_FLAG=${INSECURE_FLAG} \ + -e PROJECT=${PROJECT} \ + -e FLYTE_AWS_ENDPOINT=${FLYTE_AWS_ENDPOINT} \ + -e FLYTE_AWS_ACCESS_KEY_ID=${FLYTE_AWS_ACCESS_KEY_ID} \ + -e FLYTE_AWS_SECRET_ACCESS_KEY=${FLYTE_AWS_SECRET_ACCESS_KEY} \ + -e OUTPUT_DATA_PREFIX=${OUTPUT_DATA_PREFIX} \ + -e ADDL_DISTRIBUTION_DIR=${ADDL_DISTRIBUTION_DIR} \ + -e SERVICE_ACCOUNT=$(SERVICE_ACCOUNT) \ + -e VERSION=${VERSION} \ + -v ${CURDIR}/_pb_output:/tmp/output \ + -v ${CURDIR}:/root/$(shell basename $(CURDIR)) \ + ${TAGGED_IMAGE} make fast_serialize + +.PHONY: fast_register +fast_register: clean _pb_output ## Packages code and registers without building docker images. + @echo "Tagged Image: " + @echo ${TAGGED_IMAGE} + @echo ${CURDIR} + docker run -it --rm \ + --network host \ + -e REGISTRY=${REGISTRY} \ + -e MAKEFLAGS=${MAKEFLAGS} \ + -e FLYTE_HOST=${FLYTE_HOST} \ + -e INSECURE_FLAG=${INSECURE_FLAG} \ + -e PROJECT=${PROJECT} \ + -e FLYTE_AWS_ENDPOINT=${FLYTE_AWS_ENDPOINT} \ + -e FLYTE_AWS_ACCESS_KEY_ID=${FLYTE_AWS_ACCESS_KEY_ID} \ + -e FLYTE_AWS_SECRET_ACCESS_KEY=${FLYTE_AWS_SECRET_ACCESS_KEY} \ + -e OUTPUT_DATA_PREFIX=${OUTPUT_DATA_PREFIX} \ + -e ADDL_DISTRIBUTION_DIR=${ADDL_DISTRIBUTION_DIR} \ + -e SERVICE_ACCOUNT=$(SERVICE_ACCOUNT) \ + -e VERSION=${VERSION} \ + -v ${CURDIR}/_pb_output:/tmp/output \ + -v ${CURDIR}:/root/$(shell basename $(CURDIR)) \ + ${TAGGED_IMAGE} make fast_register + +.PHONY: docker_build +docker_build: + echo "Tagged Image: " + echo ${TAGGED_IMAGE} + docker build ../ --build-arg tag="${TAGGED_IMAGE}" -t "${TAGGED_IMAGE}" -f Dockerfile + +.PHONY: serialize +serialize: clean _pb_output docker_build + @echo ${VERSION} + @echo ${CURDIR} + docker run -it --rm \ + -e REGISTRY=${REGISTRY} \ + -e MAKEFLAGS=${MAKEFLAGS} \ + -e FLYTE_HOST=${FLYTE_HOST} \ + -e INSECURE_FLAG=${INSECURE_FLAG} \ + -e PROJECT=${PROJECT} \ + -e FLYTE_AWS_ENDPOINT=${FLYTE_AWS_ENDPOINT} \ + -e FLYTE_AWS_ACCESS_KEY_ID=${FLYTE_AWS_ACCESS_KEY_ID} \ + -e FLYTE_AWS_SECRET_ACCESS_KEY=${FLYTE_AWS_SECRET_ACCESS_KEY} \ + -e OUTPUT_DATA_PREFIX=${OUTPUT_DATA_PREFIX} \ + -e ADDL_DISTRIBUTION_DIR=${ADDL_DISTRIBUTION_DIR} \ + -e SERVICE_ACCOUNT=$(SERVICE_ACCOUNT) \ + -e VERSION=${VERSION} \ + -v ${CURDIR}/_pb_output:/tmp/output \ + ${TAGGED_IMAGE} make serialize + + +.PHONY: register +register: clean _pb_output docker_push + @echo ${VERSION} + @echo ${CURDIR} + docker run -it --rm \ + --network host \ + -e REGISTRY=${REGISTRY} \ + -e MAKEFLAGS=${MAKEFLAGS} \ + -e FLYTE_HOST=${FLYTE_HOST} \ + -e INSECURE_FLAG=${INSECURE_FLAG} \ + -e PROJECT=${PROJECT} \ + -e FLYTE_AWS_ENDPOINT=${FLYTE_AWS_ENDPOINT} \ + -e FLYTE_AWS_ACCESS_KEY_ID=${FLYTE_AWS_ACCESS_KEY_ID} \ + -e FLYTE_AWS_SECRET_ACCESS_KEY=${FLYTE_AWS_SECRET_ACCESS_KEY} \ + -e OUTPUT_DATA_PREFIX=${OUTPUT_DATA_PREFIX} \ + -e ADDL_DISTRIBUTION_DIR=${ADDL_DISTRIBUTION_DIR} \ + -e SERVICE_ACCOUNT=$(SERVICE_ACCOUNT) \ + -e VERSION=${VERSION} \ + -v ${CURDIR}/_pb_output:/tmp/output \ + ${TAGGED_IMAGE} make register + +_pb_output: + mkdir -p _pb_output + +.PHONY: clean +clean: + rm -rf _pb_output/* diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/__init__.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/__init__.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/basic_workflow.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/basic_workflow.py new file mode 100644 index 0000000000..49c42c5911 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/basic_workflow.py @@ -0,0 +1,54 @@ +""" +Write a simple workflow +------------------------------ + +Once you've had a handle on tasks, we can move to workflows. Workflow are the other basic building block of Flyte. + +Workflows string together two or more tasks. They are also written as Python functions, but it is important to make a +critical distinction between tasks and workflows. + +The body of a task's function runs at "run time", i.e. on the K8s cluster, using the task's container. The body of a +workflow is not used for computation, it is only used to structure the tasks, i.e. the output of ``t1`` is an input +of ``t2`` in the workflow below. As such, the body of workflows is run at "registration" time. Please refer to the +registration docs for additional information as well since it is actually a two-step process. + +Take a look at the conceptual `discussion `__ +behind workflows for additional information. + +""" +import typing + +from flytekit import task, workflow + + +@task +def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str): + return a + 2, "world" + + +@task +def t2(a: str, b: str) -> str: + return b + a + + +# %% +# You can treat the outputs of a task as you normally would a Python function. Assign the output to two variables +# and use them in subsequent tasks as normal. See :py:func:`flytekit.workflow` +@workflow +def my_wf(a: int, b: str) -> (int, str): + x, y = t1(a=a) + d = t2(a=y, b=b) + return x, d + + +# %% +# Execute the Workflow, simply by invoking it like a function and passing in +# the necessary parameters +# +# .. note:: +# +# One thing to remember, currently we only support ``Keyword arguments``. So +# every argument should be passed in the form ``arg=value``. Failure to do so +# will result in an error +if __name__ == "__main__": + print(f"Running my_wf(a=50, b='hello') {my_wf(a=50, b='hello')}") diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/files.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/files.py new file mode 100644 index 0000000000..027a5ec21e --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/files.py @@ -0,0 +1,68 @@ +""" +Work with files +------------------- + +Files are one of the most fundamental things that users of Python work with, and they are fully supported by Flyte. +In the IDL, they are known as `Blob `__ literals +and are backed by the `blob type `__ + +Note that the type message includes an optional ``format`` field which is a text-field used to denote the file extension. +""" + +import os +import urllib.request + +import cv2 + +import flytekit +from flytekit import task, workflow +from flytekit.types.file import FlyteFile + +# %% +# Let's assume our mission here is pretty simple. We want to take each of these links, download the picture, rotate it +# and return the file. +default_images = [ + "https://upload.wikimedia.org/wikipedia/commons/a/a8/Fractal_pyramid.jpg", + "https://upload.wikimedia.org/wikipedia/commons/thumb/a/ad/Julian_fractal.jpg/256px-Julian_fractal.jpg", +] + + +# %% +# Note the signature of the return type of this task is a ``FlyteFile``. Files do not have a native object in Python +# so we had to write one ourselves. There does exist the ``os.PathLike`` protocol, but nothing implements it. +# +# When this task finishes, the flytekit engine will detect the ``FlyteFile`` instance being returned, find a location +# in Flyte's object store (usually S3), upload the file to that location, and create a Blob literal pointing to it. +# +# Note that the ``FlyteFile`` literal is scoped with a string, which gets inserted into the format of the Blob type. +# The ``[]`` are entirely optional, and if you don't specify it, the format will just be an ``""``. +@task +def rotate(image_location: str) -> FlyteFile: + """ + Download the given image, rotate it by 180 degrees + """ + working_dir = flytekit.current_context().working_directory + local_image = os.path.join(working_dir, "incoming.jpg") + urllib.request.urlretrieve(image_location, local_image) + img = cv2.imread(local_image, 0) + if img is None: + raise Exception("Failed to read image") + (h, w) = img.shape[:2] + center = (w / 2, h / 2) + mat = cv2.getRotationMatrix2D(center, 180, 1) + res = cv2.warpAffine(img, mat, (w, h)) + out_path = os.path.join(working_dir, "rotated.jpg") + cv2.imwrite(out_path, res) + return FlyteFile["jpg"](path=out_path) + + +@workflow +def rotate_one_workflow(in_image: str) -> FlyteFile: + return rotate(image_location=in_image) + + +# %% +# Execute it +if __name__ == "__main__": + print(f"Running {__file__} main...") + print(f"Running rotate_one_workflow(in_image=default_images[0]) {rotate_one_workflow(in_image=default_images[0])}") diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/folders.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/folders.py new file mode 100644 index 0000000000..5b7d99b3c2 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/folders.py @@ -0,0 +1,77 @@ +""" +Work with folders +--------------------- + +Please also see the entry on files. After files, folders are the other fundamental operating system primitive users might find themselves working with. The Flyte IDL's support of folders take the form of `multi-part blobs `__. +""" +import os +import pathlib +import urllib.request + +import cv2 + +import flytekit +from flytekit import task, workflow +from flytekit.types.directory import FlyteDirectory + +# %% +# Playing on the same example used in the File chapter, this first task downloads a bunch of files into a directory, +# and then returns a Flyte object referencing them. +default_images = [ + "https://upload.wikimedia.org/wikipedia/commons/a/a8/Fractal_pyramid.jpg", + "https://upload.wikimedia.org/wikipedia/commons/thumb/a/ad/Julian_fractal.jpg/256px-Julian_fractal.jpg", +] + + +# %% +# This task downloads the two files above using non-Flyte libraries, and returns the path to the folder, in a FlyteDirectory object. +@task +def download_files() -> FlyteDirectory: + working_dir = flytekit.current_context().working_directory + pp = pathlib.Path(os.path.join(working_dir, "images")) + pp.mkdir(exist_ok=True) + for idx, remote_location in enumerate(default_images): + local_image = os.path.join(working_dir, "images", f"image_{idx}.jpg") + urllib.request.urlretrieve(remote_location, local_image) + + return FlyteDirectory(path=os.path.join(working_dir, "images")) + + +# %% +# This helper method is a purely Python function; no Flyte components here. +def rotate(local_image: str): + """ + In place rotation of the image + """ + img = cv2.imread(local_image, 0) + if img is None: + raise Exception("Failed to read image") + (h, w) = img.shape[:2] + center = (w / 2, h / 2) + mat = cv2.getRotationMatrix2D(center, 180, 1) + res = cv2.warpAffine(img, mat, (w, h)) + # out_path = os.path.join(working_dir, "rotated.jpg") + cv2.imwrite(local_image, res) + + +# %% +# This task accepts the previously downloaded folder, and calls the rotate function above on each. Since the rotate function does the image manipulation in place, we just create a new FlyteDirectory object pointed to the same place. +@task +def rotate_all(img_dir: FlyteDirectory) -> FlyteDirectory: + """ + Download the given image, rotate it by 180 degrees + """ + for img in [os.path.join(img_dir, x) for x in os.listdir(img_dir)]: + rotate(img) + return FlyteDirectory(path=img_dir.path) + + +@workflow +def download_and_rotate() -> FlyteDirectory: + directory = download_files() + return rotate_all(img_dir=directory) + + +if __name__ == "__main__": + print(f"Running {__file__} main...") + print(f"Running main {download_and_rotate()}") diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/hello_world.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/hello_world.py new file mode 100644 index 0000000000..d7ecfe6705 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/hello_world.py @@ -0,0 +1,41 @@ +""" +Hello World Workflow +-------------------- + +This simple workflow calls a task that returns "Hello World" and then just sets that as the final output of the workflow. + +""" +import typing + +from flytekit import task, workflow + + +# You can change the signature of the workflow to take in an argument like this: +# def say_hello(name: str) -> str: +@task +def say_hello() -> str: + return "hello world" + + +# %% +# You can treat the outputs of a task as you normally would a Python function. Assign the output to two variables +# and use them in subsequent tasks as normal. See :py:func:`flytekit.workflow` +# You can change the signature of the workflow to take in an argument like this: +# def my_wf(name: str) -> str: +@workflow +def my_wf() -> str: + res = say_hello() + return res + + +# %% +# Execute the Workflow, simply by invoking it like a function and passing in +# the necessary parameters +# +# .. note:: +# +# One thing to remember, currently we only support ``Keyword arguments``. So +# every argument should be passed in the form ``arg=value``. Failure to do so +# will result in an error +if __name__ == "__main__": + print(f"Running my_wf() {my_wf()}") diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/lp.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/lp.py new file mode 100644 index 0000000000..756cada1c4 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/lp.py @@ -0,0 +1,89 @@ +""" +.. _launch_plans: + +Launch Plans +---------------------- + +Launch plans bind a partial or complete list of inputs necessary to launch a workflow along +with optional run-time overrides such as notifications, schedules and more. +Launch plan inputs must only assign inputs already defined in the reference workflow definition. +""" + +import calendar + +# %% +# Launch plans are the only means for invoking workflow executions. +# By default, a 'default' launch plan will be created during the serialization (and registration process), +# which will optionally bind any default workflow inputs and any default runtime options specified in the project +# flytekit config (such as user role, etc). +# The following example creates a default launch plan with no inputs during serialization. +import datetime + +from flytekit import LaunchPlan, current_context, task, workflow + + +@task +def square(val: int) -> int: + return val * val + + +@workflow +def my_wf(val: int) -> int: + result = square(val=val) + return result + + +default_lp = LaunchPlan.get_default_launch_plan(current_context(), my_wf) +square_3 = default_lp(val=3) + +# %% +# The following shows how to specify a user-defined launch plan that defaults the value of 'val' to 4. +my_lp = LaunchPlan.create("default_4_lp", my_wf, default_inputs={"val": 4}) +square_4 = my_lp() +square_5 = my_lp(val=5) + +# %% +# In some cases you may want to **fix** launch plan inputs, such that they can't be overridden at execution call time. +my_fixed_lp = LaunchPlan.create("always_2_lp", my_wf, fixed_inputs={"val": 4}) +square_2 = my_fixed_lp() +# error: +# square_1 = my_fixed_lp(val=1) + +# %% +# Putting it all together +# ####################### +# +# Default and fixed inputs can all be used in combination together to simplify individual executions +# or even programmatic ones. +# Let's take a look at a trivial example where we enthusiastically greet each day of the upcoming week: + + +@task +def greet(day_of_week: str, number: int, am: bool) -> str: + greeting = "Have a great " + day_of_week + " " + greeting += "morning" if am else "evening" + return greeting + "!" * number + + +@workflow +def go_greet(day_of_week: str, number: int, am: bool = False) -> str: + return greet(day_of_week=day_of_week, number=number, am=am) + + +morning_greeting = LaunchPlan.create( + "morning_greeting", + go_greet, + fixed_inputs={"am": True}, + default_inputs={"number": 1}, +) + +# Let's see if we can convincingly pass a Turing test! +today = datetime.datetime.today() +for n in range(7): + day = today + datetime.timedelta(days=n) + weekday = calendar.day_name[day.weekday()] + if day.weekday() < 5: + print(morning_greeting(day_of_week=weekday)) + else: + # We're extra enthusiastic on weekends + print(morning_greeting(number=3, day_of_week=weekday)) diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/mocking.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/mocking.py new file mode 100644 index 0000000000..90fbf6d75b --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/mocking.py @@ -0,0 +1,71 @@ +""" +Mock Tasks for Testing +-------------------------- + +A lot of the tasks that you write you can run locally, but some of them you will not be able to, usually because they +are tasks that depend on a third-party only available on the backend. Hive tasks are a common example, as most users +will not have access to the service that executes Hive queries from their development environment. However, it's still +useful to be able to locally run a workflow that calls such a task. In these instances, flytekit provides a couple +of utilities to help navigate this. + +""" + +import datetime + +import pandas + +from flytekit import SQLTask, TaskMetadata, kwtypes, task, workflow +from flytekit.testing import patch, task_mock +from flytekit.types.schema import FlyteSchema + +# %% +# This is a generic SQL task (and is by default not hooked up to any datastore nor handled by any plugin), and must +# be mocked. +sql = SQLTask( + "my-query", + query_template="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds }}' LIMIT 10", + inputs=kwtypes(ds=datetime.datetime), + outputs=kwtypes(results=FlyteSchema), + metadata=TaskMetadata(retries=2), +) + + +# %% +# This is a task that can run locally +@task +def t1() -> datetime.datetime: + return datetime.datetime.now() + + +# %% +# Declare a workflow that chains these two tasks together. +@workflow +def my_wf() -> FlyteSchema: + dt = t1() + return sql(ds=dt) + + +# %% +# Without a mock, calling the workflow would typically raise an exception, but with the ``task_mock`` construct, which +# returns a ``MagicMock`` object, we can override the return value. +def main_1(): + with task_mock(sql) as mock: + mock.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) + assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + + +# %% +# There is another utility as well called ``patch`` which offers the same functionality, but in the traditional Python +# patching style, where the first argument is the ``MagicMock`` object. +def main_2(): + @patch(sql) + def test_user_demo_test(mock_sql): + mock_sql.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) + assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + + test_user_demo_test() + + +if __name__ == "__main__": + main_1() + main_2() diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task.py new file mode 100644 index 0000000000..14601b88bf --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task.py @@ -0,0 +1,55 @@ +""" +Tasks +------ + +This example shows how to write a task in flytekit python. +Recap: In Flyte a task is a fundamental building block and an extension point. Flyte has multiple plugins for tasks, +which can be either a backend-plugin or can be a simple extension that is available in flytekit. + +A task in flytekit can be 2 types: + +1. A task that has a python function associated with it. The execution of the task would be an execution of this + function +#. A task that does not have a python function, for e.g a SQL query or some other portable task like Sagemaker prebuilt + algorithms, or something that just invokes an API + +This section will talk about how to write a Python Function task. Other type of tasks will be covered in later sections + +""" +# %% +# For any task in flyte, there is always one required import +from flytekit import task + + +# %% +# A ``PythonFunctionTask`` must always be decorated with the ``@task`` ``flytekit.task`` decorator. +# The task itself is a regular python function, with one exception, it needs all the inputs and outputs to be clearly +# annotated with the types. The types are regular python types; we'll go over more on this in the type-system section. +# :py:func:`flytekit.task` +@task +def square(n: int) -> int: + """ + Parameters: + n (int): name of the parameter for the task will be derived from the name of the input variable + the type will be automatically deduced to be Types.Integer + + Return: + int: The label for the output will be automatically assigned and type will be deduced from the annotation + + """ + return n * n + + +# %% +# In this task, one input is ``n`` which has type ``int``. +# the task ``square`` takes the number ``n`` and returns a new integer (squared value) +# +# .. note:: +# +# Flytekit will assign a default name to the output variable like ``out0`` +# In case of multiple outputs, each output will be numbered in the order +# starting with 0. For e.g. -> ``out0, out1, out2, ...`` +# +# Flyte tasks can be executed like normal functions +if __name__ == "__main__": + print(square(n=10)) diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task_cache.py b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task_cache.py new file mode 100644 index 0000000000..d524db43e4 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/basic/task_cache.py @@ -0,0 +1,48 @@ +""" +Task Cache +---------------- + +Flyte provides the ability to cache the output of task executions in order to make subsequent executions faster. A well-behaved Flyte Task should generate deterministic output given the same inputs and task functionality. +This is useful in situations where a user knows that many executions with the exact same inputs can occur. For example, your task may be periodically run on a schedule, run multiple times when debugging workflows, or +commonly shared across different workflows but receive the same inputs. +""" + +# %% +# For any task in flyte, there is always one required import +from flytekit import task + + +# %% +# Task caching is disabled by default. This is to avoid unintended consequences of caching tasks with side-effects. To enable caching and control its behavior, use the `cache` and `cache_version` parameters when constructing +# a task. `cache` controls whether caching is enabled or disabled overall and `cache_version` controls which version of the cache is used. Bumping this version is akin to invalidating the cache, the next execution of that task +# with the same input/interface will actually run, and the results will be cached so that subsequent executions are just replayed from the cache. +# +# :py:func:`flytekit.task` +@task(cache=True, cache_version="1.0") +def square(n: int) -> int: + """ + Parameters: + n (int): name of the parameter for the task will be derived from the name of the input variable + the type will be automatically deduced to be Types.Integer + + Return: + int: The label for the output will be automatically assigned and type will be deduced from the annotation + + """ + return n * n + + +# %% +# In the above example, calling `square(n=2)` twice (even if it's across # different executions or different workflows), will only actually execute the multiplication operation once. The second time the output will be made +# available immediately - (captured from the previous execution with the same inputs) + +# %% +# If, in a subsequent code update, we update the signature of the task to return the original number along with the result, it'll automatically invalidate the cache (even though the cache version remains the same). :: +# +# :py:func:`flytekit.task` +# @task(cache=True, cache_version="1.0") +# def square(n: int) -> (int, int): +# ... + +# %% +# To read more about Task caching and how a unique signature is calculated, please proceed to the `Task Cache documentation `__. diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.in b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.in new file mode 100644 index 0000000000..f7d015b843 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.in @@ -0,0 +1,4 @@ +flytekit>=0.17.0b0 +wheel +matplotlib +opencv-python diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.txt b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.txt new file mode 100644 index 0000000000..7b2b880306 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/requirements.txt @@ -0,0 +1,136 @@ +# +# This file is autogenerated by pip-compile +# To update, run: +# +# /Library/Developer/CommandLineTools/usr/bin/make requirements.txt +# +attrs==20.3.0 + # via scantree +certifi==2020.12.5 + # via requests +chardet==4.0.0 + # via requests +click==7.1.2 + # via flytekit +croniter==1.0.10 + # via flytekit +cycler==0.10.0 + # via matplotlib +dataclasses-json==0.5.2 + # via flytekit +decorator==5.0.4 + # via retry +deprecated==1.2.12 + # via flytekit +dirhash==0.2.1 + # via flytekit +docker-image-py==0.1.10 + # via flytekit +flyteidl==0.18.31 + # via flytekit +flytekit==0.17.0 + # via -r ../common/requirements-common.in +grpcio==1.36.1 + # via flytekit +idna==2.10 + # via requests +importlib-metadata==3.10.0 + # via keyring +keyring==23.0.1 + # via flytekit +kiwisolver==1.3.1 + # via matplotlib +marshmallow-enum==1.5.1 + # via dataclasses-json +marshmallow==3.11.1 + # via + # dataclasses-json + # marshmallow-enum +matplotlib==3.4.1 + # via -r ../common/requirements-common.in +mypy-extensions==0.4.3 + # via typing-inspect +natsort==7.1.1 + # via flytekit +numpy==1.20.2 + # via + # matplotlib + # opencv-python + # pandas + # pyarrow +opencv-python==4.5.1.48 + # via -r requirements.in +pandas==1.2.3 + # via flytekit +pathspec==0.8.1 + # via scantree +pillow==8.2.0 + # via matplotlib +protobuf==3.15.7 + # via + # flyteidl + # flytekit +py==1.10.0 + # via retry +pyarrow==3.0.0 + # via flytekit +pyparsing==2.4.7 + # via matplotlib +python-dateutil==2.8.1 + # via + # croniter + # flytekit + # matplotlib + # pandas +pytimeparse==1.1.8 + # via flytekit +pytz==2018.4 + # via + # flytekit + # pandas +regex==2021.3.17 + # via docker-image-py +requests==2.25.1 + # via + # flytekit + # responses +responses==0.13.2 + # via flytekit +retry==0.9.2 + # via flytekit +scantree==0.0.1 + # via dirhash +six==1.15.0 + # via + # cycler + # flytekit + # grpcio + # protobuf + # python-dateutil + # responses + # scantree +sortedcontainers==2.3.0 + # via flytekit +statsd==3.3.0 + # via flytekit +stringcase==1.2.0 + # via dataclasses-json +typing-extensions==3.7.4.3 + # via typing-inspect +typing-inspect==0.6.0 + # via dataclasses-json +urllib3==1.25.11 + # via + # flytekit + # requests + # responses +wheel==0.36.2 + # via + # -r ../common/requirements-common.in + # flytekit +wrapt==1.12.1 + # via + # deprecated + # flytekit +zipp==3.4.1 + # via importlib-metadata diff --git a/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/sandbox.config b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/sandbox.config new file mode 100644 index 0000000000..da3362a4b0 --- /dev/null +++ b/tests/flytekit/integration/control_plane/mock_flyte_repo/workflows/sandbox.config @@ -0,0 +1,7 @@ +[sdk] +workflow_packages=workflows +python_venv=flytekit_venv + +[auth] +assumable_iam_role=arn:aws:iam::173840052742:role/flytefunctionaltestsbatchworker-production-iad +raw_output_data_prefix=s3://lyft-modelbuilder/cookbook diff --git a/tests/flytekit/integration/control_plane/test_workflow.py b/tests/flytekit/integration/control_plane/test_workflow.py new file mode 100644 index 0000000000..71727c5c8c --- /dev/null +++ b/tests/flytekit/integration/control_plane/test_workflow.py @@ -0,0 +1,57 @@ +import os +import pathlib +import time + +import pytest + +from flytekit.control_plane import launch_plan +from flytekit.models import literals + +PROJECT = "flytesnacks" + + +@pytest.fixture(scope="session") +def flyte_workflows_source_dir(): + return pathlib.Path(os.path.dirname(__file__)) / "mock_flyte_repo" + + +@pytest.fixture(scope="session") +def flyte_workflows_register(docker_compose): + docker_compose.execute( + f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{os.getpid()} " + "backend make -C workflows register" + ) + + +def test_client(flyteclient): + projects = flyteclient.list_projects_paginated(limit=5, token=None) + assert len(projects) <= 5 + + +def test_launch_workflow(flyteclient, flyte_workflows_register): + execution = launch_plan.FlyteLaunchPlan.fetch( + PROJECT, "development", "workflows.basic.basic_workflow.my_wf", f"v{os.getpid()}" + ).launch_with_literals( + PROJECT, + "development", + literals.LiteralMap( + { + "a": literals.Literal(literals.Scalar(literals.Primitive(integer=10))), + "b": literals.Literal(literals.Scalar(literals.Primitive(string_value="foobar"))), + } + ), + ) + + for _ in range(20): + if not execution.is_complete: + time.sleep(0.5) + execution.sync() + if execution.node_executions is not None and len(execution.node_executions) > 1: + execution.node_executions["n0"] + # TODO: monitor and inspect node executions + continue + else: + break + + assert execution.outputs.literals["o0"].scalar.primitive.integer == 12 + assert execution.outputs.literals["o1"].scalar.primitive.string_value == "foobarworld"