Skip to content

Commit

Permalink
merge master and resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor committed Nov 2, 2021
2 parents 6b95c1d + 751425d commit 7c39a9f
Show file tree
Hide file tree
Showing 54 changed files with 862 additions and 94 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: [3.7, 3.8, 3.9]
spark-version-suffix: ["", "-spark2"]
Expand Down Expand Up @@ -63,6 +64,7 @@ jobs:
- flytekit-greatexpectations
- flytekit-hive
- flytekit-k8s-pod
- flytekit-kf-mpi
- flytekit-kf-pytorch
- flytekit-kf-tensorflow
- flytekit-papermill
Expand Down
4 changes: 2 additions & 2 deletions docs/source/design/authoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ Before going further, there is a special object that's worth mentioning, the :py
Let's assume we have a workflow like ::

@task
def t1(a: int) -> (int, str):
def t1(a: int) -> Tuple[int, str]:
return a + 2, "world"

@task
def t2(a: str, b: str) -> str:
return b + a

@workflow
def my_wf(a: int, b: str) -> (int, str):
def my_wf(a: int, b: str) -> Tuple[int, str]:
x, y = t1(a=a).with_overrides(...)
d = t2(a=y, b=b)
return x, d
Expand Down
24 changes: 24 additions & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,25 @@
Secret
SecurityContext
Common Flyte IDL Objects
=========================
.. autosummary::
:nosignatures:
:template: custom.rst
:toctree: generated/
AuthRole
Labels
Annotations
WorkflowExecutionPhase
Blob
BlobMetadata
Literal
Scalar
LiteralType
BlobType
"""

import sys
Expand Down Expand Up @@ -162,6 +181,11 @@
from flytekit.core.workflow import WorkflowFailurePolicy, reference_workflow, workflow
from flytekit.extras.persistence import GCSPersistence, HttpPersistence, S3Persistence
from flytekit.loggers import logger
from flytekit.models.common import Annotations, AuthRole, Labels
from flytekit.models.core.execution import WorkflowExecutionPhase
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.core.types import BlobType
from flytekit.models.types import LiteralType
from flytekit.types import directory, file, schema

__version__ = "0.0.0+develop"
Expand Down
54 changes: 28 additions & 26 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,33 +125,35 @@ def handler(*args, **kwargs):
"""
max_retries = 3
max_wait_time = 1000
try:
for i in range(max_retries):
try:
return fn(*args, **kwargs)
except _RpcError as e:
if e.code() == _GrpcStatusCode.UNAUTHENTICATED:
# Always retry auth errors.
if i == (max_retries - 1):
# Exit the loop and wrap the authentication error.
raise _user_exceptions.FlyteAuthenticationException(_six.text_type(e))
cli_logger.error(f"Unauthenticated RPC error {e}, refreshing credentials and retrying\n")
refresh_handler_fn = _get_refresh_handler(_creds_config.AUTH_MODE.get())
refresh_handler_fn(args[0])

for i in range(max_retries):
try:
return fn(*args, **kwargs)
except _RpcError as e:
if e.code() == _GrpcStatusCode.UNAUTHENTICATED:
# Always retry auth errors.
if i == (max_retries - 1):
# Exit the loop and wrap the authentication error.
raise _user_exceptions.FlyteAuthenticationException(_six.text_type(e))
cli_logger.error(f"Unauthenticated RPC error {e}, refreshing credentials and retrying\n")
refresh_handler_fn = _get_refresh_handler(_creds_config.AUTH_MODE.get())
refresh_handler_fn(args[0])
# There are two cases that we should throw error immediately
# 1. Entity already exists when we register entity
# 2. Entity not found when we fetch entity
elif e.code() == _GrpcStatusCode.ALREADY_EXISTS:
raise _user_exceptions.FlyteEntityAlreadyExistsException(e)
elif e.code() == _GrpcStatusCode.NOT_FOUND:
raise _user_exceptions.FlyteEntityNotExistException(e)
else:
# No more retries if retry=False or max_retries reached.
if (retry is False) or i == (max_retries - 1):
raise
else:
# No more retries if retry=False or max_retries reached.
if (retry is False) or i == (max_retries - 1):
raise
else:
# Retry: Start with 200ms wait-time and exponentially back-off up to 1 second.
wait_time = min(200 * (2 ** i), max_wait_time)
cli_logger.error(f"Non-auth RPC error {e}, sleeping {wait_time}ms and retrying")
time.sleep(wait_time / 1000)
except _RpcError as e:
if e.code() == _GrpcStatusCode.ALREADY_EXISTS:
raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e))
else:
raise
# Retry: Start with 200ms wait-time and exponentially back-off up to 1 second.
wait_time = min(200 * (2 ** i), max_wait_time)
cli_logger.error(f"Non-auth RPC error {e}, sleeping {wait_time}ms and retrying")
time.sleep(wait_time / 1000)

return handler

Expand Down
16 changes: 14 additions & 2 deletions flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,19 @@


def _welcome_message():
_click.secho("Welcome to Flyte CLI! Version: {}".format(_tt(__version__)), bold=True)
_click.secho(
"\n################################################################################################################################",
bold=True,
)
_click.secho(
"# flyte-cli is being deprecated in favor of flytectl. More details about flytectl in https://docs.flyte.org/projects/flytectl/ #",
bold=True,
)
_click.secho(
"################################################################################################################################\n",
bold=True,
)
_click.secho("Welcome to Flyte CLI! Version: {}\n".format(_tt(__version__)), bold=True)


def _get_user_filepath_home():
Expand Down Expand Up @@ -596,7 +608,7 @@ def make_context(self, cmd_name, args, parent=None):
"the sub-command's parameter takes precedence.",
)
@_insecure_option
@_click.group("flyte-cli")
@_click.group("flyte-cli", deprecated=True)
@_click.pass_context
def _flyte_cli(ctx, host, config, project, domain, name, insecure):
"""
Expand Down
1 change: 0 additions & 1 deletion flytekit/common/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def get_serializable_task(
)
if settings.should_fast_serialize() and isinstance(entity, PythonAutoContainerTask):
entity.reset_command_fn()

return task_models.TaskSpec(template=tt)


Expand Down
42 changes: 38 additions & 4 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
import copy
import inspect
import logging as _logging
import typing
from collections import OrderedDict
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, TypeVar, Union
Expand All @@ -13,6 +14,9 @@
from flytekit.core.type_engine import TypeEngine
from flytekit.loggers import logger
from flytekit.models import interface as _interface_models
from flytekit.types.pickle import FlytePickle

T = typing.TypeVar("T")


class Interface(object):
Expand Down Expand Up @@ -244,20 +248,41 @@ def transform_interface_to_list_interface(interface: Interface) -> Interface:
return Interface(inputs=map_inputs, outputs=map_outputs)


def _change_unrecognized_type_to_pickle(t: Type[T]) -> Type[T]:
try:
if hasattr(t, "__origin__") and hasattr(t, "__args__"):
if t.__origin__ == list:
return typing.List[_change_unrecognized_type_to_pickle(t.__args__[0])]
elif t.__origin__ == dict and t.__args__[0] == str:
return typing.Dict[str, _change_unrecognized_type_to_pickle(t.__args__[1])]
else:
TypeEngine.get_transformer(t)
except ValueError:
_logging.warning(
f"Unsupported Type {t} found, Flyte will default to use PickleFile as the transport. "
f"Pickle can only be used to send objects between the exact same version of Python, "
f"and we strongly recommend to use python type that flyte support."
)
return FlytePickle[t]
return t


def transform_signature_to_interface(signature: inspect.Signature, docstring: Optional[Docstring] = None) -> Interface:
"""
From the annotations on a task function that the user should have provided, and the output names they want to use
for each output parameter, construct the TypedInterface object
For now the fancy object, maybe in the future a dumb object.
"""
outputs = extract_return_annotation(signature.return_annotation)

for k, v in outputs.items():
outputs[k] = _change_unrecognized_type_to_pickle(v)
inputs = OrderedDict()
for k, v in signature.parameters.items():
annotation = v.annotation
default = v.default if v.default is not inspect.Parameter.empty else None
# Inputs with default values are currently ignored, we may want to look into that in the future
inputs[k] = (v.annotation, v.default if v.default is not inspect.Parameter.empty else None)
inputs[k] = (_change_unrecognized_type_to_pickle(annotation), default)

# This is just for typing.NamedTuples - in those cases, the user can select a name to call the NamedTuple. We
# would like to preserve that name in our custom collections.namedtuple.
Expand All @@ -273,7 +298,8 @@ def transform_signature_to_interface(signature: inspect.Signature, docstring: Op


def transform_variable_map(
variable_map: Dict[str, type], descriptions: Dict[str, str] = {}
variable_map: Dict[str, type],
descriptions: Dict[str, str] = {},
) -> Dict[str, _interface_models.Variable]:
"""
Given a map of str (names of inputs for instance) to their Python native types, return a map of the name to a
Expand All @@ -283,6 +309,14 @@ def transform_variable_map(
if variable_map:
for k, v in variable_map.items():
res[k] = transform_type(v, descriptions.get(k, k))
sub_type: Type[T] = v
if hasattr(v, "__origin__") and hasattr(v, "__args__"):
if v.__origin__ is list:
sub_type = v.__args__[0]
elif v.__origin__ is dict:
sub_type = v.__args__[1]
if hasattr(sub_type, "__origin__") and sub_type.__origin__ is FlytePickle:
res[k].type.metadata = {"python_class_name": sub_type.python_type().__name__}

return res

Expand Down
1 change: 1 addition & 0 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def my_wf(in1: int, in2: int) -> int:
def extract_value(
ctx: FlyteContext, input_val: Any, val_type: type, flyte_literal_type: _type_models.LiteralType
) -> _literal_models.Literal:

if isinstance(input_val, list):
if flyte_literal_type.collection_type is None:
raise TypeError(f"Not a collection type {flyte_literal_type} but got a list {input_val}")
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def guess_python_type(cls, flyte_type: LiteralType) -> type:
except ValueError:
logger.debug(f"Skipping transformer {transformer.name} for {flyte_type}")

# Because the dataclass transformer is handled explicity in the get_transformer code, we have to handle it
# Because the dataclass transformer is handled explicitly in the get_transformer code, we have to handle it
# separately here too.
try:
return cls._DATACLASS_TRANSFORMER.guess_python_type(literal_type=flyte_type)
Expand Down
2 changes: 1 addition & 1 deletion flytekit/extras/cloud_pickle_resolver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from base64 import b64decode, b64encode
from typing import List

import cloudpickle # intentionally not yet part of setup.py
import cloudpickle

from flytekit.core.base_task import TaskResolverMixin
from flytekit.core.context_manager import SerializationSettings
Expand Down
7 changes: 4 additions & 3 deletions flytekit/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,11 @@ def from_flyte_idl(cls, pb):
class AuthRole(FlyteIdlEntity):
def __init__(self, assumable_iam_role=None, kubernetes_service_account=None):
"""
At most one of assumable_iam_role or kubernetes_service_account can be set.
Either one or both of the assumable IAM role and/or the K8s service account can be set.
:param Text assumable_iam_role: IAM identity with set permissions policies.
:param Text kubernetes_service_account: Provides an identity for workflow execution resources. Flyte deployment
administrators are responsible for handling permissions as they relate to the service account.
:param Text kubernetes_service_account: Provides an identity for workflow execution resources.
Flyte deployment administrators are responsible for handling permissions as they
relate to the service account.
"""
self._assumable_iam_role = assumable_iam_role
self._kubernetes_service_account = kubernetes_service_account
Expand Down
5 changes: 5 additions & 0 deletions flytekit/models/core/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@


class WorkflowExecutionPhase(object):
"""
This class holds enum values used for setting notifications. See :py:class:`flytekit.Email`
for sample usage.
"""

UNDEFINED = _execution_pb2.WorkflowExecution.UNDEFINED
QUEUED = _execution_pb2.WorkflowExecution.QUEUED
RUNNING = _execution_pb2.WorkflowExecution.RUNNING
Expand Down
3 changes: 3 additions & 0 deletions flytekit/models/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def from_flyte_idl(cls, proto: _types_pb2.EnumType):


class BlobType(_common.FlyteIdlEntity):
"""
This type represents offloaded data and is typically used for things like files.
"""
class BlobDimensionality(object):
SINGLE = _types_pb2.BlobType.SINGLE
MULTIPART = _types_pb2.BlobType.MULTIPART
Expand Down
9 changes: 9 additions & 0 deletions flytekit/models/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ def from_flyte_idl(cls, pb2_object):


class BlobMetadata(_common.FlyteIdlEntity):
"""
This is metadata for the Blob literal.
"""

def __init__(self, type):
"""
:param flytekit.models.core.types.BlobType type: The type of the underlying blob
Expand Down Expand Up @@ -237,6 +241,9 @@ def from_flyte_idl(cls, proto):
class Blob(_common.FlyteIdlEntity):
def __init__(self, metadata, uri):
"""
This literal model is used to represent binary data offloaded to some storage location which is
identifiable with a unique string. See :py:class:`flytekit.FlyteFile` as an example.
:param BlobMetadata metadata:
:param Text uri: The location of this blob
"""
Expand Down Expand Up @@ -721,6 +728,8 @@ def from_flyte_idl(cls, pb2_object):
class Literal(_common.FlyteIdlEntity):
def __init__(self, scalar: Scalar = None, collection: LiteralCollection = None, map: LiteralMap = None):
"""
This IDL message represents a literal value in the Flyte ecosystem.
:param Scalar scalar:
:param LiteralCollection collection:
:param LiteralMap map:
Expand Down
7 changes: 6 additions & 1 deletion flytekit/models/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def __init__(
metadata=None,
):
"""
Only one of the kwargs may be set.
This is a oneof message, only one of the kwargs may be set, representing one of the Flyte types.
:param int simple: Enum type from SimpleType
:param SchemaType schema: Type definition for a dataframe-like object.
:param LiteralType collection_type: For list-like objects, this is the type of each entry in the list.
Expand Down Expand Up @@ -165,6 +166,10 @@ def metadata(self):
"""
return self._metadata

@metadata.setter
def metadata(self, value):
self._metadata = value

def to_flyte_idl(self):
"""
:rtype: flyteidl.core.types_pb2.LiteralType
Expand Down
Loading

0 comments on commit 7c39a9f

Please sign in to comment.