From 80a24ab22aef7428428c64ab6010925b894ea790 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 15 Feb 2022 15:22:33 -0800 Subject: [PATCH 1/6] add anonymous retry Signed-off-by: Yee Hing Tong --- flytekit/extras/persistence/s3_awscli.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flytekit/extras/persistence/s3_awscli.py b/flytekit/extras/persistence/s3_awscli.py index 3b24fef94b..8d072782e6 100644 --- a/flytekit/extras/persistence/s3_awscli.py +++ b/flytekit/extras/persistence/s3_awscli.py @@ -10,8 +10,11 @@ from flytekit.configuration import aws from flytekit.core.data_persistence import DataPersistence, DataPersistencePlugins from flytekit.exceptions.user import FlyteUserException +from flytekit.loggers import logger from flytekit.tools import subprocess +S3_ANONYMOUS_FLAG = "--no-sign-request" + def _update_cmd_config_and_execute(cmd: List[str]): env = _os.environ.copy() @@ -32,7 +35,17 @@ def _update_cmd_config_and_execute(cmd: List[str]): retry = 0 while True: try: - return subprocess.check_call(cmd, env=env) + try: + return subprocess.check_call(cmd, env=env) + except Exception: + if retry > 0: + logger.info(f"AWS command failed with error {e}, command: {cmd}, retry {retry}") + + logger.debug(f"Appending anonymous flag and retrying command {cmd}") + anonymous_cmd = cmd[:] # strings only, so this is deep enough + anonymous_cmd.insert(1, S3_ANONYMOUS_FLAG) + return subprocess.check_call(anonymous_cmd, env=env) + except Exception as e: logging.error(f"Exception when trying to execute {cmd}, reason: {str(e)}") retry += 1 From 1674057e3920268693611c1cdc38c2b8b95b804a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 15 Feb 2022 17:00:24 -0800 Subject: [PATCH 2/6] hack up changes to fsspec plugin as well Signed-off-by: Yee Hing Tong --- .../flytekitplugins/fsspec/persist.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py index d2ea879ce0..71693f5f40 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py @@ -74,7 +74,16 @@ def get(self, from_path: str, to_path: str, recursive: bool = False): fs = self._get_filesystem(from_path) if recursive: from_path, to_path = self.recursive_paths(from_path, to_path) - return fs.get(from_path, to_path, recursive=recursive) + try: + return fs.get(from_path, to_path, recursive=recursive) + except OSError as oe: + logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}") + protocol = FSSpecPersistence._get_protocol(from_path) + if protocol == "s3": + logger.debug(f"S3 source Attempting anonymous S3 access") + kwargs = s3_setup_args() + anonymous_fs = fsspec.filesystem(protocol, anon=True, **kwargs) # type: ignore + return anonymous_fs.get(from_path, to_path, recursive=recursive) def put(self, from_path: str, to_path: str, recursive: bool = False): fs = self._get_filesystem(to_path) From 5477a81510f96b30d05d0fec1bac04c509b9fd62 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 15 Feb 2022 17:21:45 -0800 Subject: [PATCH 3/6] copy paste hack to exists Signed-off-by: Yee Hing Tong --- .../flytekitplugins/fsspec/persist.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py index 71693f5f40..9560e063e3 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py @@ -67,8 +67,18 @@ def recursive_paths(f: str, t: str) -> typing.Tuple[str, str]: return f, t def exists(self, path: str) -> bool: - fs = self._get_filesystem(path) - return fs.exists(path) + try: + fs = self._get_filesystem(path) + return fs.exists(path) + except OSError as oe: + logger.debug(f"Error in exists checking {path} {oe}") + protocol = FSSpecPersistence._get_protocol(path) + if protocol == "s3": + logger.debug(f"S3 source detected, attempting anonymous S3 exists check") + kwargs = s3_setup_args() + anonymous_fs = fsspec.filesystem(protocol, anon=True, **kwargs) # type: ignore + return anonymous_fs.exists(path) + raise oe def get(self, from_path: str, to_path: str, recursive: bool = False): fs = self._get_filesystem(from_path) @@ -80,10 +90,11 @@ def get(self, from_path: str, to_path: str, recursive: bool = False): logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}") protocol = FSSpecPersistence._get_protocol(from_path) if protocol == "s3": - logger.debug(f"S3 source Attempting anonymous S3 access") + logger.debug(f"S3 source detected, attempting anonymous S3 access") kwargs = s3_setup_args() anonymous_fs = fsspec.filesystem(protocol, anon=True, **kwargs) # type: ignore return anonymous_fs.get(from_path, to_path, recursive=recursive) + raise oe def put(self, from_path: str, to_path: str, recursive: bool = False): fs = self._get_filesystem(to_path) From 3262aa322f22ed31b2a15aad3e5acf91c9b3283f Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 15 Feb 2022 17:22:35 -0800 Subject: [PATCH 4/6] swap logger Signed-off-by: Yee Hing Tong --- flytekit/extras/persistence/s3_awscli.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/flytekit/extras/persistence/s3_awscli.py b/flytekit/extras/persistence/s3_awscli.py index 8d072782e6..fe5222d025 100644 --- a/flytekit/extras/persistence/s3_awscli.py +++ b/flytekit/extras/persistence/s3_awscli.py @@ -1,4 +1,3 @@ -import logging import os as _os import re as _re import string as _string @@ -47,14 +46,14 @@ def _update_cmd_config_and_execute(cmd: List[str]): return subprocess.check_call(anonymous_cmd, env=env) except Exception as e: - logging.error(f"Exception when trying to execute {cmd}, reason: {str(e)}") + logger.error(f"Exception when trying to execute {cmd}, reason: {str(e)}") retry += 1 if retry > aws.RETRIES.get(): raise secs = aws.BACKOFF_SECONDS.get() - logging.info(f"Sleeping before retrying again, after {secs} seconds") + logger.info(f"Sleeping before retrying again, after {secs} seconds") time.sleep(secs) - logging.info("Retrying again") + logger.info("Retrying again") def _extra_args(extra_args: Dict[str, str]) -> List[str]: From 5ca8a977cba6d9c80e506d254475517990d0ac99 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 15 Feb 2022 17:34:19 -0800 Subject: [PATCH 5/6] test fix Signed-off-by: Yee Hing Tong --- flytekit/extras/persistence/s3_awscli.py | 2 +- tests/flytekit/unit/extras/persistence/test_s3_awscli.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/extras/persistence/s3_awscli.py b/flytekit/extras/persistence/s3_awscli.py index fe5222d025..64e09e219c 100644 --- a/flytekit/extras/persistence/s3_awscli.py +++ b/flytekit/extras/persistence/s3_awscli.py @@ -36,7 +36,7 @@ def _update_cmd_config_and_execute(cmd: List[str]): try: try: return subprocess.check_call(cmd, env=env) - except Exception: + except Exception as e: if retry > 0: logger.info(f"AWS command failed with error {e}, command: {cmd}, retry {retry}") diff --git a/tests/flytekit/unit/extras/persistence/test_s3_awscli.py b/tests/flytekit/unit/extras/persistence/test_s3_awscli.py index 78f7d67b88..bcf1fd3495 100644 --- a/tests/flytekit/unit/extras/persistence/test_s3_awscli.py +++ b/tests/flytekit/unit/extras/persistence/test_s3_awscli.py @@ -25,7 +25,7 @@ def test_retries(mock_subprocess, mock_delay, mock_check): proxy = S3Persistence() assert proxy.exists("s3://test/fdsa/fdsa") is False - assert mock_subprocess.check_call.call_count == 4 + assert mock_subprocess.check_call.call_count == 8 def test_extra_args(): From aa2c912526243dfcf27bae6d71845b1a4f50ab3a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 16 Feb 2022 10:18:20 -0800 Subject: [PATCH 6/6] nit Signed-off-by: Yee Hing Tong --- .../flytekit-data-fsspec/flytekitplugins/fsspec/persist.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py index 9560e063e3..68bc92b493 100644 --- a/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py +++ b/plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py @@ -74,7 +74,7 @@ def exists(self, path: str) -> bool: logger.debug(f"Error in exists checking {path} {oe}") protocol = FSSpecPersistence._get_protocol(path) if protocol == "s3": - logger.debug(f"S3 source detected, attempting anonymous S3 exists check") + logger.debug("S3 source detected, attempting anonymous S3 exists check") kwargs = s3_setup_args() anonymous_fs = fsspec.filesystem(protocol, anon=True, **kwargs) # type: ignore return anonymous_fs.exists(path) @@ -90,7 +90,7 @@ def get(self, from_path: str, to_path: str, recursive: bool = False): logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}") protocol = FSSpecPersistence._get_protocol(from_path) if protocol == "s3": - logger.debug(f"S3 source detected, attempting anonymous S3 access") + logger.debug("S3 source detected, attempting anonymous S3 access") kwargs = s3_setup_args() anonymous_fs = fsspec.filesystem(protocol, anon=True, **kwargs) # type: ignore return anonymous_fs.get(from_path, to_path, recursive=recursive)