From 64cce006432bbd6c523cdd9d53390f0ce181ddde Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 25 Aug 2020 16:41:49 -0700 Subject: [PATCH 1/8] wip --- flytekit/bin/entrypoint.py | 12 ++++++++---- flytekit/common/tasks/sdk_runnable.py | 14 +++++++++++++- flytekit/engines/common.py | 7 ++++++- flytekit/engines/flyte/engine.py | 5 +++-- flytekit/interfaces/data/data_proxy.py | 11 ++++++----- flytekit/interfaces/data/gcs/gcs_proxy.py | 16 +++++++++++++++- flytekit/interfaces/data/s3/s3proxy.py | 21 +++++++++++++++++++-- 7 files changed, 70 insertions(+), 16 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 7ad447de4a..cd7c84dd9b 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -53,7 +53,7 @@ def _map_job_index_to_child_index(local_input_dir, datadir, index): @_scopes.system_entry_point -def _execute_task(task_module, task_name, inputs, output_prefix, test): +def _execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test): with _TemporaryConfiguration(_internal_config.CONFIGURATION_PATH.get()): with _utils.AutoDeletingTempDir("input_dir") as input_dir: # Load user code @@ -83,7 +83,10 @@ def _execute_task(task_module, task_name, inputs, output_prefix, test): _data_proxy.Data.get_data(inputs, local_inputs_file) input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) _engine_loader.get_engine().get_task(task_def).execute( - _literal_models.LiteralMap.from_flyte_idl(input_proto), context={"output_prefix": output_prefix}, + _literal_models.LiteralMap.from_flyte_idl(input_proto), context={ + "output_prefix": output_prefix, + "raw_output_data_prefix": raw_output_data_prefix, + }, ) @@ -97,10 +100,11 @@ def _pass_through(): @_click.option("--task-name", required=True) @_click.option("--inputs", required=True) @_click.option("--output-prefix", required=True) +@_click.option("--raw-data-output-prefix", required=False) @_click.option("--test", is_flag=True) -def execute_task_cmd(task_module, task_name, inputs, output_prefix, test): +def execute_task_cmd(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test): _click.echo(_utils.get_version_message()) - _execute_task(task_module, task_name, inputs, output_prefix, test) + _execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test) if __name__ == "__main__": diff --git a/flytekit/common/tasks/sdk_runnable.py b/flytekit/common/tasks/sdk_runnable.py index f868475037..7fa7c4db99 100644 --- a/flytekit/common/tasks/sdk_runnable.py +++ b/flytekit/common/tasks/sdk_runnable.py @@ -32,12 +32,13 @@ class ExecutionParameters(object): decorated function. """ - def __init__(self, execution_date, tmp_dir, stats, execution_id, logging): + def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_data_prefix=None): self._stats = stats self._execution_date = execution_date self._working_directory = tmp_dir self._execution_id = execution_id self._logging = logging + self._raw_output_data_prefix = raw_output_data_prefix @property def stats(self): @@ -102,6 +103,16 @@ def execution_id(self): """ return self._execution_id + @property + def raw_output_data_prefix(self) -> str: + """ + This is the prefix/location that offloaded data structures like Blobs and Schemas will use to store data. You + shouldn't need to access this from within a task, but it's here just in case + + :rtype: Text + """ + return self._raw_output_data_prefix + class SdkRunnableContainer(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _task_models.Container)): def __init__( @@ -338,6 +349,7 @@ def _execute_user_code(self, context, inputs): stats=context.stats, logging=context.logging, tmp_dir=context.working_directory, + raw_output_data_prefix=context.raw_output_data_prefix, ), **inputs ) diff --git a/flytekit/engines/common.py b/flytekit/engines/common.py index 68c8c6f1d4..48d7864c77 100644 --- a/flytekit/engines/common.py +++ b/flytekit/engines/common.py @@ -396,12 +396,13 @@ def fetch_workflow(self, workflow_id): class EngineContext(object): - def __init__(self, execution_date, tmp_dir, stats, execution_id, logging): + def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_data_prefix=None): self._stats = stats self._execution_date = execution_date self._working_directory = tmp_dir self._execution_id = execution_id self._logging = logging + self._raw_output_data_prefix = raw_output_data_prefix @property def stats(self): @@ -437,3 +438,7 @@ def execution_id(self): :rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier """ return self._execution_id + + @property + def raw_output_data_prefix(self) -> str: + return self._raw_output_data_prefix diff --git a/flytekit/engines/flyte/engine.py b/flytekit/engines/flyte/engine.py index 458eca15c8..f62e226ffa 100644 --- a/flytekit/engines/flyte/engine.py +++ b/flytekit/engines/flyte/engine.py @@ -270,11 +270,11 @@ def execute(self, inputs, context=None): :param dict[Text, Text] context: :rtype: dict[Text, flytekit.models.common.FlyteIdlEntity] """ - with _common_utils.AutoDeletingTempDir("engine_dir") as temp_dir: with _common_utils.AutoDeletingTempDir("task_dir") as task_dir: with _data_proxy.LocalWorkingDirectoryContext(task_dir): - with _data_proxy.RemoteDataContext(): + raw_output_data_prefix = context.get("raw_output_data_prefix", None) + with _data_proxy.RemoteDataContext(raw_output_data_prefix_override=raw_output_data_prefix): output_file_dict = dict() # This sets the logging level for user code and is the only place an sdk setting gets @@ -311,6 +311,7 @@ def execute(self, inputs, context=None): ), logging=_logging, tmp_dir=task_dir, + raw_output_data_prefix=context['raw_output_data_prefix'] if "raw_output_data_prefix" in context else None, ), inputs, ) diff --git a/flytekit/interfaces/data/data_proxy.py b/flytekit/interfaces/data/data_proxy.py index e51659cdd0..9a24135204 100644 --- a/flytekit/interfaces/data/data_proxy.py +++ b/flytekit/interfaces/data/data_proxy.py @@ -64,22 +64,23 @@ def __init__(self, sandbox): class RemoteDataContext(_OutputDataContext): _CLOUD_PROVIDER_TO_PROXIES = { - _constants.CloudProvider.AWS: _s3proxy.AwsS3Proxy(), - _constants.CloudProvider.GCP: _gcs_proxy.GCSProxy(), + _constants.CloudProvider.AWS: _s3proxy.AwsS3Proxy, + _constants.CloudProvider.GCP: _gcs_proxy.GCSProxy, } - def __init__(self, cloud_provider=None): + def __init__(self, cloud_provider=None, raw_output_data_prefix_override=None): """ :param Optional[Text] cloud_provider: From flytekit.common.constants.CloudProvider enum """ cloud_provider = cloud_provider or _platform_config.CLOUD_PROVIDER.get() - proxy = type(self)._CLOUD_PROVIDER_TO_PROXIES.get(cloud_provider, None) - if proxy is None: + proxy_class = type(self)._CLOUD_PROVIDER_TO_PROXIES.get(cloud_provider, None) + if proxy_class is None: raise _user_exception.FlyteAssertion( "Configured cloud provider is not supported for data I/O. Received: {}, expected one of: {}".format( cloud_provider, list(type(self)._CLOUD_PROVIDER_TO_PROXIES.keys()) ) ) + proxy = proxy_class(raw_output_data_prefix_override) super(RemoteDataContext, self).__init__(proxy) diff --git a/flytekit/interfaces/data/gcs/gcs_proxy.py b/flytekit/interfaces/data/gcs/gcs_proxy.py index 2313b38136..f8b99dbb36 100644 --- a/flytekit/interfaces/data/gcs/gcs_proxy.py +++ b/flytekit/interfaces/data/gcs/gcs_proxy.py @@ -28,6 +28,19 @@ def _amend_path(path): class GCSProxy(_common_data.DataProxy): _GS_UTIL_CLI = "gsutil" + def __init__(self, raw_output_data_prefix_override: str = None): + """ + :param raw_output_data_prefix_override: Instead of relying on the AWS or GCS configuration (see + S3_SHARD_FORMATTER for AWS and GCS_PREFIX for GCP) setting when computing the shard + path (_get_shard_path), use this prefix instead as a base. This code assumes that the + path passed in is correct. That is, an S3 path won't be passed in when running on GCP. + """ + self._raw_output_data_prefix_override = raw_output_data_prefix_override + + @property + def raw_output_data_prefix_override(self) -> str: + return self._raw_output_data_prefix_override + @staticmethod def _check_binary(): """ @@ -124,7 +137,8 @@ def get_random_path(self): :rtype: Text """ key = _uuid.UUID(int=_flyte_random.random.getrandbits(128)).hex - return _os.path.join(_gcp_config.GCS_PREFIX.get(), key) + prefix = self.raw_output_data_prefix_override or _gcp_config.GCS_PREFIX.get() + return _os.path.join(prefix, key) def get_random_directory(self): """ diff --git a/flytekit/interfaces/data/s3/s3proxy.py b/flytekit/interfaces/data/s3/s3proxy.py index 8ed560b35d..5948b7d59f 100644 --- a/flytekit/interfaces/data/s3/s3proxy.py +++ b/flytekit/interfaces/data/s3/s3proxy.py @@ -41,6 +41,19 @@ class AwsS3Proxy(_common_data.DataProxy): _AWS_CLI = "aws" _SHARD_CHARACTERS = [_text_type(x) for x in _six_moves.range(10)] + list(_string.ascii_lowercase) + def __init__(self, raw_output_data_prefix_override: str = None): + """ + :param raw_output_data_prefix_override: Instead of relying on the AWS or GCS configuration (see + S3_SHARD_FORMATTER for AWS and GCS_PREFIX for GCP) setting when computing the shard + path (_get_shard_path), use this prefix instead as a base. This code assumes that the + path passed in is correct. That is, an S3 path won't be passed in when running on GCP. + """ + self._raw_output_data_prefix_override = raw_output_data_prefix_override + + @property + def raw_output_data_prefix_override(self) -> str: + return self._raw_output_data_prefix_override + @staticmethod def _check_binary(): """ @@ -179,10 +192,14 @@ def get_random_directory(self): """ return self.get_random_path() + "/" - def _get_shard_path(self): + def _get_shard_path(self) -> str: """ - :rtype: Text + If this object was created with a raw output data prefix, usually set by Propeller/Plugins at execution time + and piped all the way here, it will be used instead of referencing the S3 shard configuration. """ + if self.raw_output_data_prefix_override: + return self.raw_output_data_prefix_override + shard = "" for _ in _six_moves.range(_aws_config.S3_SHARD_STRING_LENGTH.get()): shard += _flyte_random.random.choice(self._SHARD_CHARACTERS) From 45e908e596c6c59c1e3d5ff693aa47e18ac834fd Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 25 Aug 2020 16:47:19 -0700 Subject: [PATCH 2/8] comments and stuff --- flytekit/common/tasks/sdk_runnable.py | 14 +------------- flytekit/interfaces/data/gcs/gcs_proxy.py | 5 +++-- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/flytekit/common/tasks/sdk_runnable.py b/flytekit/common/tasks/sdk_runnable.py index 7fa7c4db99..f868475037 100644 --- a/flytekit/common/tasks/sdk_runnable.py +++ b/flytekit/common/tasks/sdk_runnable.py @@ -32,13 +32,12 @@ class ExecutionParameters(object): decorated function. """ - def __init__(self, execution_date, tmp_dir, stats, execution_id, logging, raw_output_data_prefix=None): + def __init__(self, execution_date, tmp_dir, stats, execution_id, logging): self._stats = stats self._execution_date = execution_date self._working_directory = tmp_dir self._execution_id = execution_id self._logging = logging - self._raw_output_data_prefix = raw_output_data_prefix @property def stats(self): @@ -103,16 +102,6 @@ def execution_id(self): """ return self._execution_id - @property - def raw_output_data_prefix(self) -> str: - """ - This is the prefix/location that offloaded data structures like Blobs and Schemas will use to store data. You - shouldn't need to access this from within a task, but it's here just in case - - :rtype: Text - """ - return self._raw_output_data_prefix - class SdkRunnableContainer(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _task_models.Container)): def __init__( @@ -349,7 +338,6 @@ def _execute_user_code(self, context, inputs): stats=context.stats, logging=context.logging, tmp_dir=context.working_directory, - raw_output_data_prefix=context.raw_output_data_prefix, ), **inputs ) diff --git a/flytekit/interfaces/data/gcs/gcs_proxy.py b/flytekit/interfaces/data/gcs/gcs_proxy.py index f8b99dbb36..3b40e664d3 100644 --- a/flytekit/interfaces/data/gcs/gcs_proxy.py +++ b/flytekit/interfaces/data/gcs/gcs_proxy.py @@ -132,9 +132,10 @@ def upload_directory(self, local_path, remote_path): ) return _update_cmd_config_and_execute(cmd) - def get_random_path(self): + def get_random_path(self) -> str: """ - :rtype: Text + If this object was created with a raw output data prefix, usually set by Propeller/Plugins at execution time + and piped all the way here, it will be used instead of referencing the GCS_PREFIX configuration. """ key = _uuid.UUID(int=_flyte_random.random.getrandbits(128)).hex prefix = self.raw_output_data_prefix_override or _gcp_config.GCS_PREFIX.get() From 39567bd921e404faa41c8bf6b18ca2b0a0a18428 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 25 Aug 2020 16:54:57 -0700 Subject: [PATCH 3/8] oops, plugins not done yet? --- flytekit/common/tasks/sdk_runnable.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flytekit/common/tasks/sdk_runnable.py b/flytekit/common/tasks/sdk_runnable.py index f868475037..02367460ad 100644 --- a/flytekit/common/tasks/sdk_runnable.py +++ b/flytekit/common/tasks/sdk_runnable.py @@ -447,6 +447,8 @@ def _get_container_definition( "{{.input}}", "--output-prefix", "{{.outputPrefix}}", + "--raw-data-output-prefix", + "{{.outputPrefix}}" ], resources=_task_models.Resources(limits=limits, requests=requests), env=environment, From 9d033bb38e7ee37aca076024b60a53cc6bdd69f6 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Aug 2020 12:20:38 -0700 Subject: [PATCH 4/8] make fmt and couple tests --- flytekit/bin/entrypoint.py | 14 +++++--- flytekit/common/tasks/sdk_runnable.py | 2 +- flytekit/contrib/notebook/tasks.py | 2 ++ flytekit/engines/flyte/engine.py | 4 ++- .../unit/bin/test_python_entrypoint.py | 32 ++++++++++++++++++- 5 files changed, 46 insertions(+), 8 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index cd7c84dd9b..cc0571252c 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -83,10 +83,8 @@ def _execute_task(task_module, task_name, inputs, output_prefix, raw_output_data _data_proxy.Data.get_data(inputs, local_inputs_file) input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) _engine_loader.get_engine().get_task(task_def).execute( - _literal_models.LiteralMap.from_flyte_idl(input_proto), context={ - "output_prefix": output_prefix, - "raw_output_data_prefix": raw_output_data_prefix, - }, + _literal_models.LiteralMap.from_flyte_idl(input_proto), + context={"output_prefix": output_prefix, "raw_output_data_prefix": raw_output_data_prefix,}, ) @@ -100,10 +98,16 @@ def _pass_through(): @_click.option("--task-name", required=True) @_click.option("--inputs", required=True) @_click.option("--output-prefix", required=True) -@_click.option("--raw-data-output-prefix", required=False) +@_click.option("--raw-output-data-prefix", required=False) @_click.option("--test", is_flag=True) def execute_task_cmd(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test): _click.echo(_utils.get_version_message()) + # Backwards compatibility - if Propeller hasn't filled this in, then it'll come through here as the original + # template string, so let's explicitly set it to None so that the downstream functions will know to fall back + # to the original shard formatter/prefix config. + if raw_output_data_prefix == "{{.rawOutputDataPrefix}}": + raw_output_data_prefix = None + _execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test) diff --git a/flytekit/common/tasks/sdk_runnable.py b/flytekit/common/tasks/sdk_runnable.py index 02367460ad..a67b612ec7 100644 --- a/flytekit/common/tasks/sdk_runnable.py +++ b/flytekit/common/tasks/sdk_runnable.py @@ -448,7 +448,7 @@ def _get_container_definition( "--output-prefix", "{{.outputPrefix}}", "--raw-data-output-prefix", - "{{.outputPrefix}}" + "{{.rawOutputDataPrefix}}", ], resources=_task_models.Resources(limits=limits, requests=requests), env=environment, diff --git a/flytekit/contrib/notebook/tasks.py b/flytekit/contrib/notebook/tasks.py index e78ae859ed..8a15016d87 100644 --- a/flytekit/contrib/notebook/tasks.py +++ b/flytekit/contrib/notebook/tasks.py @@ -329,6 +329,8 @@ def container(self): "{{.input}}", "--output-prefix", "{{.outputPrefix}}", + "--raw-data-output-prefix", + "{{.rawOutputDataPrefix}}", ] return self._container diff --git a/flytekit/engines/flyte/engine.py b/flytekit/engines/flyte/engine.py index f62e226ffa..32b7c39ea4 100644 --- a/flytekit/engines/flyte/engine.py +++ b/flytekit/engines/flyte/engine.py @@ -311,7 +311,9 @@ def execute(self, inputs, context=None): ), logging=_logging, tmp_dir=task_dir, - raw_output_data_prefix=context['raw_output_data_prefix'] if "raw_output_data_prefix" in context else None, + raw_output_data_prefix=context["raw_output_data_prefix"] + if "raw_output_data_prefix" in context + else None, ), inputs, ) diff --git a/tests/flytekit/unit/bin/test_python_entrypoint.py b/tests/flytekit/unit/bin/test_python_entrypoint.py index ea2ac10568..eb948e947c 100644 --- a/tests/flytekit/unit/bin/test_python_entrypoint.py +++ b/tests/flytekit/unit/bin/test_python_entrypoint.py @@ -2,6 +2,7 @@ import os +import mock import six from click.testing import CliRunner from flyteidl.core import literals_pb2 as _literals_pb2 @@ -38,6 +39,7 @@ def test_single_step_entrypoint_in_proc(): _task_defs.add_one.task_function_name, input_file, output_dir.name, + output_dir.name, False, ) @@ -113,7 +115,12 @@ def test_arrayjob_entrypoint_in_proc(): os.environ["AWS_BATCH_JOB_ARRAY_INDEX"] = "0" _execute_task( - _task_defs.add_one.task_module, _task_defs.add_one.task_function_name, dir.name, dir.name, False, + _task_defs.add_one.task_module, + _task_defs.add_one.task_function_name, + dir.name, + dir.name, + dir.name, + False, ) raw_map = _type_helpers.unpack_literal_map_to_sdk_python_std( @@ -132,3 +139,26 @@ def test_arrayjob_entrypoint_in_proc(): os.environ["BATCH_JOB_ARRAY_INDEX_VAR_NAME"] = orig_env_index_var_name if orig_env_array_index: os.environ["AWS_BATCH_JOB_ARRAY_INDEX"] = orig_env_array_index + + +@mock.patch("flytekit.bin.entrypoint._execute_task") +def test_backwards_compatible_replacement(mock_execute_task): + def return_args(*args, **kwargs): + assert args[4] is None + + mock_execute_task.side_effect = return_args + + with _TemporaryConfiguration( + os.path.join(os.path.dirname(__file__), "fake.config"), + internal_overrides={"project": "test", "domain": "development"}, + ): + with _utils.AutoDeletingTempDir("in") as input_dir: + with _utils.AutoDeletingTempDir("out") as output_dir: + cmd = [] + cmd.extend(["--task-module", "fake"]) + cmd.extend(["--task-name", "fake"]) + cmd.extend(["--inputs", "fake"]) + cmd.extend(["--output-prefix", "fake"]) + cmd.extend(["--raw-output-data-prefix", "{{.rawOutputDataPrefix}}"]) + result = CliRunner().invoke(execute_task_cmd, cmd) + assert result.exit_code == 0 From 2c8b7e92c43bcab4b99bfc7b20c636d208d61d2f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Aug 2020 13:01:52 -0700 Subject: [PATCH 5/8] more tests --- .../interfaces/data/gcs/test_gcs_proxy.py | 13 +++++++-- .../unit/interfaces/data/s3/test_s3_proxy.py | 27 +++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py diff --git a/tests/flytekit/unit/interfaces/data/gcs/test_gcs_proxy.py b/tests/flytekit/unit/interfaces/data/gcs/test_gcs_proxy.py index d1b29232d1..ea799ccf4e 100644 --- a/tests/flytekit/unit/interfaces/data/gcs/test_gcs_proxy.py +++ b/tests/flytekit/unit/interfaces/data/gcs/test_gcs_proxy.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import os as _os import mock as _mock @@ -69,3 +67,14 @@ def test_upload_directory_with_parallelism(mock_update_cmd_config_and_execute, g local_path, remote_path = "/foo/*", "gs://bar/0/" gcs_proxy.upload_directory(local_path, remote_path) mock_update_cmd_config_and_execute.assert_called_once_with(["gsutil", "-m", "cp", "-r", local_path, remote_path]) + + +def test_raw_prefix_property(mock_update_cmd_config_and_execute, gsutil_parallelism, gcs_proxy): + gcs_with_raw_prefix = _gcs_proxy.GCSProxy("gcs://stuff") + assert gcs_with_raw_prefix.raw_output_data_prefix_override == "gcs://stuff" + + +def test_random_path(mock_update_cmd_config_and_execute, gsutil_parallelism, gcs_proxy): + gcs_with_raw_prefix = _gcs_proxy.GCSProxy("gcs://stuff") + result = gcs_with_raw_prefix.get_random_path() + assert result.startswith("gcs://stuff") diff --git a/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py b/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py new file mode 100644 index 0000000000..7f12f112d4 --- /dev/null +++ b/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py @@ -0,0 +1,27 @@ +import os as _os + +import mock as _mock +import pytest as _pytest +from flytekit.configuration import aws as _aws_config + +from flytekit.interfaces.data.s3.s3proxy import AwsS3Proxy as _AwsS3Proxy + + +def test_property(): + aws = _AwsS3Proxy("s3://raw-output") + assert aws.raw_output_data_prefix_override == "s3://raw-output" + + +@_mock.patch("flytekit.configuration.aws.S3_SHARD_FORMATTER") +def test_random_path(mock_formatter): + mock_formatter.get.return_value = "s3://flyte/{}/" + + # Without raw output data prefix override + aws = _AwsS3Proxy() + p = str(aws.get_random_path()) + assert p.startswith("s3://flyte") + + # With override + aws = _AwsS3Proxy("s3://raw-output") + p = str(aws.get_random_path()) + assert p.startswith("s3://raw-output") From 49b1a14fc8041f18108a8503ff50535aa16de1ef Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Aug 2020 13:05:29 -0700 Subject: [PATCH 6/8] lint --- flytekit/bin/entrypoint.py | 2 +- tests/flytekit/unit/bin/test_python_entrypoint.py | 4 ++-- tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py | 4 ---- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index cc0571252c..98e1a464ac 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -84,7 +84,7 @@ def _execute_task(task_module, task_name, inputs, output_prefix, raw_output_data input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) _engine_loader.get_engine().get_task(task_def).execute( _literal_models.LiteralMap.from_flyte_idl(input_proto), - context={"output_prefix": output_prefix, "raw_output_data_prefix": raw_output_data_prefix,}, + context={"output_prefix": output_prefix, "raw_output_data_prefix": raw_output_data_prefix}, ) diff --git a/tests/flytekit/unit/bin/test_python_entrypoint.py b/tests/flytekit/unit/bin/test_python_entrypoint.py index eb948e947c..1060a3658e 100644 --- a/tests/flytekit/unit/bin/test_python_entrypoint.py +++ b/tests/flytekit/unit/bin/test_python_entrypoint.py @@ -152,8 +152,8 @@ def return_args(*args, **kwargs): os.path.join(os.path.dirname(__file__), "fake.config"), internal_overrides={"project": "test", "domain": "development"}, ): - with _utils.AutoDeletingTempDir("in") as input_dir: - with _utils.AutoDeletingTempDir("out") as output_dir: + with _utils.AutoDeletingTempDir("in"): + with _utils.AutoDeletingTempDir("out"): cmd = [] cmd.extend(["--task-module", "fake"]) cmd.extend(["--task-name", "fake"]) diff --git a/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py b/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py index 7f12f112d4..38978aa38e 100644 --- a/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py +++ b/tests/flytekit/unit/interfaces/data/s3/test_s3_proxy.py @@ -1,8 +1,4 @@ -import os as _os - import mock as _mock -import pytest as _pytest -from flytekit.configuration import aws as _aws_config from flytekit.interfaces.data.s3.s3proxy import AwsS3Proxy as _AwsS3Proxy From 03bf797ad1c5ab2523783ce65560171a9cca35bb Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Aug 2020 13:21:24 -0700 Subject: [PATCH 7/8] tests --- flytekit/common/tasks/sdk_runnable.py | 2 +- flytekit/contrib/notebook/tasks.py | 2 +- tests/flytekit/unit/sdk/tasks/test_dynamic_sidecar_tasks.py | 4 ++-- tests/flytekit/unit/sdk/tasks/test_sidecar_tasks.py | 2 ++ 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flytekit/common/tasks/sdk_runnable.py b/flytekit/common/tasks/sdk_runnable.py index a67b612ec7..08dd815fa5 100644 --- a/flytekit/common/tasks/sdk_runnable.py +++ b/flytekit/common/tasks/sdk_runnable.py @@ -447,7 +447,7 @@ def _get_container_definition( "{{.input}}", "--output-prefix", "{{.outputPrefix}}", - "--raw-data-output-prefix", + "--raw-output-data-prefix", "{{.rawOutputDataPrefix}}", ], resources=_task_models.Resources(limits=limits, requests=requests), diff --git a/flytekit/contrib/notebook/tasks.py b/flytekit/contrib/notebook/tasks.py index 8a15016d87..ca47eabd62 100644 --- a/flytekit/contrib/notebook/tasks.py +++ b/flytekit/contrib/notebook/tasks.py @@ -329,7 +329,7 @@ def container(self): "{{.input}}", "--output-prefix", "{{.outputPrefix}}", - "--raw-data-output-prefix", + "--raw-output-data-prefix", "{{.rawOutputDataPrefix}}", ] return self._container diff --git a/tests/flytekit/unit/sdk/tasks/test_dynamic_sidecar_tasks.py b/tests/flytekit/unit/sdk/tasks/test_dynamic_sidecar_tasks.py index be903a9c82..532a92d5a8 100644 --- a/tests/flytekit/unit/sdk/tasks/test_dynamic_sidecar_tasks.py +++ b/tests/flytekit/unit/sdk/tasks/test_dynamic_sidecar_tasks.py @@ -1,5 +1,3 @@ -from __future__ import absolute_import - import mock from k8s.io.api.core.v1 import generated_pb2 @@ -62,6 +60,8 @@ def test_dynamic_sidecar_task(): "{{.input}}", "--output-prefix", "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", ] assert primary_container["volumeMounts"] == [{"mountPath": "/scratch", "name": "scratch"}] assert {"name": "foo", "value": "bar"} in primary_container["env"] diff --git a/tests/flytekit/unit/sdk/tasks/test_sidecar_tasks.py b/tests/flytekit/unit/sdk/tasks/test_sidecar_tasks.py index 72989eeffc..503202706f 100644 --- a/tests/flytekit/unit/sdk/tasks/test_sidecar_tasks.py +++ b/tests/flytekit/unit/sdk/tasks/test_sidecar_tasks.py @@ -58,6 +58,8 @@ def test_sidecar_task(): "{{.input}}", "--output-prefix", "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}", ] assert primary_container["volumeMounts"] == [{"mountPath": "some/where", "name": "volume mount"}] assert {"name": "foo", "value": "bar"} in primary_container["env"] From 74e47ccb462433371a0fa397e5104cc2a58a09a0 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 26 Aug 2020 14:06:38 -0700 Subject: [PATCH 8/8] bump --- flytekit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index c8ab6eb438..e7ca1c2abd 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins # noqa: F401 -__version__ = "0.12.0" +__version__ = "0.12.1"