Skip to content

Commit

Permalink
Explicitly set the content type for flyte deck (#1658)
Browse files Browse the repository at this point in the history
* Set content type for flyte deck

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* test

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Jun 1, 2023
1 parent d76dceb commit c8433ea
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
6 changes: 5 additions & 1 deletion flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,16 @@ def get_one_of(*args) -> str:
@contextlib.contextmanager
def setup_execution(
raw_output_data_prefix: str,
output_metadata_prefix: Optional[str] = None,
checkpoint_path: Optional[str] = None,
prev_checkpoint: Optional[str] = None,
dynamic_addl_distro: Optional[str] = None,
dynamic_dest_dir: Optional[str] = None,
):
"""
:param raw_output_data_prefix:
:param raw_output_data_prefix: Where to write offloaded data (files, directories, dataframes).
:param output_metadata_prefix: Where to write primitive outputs.
:param checkpoint_path:
:param prev_checkpoint:
:param dynamic_addl_distro: Works in concert with the other dynamic arg. If present, indicates that if a dynamic
Expand Down Expand Up @@ -247,6 +249,7 @@ def setup_execution(
logging=user_space_logger,
tmp_dir=user_workspace_dir,
raw_output_prefix=raw_output_data_prefix,
output_metadata_prefix=output_metadata_prefix,
checkpoint=checkpointer,
task_id=_identifier.Identifier(_identifier.ResourceType.TASK, tk_project, tk_domain, tk_name, tk_version),
)
Expand Down Expand Up @@ -337,6 +340,7 @@ def _execute_task(

with setup_execution(
raw_output_data_prefix,
output_prefix,
checkpoint_path,
prev_checkpoint,
dynamic_addl_distro,
Expand Down
6 changes: 6 additions & 0 deletions flytekit/core/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_metadata_prefix=None,
checkpoint=None,
decks=None,
task_id: typing.Optional[_identifier.Identifier] = None,
Expand All @@ -173,6 +174,7 @@ def __init__(
self._execution_id = execution_id
self._logging = logging
self._raw_output_prefix = raw_output_prefix
self._output_metadata_prefix = output_metadata_prefix
# AutoDeletingTempDir's should be used with a with block, which creates upon entry
self._attrs = kwargs
# It is safe to recreate the Secrets Manager
Expand Down Expand Up @@ -201,6 +203,10 @@ def logging(self) -> _logging.Logger:
def raw_output_prefix(self) -> str:
return self._raw_output_prefix

@property
def output_metadata_prefix(self) -> str:
return self._output_metadata_prefix

@property
def working_directory(self) -> str:
"""
Expand Down
8 changes: 4 additions & 4 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get(self, from_path: str, to_path: str, recursive: bool = False):
return file_system.get(from_path, to_path, recursive=recursive)
raise oe

def put(self, from_path: str, to_path: str, recursive: bool = False):
def put(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
file_system = self.get_filesystem_for_path(to_path)
from_path = self.strip_file_header(from_path)
if recursive:
Expand All @@ -217,7 +217,7 @@ def put(self, from_path: str, to_path: str, recursive: bool = False):
self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True
)
from_path, to_path = self.recursive_paths(from_path, to_path)
return file_system.put(from_path, to_path, recursive=recursive)
return file_system.put(from_path, to_path, recursive=recursive, **kwargs)

def get_random_remote_path(self, file_path_or_file_name: typing.Optional[str] = None) -> str:
"""
Expand Down Expand Up @@ -304,7 +304,7 @@ def get_data(self, remote_path: str, local_path: str, is_multipart: bool = False
)

@timeit("Upload data to remote")
def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False):
def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_multipart: bool = False, **kwargs):
"""
The implication here is that we're always going to put data to the remote location, so we .remote to ensure
we don't use the true local proxy if the remote path is a file://
Expand All @@ -316,7 +316,7 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul
try:
local_path = str(local_path)

self.put(cast(str, local_path), remote_path, recursive=is_multipart)
self.put(cast(str, local_path), remote_path, recursive=is_multipart, **kwargs)
except Exception as ex:
raise FlyteAssertion(
f"Failed to put data from {local_path} to {remote_path} (recursive={is_multipart}).\n\n"
Expand Down
20 changes: 12 additions & 8 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Deck:
Each task has a least three decks (input, output, default). Input/output decks are
used to render tasks' input/output data, and the default deck is used to render line plots,
scatter plots or markdown text. In addition, users can create new decks to render
scatter plots or Markdown text. In addition, users can create new decks to render
their data with custom renderers.
.. warning::
Expand Down Expand Up @@ -145,14 +145,18 @@ def _get_deck(

def _output_deck(task_name: str, new_user_params: ExecutionParameters):
ctx = FlyteContext.current_context()
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
output_dir = ctx.execution_state.engine_dir
else:
output_dir = ctx.file_access.get_random_local_directory()
deck_path = os.path.join(output_dir, DECK_FILE_NAME)
with open(deck_path, "w") as f:
local_dir = ctx.file_access.get_random_local_directory()
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}"
with open(local_path, "w") as f:
f.write(_get_deck(new_user_params, ignore_jupyter=True))
logger.info(f"{task_name} task creates flyte deck html to file://{deck_path}")
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}")
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
remote_path = f"{new_user_params.output_metadata_prefix}{os.sep}{DECK_FILE_NAME}"
kwargs: typing.Dict[str, str] = {
"ContentType": "text/html", # For s3
"content_type": "text/html", # For gcs
}
ctx.file_access.put_data(local_path, remote_path, **kwargs)


def get_deck_template() -> "Template":
Expand Down

0 comments on commit c8433ea

Please sign in to comment.