diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py b/sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py index 1f06bc9dce91..1300ab0737ba 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py @@ -59,6 +59,15 @@ def generate_path_property(azureml_type): ) +def generate_path_on_compute_property(azureml_type): + return UnionField( + [ + LocalPathField(pattern=r"^file:.*"), + ], + is_strict=True, + ) + + def generate_datastore_property(): metadata = { "description": "Name of the datastore to upload local paths to.", @@ -103,6 +112,7 @@ class DataInputSchema(InputSchema): ] ) path = generate_path_property(azureml_type=AzureMLResourceType.DATA) + path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA) datastore = generate_datastore_property() @@ -119,6 +129,7 @@ class MLTableInputSchema(InputSchema): ) type = StringTransformedEnum(allowed_values=[AssetTypes.MLTABLE]) path = generate_path_property(azureml_type=AzureMLResourceType.DATA) + path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA) datastore = generate_datastore_property() diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py index b5e11a188b13..1fa944c74883 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py @@ -40,6 +40,8 @@ class Input(_InputOutputBase): # pylint: disable=too-many-instance-attributes * 'download': Download the data to the compute target, * 'direct': Pass in the URI as a string to be accessed at runtime :paramtype mode: Optional[str] + :keyword path_on_compute: The access path of the data input for compute + :paramtype mode: Optional[str] :keyword default: The default value of the input. If a default is set, the input data will be optional. :paramtype default: Union[str, int, float, bool] :keyword min: The minimum value for the input. If a value smaller than the minimum is passed to the job, the job @@ -70,7 +72,19 @@ class Input(_InputOutputBase): # pylint: disable=too-many-instance-attributes """ _EMPTY = Parameter.empty - _IO_KEYS = ["path", "type", "mode", "description", "default", "min", "max", "enum", "optional", "datastore"] + _IO_KEYS = [ + "path", + "type", + "mode", + "path_on_compute", + "description", + "default", + "min", + "max", + "enum", + "optional", + "datastore", + ] @overload def __init__( @@ -205,6 +219,7 @@ def __init__( type: str = "uri_folder", path: Optional[str] = None, mode: Optional[str] = None, + path_on_compute: Optional[str] = None, default: Optional[Union[str, int, float, bool]] = None, optional: Optional[bool] = None, min: Optional[Union[int, float]] = None, @@ -226,6 +241,7 @@ def __init__( self.path = str(path) else: self.path = path + self.path_on_compute = path_on_compute self.mode = None if self._is_primitive_type else mode self._update_default(default) self.optional = optional diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/output.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/output.py index 573931a53243..0981f101d054 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/output.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/output.py @@ -18,7 +18,7 @@ class Output(_InputOutputBase): - _IO_KEYS = ["name", "version", "path", "type", "mode", "description", "early_available"] + _IO_KEYS = ["name", "version", "path", "path_on_compute", "type", "mode", "description", "early_available"] @overload def __init__( @@ -82,6 +82,8 @@ def __init__( # type: ignore[misc] * 'upload': Upload the data from the compute target * 'direct': Pass in the URI as a string :paramtype mode: Optional[str] + :keyword path_on_compute: The access path of the data output for compute + :paramtype mode: Optional[str] :keyword description: The description of the output. :paramtype description: Optional[str] :keyword name: The name to be used to register the output as a Data or Model asset. A name can be set without @@ -116,6 +118,7 @@ def __init__( # type: ignore[misc] self._is_primitive_type = self.type in IOConstants.PRIMITIVE_STR_2_TYPE self.description = description self.path = path + self.path_on_compute = kwargs.pop("path_on_compute", None) self.mode = mode # use this field to mark Output for early node orchestrate, currently hide in kwargs self.early_available = kwargs.pop("early_available", None) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py index 7f9836f29cb7..efce705ca1bd 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py @@ -219,6 +219,8 @@ def to_rest_dataset_literal_inputs( # set mode attribute manually for binding job input if input_value.mode: input_data.mode = INPUT_MOUNT_MAPPING_TO_REST[input_value.mode] + if input_value.path_on_compute: + input_data.pathOnCompute = input_value.path_on_compute input_data.job_input_type = JobInputType.LITERAL else: target_cls_dict = get_input_rest_cls_dict() @@ -282,11 +284,15 @@ def from_rest_inputs_to_dataset_literal(inputs: Dict[str, RestJobInput]) -> Dict if input_value.job_input_type in type_transfer_dict: if input_value.uri: path = input_value.uri - + if getattr(input_value, "pathOnCompute", None) is not None: + sourcePathOnCompute = input_value.pathOnCompute + else: + sourcePathOnCompute = None input_data = Input( type=type_transfer_dict[input_value.job_input_type], path=path, mode=INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] if input_value.mode else None, + path_on_compute=sourcePathOnCompute, ) elif input_value.job_input_type in (JobInputType.LITERAL, JobInputType.LITERAL): # otherwise, the input is a literal, so just unpack the InputData value field @@ -331,6 +337,7 @@ def to_rest_data_outputs(outputs: Optional[Dict]) -> Dict[str, RestJobOutput]: asset_version=output_value.version, uri=output_value.path, mode=OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] if output_value.mode else None, + pathOnCompute=output_value.path_on_compute if output_value.path_on_compute else None, description=output_value.description, ) else: @@ -364,12 +371,16 @@ def from_rest_data_outputs(outputs: Dict[str, RestJobOutput]) -> Dict[str, Outpu # deal with invalid output type submitted by feb api # todo: backend help convert node level input/output type normalize_job_input_output_type(output_value) - + if getattr(output_value, "pathOnCompute", None) is not None: + sourcePathOnCompute = output_value.pathOnCompute + else: + sourcePathOnCompute = None if output_value.job_output_type in output_type_mapping: from_rest_outputs[output_name] = Output( type=output_type_mapping[output_value.job_output_type], path=output_value.uri, mode=OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None, + path_on_compute=sourcePathOnCompute, description=output_value.description, name=output_value.asset_name, version=output_value.asset_version,