auth_endpoints=_credentials_access.get_authorization_endpoints()token_endpoint=auth_endpoints.token_endpointclient_secret=_basic_auth.get_secret()
- _logging.debug('Basic authorization flow with client id {} scope {}',_CLIENT_ID.get(),_SCOPE.get())
+ _logging.debug('Basic authorization flow with client id {} scope {}'.format(_CLIENT_ID.get(),_SCOPE.get()))authorization_header=_basic_auth.get_basic_authorization_header(_CLIENT_ID.get(),client_secret)token,expires_in=_basic_auth.get_token(token_endpoint,authorization_header,_SCOPE.get())_logging.info('Retrieved new token, expires in {}'.format(expires_in))
@@ -251,6 +252,21 @@
Source code for flytekit.clients.raw
returnhandler
+def_handle_invalid_create_request(fn):
+ defhandler(self,create_request):
+ try:
+ fn(self,create_request)
+ except_RpcErrorase:
+ ife.code()==_GrpcStatusCode.INVALID_ARGUMENT:
+ _logging.error("Error creating Flyte entity because of invalid arguments. Create request: ")
+ _logging.error(_MessageToJson(create_request))
+
+ # In any case, re-raise since we're not truly handling the error here
+ raisee
+
+ returnhandler
+
+
[docs]classRawSynchronousFlyteClient(object):""" This is a thin synchronous wrapper around the auto-generated GRPC stubs for communicating with the admin service.
@@ -302,6 +318,7 @@
[docs]@_handle_rpc_error
+ @_handle_invalid_create_requestdefcreate_task(self,task_create_request):""" This will create a task definition in the Admin database. Once successful, the task object can be
@@ -388,6 +405,7 @@
[docs]@_handle_rpc_error
+ @_handle_invalid_create_requestdefcreate_workflow(self,workflow_create_request):""" This will create a workflow definition in the Admin database. Once successful, the workflow object can be
@@ -474,6 +492,7 @@
[docs]@_handle_rpc_error
+ @_handle_invalid_create_requestdefcreate_launch_plan(self,launch_plan_create_request):""" This will create a launch plan definition in the Admin database. Once successful, the launch plan object can be
@@ -727,7 +746,31 @@
[docs]@_handle_rpc_error
+ defupdate_project_domain_attributes(self,project_domain_attributes_update_request):
+ """
+ This updates the attributes for a project and domain registered with the Flyte Admin Service
+ :param flyteidl.admin..ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request:
+ :rtype: flyteidl.admin..ProjectDomainAttributesUpdateResponse
+ """
+ returnself._stub.UpdateProjectDomainAttributes(project_domain_attributes_update_request,
+ metadata=self._metadata)
+
+
[docs]@_handle_rpc_error
+ defupdate_workflow_attributes(self,workflow_attributes_update_request):
+ """
+ This updates the attributes for a project, domain, and workflow registered with the Flyte Admin Service
+ :param flyteidl.admin..UpdateWorkflowAttributes workflow_attributes_update_request:
+ :rtype: flyteidl.admin..workflow_attributes_update_requestResponse
+ """
+ returnself._stub.UpdateWorkflowAttributes(workflow_attributes_update_request,metadata=self._metadata)
+ PYTORCH_TASK="pytorch"
+ # Raw container task is just a name, it defaults to using the regular container task (like python etc), but sets the data_config in the container
+ RAW_CONTAINER_TASK="raw-container"
[docs]@_exception_scopes.system_entry_point
+ deflaunch_with_literals(self,project,domain,literal_inputs,name=None,notification_overrides=None,
+ label_overrides=None,annotation_overrides=None):
+ """ Executes the launch plan and returns the execution identifier. This version of execution is meant for when you already have a LiteralMap of inputs.
@@ -351,7 +361,7 @@
Source code for flytekit.common.launch_plan
"""# Kubernetes requires names starting with an alphabet for some resources.name=nameor"f"+_uuid.uuid4().hex[:19]
- execution=_engine_loader.get_engine().get_launch_plan(self).execute(
+ execution=_engine_loader.get_engine().get_launch_plan(self).launch(project,domain,name,
@@ -416,7 +426,7 @@
:param flytekit.models.common.Annotations annotations: Any custom kubernetes annotations to apply to workflows executed by this launch plan. Any custom kubernetes annotations to apply to workflows executed by this launch plan.
- :param flytekit.models.launch_plan.Auth auth: The auth method with which to execute the workflow.
+ :param flytekit.models.common.Authrole auth_role: The auth method with which to execute the workflow. """
- ifroleandauth:
+ ifroleandauth_role:raiseValueError("Cannot set both role and auth. Role is deprecated, use auth instead.")fixed_inputs=fixed_inputsor{}default_inputs=default_inputsor{}ifrole:
- auth=_launch_plan_models.Auth(assumable_iam_role=role)
+ auth_role=_common_models.AuthRole(assumable_iam_role=role)# The constructor for SdkLaunchPlan sets the id to None anyways so we don't bother passing in an ID. The ID# should be set in one of three places,
@@ -464,7 +474,7 @@
[docs]deflaunch(self,project,domain,inputs=None,name=None,notification_overrides=None,label_overrides=None,
+ annotation_overrides=None):
+ """
+ Creates a remote execution from the entity and returns the execution identifier.
+ This version of launch is meant for when inputs are specified as Python native types/structures.
+
+ :param Text project:
+ :param Text domain:
+ :param dict[Text, Any] inputs: A dictionary of Python standard inputs that will be type-checked, then compiled
+ to a LiteralMap.
+ :param Text name: [Optional] If specified, an execution will be created with this name. Note: the name must
+ be unique within the context of the project and domain.
+ :param list[flytekit.common.notifications.Notification] notification_overrides: [Optional] If specified, these
+ are the notifications that will be honored for this execution. An empty list signals to disable all
+ notifications.
+ :param flytekit.models.common.Labels label_overrides:
+ :param flytekit.models.common.Annotations annotation_overrides:
+ :rtype: T
+
+ """
+ returnself.launch_with_literals(
+ project,
+ domain,
+ self._python_std_input_map_to_literal_map(inputsor{}),
+ name=name,
+ notification_overrides=notification_overrides,
+ label_overrides=label_overrides,
+ annotation_overrides=annotation_overrides,
+ )
[docs]@_abc.abstractmethod
+ deflaunch_with_literals(self,project,domain,literal_inputs,name=None,notification_overrides=None,
+ label_overrides=None,annotation_overrides=None):
+ """
+ Executes the entity and returns the execution identifier. This version of execution is meant for when
+ you already have a LiteralMap of inputs.
+
+ :param Text project:
+ :param Text domain:
+ :param flytekit.models.literals.LiteralMap literal_inputs: Inputs to the execution.
+ :param Text name: [Optional] If specified, an execution will be created with this name. Note: the name must
+ be unique within the context of the project and domain.
+ :param list[flytekit.common.notifications.Notification] notification_overrides: [Optional] If specified, these
+ are the notifications that will be honored for this execution. An empty list signals to disable all
+ notifications.
+ :param flytekit.models.common.Labels label_overrides:
+ :param flytekit.models.common.Annotations annotation_overrides:
+ :rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier:
+ """
+ pass
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/_modules/flytekit/common/mixins/registerable.html b/_modules/flytekit/common/mixins/registerable.html
index 48a87d6c77..c53fbcdc0b 100644
--- a/_modules/flytekit/common/mixins/registerable.html
+++ b/_modules/flytekit/common/mixins/registerable.html
@@ -8,7 +8,7 @@
- flytekit.common.mixins.registerable — Flyte 0.4.0 documentation
+ flytekit.common.mixins.registerable — Flyte 0.5.0 documentation
@@ -311,10 +311,19 @@
Source code for flytekit.common.mixins.registerable
m =_importlib.import_module(self.instantiated_in)forkindir(m):
- ifgetattr(m,k)==self:
- self._platform_valid_name=_utils.fqdn(m.__name__,k,entity_type=self.resource_type)
- _logging.debug("Auto-assigning name to {}".format(self._platform_valid_name))
- return
+ try:
+ ifgetattr(m,k)==self:
+ self._platform_valid_name=_utils.fqdn(m.__name__,k,entity_type=self.resource_type)
+ _logging.debug("Auto-assigning name to {}".format(self._platform_valid_name))
+ return
+ exceptValueErroraserr:
+ # Empty pandas dataframes behave weirdly here such that calling `m.df` raises:
+ # ValueError: The truth value of a {type(self).__name__} is ambiguous. Use a.empty, a.bool(), a.item(),
+ # a.any() or a.all()
+ # Since dataframes aren't registrable entities to begin with we swallow any errors they raise and
+ # continue looping through m.
+ _logging.warning("Caught ValueError {} while attempting to auto-assign name".format(err))
+ pass_logging.error("Could not auto-assign name")raise_system_exceptions.FlyteSystemException("Error looking for object while auto-assigning name.")
[docs]classSdkRawContainerTask(_base_task.SdkTask):
+ """
+ Use this task when you want to run an arbitrary container as a task (e.g. external tools, binaries compiled
+ separately as a container completely separate from the container where your Flyte workflow is defined.
+ """
+ METADATA_FORMAT_JSON=_task_models.DataLoadingConfig.LITERALMAP_FORMAT_JSON
+ METADATA_FORMAT_YAML=_task_models.DataLoadingConfig.LITERALMAP_FORMAT_YAML
+ METADATA_FORMAT_PROTO=_task_models.DataLoadingConfig.LITERALMAP_FORMAT_PROTO
+
+ def__init__(
+ self,
+ inputs:Dict[str,FlyteSdkType],
+ image:str,
+ outputs:Dict[str,FlyteSdkType]=None,
+ input_data_dir:str=None,
+ output_data_dir:str=None,
+ metadata_format:int=METADATA_FORMAT_JSON,
+ io_strategy:_task_models.IOStrategy=None,
+ command:List[str]=None,
+ args:List[str]=None,
+ storage_request:str=None,
+ cpu_request:str=None,
+ gpu_request:str=None,
+ memory_request:str=None,
+ storage_limit:str=None,
+ cpu_limit:str=None,
+ gpu_limit:str=None,
+ memory_limit:str=None,
+ environment:Dict[str,str]=None,
+ interruptible:bool=False,
+ discoverable:bool=False,
+ discovery_version:str=None,
+ retries:int=1,
+ timeout:_datetime.timedelta=None,
+ ):
+ """
+ :param inputs:
+ :param outputs:
+ :param image:
+ :param command:
+ :param args:
+ :param storage_request:
+ :param cpu_request:
+ :param gpu_request:
+ :param memory_request:
+ :param storage_limit:
+ :param cpu_limit:
+ :param gpu_limit:
+ :param memory_limit:
+ :param environment:
+ :param interruptible:
+ :param discoverable:
+ :param discovery_version:
+ :param retries:
+ :param timeout:
+ :param input_data_dir: This is the directory where data will be downloaded to
+ :param output_data_dir: This is the directory where data will be uploaded from
+ :param metadata_format: Format in which the metadata will be available for the script
+ """
+
+ # Set as class fields which are used down below to configure implicit
+ # parameters
+ self._data_loading_config=_task_models.DataLoadingConfig(
+ input_path=input_data_dir,
+ output_path=output_data_dir,
+ format=metadata_format,
+ enabled=True,
+ io_strategy=io_strategy,
+ )
+
+ metadata=_task_models.TaskMetadata(
+ discoverable,
+ # This needs to have the proper version reflected in it
+ _task_models.RuntimeMetadata(
+ _task_models.RuntimeMetadata.RuntimeType.FLYTE_SDK,__version__,
+ "python"),
+ timeoutor_datetime.timedelta(seconds=0),
+ _literals.RetryStrategy(retries),
+ interruptible,
+ discovery_version,
+ None
+ )
+
+ # The interface is defined using the inputs and outputs
+ i=_interface.TypedInterface(inputs=types_to_variable(inputs),outputs=types_to_variable(outputs))
+
+ # This sets the base SDKTask with container etc
+ super(SdkRawContainerTask,self).__init__(
+ _constants.SdkTaskType.RAW_CONTAINER_TASK,
+ metadata,
+ i,
+ None,
+ container=_get_container_definition(
+ image=image,
+ args=args,
+ command=command,
+ data_loading_config=self._data_loading_config,
+ storage_request=storage_request,
+ cpu_request=cpu_request,
+ gpu_request=gpu_request,
+ memory_request=memory_request,
+ storage_limit=storage_limit,
+ cpu_limit=cpu_limit,
+ gpu_limit=gpu_limit,
+ memory_limit=memory_limit,
+ environment=environment,
+ )
+ )
+
+
+
[docs]@_exception_scopes.system_entry_point
+ defadd_inputs(self,inputs:Dict[str,Variable]):
+ """
+ Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
+ name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in
+ the wrapped function.
+ :param dict[Text, flytekit.models.interface.Variable] inputs: names and variables
+ """
+ self._validate_inputs(inputs)
+ self.interface.inputs.update(inputs)
+
+
[docs]@_exception_scopes.system_entry_point
+ defadd_outputs(self,outputs:Dict[str,Variable]):
+ """
+ Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
+ name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in
+ the wrapped function.
+ :param dict[Text, flytekit.models.interface.Variable] outputs: names and variables
+ """
+ self._validate_outputs(outputs)
+ self.interface.outputs.update(outputs)
+ )
+
+ def_python_std_input_map_to_literal_map(self,inputs):
+ """
+ :param dict[Text,Any] inputs: A dictionary of Python standard inputs that will be type-checked and compiled
+ to a LiteralMap
+ :rtype: flytekit.models.literals.LiteralMap
+ """
+ return_type_helpers.pack_python_std_map_to_literal_map(inputs,{
+ k:_type_helpers.get_sdk_type_from_literal_type(v.type)
+ fork,vin_six.iteritems(self.interface.inputs)
+ })
+
+ def_produce_deterministic_version(self,version=None):
+ """
+ :param Text version:
+ :return Text:
+ """
+
+ ifself.containerisnotNoneandself.container.data_loading_configisNone:
+ # Only in the case of raw container tasks (which are the only valid tasks with container definitions that
+ # can assign a client-side task version) their data config will be None.
+ raiseValueError("Client-side task versions are not supported for {} task type".format(self.type))
+ ifversionisnotNone:
+ returnversion
+ custom=_json_format.Parse(_json.dumps(self.custom,sort_keys=True),_struct.Struct())ifself.customelseNone
+
+ # The task body is the entirety of the task template MINUS the identifier. The identifier is omitted because
+ # 1) this method is used to compute the version portion of the identifier and
+ # 2 ) the SDK will actually generate a unique name on every task instantiation which is not great for
+ # the reproducibility this method attempts.
+ task_body=(self.type,self.metadata.to_flyte_idl().SerializeToString(deterministic=True),
+ self.interface.to_flyte_idl().SerializeToString(deterministic=True),custom)
+ return_hashlib.md5(str(task_body).encode('utf-8')).hexdigest()
+
+
[docs]@_exception_scopes.system_entry_point
+ defregister_and_launch(self,project,domain,name=None,version=None,inputs=None):
+ """
+ :param Text project: The project in which to register and launch this task.
+ :param Text domain: The domain in which to register and launch this task.
+ :param Text name: The name to give this task.
+ :param Text version: The version in which to register this task
+ :param dict[Text, Any] inputs: A dictionary of Python standard inputs that will be type-checked, then compiled
+ to a LiteralMap.
+
+ :rtype: flytekit.common.workflow_execution.SdkWorkflowExecution
+ """
+ self.validate()
+ version=self._produce_deterministic_version(version)
+
+ ifnameisNone:
+ try:
+ self.auto_assign_name()
+ generated_name=self._platform_valid_name
+ except_system_exceptions.FlyteSystemException:
+ # If we're not able to assign a platform valid name, use the deterministically-produced version instead.
+ generated_name=version
+ name=nameifnameelsegenerated_name
+ id_to_register=_identifier.Identifier(_identifier_model.ResourceType.TASK,project,domain,name,version)
+ old_id=self.id
+ try:
+ self._id=id_to_register
+ _engine_loader.get_engine().get_task(self).register(id_to_register)
+ except:
+ self._id=old_id
+ raise
+ returnself.launch(project,domain,inputs=inputs)
+
+
[docs]@_exception_scopes.system_entry_point
+ deflaunch_with_literals(self,project,domain,literal_inputs,name=None,notification_overrides=None,
+ label_overrides=None,annotation_overrides=None):
+ """
+ Launches a single task execution and returns the execution identifier.
+ :param Text project:
+ :param Text domain:
+ :param flytekit.models.literals.LiteralMap literal_inputs: Inputs to the execution.
+ :param Text name: [Optional] If specified, an execution will be created with this name. Note: the name must
+ be unique within the context of the project and domain.
+ :param list[flytekit.common.notifications.Notification] notification_overrides: [Optional] If specified, these
+ are the notifications that will be honored for this execution. An empty list signals to disable all
+ notifications.
+ :param flytekit.models.common.Labels label_overrides:
+ :param flytekit.models.common.Annotations annotation_overrides:
+ :rtype: flytekit.common.workflow_execution.SdkWorkflowExecution
+ """
+ execution=_engine_loader.get_engine().get_task(self).launch(
+ project,
+ domain,
+ name=name,
+ inputs=literal_inputs,
+ notification_overrides=notification_overrides,
+ label_overrides=label_overrides,
+ annotation_overrides=annotation_overrides,
+ )
+ return_workflow_execution.SdkWorkflowExecution.promote_from_model(execution)
[docs]@_abc.abstractmethod
+ deflaunch(self,project,domain,name,inputs,notification_overrides=None,label_overrides=None,
+ annotation_overrides=None):""" Registers the launch plan and returns the identifier. :param Text project:
@@ -381,7 +381,7 @@
[docs]@_abc.abstractmethoddefupdate(self,identifier,state):""" :param flytekit.models.core.identifier.Identifier identifier: ID for launch plan to update
@@ -416,6 +416,24 @@
[docs]@_deprecated(reason="Use launch instead",version='0.9.0')
+ defexecute(self,project,domain,name,inputs,notification_overrides=None,label_overrides=None,annotation_overrides=None):"""
- Executes the launch plan.
+ Deprecated. Use launch instead.
+ """
+ returnself.launch(project,domain,name,inputs,notification_overrides,label_overrides,annotation_overrides)
+
+
[docs]deflaunch(self,project,domain,name,inputs,notification_overrides=None,label_overrides=None,
+ annotation_overrides=None):
+ """
+ Creates a workflow execution using parameters specified in the launch plan. :param Text project: :param Text domain: :param Text name:
@@ -495,7 +506,68 @@
[docs]defregister(self,identifier,version):
- raise_user_exceptions.FlyteAssertion("You cannot register unit test tasks.")
+ raise_user_exceptions.FlyteAssertion("You cannot register unit test tasks.")
+
+
[docs]deflaunch(self,project,domain,name=None,inputs=None,notification_overrides=None,label_overrides=None,
+ annotation_overrides=None,auth_role=None):
+ raise_user_exceptions.FlyteAssertion("You cannot launch unit test tasks.")
[docs]classAuthRole(FlyteIdlEntity):
+ def__init__(self,assumable_iam_role=None,kubernetes_service_account=None):
+ """
+ At most one of assumable_iam_role or kubernetes_service_account can be set.
+ :param Text assumable_iam_role: IAM identity with set permissions policies.
+ :param Text kubernetes_service_account: Provides an identity for workflow execution resources. Flyte deployment
+ administrators are responsible for handling permissions as they relate to the service account.
+ """
+ ifassumable_iam_roleandkubernetes_service_account:
+ raiseValueError("Only one of assumable_iam_role or kubernetes_service_account can be set")
+ self._assumable_iam_role=assumable_iam_role
+ self._kubernetes_service_account=kubernetes_service_account
+
+ @property
+ defassumable_iam_role(self):
+ """
+ The IAM role to execute the workflow with
+ :rtype: Text
+ """
+ returnself._assumable_iam_role
+
+ @property
+ defkubernetes_service_account(self):
+ """
+ The kubernetes service account to execute the workflow with
+ :rtype: Text
+ """
+ returnself._kubernetes_service_account
+
+
[docs]classOnFailurePolicy(object):
+ """
+ Defines the execution behavior of the workflow when a failure is detected.
+
+ Attributes:
+ FAIL_IMMEDIATELY Instructs the system to fail as soon as a node fails in the
+ workflow. It'll automatically abort all currently running nodes and
+ clean up resources before finally marking the workflow executions as failed.
+
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE Instructs the system to make as much progress as it can. The system
+ will not alter the dependencies of the execution graph so any node
+ that depend on the failed node will not be run. Other nodes that will
+ be executed to completion before cleaning up resources and marking
+ the workflow execution as failed.
+ """
+
+ FAIL_IMMEDIATELY=_core_workflow.WorkflowMetadata.FAIL_IMMEDIATELY
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE=_core_workflow.WorkflowMetadata.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
+
+ def__init__(self,queuing_budget=None,on_failure=None):""" Metadata for the workflow. :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
+ :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. """self._queuing_budget=queuing_budget
+ self._on_failure=on_failure@propertydefqueuing_budget(self):
@@ -619,6 +640,13 @@
[docs]classExecutionSpec(_common_models.FlyteIdlEntity):def__init__(self,launch_plan,metadata,notifications=None,disable_all=None,labels=None,
- annotations=None):
+ annotations=None,auth_role=None):""" :param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute :param ExecutionMetadata metadata: The metadata to be associated with this execution
@@ -242,6 +242,7 @@
Source code for flytekit.models.execution
:param bool disable_all: If true, all notifications should be disabled. :param flytekit.models.common.Labels labels: Labels to apply to the execution. :param flytekit.models.common.Annotations annotations: Annotations to apply to the execution
+ :param flytekit.models.common.AuthRole auth_role: The authorization method with which to execute the workflow. """self._launch_plan=launch_plan
@@ -250,6 +251,7 @@
[docs]classAuth(_common.FlyteIdlEntity):def__init__(self,assumable_iam_role=None,kubernetes_service_account=None):"""
+ DEPRECATED. Do not use. Use flytekit.models.common.AuthRole instead At most one of assumable_iam_role or kubernetes_service_account can be set. :param Text assumable_iam_role: IAM identity with set permissions policies. :param Text kubernetes_service_account: Provides an identity for workflow execution resources. Flyte deployment
@@ -266,7 +267,7 @@
Source code for flytekit.models.launch_plan
[docs]classLaunchPlanSpec(_common.FlyteIdlEntity):
- def__init__(self,workflow_id,entity_metadata,default_inputs,fixed_inputs,labels,annotations,auth):
+ def__init__(self,workflow_id,entity_metadata,default_inputs,fixed_inputs,labels,annotations,auth_role):""" The spec for a Launch Plan.
@@ -278,7 +279,7 @@
Source code for flytekit.models.launch_plan
Any custom kubernetes labels to apply to workflows executed by this launch plan. :param flyteidl.admin.common_pb2.Annotations annotations: Any custom kubernetes annotations to apply to workflows executed by this launch plan.
- :param flytekit.models.launch_plan.Auth auth: The auth method with which to execute the workflow.
+ :param flytekit.models.common.Auth auth_role: The auth method with which to execute the workflow. """self._workflow_id=workflow_idself._entity_metadata=entity_metadata
@@ -286,7 +287,7 @@
returnself._annotations@property
- defauth(self):
+ defauth_role(self):""" The authorization method with which to execute the workflow.
- :return: flytekit.models.launch_plan.Auth
+ :return: flytekit.models.common.Auth """
- returnself._auth
+ returnself._auth_role
[docs]classClusterResourceAttributes(_common.FlyteIdlEntity):
+
+ def__init__(self,attributes):
+ """
+ Custom resource attributes which will be applied in cluster resource creation (e.g. quotas).
+ Dict keys are the *case-sensitive* names of variables in templatized resource files.
+ Dict values should be the custom values which get substituted during resource creation.
+
+ :param dict[Text, Text] attributes: Applied in cluster resource creation (e.g. quotas).
+ """
+ self._attributes=attributes
+
+ @property
+ defattributes(self):
+ """
+ Custom resource attributes which will be applied in cluster resource management
+ :rtype: dict[Text, Text]
+ """
+ returnself._attributes
+
+
[docs]classExecutionClusterLabel(_common.FlyteIdlEntity):
+
+ def__init__(self,value):
+ """
+ Label value to determine where the execution will be run
+
+ :param Text value:
+ """
+ self._value=value
+
+ @property
+ defvalue(self):
+ """
+ :rtype: Text
+ """
+ returnself._value
+
+
[docs]classTaskMetadata(_common.FlyteIdlEntity):
- def__init__(self,discoverable,runtime,timeout,retries,interruptible,discovery_version,deprecated_error_message):
+ def__init__(self,discoverable,runtime,timeout,retries,interruptible,discovery_version,
+ deprecated_error_message):""" Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts, and retries.
@@ -808,7 +807,7 @@
[docs]classIOStrategy(_common.FlyteIdlEntity):
+ """
+ Provides methods to manage data in and out of the Raw container using Download Modes. This can only be used if DataLoadingConfig is enabled.
+ """
+ DOWNLOAD_MODE_EAGER=_core_task.IOStrategy.DOWNLOAD_EAGER
+ DOWNLOAD_MODE_STREAM=_core_task.IOStrategy.DOWNLOAD_STREAM
+ DOWNLOAD_MODE_NO_DOWNLOAD=_core_task.IOStrategy.DO_NOT_DOWNLOAD
+
+ UPLOAD_MODE_EAGER=_core_task.IOStrategy.UPLOAD_EAGER
+ UPLOAD_MODE_ON_EXIT=_core_task.IOStrategy.UPLOAD_ON_EXIT
+ UPLOAD_MODE_NO_UPLOAD=_core_task.IOStrategy.DO_NOT_UPLOAD
+
+ def__init__(self,
+ download_mode:_core_task.IOStrategy.DownloadMode=DOWNLOAD_MODE_EAGER,
+ upload_mode:_core_task.IOStrategy.UploadMode=UPLOAD_MODE_ON_EXIT):
+ self._download_mode=download_mode
+ self._upload_mode=upload_mode
+
+
[docs]@classmethod
+ deffrom_flyte_idl(cls,pb2:_core_task.DataLoadingConfig):
+ # TODO use python 3.7+ only and then https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel -> DataLoadingConfig:
+ ifpb2isNone:
+ returnNone
+ returncls(
+ input_path=pb2.input_path,
+ output_path=pb2.output_path,
+ enabled=pb2.enabled,
+ format=pb2.format,
+ io_strategy=IOStrategy.from_flyte_idl(pb2.io_strategy)ifpb2.HasField("io_strategy")elseNone,
+ )
+
+
[docs]classContainer(_common.FlyteIdlEntity):
- def__init__(self,image,command,args,resources,env,config):
+ def__init__(self,image,command,args,resources,env,config,data_loading_config=None):"""
- This defines a container target. It will execute the appropriate command line on the appropriate image with
- the given configurations.
+ This defines a container target. It will execute the appropriate command line on the appropriate image with
+ the given configurations.
- :param Text image: The fully-qualified identifier for the image.
- :param list[Text] command: A list of 'words' for the command. i.e. ['aws', 's3', 'ls']
- :param list[Text] args: A list of arguments for the command. i.e. ['s3://some/path', '/tmp/local/path']
- :param Resources resources: A definition of requisite compute resources.
- :param dict[Text, Text] env: A definition of key-value pairs for environment variables.
- :param dict[Text, Text] config: A definition of configuration key-value pairs.
- """
+ :param Text image: The fully-qualified identifier for the image.
+ :param list[Text] command: A list of 'words' for the command. i.e. ['aws', 's3', 'ls']
+ :param list[Text] args: A list of arguments for the command. i.e. ['s3://some/path', '/tmp/local/path']
+ :param Resources resources: A definition of requisite compute resources.
+ :param dict[Text, Text] env: A definition of key-value pairs for environment variables.
+ :param dict[Text, Text] config: A definition of configuration key-value pairs.
+ :type DataLoadingConfig data_loading_config: object
+ """
+ self._data_loading_config=data_loading_configself._image=imageself._command=commandself._args=args
@@ -888,6 +963,13 @@
[docs]defworkflow_class(_workflow_metaclass=None,cls=None,queuing_budget=None,on_failure=None):""" This is a decorator for wrapping class definitions into workflows.
@@ -221,11 +221,12 @@
Source code for flytekit.sdk.workflow
by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`. :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
+ :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. :rtype: flytekit.common.workflow.SdkWorkflow """defwrapper(metaclass):
- wf=_common_workflow.build_sdk_workflow_from_metaclass(metaclass,cls=cls,queuing_budget=queuing_budget)
+ wf=_common_workflow.build_sdk_workflow_from_metaclass(metaclass,cls=cls,queuing_budget=queuing_budget,on_failure=on_failure)returnwfif_workflow_metaclassisnotNone:
@@ -233,7 +234,7 @@
[docs]defworkflow(nodes,inputs=None,outputs=None,cls=None,queuing_budget=None,on_failure=None):""" This function provides a user-friendly interface for authoring workflows.
@@ -267,6 +268,7 @@
Source code for flytekit.sdk.workflow
by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`. :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
+ :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. :rtype: flytekit.common.workflow.SdkWorkflow """wf=(clsor_common_workflow.SdkWorkflow)(
diff --git a/_modules/flytekit/tools/lazy_loader.html b/_modules/flytekit/tools/lazy_loader.html
index abcaf9d736..07df079071 100644
--- a/_modules/flytekit/tools/lazy_loader.html
+++ b/_modules/flytekit/tools/lazy_loader.html
@@ -8,7 +8,7 @@
- flytekit.tools.lazy_loader — Flyte 0.4.0 documentation
+ flytekit.tools.lazy_loader — Flyte 0.5.0 documentation
diff --git a/_modules/flytekit/tools/module_loader.html b/_modules/flytekit/tools/module_loader.html
index 8d42dc287f..3d8d5c93f0 100644
--- a/_modules/flytekit/tools/module_loader.html
+++ b/_modules/flytekit/tools/module_loader.html
@@ -8,7 +8,7 @@
- flytekit.tools.module_loader — Flyte 0.4.0 documentation
+ flytekit.tools.module_loader — Flyte 0.5.0 documentation
diff --git a/_modules/flytekit/tools/subprocess.html b/_modules/flytekit/tools/subprocess.html
index 99e66a3e26..a0a4735611 100644
--- a/_modules/flytekit/tools/subprocess.html
+++ b/_modules/flytekit/tools/subprocess.html
@@ -8,7 +8,7 @@
- flytekit.tools.subprocess — Flyte 0.4.0 documentation
+ flytekit.tools.subprocess — Flyte 0.5.0 documentation
diff --git a/_modules/flytekit/type_engines/common.html b/_modules/flytekit/type_engines/common.html
index 16e1dd94b0..1a529df061 100644
--- a/_modules/flytekit/type_engines/common.html
+++ b/_modules/flytekit/type_engines/common.html
@@ -8,7 +8,7 @@
- flytekit.type_engines.common — Flyte 0.4.0 documentation
+ flytekit.type_engines.common — Flyte 0.5.0 documentation
diff --git a/_modules/flytekit/type_engines/default/flyte.html b/_modules/flytekit/type_engines/default/flyte.html
index 04a3dad11a..5a261d373f 100644
--- a/_modules/flytekit/type_engines/default/flyte.html
+++ b/_modules/flytekit/type_engines/default/flyte.html
@@ -8,7 +8,7 @@
- flytekit.type_engines.default.flyte — Flyte 0.4.0 documentation
+ flytekit.type_engines.default.flyte — Flyte 0.5.0 documentation
diff --git a/_modules/index.html b/_modules/index.html
index 0f02d5827a..d73e07427e 100644
--- a/_modules/index.html
+++ b/_modules/index.html
@@ -8,7 +8,7 @@
- Overview: module code — Flyte 0.4.0 documentation
+ Overview: module code — Flyte 0.5.0 documentation
@@ -177,8 +177,8 @@
diff --git a/_modules/random.html b/_modules/random.html
index c3ff308084..c2780e253c 100644
--- a/_modules/random.html
+++ b/_modules/random.html
@@ -8,7 +8,7 @@
- random — Flyte 0.4.0 documentation
+ random — Flyte 0.5.0 documentation
diff --git a/_sources/flyteidl/core/workflow.proto.rst.txt b/_sources/flyteidl/core/workflow.proto.rst.txt
index 35c6c2286a..3ef7d40f90 100644
--- a/_sources/flyteidl/core/workflow.proto.rst.txt
+++ b/_sources/flyteidl/core/workflow.proto.rst.txt
@@ -8,7 +8,7 @@ workflow.proto
flyteidl.core.IfBlock
---------------------
-`[flyteidl.core.IfBlock proto] `_
+`[flyteidl.core.IfBlock proto] `_
Defines a condition and the execution unit that should be executed if the condition is satisfied.
@@ -36,7 +36,7 @@ then_node
flyteidl.core.IfElseBlock
-------------------------
-`[flyteidl.core.IfElseBlock proto] `_
+`[flyteidl.core.IfElseBlock proto] `_
Defines a series of if/else blocks. The first branch whose condition evaluates to true is the one to execute.
If no conditions were satisfied, the else_node or the error will execute.
@@ -89,7 +89,7 @@ error
flyteidl.core.BranchNode
------------------------
-`[flyteidl.core.BranchNode proto] `_
+`[flyteidl.core.BranchNode proto] `_
BranchNode is a special node that alter the flow of the workflow graph. It allows the control flow to branch at
runtime based on a series of conditions that get evaluated on various parameters (e.g. inputs, primtives).
@@ -113,7 +113,7 @@ if_else
flyteidl.core.TaskNode
----------------------
-`[flyteidl.core.TaskNode proto] `_
+`[flyteidl.core.TaskNode proto] `_
Refers to the task that the Node is to execute.
@@ -137,7 +137,7 @@ reference_id
flyteidl.core.WorkflowNode
--------------------------
-`[flyteidl.core.WorkflowNode proto] `_
+`[flyteidl.core.WorkflowNode proto] `_
Refers to a the workflow the node is to execute.
@@ -173,7 +173,7 @@ sub_workflow_ref
flyteidl.core.NodeMetadata
--------------------------
-`[flyteidl.core.NodeMetadata proto] `_
+`[flyteidl.core.NodeMetadata proto] `_
Defines extra information about the Node.
@@ -218,7 +218,7 @@ interruptible
flyteidl.core.Alias
-------------------
-`[flyteidl.core.Alias proto] `_
+`[flyteidl.core.Alias proto] `_
Links a variable to an alias.
@@ -248,7 +248,7 @@ alias
flyteidl.core.Node
------------------
-`[flyteidl.core.Node proto] `_
+`[flyteidl.core.Node proto] `_
A Workflow graph Node. One unit of execution in the graph. Each node can be linked to a Task, a Workflow or a branch
node.
@@ -339,15 +339,15 @@ branch_node
flyteidl.core.WorkflowMetadata
------------------------------
-`[flyteidl.core.WorkflowMetadata proto] `_
+`[flyteidl.core.WorkflowMetadata proto] `_
-Metadata for the entire workflow.
-To be used in the future.
+Metadata for the entire workflow. Defines execution behavior that does not change the final outputs of the workflow.
.. code-block:: json
{
- "queuing_budget": "{...}"
+ "queuing_budget": "{...}",
+ "on_failure": "..."
}
.. _api_field_flyteidl.core.WorkflowMetadata.queuing_budget:
@@ -356,14 +356,46 @@ queuing_budget
(:ref:`google.protobuf.Duration `) Total wait time a workflow can be delayed by queueing.
+.. _api_field_flyteidl.core.WorkflowMetadata.on_failure:
+on_failure
+ (:ref:`flyteidl.core.WorkflowMetadata.OnFailurePolicy `) Defines how the system should behave when a failure is detected in the workflow execution.
+
+
+
+.. _api_enum_flyteidl.core.WorkflowMetadata.OnFailurePolicy:
+
+Enum flyteidl.core.WorkflowMetadata.OnFailurePolicy
+---------------------------------------------------
+
+`[flyteidl.core.WorkflowMetadata.OnFailurePolicy proto] `_
+
+Failure Handling Strategy
+
+.. _api_enum_value_flyteidl.core.WorkflowMetadata.OnFailurePolicy.FAIL_IMMEDIATELY:
+
+FAIL_IMMEDIATELY
+ *(DEFAULT)* FAIL_IMMEDIATELY instructs the system to fail as soon as a node fails in the workflow. It'll automatically
+ abort all currently running nodes and clean up resources before finally marking the workflow executions as
+ failed.
+
+
+.. _api_enum_value_flyteidl.core.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE:
+
+FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE instructs the system to make as much progress as it can. The system will
+ not alter the dependencies of the execution graph so any node that depend on the failed node will not be run.
+ Other nodes that will be executed to completion before cleaning up resources and marking the workflow
+ execution as failed.
+
+
.. _api_msg_flyteidl.core.WorkflowMetadataDefaults:
flyteidl.core.WorkflowMetadataDefaults
--------------------------------------
-`[flyteidl.core.WorkflowMetadataDefaults proto] `_
+`[flyteidl.core.WorkflowMetadataDefaults proto] `_
Default Workflow Metadata for the entire workflow.
@@ -388,7 +420,7 @@ interruptible
flyteidl.core.WorkflowTemplate
------------------------------
-`[flyteidl.core.WorkflowTemplate proto] `_
+`[flyteidl.core.WorkflowTemplate proto] `_
Flyte Workflow Structure that encapsulates task, branch and subworkflow nodes to form a statically analyzable,
directed acyclic graph.
diff --git a/_sources/flytekit/flytekit.common.mixins.rst.txt b/_sources/flytekit/flytekit.common.mixins.rst.txt
index f91011e3e3..a19c332f45 100644
--- a/_sources/flytekit/flytekit.common.mixins.rst.txt
+++ b/_sources/flytekit/flytekit.common.mixins.rst.txt
@@ -12,18 +12,18 @@ flytekit.common.mixins.artifact module
:undoc-members:
:show-inheritance:
-flytekit.common.mixins.executable module
-----------------------------------------
+flytekit.common.mixins.hash module
+----------------------------------
-.. automodule:: flytekit.common.mixins.executable
+.. automodule:: flytekit.common.mixins.hash
:members:
:undoc-members:
:show-inheritance:
-flytekit.common.mixins.hash module
-----------------------------------
+flytekit.common.mixins.launchable module
+----------------------------------------
-.. automodule:: flytekit.common.mixins.hash
+.. automodule:: flytekit.common.mixins.launchable
:members:
:undoc-members:
:show-inheritance:
diff --git a/_sources/flytekit/flytekit.common.tasks.rst.txt b/_sources/flytekit/flytekit.common.tasks.rst.txt
index a9023fd22a..b017d143be 100644
--- a/_sources/flytekit/flytekit.common.tasks.rst.txt
+++ b/_sources/flytekit/flytekit.common.tasks.rst.txt
@@ -52,6 +52,14 @@ flytekit.common.tasks.pytorch\_task module
:undoc-members:
:show-inheritance:
+flytekit.common.tasks.raw\_container module
+-------------------------------------------
+
+.. automodule:: flytekit.common.tasks.raw_container
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
flytekit.common.tasks.sdk\_dynamic module
-----------------------------------------
diff --git a/_sources/flytekit/flytekit.models.rst.txt b/_sources/flytekit/flytekit.models.rst.txt
index 3fab151540..8a27629957 100644
--- a/_sources/flytekit/flytekit.models.rst.txt
+++ b/_sources/flytekit/flytekit.models.rst.txt
@@ -76,6 +76,14 @@ flytekit.models.literals module
:undoc-members:
:show-inheritance:
+flytekit.models.matchable\_resource module
+------------------------------------------
+
+.. automodule:: flytekit.models.matchable_resource
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
flytekit.models.named\_entity module
------------------------------------
diff --git a/_sources/user/features/index.rst.txt b/_sources/user/features/index.rst.txt
index 162eba13bf..27f3c52a9d 100644
--- a/_sources/user/features/index.rst.txt
+++ b/_sources/user/features/index.rst.txt
@@ -14,3 +14,5 @@ Flyte Features
launchplans
task_cache
roles
+ single_task_execution
+ on_failure_policy
diff --git a/_sources/user/features/on_failure_policy.rst.txt b/_sources/user/features/on_failure_policy.rst.txt
new file mode 100644
index 0000000000..e4d164c9f7
--- /dev/null
+++ b/_sources/user/features/on_failure_policy.rst.txt
@@ -0,0 +1,48 @@
+.. _on-failure-policy:
+
+On Failure Policy
+#################
+
+What is it
+----------
+
+The default behavior for when a node fails in a workflow is to immediately abort the entire workflow. The reasoning behind this thinking
+is to avoid wasting resources since the workflow will end up failing anyway. There are certain cases however, when it's desired for the
+workflow to carry on executing the branches it can execute.
+
+For example when the remaining tasks are marked as :ref:`cacheable `.
+Once the failure has been fixed and the workflow is relaunched, cached tasks will be bypassed quickly.
+
+How to use it
+-------------
+
+Use on_failure attribute on workflow_class.
+
+.. code:: python
+
+ from flytekit.models.core.workflow import WorkflowMetadata
+
+ @workflow_class(on_failure=WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
+ class RunToCompletionWF(object):
+ pass
+
+Available values in the policy:
+
+.. code:: python
+
+ class OnFailurePolicy(object):
+ """
+ Defines the execution behavior of the workflow when a failure is detected.
+ Attributes:
+ FAIL_IMMEDIATELY Instructs the system to fail as soon as a node fails in the
+ workflow. It'll automatically abort all currently running nodes and
+ clean up resources before finally marking the workflow executions as failed.
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE Instructs the system to make as much progress as it can. The system
+ will not alter the dependencies of the execution graph so any node
+ that depend on the failed node will not be run. Other nodes that will
+ be executed to completion before cleaning up resources and marking
+ the workflow execution as failed.
+ """
+
+ FAIL_IMMEDIATELY = _core_workflow.WorkflowMetadata.FAIL_IMMEDIATELY
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE = _core_workflow.WorkflowMetadata.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
diff --git a/_sources/user/features/single_task_execution.rst.txt b/_sources/user/features/single_task_execution.rst.txt
new file mode 100644
index 0000000000..d3d258ec29
--- /dev/null
+++ b/_sources/user/features/single_task_execution.rst.txt
@@ -0,0 +1,90 @@
+.. _features-singletaskexec:
+
+Single Task Excution
+####################
+
+What are single task executions?
+================================
+
+Tasks are the most atomic unit of execution in Flyte. Although workflows are traditionally composed of multiple tasks with dependencies
+defined by shared inputs and outputs it can be helpful to execute a single task during the process of iterating on its definition.
+It can be tedious to write a new workflow definition every time you want to excecute a single task under development but single task
+executions can be used to easily iterate on task logic.
+
+Launch a single task
+====================
+
+After you've built an image with your updated task code, create an execution using launch:
+
+.. code-block:: python
+
+ @inputs(plant=Types.String)
+ @outputs(out=Types.String)
+ @python_task
+ def my_task(wf_params, plant, out)
+ ...
+
+
+ my_single_task_execution = my_task.launch(project="my_flyte_projext", domain="development", inputs={'plant': 'ficus'})
+ print("Created {}".format(my_single_task_execution.id))
+
+Just like workflow executions, you can optionally pass a user-defined name, labels, annotations, and/or notifications when launching a single task.
+
+The type of ``my_single_task_execution`` is `SdkWorkflowExecution `_
+and has the full set of methods and functionality available for conventional WorkflowExecutions.
+
+
+Fetch and launch a single task
+==============================
+
+Single task executions aren't limited to just tasks you've defined in your code. You can reference previously registered tasks and launch a single task execution like so:
+
+.. code-block:: python
+
+ from flytekit.common.tasks import task as _task
+
+ my_task = _task.SdkTask.fetch("my_flyte_project", "production", "workflows.my_task", "abc123") # project, domain, name, version
+
+ my_task_exec = my_task.launch(project="my_other_project", domain="development", inputs={'plant': 'philodendron'})
+ my_task_exec.wait_for_completion()
+
+
+Launch a single task from the commandline
+=========================================
+
+Previously registered tasks can also be launched from the command-line using :ref:`flyte-cli `
+
+.. code-block:: console
+
+ $ flyte-cli -h example.com -p my_flyte_project -d development launch-task \
+ -u tsk:my_flyte_project:production:my_complicated_task:abc123 -- an_input=hi \
+ other_input=123 more_input=qwerty
+
+
+Monitoring single task executions in the Flyte console
+======================================================
+
+Single task executions don't yet have native support in the Flyte console but they're accessible using the same URLs as ordinary workflow executions.
+
+For example, for a console hosted example.com you can visit ``example.com/console/projects//domains//executions/`` to track the progress of
+your execution. Log links and status changes will be available as your execution progresses.
+
+
+Registering and launching a single task
+=======================================
+
+A certain category of tasks don't rely on custom containers with registered images to run. Therefore, you may find it convenient to use
+``register_and_launch`` on a task definition to immediately launch a single task execution, like so:
+
+.. code-block:: python
+
+ containerless_task = SdkPrestoTask(
+ task_inputs=inputs(ds=Types.String, count=Types.Integer, rg=Types.String),
+ statement="SELECT * FROM flyte.widgets WHERE ds = '{{ .Inputs.ds}}' LIMIT '{{ .Inputs.count}}'",
+ output_schema=Types.Schema([("a", Types.String), ("b", Types.Integer)]),
+ routing_group="{{ .Inputs.rg }}",
+ )
+
+ my_single_task_execution = containerless_task.register_and_launch(project="my_flyte_projext", domain="development",
+ inputs={'ds': '2020-02-29', 'count': 10, 'rg': 'my_routing_group'})
+
diff --git a/_sources/user/tasktypes/index.rst.txt b/_sources/user/tasktypes/index.rst.txt
index f09095f6f2..1450799296 100644
--- a/_sources/user/tasktypes/index.rst.txt
+++ b/_sources/user/tasktypes/index.rst.txt
@@ -14,3 +14,4 @@ Flyte Task Types
spark
dynamic
sidecar
+ pytorch
diff --git a/_sources/user/tasktypes/pytorch.rst.txt b/_sources/user/tasktypes/pytorch.rst.txt
new file mode 100644
index 0000000000..17bd98e50e
--- /dev/null
+++ b/_sources/user/tasktypes/pytorch.rst.txt
@@ -0,0 +1,112 @@
+.. _pytorch-task-type:
+
+PyTorch Task
+============
+
+PyTorch Task Type allows users to run distributed PyTorch training jobs on the Kubernetes cluster via `PyTorch Operator`_.
+
+#####
+Setup
+#####
+
+In order to build image that is to be eventually submitted to Kubernetes, you'll need to make sure it includes following:
+ - pytorch and its dependencies (GPU support, distributed communication backend libs and etc.)
+ - flytekit with pytorch extra (``pip install flytekit[pytorch]``)
+ - user defined flyte workflows and its dependencies
+
+You might want to leverage official `Dockerfile`_ or `prebuilt images`_.
+
+Also make sure that your flyte installation is compliant with these requirements:
+ - pytorch plugin is enabled in flytepropeller's config
+ - `Kubeflow pytorch operator`_ is installed in your k8s cluster (you can use `base`_ and configure it in your deploy)
+ - [if using GPU] `GPU device plugin`_ is deployed as well
+
+
+#####
+Usage
+#####
+
+Use pytorch_task_ decorator for configuring job execution resources. Here you can specify number of worker replicas (in addition to single master) and resource `requests and limits`_ on per replica basis.
+
+.. code-block:: python
+ :caption: PyTorch task example (an excerpt from `flytesnacks`_)
+
+ @inputs(
+ batch_size=Types.Integer,
+ test_batch_size=Types.Integer,
+ epochs=Types.Integer,
+ learning_rate=Types.Float,
+ sgd_momentum=Types.Float,
+ seed=Types.Integer,
+ log_interval=Types.Integer,
+ dir=Types.String)
+ @outputs(epoch_accuracies=[Types.Float], model_state=Types.Blob)
+ @pytorch_task(
+ workers_count=2,
+ per_replica_cpu_request="500m",
+ per_replica_memory_request="4Gi",
+ per_replica_memory_limit="8Gi",
+ per_replica_gpu_limit="1",
+ )
+ def mnist_pytorch_job(workflow_params, batch_size, test_batch_size, epochs, learning_rate, sgd_momentum, seed, log_interval, dir, epoch_accuracies, model_state):
+ backend_type = dist.Backend.GLOO
+
+ torch.manual_seed(seed)
+
+ device = torch.device('cuda' if torch.cuda.is_available else 'cpu')
+
+ if should_distribute():
+ dist.init_process_group(backend=backend_type)
+
+ kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available else {}
+ train_loader = torch.utils.data.DataLoader(
+ datasets.MNIST('../data', train=True, download=True,
+ transform=transforms.Compose([
+ transforms.ToTensor(),
+ transforms.Normalize((0.1307,), (0.3081,))
+ ])),
+ batch_size=batch_size, shuffle=True, **kwargs)
+ test_loader = torch.utils.data.DataLoader(
+ datasets.MNIST('../data', train=False, transform=transforms.Compose([
+ transforms.ToTensor(),
+ transforms.Normalize((0.1307,), (0.3081,))
+ ])),
+ batch_size=test_batch_size, shuffle=False, **kwargs)
+
+ model = Net().to(device)
+
+ if is_distributed():
+ Distributor = nn.parallel.DistributedDataParallel if torch.cuda.is_available \
+ else nn.parallel.DistributedDataParallelCPU
+ model = Distributor(model)
+
+ optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=sgd_momentum)
+
+ accuracies = [epoch_step(model, device, train_loader, test_loader, optimizer, epoch, writer, log_interval) for epoch in range(1, epochs + 1)]
+
+ model_file = "mnist_cnn.pt"
+ torch.save(model.state_dict(), model_file)
+
+ model_state.set(model_file)
+ epoch_accuracies.set(accuracies)
+
+ def should_distribute():
+ return dist.is_available() and WORLD_SIZE > 1
+
+
+ def is_distributed():
+ return dist.is_available() and dist.is_initialized()
+
+
+Note that if you request GPU resources, toleration like, `flyte/gpu=dedicated:NoSchedule` (configured in the common flyteplugins configuration) is added to pod spec automatically. So you can use respective taint_ to make GPU-enabled nodes available exclusively for flyte-originated GPU-oriented tasks.
+
+.. _`PyTorch Operator`: https://github.com/kubeflow/pytorch-operator
+.. _Dockerfile: https://github.com/pytorch/pytorch/blob/master/docker/pytorch/Dockerfile
+.. _`prebuilt images`: https://hub.docker.com/r/pytorch/pytorch/tags
+.. _pytorch_task: https://lyft.github.io/flyte/flytekit/flytekit.sdk.html#flytekit.sdk.tasks.pytorch_task
+.. _`requests and limits`: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits
+.. _taint: https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
+.. _`Kubeflow pytorch operator`: https://github.com/kubeflow/pytorch-operator
+.. _`base`: https://github.com/lyft/flyte/blob/master/kustomize/base/operators/kfoperators/pytorch/kustomization.yaml
+.. _`GPU device plugin`: https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/#deploying-nvidia-gpu-device-plugin
+.. _`flytesnacks`: https://github.com/lyft/flytesnacks/blob/761426a2a41809c339a5444d111dfc637434f015/pytorch/workflows/mnist.py#L1
\ No newline at end of file
diff --git a/_static/documentation_options.js b/_static/documentation_options.js
index 24568a7ac0..9f0af9a337 100644
--- a/_static/documentation_options.js
+++ b/_static/documentation_options.js
@@ -1,6 +1,6 @@
var DOCUMENTATION_OPTIONS = {
URL_ROOT: document.getElementById("documentation_options").getAttribute('data-url_root'),
- VERSION: '0.4.0',
+ VERSION: '0.5.0',
LANGUAGE: 'None',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '.html',
diff --git a/administrator/architecture.html b/administrator/architecture.html
index 8aaf038b9d..83461dffbb 100644
--- a/administrator/architecture.html
+++ b/administrator/architecture.html
@@ -8,7 +8,7 @@
- Architecture — Flyte 0.4.0 documentation
+ Architecture — Flyte 0.5.0 documentation
diff --git a/administrator/index.html b/administrator/index.html
index cbcabe7c37..13fcbee81f 100644
--- a/administrator/index.html
+++ b/administrator/index.html
@@ -8,7 +8,7 @@
- Administrator Docs — Flyte 0.4.0 documentation
+ Administrator Docs — Flyte 0.5.0 documentation
@@ -36,7 +36,7 @@
-
+
@@ -203,7 +203,7 @@
Next
- Previous
+ Previous
Defines a series of if/else blocks. The first branch whose condition evaluates to true is the one to execute.
If no conditions were satisfied, the else_node or the error will execute.
BranchNode is a special node that alter the flow of the workflow graph. It allows the control flow to branch at
runtime based on a series of conditions that get evaluated on various parameters (e.g. inputs, primtives).
(DEFAULT) FAIL_IMMEDIATELY instructs the system to fail as soon as a node fails in the workflow. It’ll automatically
+abort all currently running nodes and clean up resources before finally marking the workflow executions as
+failed.
+
+
+
+
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE instructs the system to make as much progress as it can. The system will
+not alter the dependencies of the execution graph so any node that depend on the failed node will not be run.
+Other nodes that will be executed to completion before cleaning up resources and marking the workflow
+execution as failed.
Sets custom attributes for a project and domain combination.
+:param Text project:
+:param Text domain:
+:param flytekit.models.MatchingAttributes matching_attributes:
+:return:
Sets custom attributes for a project, domain, and workflow combination.
+:param Text project:
+:param Text domain:
+:param Text workflow:
+:param flytekit.models.MatchingAttributes matching_attributes:
+:return:
name (Text) – [Optional] If specified, an execution will be created with this name. Note: the name must
-be unique within the context of the project and domain.
-
notification_overrides (list[flytekit.common.notifications.Notification]) – [Optional] If specified, these
-are the notifications that will be honored for this execution. An empty list signals to disable all
-notifications.
name (Text) – [Optional] If specified, an execution will be created with this name. Note: the name must
+be unique within the context of the project and domain.
+
notification_overrides (list[flytekit.common.notifications.Notification]) – [Optional] If specified, these
+are the notifications that will be honored for this execution. An empty list signals to disable all
+notifications.
Executes the entity and returns the execution identifier. This version of execution is meant for when
-inputs are specified as Python native types/structures.
Creates a remote execution from the entity and returns the execution identifier.
+This version of launch is meant for when inputs are specified as Python native types/structures.
Use this task when you want to run an arbitrary container as a task (e.g. external tools, binaries compiled
+separately as a container completely separate from the container where your Flyte workflow is defined.
Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
+name is added more than once, a name collides with an output, or if the name doesn’t exist as an arg name in
+the wrapped function.
+:param dict[Text, flytekit.models.interface.Variable] inputs: names and variables
Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
+name is added more than once, a name collides with an output, or if the name doesn’t exist as an arg name in
+the wrapped function.
+:param dict[Text, flytekit.models.interface.Variable] outputs: names and variables
Launches a single task execution and returns the execution identifier.
+:param Text project:
+:param Text domain:
+:param flytekit.models.literals.LiteralMap literal_inputs: Inputs to the execution.
+:param Text name: [Optional] If specified, an execution will be created with this name. Note: the name must
+
+
be unique within the context of the project and domain.
+
+
+
Parameters
+
+
notification_overrides (list[flytekit.common.notifications.Notification]) – [Optional] If specified, these
+are the notifications that will be honored for this execution. An empty list signals to disable all
+notifications.
Creates a workflow execution using parameters specified in the launch plan.
:param Text project:
:param Text domain:
:param Text name:
@@ -455,6 +461,32 @@
Executes the task as a single task execution and returns the identifier.
+:param Text project:
+:param Text domain:
+:param Text name:
+:param flytekit.models.literals.LiteralMap inputs: The inputs to pass
+:param list[flytekit.models.common.Notification] notification_overrides: If specified, override the
Executes the task as a single task execution and returns the identifier.
+:param Text project:
+:param Text domain:
+:param Text name:
+:param flytekit.models.literals.LiteralMap inputs: The inputs to pass
+:param list[flytekit.models.common.Notification] notification_overrides: If specified, override the
Executes the task as a single task execution and returns the identifier.
+:param Text project:
+:param Text domain:
+:param Text name:
+:param flytekit.models.literals.LiteralMap inputs: The inputs to pass
+:param list[flytekit.models.common.Notification] notification_overrides: If specified, override the
-flytekit.interfaces.random.random = <random.Random object at 0x19ca018>¶
+flytekit.interfaces.random.random = <random.Random object at 0x2b9ff08>¶
An instance of the global random number generator used by flytekit. Flytekit maintains it’s own random instance
to ensure that calls to random.seed(…) do not affect the pseudo-random behavior of flytekit. This random should be
used by flytekit components in all cases where random.random would have been used. Components who want additional
diff --git a/flytekit/flytekit.interfaces.stats.html b/flytekit/flytekit.interfaces.stats.html
index c5373a4e99..81bca2c685 100644
--- a/flytekit/flytekit.interfaces.stats.html
+++ b/flytekit/flytekit.interfaces.stats.html
@@ -8,7 +8,7 @@
-
Defines the execution behavior of the workflow when a failure is detected.
+
+
Attributes:
+
FAIL_IMMEDIATELY Instructs the system to fail as soon as a node fails in the
workflow. It’ll automatically abort all currently running nodes and
+clean up resources before finally marking the workflow executions as failed.
+
+
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE Instructs the system to make as much progress as it can. The system
will not alter the dependencies of the execution graph so any node
+that depend on the failed node will not be run. Other nodes that will
+be executed to completion before cleaning up resources and marking
+the workflow execution as failed.
The default behavior for when a node fails in a workflow is to immediately abort the entire workflow. The reasoning behind this thinking
+is to avoid wasting resources since the workflow will end up failing anyway. There are certain cases however, when it’s desired for the
+workflow to carry on executing the branches it can execute.
+
For example when the remaining tasks are marked as cacheable.
+Once the failure has been fixed and the workflow is relaunched, cached tasks will be bypassed quickly.
classOnFailurePolicy(object):
+ """
+ Defines the execution behavior of the workflow when a failure is detected.
+ Attributes:
+ FAIL_IMMEDIATELY Instructs the system to fail as soon as a node fails in the
+ workflow. It'll automatically abort all currently running nodes and
+ clean up resources before finally marking the workflow executions as failed.
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE Instructs the system to make as much progress as it can. The system
+ will not alter the dependencies of the execution graph so any node
+ that depend on the failed node will not be run. Other nodes that will
+ be executed to completion before cleaning up resources and marking
+ the workflow execution as failed.
+ """
+
+ FAIL_IMMEDIATELY=_core_workflow.WorkflowMetadata.FAIL_IMMEDIATELY
+ FAIL_AFTER_EXECUTABLE_NODES_COMPLETE=_core_workflow.WorkflowMetadata.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
+
Tasks are the most atomic unit of execution in Flyte. Although workflows are traditionally composed of multiple tasks with dependencies
+defined by shared inputs and outputs it can be helpful to execute a single task during the process of iterating on its definition.
+It can be tedious to write a new workflow definition every time you want to excecute a single task under development but single task
+executions can be used to easily iterate on task logic.
Just like workflow executions, you can optionally pass a user-defined name, labels, annotations, and/or notifications when launching a single task.
+
The type of my_single_task_execution is SdkWorkflowExecution
+and has the full set of methods and functionality available for conventional WorkflowExecutions.
Single task executions aren’t limited to just tasks you’ve defined in your code. You can reference previously registered tasks and launch a single task execution like so:
Monitoring single task executions in the Flyte console¶
+
Single task executions don’t yet have native support in the Flyte console but they’re accessible using the same URLs as ordinary workflow executions.
+
For example, for a console hosted example.com you can visit example.com/console/projects/<my_project>/domains/<my_domain>/executions/<execution_name> to track the progress of
+your execution. Log links and status changes will be available as your execution progresses.
A certain category of tasks don’t rely on custom containers with registered images to run. Therefore, you may find it convenient to use
+register_and_launch on a task definition to immediately launch a single task execution, like so:
Use pytorch_task decorator for configuring job execution resources. Here you can specify number of worker replicas (in addition to single master) and resource requests and limits on per replica basis.
+
+
PyTorch task example (an excerpt from flytesnacks)¶
Note that if you request GPU resources, toleration like, flyte/gpu=dedicated:NoSchedule (configured in the common flyteplugins configuration) is added to pod spec automatically. So you can use respective taint to make GPU-enabled nodes available exclusively for flyte-originated GPU-oriented tasks.