diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index a9b7c313f0..425d06bc4c 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -183,6 +183,7 @@ def get_one_of(*args) -> str: @contextlib.contextmanager def setup_execution( raw_output_data_prefix: str, + output_metadata_prefix: Optional[str] = None, checkpoint_path: Optional[str] = None, prev_checkpoint: Optional[str] = None, dynamic_addl_distro: Optional[str] = None, @@ -190,7 +191,8 @@ def setup_execution( ): """ - :param raw_output_data_prefix: + :param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes). + :param output_metadata_prefix: Where to write primitive outputs. :param checkpoint_path: :param prev_checkpoint: :param dynamic_addl_distro: Works in concert with the other dynamic arg. If present, indicates that if a dynamic @@ -247,6 +249,7 @@ def setup_execution( logging=user_space_logger, tmp_dir=user_workspace_dir, raw_output_prefix=raw_output_data_prefix, + output_metadata_prefix=output_metadata_prefix, checkpoint=checkpointer, task_id=_identifier.Identifier(_identifier.ResourceType.TASK, tk_project, tk_domain, tk_name, tk_version), ) @@ -337,6 +340,7 @@ def _execute_task( with setup_execution( raw_output_data_prefix, + output_prefix, checkpoint_path, prev_checkpoint, dynamic_addl_distro, diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index e2923bfc7f..53ff504d55 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -151,6 +151,7 @@ def __init__( execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier], logging, raw_output_prefix, + output_metadata_prefix=None, checkpoint=None, decks=None, task_id: typing.Optional[_identifier.Identifier] = None, @@ -173,6 +174,7 @@ def __init__( self._execution_id = execution_id self._logging = logging self._raw_output_prefix = raw_output_prefix + self._output_metadata_prefix = output_metadata_prefix # AutoDeletingTempDir's should be used with a with block, which creates upon entry self._attrs = kwargs # It is safe to recreate the Secrets Manager @@ -201,6 +203,10 @@ def logging(self) -> _logging.Logger: def raw_output_prefix(self) -> str: return self._raw_output_prefix + @property + def output_metadata_prefix(self) -> str: + return self._output_metadata_prefix + @property def working_directory(self) -> str: """ diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 2bd86ce896..615709b7b5 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -203,7 +203,7 @@ def get(self, from_path: str, to_path: str, recursive: bool = False): return file_system.get(from_path, to_path, recursive=recursive) raise oe - def put(self, from_path: str, to_path: str, recursive: bool = False): + def put(self, from_path: str, to_path: str, recursive: bool = False, **kwargs): file_system = self.get_filesystem_for_path(to_path) from_path = self.strip_file_header(from_path) if recursive: @@ -217,7 +217,7 @@ def put(self, from_path: str, to_path: str, recursive: bool = False): self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True ) from_path, to_path = self.recursive_paths(from_path, to_path) - return file_system.put(from_path, to_path, recursive=recursive) + return file_system.put(from_path, to_path, recursive=recursive, **kwargs) def get_random_remote_path(self, file_path_or_file_name: typing.Optional[str] = None) -> str: """ @@ -304,7 +304,7 @@ def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False ) @timeit("Upload data to remote") - def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False): + def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False, **kwargs): """ The implication here is that we're always going to put data to the remote location, so we .remote to ensure we don't use the true local proxy if the remote path is a file:// @@ -316,7 +316,7 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul try: local_path = str(local_path) - self.put(cast(str, local_path), remote_path, recursive=is_multipart) + self.put(cast(str, local_path), remote_path, recursive=is_multipart, **kwargs) except Exception as ex: raise FlyteAssertion( f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n" diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 53c89ed003..c011a218a6 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -23,7 +23,7 @@ class Deck: Each task has a least three decks (input, output, default). Input/output decks are used to render tasks' input/output data, and the default deck is used to render line plots, - scatter plots or markdown text. In addition, users can create new decks to render + scatter plots or Markdown text. In addition, users can create new decks to render their data with custom renderers. .. warning:: @@ -145,14 +145,18 @@ def _get_deck( def _output_deck(task_name: str, new_user_params: ExecutionParameters): ctx = FlyteContext.current_context() - if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: - output_dir = ctx.execution_state.engine_dir - else: - output_dir = ctx.file_access.get_random_local_directory() - deck_path = os.path.join(output_dir, DECK_FILE_NAME) - with open(deck_path, "w") as f: + local_dir = ctx.file_access.get_random_local_directory() + local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}" + with open(local_path, "w") as f: f.write(_get_deck(new_user_params, ignore_jupyter=True)) - logger.info(f"{task_name} task creates flyte deck html to file://{deck_path}") + logger.info(f"{task_name} task creates flyte deck html to file://{local_path}") + if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: + remote_path = f"{new_user_params.output_metadata_prefix}{os.sep}{DECK_FILE_NAME}" + kwargs: typing.Dict[str, str] = { + "ContentType": "text/html", # For s3 + "content_type": "text/html", # For gcs + } + ctx.file_access.put_data(local_path, remote_path, **kwargs) def get_deck_template() -> "Template":