From d89e17890514b416e1877e3b22877426bc34407e Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 20 Aug 2020 22:54:38 -0700 Subject: [PATCH 01/16] add an alternative entrypoint; add a helper function to pack string map to literal map --- flytekit/bin/entrypoint_alterative.py | 62 +++++++++++++++++++++++++++ flytekit/common/types/helpers.py | 14 ++++++ setup.py | 1 + 3 files changed, 77 insertions(+) create mode 100644 flytekit/bin/entrypoint_alterative.py diff --git a/flytekit/bin/entrypoint_alterative.py b/flytekit/bin/entrypoint_alterative.py new file mode 100644 index 0000000000..6c9ea5ffaa --- /dev/null +++ b/flytekit/bin/entrypoint_alterative.py @@ -0,0 +1,62 @@ +from __future__ import absolute_import + +import importlib as _importlib +import os as _os + +import click as _click +import datetime as _datetime +import random as _random +from flyteidl.core import literals_pb2 as _literals_pb2 + +from flytekit.common import utils as _utils +from flytekit.common.exceptions import scopes as _scopes, system as _system_exceptions +from flytekit.configuration import internal as _internal_config, TemporaryConfiguration as _TemporaryConfiguration +from flytekit.engines import loader as _engine_loader +from flytekit.interfaces.data import data_proxy as _data_proxy +from flytekit.interfaces import random as _flyte_random +from flytekit.models import literals as _literal_models + + +@_scopes.system_entry_point +def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): + with _TemporaryConfiguration(_internal_config.CONFIGURATION_PATH.get()): + with _utils.AutoDeletingTempDir('input_dir') as input_dir: + # Load user code + task_module = _importlib.import_module(task_module) + task_def = getattr(task_module, task_name) + + + if not test: + local_inputs_file = input_dir.get_named_tempfile('inputs.pb') + + # TODO: parse the unknown arguments, and create a litealmap out from the task definition to replace these two lines: + # _data_proxy.Data.get_data(inputs, local_inputs_file) + # input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) + + input_proto = createLiteralMap(sagemaker_args, task_def) + _engine_loader.get_engine().get_task(task_def).execute( + _literal_models.LiteralMap.from_flyte_idl(input_proto), + context={'output_prefix': output_prefix} + ) + + +@_click.group() +def _pass_through(): + pass + + +@_pass_through.command('pyflyte-execute-alternative', context_settings=dict(ignore_unknown_options=True)) +@_click.option('--task-module', required=True) +@_click.option('--task-name', required=True) +@_click.option('--output-prefix', required=True) +@_click.option('--test', is_flag=True) +@_click.argument('sagemaker_args', nargs=-1, type=_click.UNPROCESSED) +def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args): + _click.echo(_utils.get_version_message()) + _click.echo('unknown_args : {}'.format(sagemaker_args)) + _click.echo(type(sagemaker_args)) + # _execute_task(task_module, task_name, inputs, output_prefix, test, sagemaker_args) + + +if __name__ == '__main__': + _pass_through() \ No newline at end of file diff --git a/flytekit/common/types/helpers.py b/flytekit/common/types/helpers.py index 2bb19bdf4d..0b667cfb61 100644 --- a/flytekit/common/types/helpers.py +++ b/flytekit/common/types/helpers.py @@ -131,3 +131,17 @@ def pack_python_std_map_to_literal_map(std_map, type_map): k: v.from_python_std(std_map[k]) for k, v in _six.iteritems(type_map) } ) + + +def pack_python_string_map_to_literal_map(str_map, type_map): + """ + :param dict[Text, Text] str_map: + :param dict[Text, flytekit.common.types.base_sdk_types.FlyteSdkType] type_map: + :rtype: flytekit.models.literals.LiteralMap + :raises: flytekit.common.exceptions.user.FlyteTypeException + """ + return _literal_models.LiteralMap( + literals={ + k: v.from_string(str_map[k]) for k, v in _six.iteritems(type_map) + } + ) diff --git a/setup.py b/setup.py index e016bcb313..09c90f4a62 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,7 @@ entry_points={ 'console_scripts': [ 'pyflyte-execute=flytekit.bin.entrypoint:execute_task_cmd', + 'pyflyte-execute-alternative=flytekit.bin.entrypoint_alternative:execute_task_cmd', 'pyflyte=flytekit.clis.sdk_in_container.pyflyte:main', 'flyte-cli=flytekit.clis.flyte_cli.main:_flyte_cli' ] From 7c16723a2ac8d9b8c133d26a5b2a2419066f4b8e Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 20 Aug 2020 22:59:05 -0700 Subject: [PATCH 02/16] bump patch version --- flytekit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 58c8b56c21..dfac1b9501 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins # noqa: F401 -__version__ = "0.12.0b1" +__version__ = "0.12.1b0" From 3d84cdb7ae8cc691db1b061187a43f25740272dd Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 20 Aug 2020 23:07:24 -0700 Subject: [PATCH 03/16] create input literal map from string-typed arguments in pyflyte-execute-alternative --- flytekit/bin/entrypoint_alterative.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/flytekit/bin/entrypoint_alterative.py b/flytekit/bin/entrypoint_alterative.py index 6c9ea5ffaa..2ee39f7db6 100644 --- a/flytekit/bin/entrypoint_alterative.py +++ b/flytekit/bin/entrypoint_alterative.py @@ -15,6 +15,7 @@ from flytekit.interfaces.data import data_proxy as _data_proxy from flytekit.interfaces import random as _flyte_random from flytekit.models import literals as _literal_models +import flytekit.common.types.helpers as _type_helpers @_scopes.system_entry_point @@ -25,17 +26,31 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): task_module = _importlib.import_module(task_module) task_def = getattr(task_module, task_name) - if not test: local_inputs_file = input_dir.get_named_tempfile('inputs.pb') - # TODO: parse the unknown arguments, and create a litealmap out from the task definition to replace these two lines: + # TODO: parse the unknown arguments, and create a litealmap out from the task definition + # to replace these two lines: # _data_proxy.Data.get_data(inputs, local_inputs_file) # input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) + map_of_input_values = {} + # Here we have an assumption that each option key will come with a value right after the key + for i in sagemaker_args[::2]: + map_of_input_values[sagemaker_args[i]] = sagemaker_args[i+1] + + map_of_literal_types = {} + map_of_sdk_types = {} + for k, v in task_def.interface.inputs.items(): + map_of_literal_types[k] = v.type + map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) + + input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( + map_of_input_values, + map_of_sdk_types, + ) - input_proto = createLiteralMap(sagemaker_args, task_def) _engine_loader.get_engine().get_task(task_def).execute( - _literal_models.LiteralMap.from_flyte_idl(input_proto), + input_literal_map, context={'output_prefix': output_prefix} ) @@ -55,7 +70,7 @@ def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args _click.echo(_utils.get_version_message()) _click.echo('unknown_args : {}'.format(sagemaker_args)) _click.echo(type(sagemaker_args)) - # _execute_task(task_module, task_name, inputs, output_prefix, test, sagemaker_args) + _execute_task(task_module, task_name, inputs, output_prefix, test, sagemaker_args) if __name__ == '__main__': From 89337256bb93dcfd9e6abf7ecd3c8e25bb84d6af Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 20 Aug 2020 23:13:45 -0700 Subject: [PATCH 04/16] add new line at EOF --- flytekit/bin/entrypoint_alterative.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/bin/entrypoint_alterative.py b/flytekit/bin/entrypoint_alterative.py index 2ee39f7db6..93d6275c53 100644 --- a/flytekit/bin/entrypoint_alterative.py +++ b/flytekit/bin/entrypoint_alterative.py @@ -74,4 +74,4 @@ def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args if __name__ == '__main__': - _pass_through() \ No newline at end of file + _pass_through() From d367266b61f87a88ab63b8c9a3d0980df479db29 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 20 Aug 2020 23:32:31 -0700 Subject: [PATCH 05/16] lint errors --- flytekit/bin/entrypoint_alterative.py | 14 +++----------- flytekit/clients/helpers.py | 2 ++ 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/flytekit/bin/entrypoint_alterative.py b/flytekit/bin/entrypoint_alterative.py index 93d6275c53..ead2e3b774 100644 --- a/flytekit/bin/entrypoint_alterative.py +++ b/flytekit/bin/entrypoint_alterative.py @@ -1,21 +1,14 @@ from __future__ import absolute_import import importlib as _importlib -import os as _os import click as _click -import datetime as _datetime -import random as _random -from flyteidl.core import literals_pb2 as _literals_pb2 +import flytekit.common.types.helpers as _type_helpers from flytekit.common import utils as _utils -from flytekit.common.exceptions import scopes as _scopes, system as _system_exceptions +from flytekit.common.exceptions import scopes as _scopes from flytekit.configuration import internal as _internal_config, TemporaryConfiguration as _TemporaryConfiguration from flytekit.engines import loader as _engine_loader -from flytekit.interfaces.data import data_proxy as _data_proxy -from flytekit.interfaces import random as _flyte_random -from flytekit.models import literals as _literal_models -import flytekit.common.types.helpers as _type_helpers @_scopes.system_entry_point @@ -27,7 +20,6 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): task_def = getattr(task_module, task_name) if not test: - local_inputs_file = input_dir.get_named_tempfile('inputs.pb') # TODO: parse the unknown arguments, and create a litealmap out from the task definition # to replace these two lines: @@ -70,7 +62,7 @@ def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args _click.echo(_utils.get_version_message()) _click.echo('unknown_args : {}'.format(sagemaker_args)) _click.echo(type(sagemaker_args)) - _execute_task(task_module, task_name, inputs, output_prefix, test, sagemaker_args) + _execute_task(task_module, task_name, output_prefix, test, sagemaker_args) if __name__ == '__main__': diff --git a/flytekit/clients/helpers.py b/flytekit/clients/helpers.py index 75b2232636..9663a4e928 100644 --- a/flytekit/clients/helpers.py +++ b/flytekit/clients/helpers.py @@ -1,3 +1,5 @@ + + def iterate_node_executions( client, workflow_execution_identifier=None, task_execution_identifier=None, limit=None, filters=None, ): From 4f4c6c7a368952b04f27d99195430879b77a9968 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 20 Aug 2020 23:33:23 -0700 Subject: [PATCH 06/16] commented redundant code --- flytekit/bin/entrypoint_alterative.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/bin/entrypoint_alterative.py b/flytekit/bin/entrypoint_alterative.py index ead2e3b774..cbc2847fae 100644 --- a/flytekit/bin/entrypoint_alterative.py +++ b/flytekit/bin/entrypoint_alterative.py @@ -30,10 +30,10 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): for i in sagemaker_args[::2]: map_of_input_values[sagemaker_args[i]] = sagemaker_args[i+1] - map_of_literal_types = {} + # map_of_literal_types = {} map_of_sdk_types = {} for k, v in task_def.interface.inputs.items(): - map_of_literal_types[k] = v.type + # map_of_literal_types[k] = v.type map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( From 2fe092f6761d6408c4b7e1c3a592b09816a0b0d3 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 09:47:57 -0700 Subject: [PATCH 07/16] shortend the name to pyflyte-execute-alt --- flytekit/bin/{entrypoint_alterative.py => entrypoint_alt.py} | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename flytekit/bin/{entrypoint_alterative.py => entrypoint_alt.py} (96%) diff --git a/flytekit/bin/entrypoint_alterative.py b/flytekit/bin/entrypoint_alt.py similarity index 96% rename from flytekit/bin/entrypoint_alterative.py rename to flytekit/bin/entrypoint_alt.py index cbc2847fae..6a90e26ad7 100644 --- a/flytekit/bin/entrypoint_alterative.py +++ b/flytekit/bin/entrypoint_alt.py @@ -52,7 +52,7 @@ def _pass_through(): pass -@_pass_through.command('pyflyte-execute-alternative', context_settings=dict(ignore_unknown_options=True)) +@_pass_through.command('pyflyte-execute-alt', context_settings=dict(ignore_unknown_options=True)) @_click.option('--task-module', required=True) @_click.option('--task-name', required=True) @_click.option('--output-prefix', required=True) diff --git a/setup.py b/setup.py index 1adc73e330..085356e281 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ entry_points={ "console_scripts": [ "pyflyte-execute=flytekit.bin.entrypoint:execute_task_cmd", - "pyflyte-execute-alternative=flytekit.bin.entrypoint_alternative:execute_task_cmd", + "pyflyte-execute-alt=flytekit.bin.entrypoint_alt:execute_task_cmd", "pyflyte=flytekit.clis.sdk_in_container.pyflyte:main", "flyte-cli=flytekit.clis.flyte_cli.main:_flyte_cli", ] From f6d1804ab99b601cff39292ab751d173f053ef6a Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 13:29:21 -0700 Subject: [PATCH 08/16] add unit tests for the pyflyte-execute-alt entrypoint --- flytekit/bin/entrypoint_alt.py | 13 +- tests/flytekit/common/task_definitions.py | 29 +++++ .../unit/bin/test_python_entrypoint_alt.py | 117 ++++++++++++++++++ 3 files changed, 154 insertions(+), 5 deletions(-) create mode 100644 tests/flytekit/unit/bin/test_python_entrypoint_alt.py diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index 6a90e26ad7..9a33075182 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -14,7 +14,8 @@ @_scopes.system_entry_point def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): with _TemporaryConfiguration(_internal_config.CONFIGURATION_PATH.get()): - with _utils.AutoDeletingTempDir('input_dir') as input_dir: + with _utils.AutoDeletingTempDir("input_dir") as input_dir: + # Load user code task_module = _importlib.import_module(task_module) task_def = getattr(task_module, task_name) @@ -27,8 +28,10 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): # input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) map_of_input_values = {} # Here we have an assumption that each option key will come with a value right after the key - for i in sagemaker_args[::2]: - map_of_input_values[sagemaker_args[i]] = sagemaker_args[i+1] + for i in range(0, len(sagemaker_args), 2): + # Since the sagemaker_args are unprocessed, each of the option keys comes with a leading "--" + # We need to remove them + map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] # map_of_literal_types = {} map_of_sdk_types = {} @@ -60,8 +63,8 @@ def _pass_through(): @_click.argument('sagemaker_args', nargs=-1, type=_click.UNPROCESSED) def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args): _click.echo(_utils.get_version_message()) - _click.echo('unknown_args : {}'.format(sagemaker_args)) - _click.echo(type(sagemaker_args)) + _click.echo('sagemaker_args : {}'.format(sagemaker_args)) + # Note that the unknown arguments are entirely unprocessed, so the leading "--" are still there _execute_task(task_module, task_name, output_prefix, test, sagemaker_args) diff --git a/tests/flytekit/common/task_definitions.py b/tests/flytekit/common/task_definitions.py index 0381b203d1..1d5252eb6f 100644 --- a/tests/flytekit/common/task_definitions.py +++ b/tests/flytekit/common/task_definitions.py @@ -9,3 +9,32 @@ @python_task def add_one(wf_params, a, b): b.set(a + 1) + + +@inputs( + train=Types.CSV, + validation=Types.MultiPartBlob, + a=Types.Integer, + b=Types.Float, + c=Types.String, + d=Types.Boolean, + e=Types.Datetime, +) +@outputs( + otrain=Types.CSV, + ovalidation=Types.MultiPartBlob, + oa=Types.Integer, + ob=Types.Float, + oc=Types.String, + od=Types.Boolean, + oe=Types.Datetime, +) +@python_task +def dummy_for_entrypoint_alt(wf_params, train, validation, a, b, c, d, e, otrain, ovalidation, oa, ob, oc, od, oe): + otrain.set(train) + ovalidation.set(validation) + oa.set(a) + ob.set(b) + oc.set(c) + od.set(d) + oe.set(e) diff --git a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py new file mode 100644 index 0000000000..2f07181c15 --- /dev/null +++ b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py @@ -0,0 +1,117 @@ +from __future__ import absolute_import + +import os + +import six +from click.testing import CliRunner +from dateutil import parser +from flyteidl.core import literals_pb2 as _literals_pb2 + +from flytekit.bin.entrypoint_alt import _execute_task, execute_task_cmd +from flytekit.common import constants as _constants +from flytekit.common import utils as _utils +from flytekit.common.types import helpers as _type_helpers +from flytekit.configuration import TemporaryConfiguration as _TemporaryConfiguration +from flytekit.models import literals as _literal_models +from tests.flytekit.common import task_definitions as _task_defs + + +def _type_map_from_variable_map(variable_map): + return {k: _type_helpers.get_sdk_type_from_literal_type(v.type) for k, v in six.iteritems(variable_map)} + + +def test_single_step_entrypoint_in_proc(): + with _TemporaryConfiguration( + os.path.join(os.path.dirname(__file__), "fake.config"), + internal_overrides={"project": "test", "domain": "development"}, + ): + raw_args = ( + "--train", "s3://dummy", + "--validation", "s3://dummy", + "--a", "1", + "--b", "0.5", + "--c", "val", + "--d", "0", + "--e", "20180612T09:55:22Z") + with _utils.AutoDeletingTempDir("out") as output_dir: + _execute_task( + task_module=_task_defs.dummy_for_entrypoint_alt.task_module, + task_name=_task_defs.dummy_for_entrypoint_alt.task_function_name, + output_prefix=output_dir.name, + test=False, + sagemaker_args=raw_args, + ) + p = _utils.load_proto_from_file( + _literals_pb2.LiteralMap, os.path.join(output_dir.name, _constants.OUTPUT_FILE_NAME), + ) + + raw_args_map = {} + for i in range(0, len(raw_args), 2): + raw_args_map[raw_args[i][2:]] = raw_args[i + 1] + + raw_map = _type_helpers.unpack_literal_map_to_sdk_python_std( + _literal_models.LiteralMap.from_flyte_idl(p), + _type_map_from_variable_map(_task_defs.dummy_for_entrypoint_alt.interface.outputs), + ) + + assert len(raw_map) == 7 + assert raw_map["otrain"].uri.rstrip("/") == raw_args_map["train"].rstrip("/") + assert raw_map["ovalidation"].uri.rstrip("/") == raw_args_map["validation"].rstrip("/") + assert raw_map["oa"] == 1 + assert raw_map["ob"] == 0.5 + assert raw_map["oc"] == "val" + assert raw_map["od"] is False + assert raw_map["oe"] == parser.parse("20180612T09:55:22Z") + + +def test_single_step_entrypoint_out_of_proc(): + with _TemporaryConfiguration( + os.path.join(os.path.dirname(__file__), "fake.config"), + internal_overrides={"project": "test", "domain": "development"}, + ): + with _utils.AutoDeletingTempDir("in") as input_dir: + literal_map = _type_helpers.pack_python_std_map_to_literal_map( + {"a": 9}, _type_map_from_variable_map(_task_defs.add_one.interface.inputs), + ) + input_file = os.path.join(input_dir.name, "inputs.pb") + _utils.write_proto_to_file(literal_map.to_flyte_idl(), input_file) + + raw_args = ( + "--train", "s3://dummy", + "--validation", "s3://dummy", + "--a", "1", + "--b", "0.5", + "--c", "val", + "--d", "0", + "--e", "20180612T09:55:22Z") + + with _utils.AutoDeletingTempDir("out") as output_dir: + cmd = [] + cmd.extend(["--task-module", _task_defs.dummy_for_entrypoint_alt.task_module]) + cmd.extend(["--task-name", _task_defs.dummy_for_entrypoint_alt.task_function_name]) + cmd.extend(["--output-prefix", output_dir.name]) + cmd.extend(raw_args) + result = CliRunner().invoke(execute_task_cmd, cmd) + + assert result.exit_code == 0 + p = _utils.load_proto_from_file( + _literals_pb2.LiteralMap, os.path.join(output_dir.name, _constants.OUTPUT_FILE_NAME), + ) + + raw_args_map = {} + for i in range(0, len(raw_args), 2): + raw_args_map[raw_args[i][2:]] = raw_args[i + 1] + + raw_map = _type_helpers.unpack_literal_map_to_sdk_python_std( + _literal_models.LiteralMap.from_flyte_idl(p), + _type_map_from_variable_map(_task_defs.dummy_for_entrypoint_alt.interface.outputs), + ) + + assert len(raw_map) == 7 + assert raw_map["otrain"].uri.rstrip("/") == raw_args_map["train"].rstrip("/") + assert raw_map["ovalidation"].uri.rstrip("/") == raw_args_map["validation"].rstrip("/") + assert raw_map["oa"] == 1 + assert raw_map["ob"] == 0.5 + assert raw_map["oc"] == "val" + assert raw_map["od"] is False + assert raw_map["oe"] == parser.parse("20180612T09:55:22Z") From 5f874dbba320d860cac64ea204823f1dd91b467c Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 13:33:14 -0700 Subject: [PATCH 09/16] remove redundant temp input dir --- flytekit/bin/entrypoint_alt.py | 66 +++++++++++++++++----------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index 9a33075182..a1928d5ecf 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -14,40 +14,38 @@ @_scopes.system_entry_point def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): with _TemporaryConfiguration(_internal_config.CONFIGURATION_PATH.get()): - with _utils.AutoDeletingTempDir("input_dir") as input_dir: - - # Load user code - task_module = _importlib.import_module(task_module) - task_def = getattr(task_module, task_name) - - if not test: - - # TODO: parse the unknown arguments, and create a litealmap out from the task definition - # to replace these two lines: - # _data_proxy.Data.get_data(inputs, local_inputs_file) - # input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) - map_of_input_values = {} - # Here we have an assumption that each option key will come with a value right after the key - for i in range(0, len(sagemaker_args), 2): - # Since the sagemaker_args are unprocessed, each of the option keys comes with a leading "--" - # We need to remove them - map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] - - # map_of_literal_types = {} - map_of_sdk_types = {} - for k, v in task_def.interface.inputs.items(): - # map_of_literal_types[k] = v.type - map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) - - input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( - map_of_input_values, - map_of_sdk_types, - ) - - _engine_loader.get_engine().get_task(task_def).execute( - input_literal_map, - context={'output_prefix': output_prefix} - ) + # Load user code + task_module = _importlib.import_module(task_module) + task_def = getattr(task_module, task_name) + + if not test: + + # TODO: parse the unknown arguments, and create a litealmap out from the task definition + # to replace these two lines: + # _data_proxy.Data.get_data(inputs, local_inputs_file) + # input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) + map_of_input_values = {} + # Here we have an assumption that each option key will come with a value right after the key + for i in range(0, len(sagemaker_args), 2): + # Since the sagemaker_args are unprocessed, each of the option keys comes with a leading "--" + # We need to remove them + map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] + + # map_of_literal_types = {} + map_of_sdk_types = {} + for k, v in task_def.interface.inputs.items(): + # map_of_literal_types[k] = v.type + map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) + + input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( + map_of_input_values, + map_of_sdk_types, + ) + + _engine_loader.get_engine().get_task(task_def).execute( + input_literal_map, + context={'output_prefix': output_prefix} + ) @_click.group() From e26097e022f976054eea1938584820222ced4eac Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 13:42:43 -0700 Subject: [PATCH 10/16] add comments --- flytekit/bin/entrypoint_alt.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index a1928d5ecf..9a1b2da485 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -20,10 +20,7 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): if not test: - # TODO: parse the unknown arguments, and create a litealmap out from the task definition - # to replace these two lines: - # _data_proxy.Data.get_data(inputs, local_inputs_file) - # input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) + # Parse the unknown arguments, and create a litealmap out from the task definition map_of_input_values = {} # Here we have an assumption that each option key will come with a value right after the key for i in range(0, len(sagemaker_args), 2): @@ -31,12 +28,15 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): # We need to remove them map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] - # map_of_literal_types = {} + # TODO: we might need to do some special handling of the blob-typed inputs, i.e., read them from predefined + # locations in the container + map_of_sdk_types = {} for k, v in task_def.interface.inputs.items(): # map_of_literal_types[k] = v.type map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) + input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( map_of_input_values, map_of_sdk_types, @@ -53,6 +53,9 @@ def _pass_through(): pass +# pyflyte-execute-alt is an alternative pyflyte entrypoint specifically designed for SageMaker (currently) +# This entrypoint assumes no --inputs command-line option, and therefore it doesn't accept the input.pb file +# All the inputs will be passed into the entrypoint as unknown arguments @_pass_through.command('pyflyte-execute-alt', context_settings=dict(ignore_unknown_options=True)) @_click.option('--task-module', required=True) @_click.option('--task-name', required=True) From 93106575d7a6fa5c8600b4690d857782c2319eaa Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 13:43:41 -0700 Subject: [PATCH 11/16] lint errors --- flytekit/bin/entrypoint_alt.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index 9a1b2da485..f98f2e8864 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -36,7 +36,6 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): # map_of_literal_types[k] = v.type map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) - input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( map_of_input_values, map_of_sdk_types, From ef1a6eaf38341e134280a19395283885d5859666 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 17:34:00 -0700 Subject: [PATCH 12/16] wip proper redirection of blob inputs --- flytekit/bin/entrypoint_alt.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index f98f2e8864..e38c20d1fb 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -9,6 +9,24 @@ from flytekit.common.exceptions import scopes as _scopes from flytekit.configuration import internal as _internal_config, TemporaryConfiguration as _TemporaryConfiguration from flytekit.engines import loader as _engine_loader +from flytekit.sdk.types import Types +from flytekit.common.types import base_sdk_types as _base_sdk_types + +SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX = "/opt/ml/input" + + +def _get_blob_inputs_local_paths(map_of_sdk_types): + """ + :param dict[Text, Any] map_of_sdk_types: + :rtype dict[Text, Text]: + """ + ret = {} + for k, v in map_of_sdk_types.items(): + try: + if isinstance(v, _base_sdk_types.InstantiableType): + ret[k] = "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, k) + + return ret @_scopes.system_entry_point @@ -28,14 +46,15 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): # We need to remove them map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] - # TODO: we might need to do some special handling of the blob-typed inputs, i.e., read them from predefined - # locations in the container - map_of_sdk_types = {} for k, v in task_def.interface.inputs.items(): # map_of_literal_types[k] = v.type map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) + # TODO: we might need to do some special handling of the blob-typed inputs, i.e., read them from predefined + # locations in the container + map_of_input_values.update(_get_blob_inputs_local_paths(map_of_sdk_types)) + input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( map_of_input_values, map_of_sdk_types, From d67d3e64f8366fb2ce13216f029b2b00c32e803a Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 18:09:49 -0700 Subject: [PATCH 13/16] replace blob/schema/csv with local paths --- flytekit/bin/entrypoint_alt.py | 27 +++++-------------- .../unit/bin/test_python_entrypoint_alt.py | 14 +++++----- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index e38c20d1fb..4767ddc6b9 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -9,24 +9,8 @@ from flytekit.common.exceptions import scopes as _scopes from flytekit.configuration import internal as _internal_config, TemporaryConfiguration as _TemporaryConfiguration from flytekit.engines import loader as _engine_loader -from flytekit.sdk.types import Types -from flytekit.common.types import base_sdk_types as _base_sdk_types -SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX = "/opt/ml/input" - - -def _get_blob_inputs_local_paths(map_of_sdk_types): - """ - :param dict[Text, Any] map_of_sdk_types: - :rtype dict[Text, Text]: - """ - ret = {} - for k, v in map_of_sdk_types.items(): - try: - if isinstance(v, _base_sdk_types.InstantiableType): - ret[k] = "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, k) - - return ret +SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX = "/opt/ml/input/data" @_scopes.system_entry_point @@ -47,13 +31,16 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] map_of_sdk_types = {} + blob_and_schema_local_path_map = {} for k, v in task_def.interface.inputs.items(): # map_of_literal_types[k] = v.type + if v.type.blob is not None or v.type.schema is not None: + blob_and_schema_local_path_map[k] = "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, k) map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) - # TODO: we might need to do some special handling of the blob-typed inputs, i.e., read them from predefined - # locations in the container - map_of_input_values.update(_get_blob_inputs_local_paths(map_of_sdk_types)) + # We need to do some special handling of the blob-typed inputs, i.e., read them from predefined + # locations in the container + map_of_input_values.update(blob_and_schema_local_path_map) input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( map_of_input_values, diff --git a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py index 2f07181c15..2e068f9943 100644 --- a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py +++ b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py @@ -7,7 +7,7 @@ from dateutil import parser from flyteidl.core import literals_pb2 as _literals_pb2 -from flytekit.bin.entrypoint_alt import _execute_task, execute_task_cmd +from flytekit.bin.entrypoint_alt import _execute_task, execute_task_cmd, SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX from flytekit.common import constants as _constants from flytekit.common import utils as _utils from flytekit.common.types import helpers as _type_helpers @@ -26,7 +26,7 @@ def test_single_step_entrypoint_in_proc(): internal_overrides={"project": "test", "domain": "development"}, ): raw_args = ( - "--train", "s3://dummy", + "--train", "/local/host", "--validation", "s3://dummy", "--a", "1", "--b", "0.5", @@ -55,8 +55,8 @@ def test_single_step_entrypoint_in_proc(): ) assert len(raw_map) == 7 - assert raw_map["otrain"].uri.rstrip("/") == raw_args_map["train"].rstrip("/") - assert raw_map["ovalidation"].uri.rstrip("/") == raw_args_map["validation"].rstrip("/") + assert raw_map["otrain"].uri.rstrip("/") == "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "train") + assert raw_map["ovalidation"].uri.rstrip("/") == "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "validation") assert raw_map["oa"] == 1 assert raw_map["ob"] == 0.5 assert raw_map["oc"] == "val" @@ -108,8 +108,10 @@ def test_single_step_entrypoint_out_of_proc(): ) assert len(raw_map) == 7 - assert raw_map["otrain"].uri.rstrip("/") == raw_args_map["train"].rstrip("/") - assert raw_map["ovalidation"].uri.rstrip("/") == raw_args_map["validation"].rstrip("/") + assert raw_map["otrain"].uri.rstrip("/") == "{}/{}".format( + SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "train") + assert raw_map["ovalidation"].uri.rstrip("/") == "{}/{}".format( + SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "validation") assert raw_map["oa"] == 1 assert raw_map["ob"] == 0.5 assert raw_map["oc"] == "val" From ac5b11e0a517bae4129a970eeb578c6ca1cd0cdc Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 18:56:52 -0700 Subject: [PATCH 14/16] make fmt --- flytekit/bin/entrypoint_alt.py | 24 ++++----- flytekit/clients/helpers.py | 2 - .../unit/bin/test_python_entrypoint_alt.py | 54 +++++++++++++------ 3 files changed, 48 insertions(+), 32 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index 4767ddc6b9..cd068ec378 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -28,7 +28,7 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): for i in range(0, len(sagemaker_args), 2): # Since the sagemaker_args are unprocessed, each of the option keys comes with a leading "--" # We need to remove them - map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i+1] + map_of_input_values[sagemaker_args[i][2:]] = sagemaker_args[i + 1] map_of_sdk_types = {} blob_and_schema_local_path_map = {} @@ -43,13 +43,11 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): map_of_input_values.update(blob_and_schema_local_path_map) input_literal_map = _type_helpers.pack_python_string_map_to_literal_map( - map_of_input_values, - map_of_sdk_types, + map_of_input_values, map_of_sdk_types, ) _engine_loader.get_engine().get_task(task_def).execute( - input_literal_map, - context={'output_prefix': output_prefix} + input_literal_map, context={"output_prefix": output_prefix} ) @@ -61,18 +59,18 @@ def _pass_through(): # pyflyte-execute-alt is an alternative pyflyte entrypoint specifically designed for SageMaker (currently) # This entrypoint assumes no --inputs command-line option, and therefore it doesn't accept the input.pb file # All the inputs will be passed into the entrypoint as unknown arguments -@_pass_through.command('pyflyte-execute-alt', context_settings=dict(ignore_unknown_options=True)) -@_click.option('--task-module', required=True) -@_click.option('--task-name', required=True) -@_click.option('--output-prefix', required=True) -@_click.option('--test', is_flag=True) -@_click.argument('sagemaker_args', nargs=-1, type=_click.UNPROCESSED) +@_pass_through.command("pyflyte-execute-alt", context_settings=dict(ignore_unknown_options=True)) +@_click.option("--task-module", required=True) +@_click.option("--task-name", required=True) +@_click.option("--output-prefix", required=True) +@_click.option("--test", is_flag=True) +@_click.argument("sagemaker_args", nargs=-1, type=_click.UNPROCESSED) def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args): _click.echo(_utils.get_version_message()) - _click.echo('sagemaker_args : {}'.format(sagemaker_args)) + _click.echo("sagemaker_args : {}".format(sagemaker_args)) # Note that the unknown arguments are entirely unprocessed, so the leading "--" are still there _execute_task(task_module, task_name, output_prefix, test, sagemaker_args) -if __name__ == '__main__': +if __name__ == "__main__": _pass_through() diff --git a/flytekit/clients/helpers.py b/flytekit/clients/helpers.py index 9663a4e928..75b2232636 100644 --- a/flytekit/clients/helpers.py +++ b/flytekit/clients/helpers.py @@ -1,5 +1,3 @@ - - def iterate_node_executions( client, workflow_execution_identifier=None, task_execution_identifier=None, limit=None, filters=None, ): diff --git a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py index 2e068f9943..831b0cb24a 100644 --- a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py +++ b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py @@ -26,13 +26,21 @@ def test_single_step_entrypoint_in_proc(): internal_overrides={"project": "test", "domain": "development"}, ): raw_args = ( - "--train", "/local/host", - "--validation", "s3://dummy", - "--a", "1", - "--b", "0.5", - "--c", "val", - "--d", "0", - "--e", "20180612T09:55:22Z") + "--train", + "/local/host", + "--validation", + "s3://dummy", + "--a", + "1", + "--b", + "0.5", + "--c", + "val", + "--d", + "0", + "--e", + "20180612T09:55:22Z", + ) with _utils.AutoDeletingTempDir("out") as output_dir: _execute_task( task_module=_task_defs.dummy_for_entrypoint_alt.task_module, @@ -56,7 +64,9 @@ def test_single_step_entrypoint_in_proc(): assert len(raw_map) == 7 assert raw_map["otrain"].uri.rstrip("/") == "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "train") - assert raw_map["ovalidation"].uri.rstrip("/") == "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "validation") + assert raw_map["ovalidation"].uri.rstrip("/") == "{}/{}".format( + SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "validation" + ) assert raw_map["oa"] == 1 assert raw_map["ob"] == 0.5 assert raw_map["oc"] == "val" @@ -77,13 +87,21 @@ def test_single_step_entrypoint_out_of_proc(): _utils.write_proto_to_file(literal_map.to_flyte_idl(), input_file) raw_args = ( - "--train", "s3://dummy", - "--validation", "s3://dummy", - "--a", "1", - "--b", "0.5", - "--c", "val", - "--d", "0", - "--e", "20180612T09:55:22Z") + "--train", + "s3://dummy", + "--validation", + "s3://dummy", + "--a", + "1", + "--b", + "0.5", + "--c", + "val", + "--d", + "0", + "--e", + "20180612T09:55:22Z", + ) with _utils.AutoDeletingTempDir("out") as output_dir: cmd = [] @@ -109,9 +127,11 @@ def test_single_step_entrypoint_out_of_proc(): assert len(raw_map) == 7 assert raw_map["otrain"].uri.rstrip("/") == "{}/{}".format( - SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "train") + SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "train" + ) assert raw_map["ovalidation"].uri.rstrip("/") == "{}/{}".format( - SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "validation") + SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, "validation" + ) assert raw_map["oa"] == 1 assert raw_map["ob"] == 0.5 assert raw_map["oc"] == "val" From f09b3f86410dc5eae0f5f0db30b5040c6d6b135e Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 21 Aug 2020 21:07:17 -0700 Subject: [PATCH 15/16] fix lint --- flytekit/bin/entrypoint_alt.py | 3 ++- tests/flytekit/unit/bin/test_python_entrypoint_alt.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index cd068ec378..25d8872bb5 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -7,7 +7,8 @@ import flytekit.common.types.helpers as _type_helpers from flytekit.common import utils as _utils from flytekit.common.exceptions import scopes as _scopes -from flytekit.configuration import internal as _internal_config, TemporaryConfiguration as _TemporaryConfiguration +from flytekit.configuration import TemporaryConfiguration as _TemporaryConfiguration +from flytekit.configuration import internal as _internal_config from flytekit.engines import loader as _engine_loader SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX = "/opt/ml/input/data" diff --git a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py index 831b0cb24a..00daf48e92 100644 --- a/tests/flytekit/unit/bin/test_python_entrypoint_alt.py +++ b/tests/flytekit/unit/bin/test_python_entrypoint_alt.py @@ -7,7 +7,7 @@ from dateutil import parser from flyteidl.core import literals_pb2 as _literals_pb2 -from flytekit.bin.entrypoint_alt import _execute_task, execute_task_cmd, SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX +from flytekit.bin.entrypoint_alt import SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, _execute_task, execute_task_cmd from flytekit.common import constants as _constants from flytekit.common import utils as _utils from flytekit.common.types import helpers as _type_helpers From af690a095210bd3361dcec6dee8a20988d2c8f8c Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 25 Aug 2020 13:02:12 -0700 Subject: [PATCH 16/16] add back inputs to entrypoint-alt --- flytekit/bin/entrypoint_alt.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/flytekit/bin/entrypoint_alt.py b/flytekit/bin/entrypoint_alt.py index 25d8872bb5..827f6d5187 100644 --- a/flytekit/bin/entrypoint_alt.py +++ b/flytekit/bin/entrypoint_alt.py @@ -13,9 +13,11 @@ SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX = "/opt/ml/input/data" +def build_sdk_type_map_from_typed_interface(interface): + @_scopes.system_entry_point -def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): +def _execute_task(task_module, task_name, inputs, output_prefix, test, sagemaker_args): with _TemporaryConfiguration(_internal_config.CONFIGURATION_PATH.get()): # Load user code task_module = _importlib.import_module(task_module) @@ -33,10 +35,8 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): map_of_sdk_types = {} blob_and_schema_local_path_map = {} + for k, v in task_def.interface.inputs.items(): - # map_of_literal_types[k] = v.type - if v.type.blob is not None or v.type.schema is not None: - blob_and_schema_local_path_map[k] = "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, k) map_of_sdk_types[k] = _type_helpers.get_sdk_type_from_literal_type(v.type) # We need to do some special handling of the blob-typed inputs, i.e., read them from predefined @@ -47,6 +47,12 @@ def _execute_task(task_module, task_name, output_prefix, test, sagemaker_args): map_of_input_values, map_of_sdk_types, ) + # TODO 1. need to handle the case of "collection of blobs" or even "hierarchical collection of blobs" + # TODO 2. replace the blob uris with local + for k, v in task_def.interface.inputs.items(): + if v.type.blob is not None or v.type.schema is not None: + blob_and_schema_local_path_map[k] = "{}/{}".format(SAGEMAKER_CONTAINER_LOCAL_INPUT_PREFIX, k) + _engine_loader.get_engine().get_task(task_def).execute( input_literal_map, context={"output_prefix": output_prefix} ) @@ -63,14 +69,15 @@ def _pass_through(): @_pass_through.command("pyflyte-execute-alt", context_settings=dict(ignore_unknown_options=True)) @_click.option("--task-module", required=True) @_click.option("--task-name", required=True) +@_click.option("--inputs", required=True) @_click.option("--output-prefix", required=True) @_click.option("--test", is_flag=True) @_click.argument("sagemaker_args", nargs=-1, type=_click.UNPROCESSED) -def execute_task_cmd(task_module, task_name, output_prefix, test, sagemaker_args): +def execute_task_cmd(task_module, task_name, inputs, output_prefix, test, sagemaker_args): _click.echo(_utils.get_version_message()) _click.echo("sagemaker_args : {}".format(sagemaker_args)) # Note that the unknown arguments are entirely unprocessed, so the leading "--" are still there - _execute_task(task_module, task_name, output_prefix, test, sagemaker_args) + _execute_task(task_module, task_name, inputs, output_prefix, test, sagemaker_args) if __name__ == "__main__":