From ae44825f98d6d5fe37986f521c115c26eafe7243 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Fri, 23 Sep 2022 14:40:17 -0700 Subject: [PATCH 1/7] Open HashMethod to all types (#1171) * Open HashMethod to all types * Fix test in test_type_hints.py Signed-off-by: Eduardo Apolinario Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: LN --- flytekit/core/type_engine.py | 11 ++--------- flytekit/types/schema/types_pandas.py | 1 - flytekit/types/structured/structured_dataset.py | 3 --- tests/flytekit/unit/core/test_local_cache.py | 16 ++++++++-------- tests/flytekit/unit/core/test_type_engine.py | 7 +++---- tests/flytekit/unit/core/test_type_hints.py | 6 +++--- 6 files changed, 16 insertions(+), 28 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 43c11065ba..79e869107d 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -62,13 +62,10 @@ class TypeTransformer(typing.Generic[T]): Base transformer type that should be implemented for every python native type that can be handled by flytekit """ - def __init__(self, name: str, t: Type[T], enable_type_assertions: bool = True, hash_overridable: bool = False): + def __init__(self, name: str, t: Type[T], enable_type_assertions: bool = True): self._t = t self._name = name self._type_assertions_enabled = enable_type_assertions - # `hash_overridable` indicates that the literals produced by this type transformer can set their hashes if needed. - # See (link to documentation where this feature is explained). - self._hash_overridable = hash_overridable @property def name(self): @@ -88,10 +85,6 @@ def type_assertions_enabled(self) -> bool: """ return self._type_assertions_enabled - @property - def hash_overridable(self) -> bool: - return self._hash_overridable - def assert_type(self, t: Type[T], v: T): if not hasattr(t, "__origin__") and not isinstance(v, t): raise TypeTransformerFailedError(f"Type of Val '{v}' is not an instance of {t}") @@ -742,7 +735,7 @@ def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type # In case the value is an annotated type we inspect the annotations and look for hash-related annotations. hash = None - if transformer.hash_overridable and get_origin(python_type) is Annotated: + if get_origin(python_type) is Annotated: # We are now dealing with one of two cases: # 1. The annotated type is a `HashMethod`, which indicates that we should we should produce the hash using # the method indicated in the annotation. diff --git a/flytekit/types/schema/types_pandas.py b/flytekit/types/schema/types_pandas.py index 0d3bc8cd78..e4c6078e94 100644 --- a/flytekit/types/schema/types_pandas.py +++ b/flytekit/types/schema/types_pandas.py @@ -81,7 +81,6 @@ class PandasDataFrameTransformer(TypeTransformer[pandas.DataFrame]): def __init__(self): super().__init__("PandasDataFrame<->GenericSchema", pandas.DataFrame) self._parquet_engine = ParquetIO() - self._hash_overridable = True @staticmethod def _get_schema_type() -> SchemaType: diff --git a/flytekit/types/structured/structured_dataset.py b/flytekit/types/structured/structured_dataset.py index bdad752b16..e57fdb2480 100644 --- a/flytekit/types/structured/structured_dataset.py +++ b/flytekit/types/structured/structured_dataset.py @@ -376,9 +376,6 @@ def __init__(self): super().__init__("StructuredDataset Transformer", StructuredDataset) self._type_assertions_enabled = False - # Instances of StructuredDataset opt-in to the ability of being cached. - self._hash_overridable = True - @classmethod def register_renderer(cls, python_type: Type, renderer: Renderable): cls.Renderers[python_type] = renderer diff --git a/tests/flytekit/unit/core/test_local_cache.py b/tests/flytekit/unit/core/test_local_cache.py index 674f6176e1..4648c87e2b 100644 --- a/tests/flytekit/unit/core/test_local_cache.py +++ b/tests/flytekit/unit/core/test_local_cache.py @@ -261,9 +261,9 @@ def my_wf(a: int, b: str) -> (int, str): assert n_cached_task_calls == 2 -def test_set_integer_literal_hash_is_not_cached(): +def test_set_integer_literal_hash_is_cached(): """ - Test to confirm that the local cache is not set in the case of integers, even if we + Test to confirm that the local cache is set in the case of integers, even if we return an annotated integer. In order to make this very explicit, we define a constant hash function, i.e. the same value is returned by it regardless of the input. """ @@ -289,13 +289,13 @@ def wf(a: int) -> int: assert n_cached_task_calls == 0 assert wf(a=3) == 3 assert n_cached_task_calls == 1 - # Confirm that the value is not cached, even though we set a hash function that - # returns a constant value and that the task has only one input. - assert wf(a=2) == 2 - assert n_cached_task_calls == 2 + # Confirm that the value is cached due to the fact the hash value is constant, regardless + # of the value passed to the cacheable task. + assert wf(a=2) == 3 + assert n_cached_task_calls == 1 # Confirm that the cache is hit if we execute the workflow with the same value as previous run. - assert wf(a=2) == 2 - assert n_cached_task_calls == 2 + assert wf(a=2) == 3 + assert n_cached_task_calls == 1 def test_pass_annotated_to_downstream_tasks(): diff --git a/tests/flytekit/unit/core/test_type_engine.py b/tests/flytekit/unit/core/test_type_engine.py index 0fe2513908..dd63cf82dd 100644 --- a/tests/flytekit/unit/core/test_type_engine.py +++ b/tests/flytekit/unit/core/test_type_engine.py @@ -1254,17 +1254,16 @@ def t1(a: int) -> int: assert t1(a=3) == 9 -def test_literal_hash_int_not_set(): +def test_literal_hash_int_can_be_set(): """ - Test to confirm that annotating an integer with `HashMethod` does not force the literal to have its - hash set. + Test to confirm that annotating an integer with `HashMethod` is allowed. """ ctx = FlyteContext.current_context() lv = TypeEngine.to_literal( ctx, 42, Annotated[int, HashMethod(str)], LiteralType(simple=model_types.SimpleType.INTEGER) ) assert lv.scalar.primitive.integer == 42 - assert lv.hash is None + assert lv.hash == "42" def test_literal_hash_to_python_value(): diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index abdf69f5b0..04ea85113e 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -1811,11 +1811,11 @@ def wf(a: int) -> str: del TypeEngine._REGISTRY[MyInt] -def test_task_annotate_primitive_type_has_no_effect(): +def test_task_annotate_primitive_type_is_allowed(): @task def plus_two( a: int, - ) -> Annotated[int, HashMethod(str)]: # Note the use of `str` as the hash function for ints. This has no effect. + ) -> Annotated[int, HashMethod(lambda x: str(x + 1))]: return a + 2 assert plus_two(a=1) == 3 @@ -1832,7 +1832,7 @@ def plus_two( ), ) assert output_lm.literals["o0"].scalar.primitive.integer == 5 - assert output_lm.literals["o0"].hash is None + assert output_lm.literals["o0"].hash == "6" def test_task_hash_return_pandas_dataframe(): From 2110598afecd24ca608c512144b78fb9a2407fe1 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Sat, 24 Sep 2022 13:04:02 -0700 Subject: [PATCH 2/7] Return None for SyncCheckpoint.read() when src is empty (#1189) Signed-off-by: Andrew Dye Signed-off-by: Andrew Dye Signed-off-by: LN --- flytekit/core/checkpointer.py | 4 +++- tests/flytekit/unit/core/test_checkpoint.py | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/flytekit/core/checkpointer.py b/flytekit/core/checkpointer.py index bd17b7748b..bed776ed4c 100644 --- a/flytekit/core/checkpointer.py +++ b/flytekit/core/checkpointer.py @@ -146,7 +146,9 @@ def read(self) -> typing.Optional[bytes]: if p is None: return None files = list(p.iterdir()) - if len(files) == 0 or len(files) > 1: + if len(files) == 0: + return None + if len(files) > 1: raise ValueError(f"Expected exactly one checkpoint - found {len(files)}") f = files[0] return f.read_bytes() diff --git a/tests/flytekit/unit/core/test_checkpoint.py b/tests/flytekit/unit/core/test_checkpoint.py index 0199737335..f1dbbbd5ef 100644 --- a/tests/flytekit/unit/core/test_checkpoint.py +++ b/tests/flytekit/unit/core/test_checkpoint.py @@ -83,6 +83,16 @@ def test_sync_checkpoint_restore_default_path(tmpdir): assert cp.restore() == cp._prev_download_path +def test_sync_checkpoint_read_empty_dir(tmpdir): + td_path = Path(tmpdir) + dest = td_path.joinpath("dest") + dest.mkdir() + src = td_path.joinpath("src") + src.mkdir() + cp = SyncCheckpoint(checkpoint_dest=str(dest), checkpoint_src=str(src)) + assert cp.read() is None + + def test_sync_checkpoint_read_multiple_files(tmpdir): """ Read can only work with one file. From 24817420f74e1c57dedee44066d6ba626e0e6221 Mon Sep 17 00:00:00 2001 From: Snyk bot Date: Wed, 28 Sep 2022 10:40:56 +0300 Subject: [PATCH 3/7] fix: plugins/flytekit-k8s-pod/requirements.txt to reduce vulnerabilities (#1192) The following vulnerabilities are fixed by pinning transitive dependencies: - https://snyk.io/vuln/SNYK-PYTHON-PROTOBUF-3031740 Signed-off-by: LN --- plugins/flytekit-k8s-pod/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-k8s-pod/requirements.txt b/plugins/flytekit-k8s-pod/requirements.txt index cc9a88b96a..75c7639aed 100644 --- a/plugins/flytekit-k8s-pod/requirements.txt +++ b/plugins/flytekit-k8s-pod/requirements.txt @@ -113,7 +113,7 @@ packaging==21.3 # via marshmallow pandas==1.3.5 # via flytekit -protobuf==3.20.1 +protobuf==3.20.2 # via # flyteidl # flytekit From ae5e01fdffb8171c76e2c9cbdd408b8ad5deb25b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 29 Sep 2022 22:13:19 +0800 Subject: [PATCH 4/7] pyflyte non-fast register (#1205) * pyflyte run non-fast register Signed-off-by: Kevin Su * lint Signed-off-by: Kevin Su Signed-off-by: Kevin Su Signed-off-by: LN --- flytekit/clis/sdk_in_container/register.py | 36 ++++++++++++------- .../unit/cli/pyflyte/test_register.py | 19 ++++++++++ 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/flytekit/clis/sdk_in_container/register.py b/flytekit/clis/sdk_in_container/register.py index 024b70edde..c0bdcd2416 100644 --- a/flytekit/clis/sdk_in_container/register.py +++ b/flytekit/clis/sdk_in_container/register.py @@ -13,6 +13,7 @@ from flytekit.tools.fast_registration import fast_package from flytekit.tools.repo import find_common_root, load_packages_and_modules from flytekit.tools.repo import register as repo_register +from flytekit.tools.script_mode import hash_file from flytekit.tools.translator import Options _register_help = """ @@ -105,6 +106,12 @@ is_flag=True, help="Enables symlink dereferencing when packaging files in fast registration", ) +@click.option( + "--non-fast", + default=False, + is_flag=True, + help="Enables to skip zipping and uploading the package", +) @click.argument("package-or-module", type=click.Path(exists=True, readable=True, resolve_path=True), nargs=-1) @click.pass_context def register( @@ -118,6 +125,7 @@ def register( raw_data_prefix: str, version: typing.Optional[str], deref_symlinks: bool, + non_fast: bool, package_or_module: typing.Tuple[str], ): """ @@ -138,22 +146,30 @@ def register( cli_logger.debug( f"Running pyflyte register from {os.getcwd()} " f"with images {image_config} " - f"and image destinationfolder {destination_dir} " + f"and image destination folder {destination_dir} " f"on {len(package_or_module)} package(s) {package_or_module}" ) # Create and save FlyteRemote, remote = get_and_save_remote_with_click_context(ctx, project, domain) - # Todo: add switch for non-fast - skip the zipping and uploading and no fastserializationsettings - # Create a zip file containing all the entries. detected_root = find_common_root(package_or_module) cli_logger.debug(f"Using {detected_root} as root folder for project") - zip_file = fast_package(detected_root, output, deref_symlinks) + fast_serialization_settings = None - # Upload zip file to Admin using FlyteRemote. - md5_bytes, native_url = remote._upload_file(pathlib.Path(zip_file)) - cli_logger.debug(f"Uploaded zip {zip_file} to {native_url}") + # Create a zip file containing all the entries. + zip_file = fast_package(detected_root, output, deref_symlinks) + md5_bytes, _ = hash_file(pathlib.Path(zip_file)) + + if non_fast is False: + # Upload zip file to Admin using FlyteRemote. + md5_bytes, native_url = remote._upload_file(pathlib.Path(zip_file)) + cli_logger.debug(f"Uploaded zip {zip_file} to {native_url}") + fast_serialization_settings = FastSerializationSettings( + enabled=not non_fast, + destination_dir=destination_dir, + distribution_location=native_url, + ) # Create serialization settings # Todo: Rely on default Python interpreter for now, this will break custom Spark containers @@ -161,11 +177,7 @@ def register( project=project, domain=domain, image_config=image_config, - fast_serialization_settings=FastSerializationSettings( - enabled=True, - destination_dir=destination_dir, - distribution_location=native_url, - ), + fast_serialization_settings=fast_serialization_settings, ) options = Options.default_from(k8s_service_account=service_account, raw_data_prefix=raw_data_prefix) diff --git a/tests/flytekit/unit/cli/pyflyte/test_register.py b/tests/flytekit/unit/cli/pyflyte/test_register.py index d078851e1b..e9661dff6a 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_register.py +++ b/tests/flytekit/unit/cli/pyflyte/test_register.py @@ -63,3 +63,22 @@ def test_register_with_no_output_dir_passed(mock_client, mock_remote): result = runner.invoke(pyflyte.main, ["register", "core"]) assert "Output given as None, using a temporary directory at" in result.output shutil.rmtree("core") + + +@mock.patch("flytekit.clis.sdk_in_container.helpers.FlyteRemote", spec=FlyteRemote) +@mock.patch("flytekit.clients.friendly.SynchronousFlyteClient", spec=SynchronousFlyteClient) +def test_non_fast_register(mock_client, mock_remote): + mock_remote._client = mock_client + mock_remote.return_value._version_from_hash.return_value = "dummy_version_from_hash" + mock_remote.return_value._upload_file.return_value = "dummy_md5_bytes", "dummy_native_url" + runner = CliRunner() + with runner.isolated_filesystem(): + out = subprocess.run(["git", "init"], capture_output=True) + assert out.returncode == 0 + os.makedirs("core", exist_ok=True) + with open(os.path.join("core", "sample.py"), "w") as f: + f.write(sample_file_contents) + f.close() + result = runner.invoke(pyflyte.main, ["register", "--non-fast", "core"]) + assert "Output given as None, using a temporary directory at" in result.output + shutil.rmtree("core") From 55538a7f8c72952f7fc6a65c8026abb52e16b6d2 Mon Sep 17 00:00:00 2001 From: LN <91385411+Ln11211@users.noreply.github.com> Date: Sun, 2 Oct 2022 16:50:14 +0530 Subject: [PATCH 5/7] Update control_plane.rst Added about large response while using Remote.Sync() and how to handle it in the doc Signed-off-by: LN --- docs/source/design/control_plane.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/source/design/control_plane.rst b/docs/source/design/control_plane.rst index d4c57d733d..f581a7c852 100644 --- a/docs/source/design/control_plane.rst +++ b/docs/source/design/control_plane.rst @@ -299,6 +299,11 @@ You can use :meth:`~flytekit.remote.remote.FlyteRemote.sync` to sync the entity synced_execution = remote.sync(execution, sync_nodes=True) node_keys = synced_execution.node_executions.keys() +when using ``FlyteRemote.sync`` + +- ``Received message larger than max (xxx vs. 4194304)`` usually crops up when the message size is too large. +- To fix this, edit the flyte-admin-base-config config map to increase ``maxMessageSizeBytes`` value. + ``node_executions`` will fetch all the underlying node executions recursively. To fetch output of a specific node execution: From 9f7505f0e16aff522f59882cd9881ae90f6c83e7 Mon Sep 17 00:00:00 2001 From: LN <91385411+Ln11211@users.noreply.github.com> Date: Thu, 6 Oct 2022 23:23:14 +0530 Subject: [PATCH 6/7] Update docs/source/design/control_plane.rst updating FlyteRemote sync message Co-authored-by: Samhita Alla --- docs/source/design/control_plane.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/design/control_plane.rst b/docs/source/design/control_plane.rst index f581a7c852..e7f6a7d6de 100644 --- a/docs/source/design/control_plane.rst +++ b/docs/source/design/control_plane.rst @@ -299,7 +299,9 @@ You can use :meth:`~flytekit.remote.remote.FlyteRemote.sync` to sync the entity synced_execution = remote.sync(execution, sync_nodes=True) node_keys = synced_execution.node_executions.keys() -when using ``FlyteRemote.sync`` +.. note:: + + During the sync, you may come across ``Received message larger than max (xxx vs. 4194304)`` error if the message size is too large. In that case, edit the ``flyte-admin-base-config`` config map using the command ``kubectl edit cm flyte-admin-base-config -n flyte`` to increase the ``maxMessageSizeBytes`` value. Refer to the :ref:`flyte: ` in case you've queries about the command's usage. - ``Received message larger than max (xxx vs. 4194304)`` usually crops up when the message size is too large. - To fix this, edit the flyte-admin-base-config config map to increase ``maxMessageSizeBytes`` value. From 976803cafbbe94e0440942ddd436e9a79f87cf05 Mon Sep 17 00:00:00 2001 From: LN <91385411+Ln11211@users.noreply.github.com> Date: Thu, 6 Oct 2022 23:44:14 +0530 Subject: [PATCH 7/7] Update control_plane.rst Updated the fix for the error and also removed the repeated error message line. Signed-off-by: LN <91385411+Ln11211@users.noreply.github.com> --- docs/source/design/control_plane.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/source/design/control_plane.rst b/docs/source/design/control_plane.rst index e7f6a7d6de..7540fea73b 100644 --- a/docs/source/design/control_plane.rst +++ b/docs/source/design/control_plane.rst @@ -303,8 +303,7 @@ You can use :meth:`~flytekit.remote.remote.FlyteRemote.sync` to sync the entity During the sync, you may come across ``Received message larger than max (xxx vs. 4194304)`` error if the message size is too large. In that case, edit the ``flyte-admin-base-config`` config map using the command ``kubectl edit cm flyte-admin-base-config -n flyte`` to increase the ``maxMessageSizeBytes`` value. Refer to the :ref:`flyte: ` in case you've queries about the command's usage. -- ``Received message larger than max (xxx vs. 4194304)`` usually crops up when the message size is too large. -- To fix this, edit the flyte-admin-base-config config map to increase ``maxMessageSizeBytes`` value. +- To fix this error, edit the flyte-admin-base-config config map to increase ``maxMessageSizeBytes`` value. ``node_executions`` will fetch all the underlying node executions recursively.