diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 2b15dfbd50..2bae266a53 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -1018,3 +1018,9 @@ def get_download_signed_url( expires_in=expires_in_pb, ) ) + + def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse: + req = _data_proxy_pb2.GetDataRequest(flyte_url=flyte_uri) + + resp = self._dataproxy_stub.GetData(req, metadata=self._metadata) + return resp diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index db20b3769f..836d5ffa3b 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -579,9 +579,9 @@ def create_upload_location( def create_download_location( self, create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest ) -> _dataproxy_pb2.CreateDownloadLocationResponse: - """ - Get a signed url to be used during fast registration - :param flyteidl.service.dataproxy_pb2.CreateDownloadLocationRequest create_download_location_request: - :rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLocationResponse - """ return self._dataproxy_stub.CreateDownloadLocation(create_download_location_request, metadata=self._metadata) + + def create_download_link( + self, create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest + ) -> _dataproxy_pb2.CreateDownloadLinkResponse: + return self._dataproxy_stub.CreateDownloadLink(create_download_link_request, metadata=self._metadata) diff --git a/flytekit/deck/deck.py b/flytekit/deck/deck.py index 0d53ec18d6..53c89ed003 100644 --- a/flytekit/deck/deck.py +++ b/flytekit/deck/deck.py @@ -4,6 +4,7 @@ from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager from flytekit.loggers import logger +from flytekit.tools.interactive import ipython_check OUTPUT_DIR_JUPYTER_PREFIX = "jupyter" DECK_FILE_NAME = "deck.html" @@ -124,22 +125,6 @@ def html(self) -> str: return gantt_chart_html + time_table_html + note -def _ipython_check() -> bool: - """ - Check if interface is launching from iPython (not colab) - :return is_ipython (bool): True or False - """ - is_ipython = False - try: # Check if running interactively using ipython. - from IPython import get_ipython - - if get_ipython() is not None: - is_ipython = True - except (ImportError, NameError): - pass - return is_ipython - - def _get_deck( new_user_params: ExecutionParameters, ignore_jupyter: bool = False ) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore @@ -149,7 +134,7 @@ def _get_deck( """ deck_map = {deck.name: deck.html for deck in new_user_params.decks} raw_html = get_deck_template().render(metadata=deck_map) - if not ignore_jupyter and _ipython_check(): + if not ignore_jupyter and ipython_check(): try: from IPython.core.display import HTML except ImportError: diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index d455ba4b4b..a34c93ad74 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -22,7 +22,6 @@ from flyteidl.admin.signal_pb2 import Signal, SignalListRequest, SignalSetRequest from flyteidl.core import literals_pb2 as literals_pb2 -from flytekit import Literal from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.clients.helpers import iterate_node_executions, iterate_task_executions from flytekit.configuration import Config, FastSerializationSettings, ImageConfig, SerializationSettings @@ -61,6 +60,7 @@ NotificationList, WorkflowExecutionGetDataResponse, ) +from flytekit.models.literals import Literal, LiteralMap from flytekit.remote.backfill import create_backfill_workflow from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow from flytekit.remote.executions import FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflowExecution @@ -68,6 +68,7 @@ from flytekit.remote.lazy_entity import LazyEntity from flytekit.remote.remote_callable import RemoteEntity from flytekit.tools.fast_registration import fast_package +from flytekit.tools.interactive import ipython_check from flytekit.tools.script_mode import compress_scripts, hash_file from flytekit.tools.translator import ( FlyteControlPlaneEntity, @@ -77,6 +78,11 @@ get_serializable_launch_plan, ) +try: + from IPython.core.display import HTML +except ImportError: + ... + ExecutionDataResponse = typing.Union[WorkflowExecutionGetDataResponse, NodeExecutionGetDataResponse] MOST_RECENT_FIRST = admin_common_models.Sort("created_at", admin_common_models.Sort.Direction.DESCENDING) @@ -219,6 +225,41 @@ def file_access(self) -> FileAccessProvider: """File access provider to use for offloading non-literal inputs/outputs.""" return self._file_access + def get( + self, flyte_uri: typing.Optional[str] = None + ) -> typing.Optional[typing.Union[LiteralsResolver, HTML, bytes]]: + if flyte_uri is None: + raise user_exceptions.FlyteUserException("flyte_uri cannot be empty") + ctx = self._ctx or FlyteContextManager.current_context() + try: + data_response = self.client.get_data(flyte_uri) + + if data_response.HasField("literal_map"): + lm = LiteralMap.from_flyte_idl(data_response.literal_map) + return LiteralsResolver(lm.literals) + elif data_response.HasField("pre_signed_urls"): + if len(data_response.pre_signed_urls.signed_url) == 0: + raise ValueError(f"Flyte url {flyte_uri} resolved to empty download link") + d = data_response.pre_signed_urls.signed_url[0] + remote_logger.debug(f"Download link is {d}") + fs = ctx.file_access.get_filesystem_for_path(d) + + # If the venv has IPython, then return IPython's HTML + if ipython_check(): + remote_logger.debug(f"IPython found, returning HTML from {flyte_uri}") + with fs.open(d, "rb") as r: + html = HTML(str(r.read())) + return html + # If not return bytes + else: + remote_logger.debug(f"IPython not found, returning HTML as bytes from {flyte_uri}") + return fs.open(d, "rb").read() + + except user_exceptions.FlyteUserException as e: + remote_logger.info(f"Error from Flyte backend when trying to fetch data: {e.__cause__}") + + remote_logger.debug(f"Nothing found from {flyte_uri}") + def remote_context(self): """Context manager with remote-specific configuration.""" return FlyteContextManager.with_context( diff --git a/flytekit/tools/interactive.py b/flytekit/tools/interactive.py new file mode 100644 index 0000000000..a0d022df18 --- /dev/null +++ b/flytekit/tools/interactive.py @@ -0,0 +1,14 @@ +def ipython_check() -> bool: + """ + Check if interface is launching from iPython (not colab) + :return is_ipython (bool): True or False + """ + is_ipython = False + try: # Check if running interactively using ipython. + from IPython import get_ipython + + if get_ipython() is not None: + is_ipython = True + except (ImportError, NameError): + pass + return is_ipython diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index f65c94b877..ebba6d23dc 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -91,7 +91,7 @@ def t_df(a: str) -> int: assert len(ctx.user_space_params.decks) == expected_decks -@mock.patch("flytekit.deck.deck._ipython_check") +@mock.patch("flytekit.deck.deck.ipython_check") def test_deck_in_jupyter(mock_ipython_check): mock_ipython_check.return_value = True diff --git a/tests/flytekit/unit/remote/test_remote.py b/tests/flytekit/unit/remote/test_remote.py index 5bfd7e4bf6..94b03b044a 100644 --- a/tests/flytekit/unit/remote/test_remote.py +++ b/tests/flytekit/unit/remote/test_remote.py @@ -1,17 +1,22 @@ import os import pathlib import tempfile +import typing from collections import OrderedDict from datetime import datetime, timedelta +import mock import pytest from flyteidl.core import compiler_pb2 as _compiler_pb2 +from flyteidl.service import dataproxy_pb2 from mock import MagicMock, patch import flytekit.configuration from flytekit import CronSchedule, LaunchPlan, task, workflow from flytekit.configuration import Config, DefaultImages, ImageConfig from flytekit.core.base_task import PythonTask +from flytekit.core.context_manager import FlyteContextManager +from flytekit.core.type_engine import TypeEngine from flytekit.exceptions import user as user_exceptions from flytekit.models import common as common_models from flytekit.models import security @@ -338,3 +343,21 @@ def test_launch_backfill(remote): wf = remote.launch_backfill("p", "d", start_date, end_date, "daily2", "v1", dry_run=True) assert wf + + +@mock.patch("flytekit.remote.remote.FlyteRemote.client") +def test_local_server(mock_client): + ctx = FlyteContextManager.current_context() + lt = TypeEngine.to_literal_type(typing.Dict[str, int]) + lm = TypeEngine.to_literal(ctx, {"hello": 55}, typing.Dict[str, int], lt) + lm = lm.map.to_flyte_idl() + + mock_client.get_data.return_value = dataproxy_pb2.GetDataResponse(literal_map=lm) + + rr = FlyteRemote( + Config.for_sandbox(), + default_project="flytesnacks", + default_domain="development", + ) + lr = rr.get("flyte://v1/flytesnacks/development/f6988c7bdad554a4da7a/n0/o") + assert lr.get("hello", int) == 55