Skip to content

Commit

Permalink
Yancao/add path on compute (Azure#34377)
Browse files Browse the repository at this point in the history
* add path_on_compute

* fix parsing error

* output parsing error

* fix blank

* fix import inputoutputbase

* add literal input convert

* fix comments

* fix conflict

* fix conflict

* Update output.py
  • Loading branch information
yancao6 authored Mar 28, 2024
1 parent 0c2f524 commit b605d0a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 4 deletions.
11 changes: 11 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ def generate_path_property(azureml_type):
)


def generate_path_on_compute_property(azureml_type):
return UnionField(
[
LocalPathField(pattern=r"^file:.*"),
],
is_strict=True,
)


def generate_datastore_property():
metadata = {
"description": "Name of the datastore to upload local paths to.",
Expand Down Expand Up @@ -103,6 +112,7 @@ class DataInputSchema(InputSchema):
]
)
path = generate_path_property(azureml_type=AzureMLResourceType.DATA)
path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA)
datastore = generate_datastore_property()


Expand All @@ -119,6 +129,7 @@ class MLTableInputSchema(InputSchema):
)
type = StringTransformedEnum(allowed_values=[AssetTypes.MLTABLE])
path = generate_path_property(azureml_type=AzureMLResourceType.DATA)
path_on_compute = generate_path_on_compute_property(azureml_type=AzureMLResourceType.DATA)
datastore = generate_datastore_property()


Expand Down
18 changes: 17 additions & 1 deletion sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Input(_InputOutputBase): # pylint: disable=too-many-instance-attributes
* 'download': Download the data to the compute target,
* 'direct': Pass in the URI as a string to be accessed at runtime
:paramtype mode: Optional[str]
:keyword path_on_compute: The access path of the data input for compute
:paramtype mode: Optional[str]
:keyword default: The default value of the input. If a default is set, the input data will be optional.
:paramtype default: Union[str, int, float, bool]
:keyword min: The minimum value for the input. If a value smaller than the minimum is passed to the job, the job
Expand Down Expand Up @@ -70,7 +72,19 @@ class Input(_InputOutputBase): # pylint: disable=too-many-instance-attributes
"""

_EMPTY = Parameter.empty
_IO_KEYS = ["path", "type", "mode", "description", "default", "min", "max", "enum", "optional", "datastore"]
_IO_KEYS = [
"path",
"type",
"mode",
"path_on_compute",
"description",
"default",
"min",
"max",
"enum",
"optional",
"datastore",
]

@overload
def __init__(
Expand Down Expand Up @@ -205,6 +219,7 @@ def __init__(
type: str = "uri_folder",
path: Optional[str] = None,
mode: Optional[str] = None,
path_on_compute: Optional[str] = None,
default: Optional[Union[str, int, float, bool]] = None,
optional: Optional[bool] = None,
min: Optional[Union[int, float]] = None,
Expand All @@ -226,6 +241,7 @@ def __init__(
self.path = str(path)
else:
self.path = path
self.path_on_compute = path_on_compute
self.mode = None if self._is_primitive_type else mode
self._update_default(default)
self.optional = optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class Output(_InputOutputBase):
_IO_KEYS = ["name", "version", "path", "type", "mode", "description", "early_available"]
_IO_KEYS = ["name", "version", "path", "path_on_compute", "type", "mode", "description", "early_available"]

@overload
def __init__(
Expand Down Expand Up @@ -82,6 +82,8 @@ def __init__( # type: ignore[misc]
* 'upload': Upload the data from the compute target
* 'direct': Pass in the URI as a string
:paramtype mode: Optional[str]
:keyword path_on_compute: The access path of the data output for compute
:paramtype mode: Optional[str]
:keyword description: The description of the output.
:paramtype description: Optional[str]
:keyword name: The name to be used to register the output as a Data or Model asset. A name can be set without
Expand Down Expand Up @@ -116,6 +118,7 @@ def __init__( # type: ignore[misc]
self._is_primitive_type = self.type in IOConstants.PRIMITIVE_STR_2_TYPE
self.description = description
self.path = path
self.path_on_compute = kwargs.pop("path_on_compute", None)
self.mode = mode
# use this field to mark Output for early node orchestrate, currently hide in kwargs
self.early_available = kwargs.pop("early_available", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ def to_rest_dataset_literal_inputs(
# set mode attribute manually for binding job input
if input_value.mode:
input_data.mode = INPUT_MOUNT_MAPPING_TO_REST[input_value.mode]
if input_value.path_on_compute:
input_data.pathOnCompute = input_value.path_on_compute
input_data.job_input_type = JobInputType.LITERAL
else:
target_cls_dict = get_input_rest_cls_dict()
Expand Down Expand Up @@ -282,11 +284,15 @@ def from_rest_inputs_to_dataset_literal(inputs: Dict[str, RestJobInput]) -> Dict
if input_value.job_input_type in type_transfer_dict:
if input_value.uri:
path = input_value.uri

if getattr(input_value, "pathOnCompute", None) is not None:
sourcePathOnCompute = input_value.pathOnCompute
else:
sourcePathOnCompute = None
input_data = Input(
type=type_transfer_dict[input_value.job_input_type],
path=path,
mode=INPUT_MOUNT_MAPPING_FROM_REST[input_value.mode] if input_value.mode else None,
path_on_compute=sourcePathOnCompute,
)
elif input_value.job_input_type in (JobInputType.LITERAL, JobInputType.LITERAL):
# otherwise, the input is a literal, so just unpack the InputData value field
Expand Down Expand Up @@ -331,6 +337,7 @@ def to_rest_data_outputs(outputs: Optional[Dict]) -> Dict[str, RestJobOutput]:
asset_version=output_value.version,
uri=output_value.path,
mode=OUTPUT_MOUNT_MAPPING_TO_REST[output_value.mode.lower()] if output_value.mode else None,
pathOnCompute=output_value.path_on_compute if output_value.path_on_compute else None,
description=output_value.description,
)
else:
Expand Down Expand Up @@ -364,12 +371,16 @@ def from_rest_data_outputs(outputs: Dict[str, RestJobOutput]) -> Dict[str, Outpu
# deal with invalid output type submitted by feb api
# todo: backend help convert node level input/output type
normalize_job_input_output_type(output_value)

if getattr(output_value, "pathOnCompute", None) is not None:
sourcePathOnCompute = output_value.pathOnCompute
else:
sourcePathOnCompute = None
if output_value.job_output_type in output_type_mapping:
from_rest_outputs[output_name] = Output(
type=output_type_mapping[output_value.job_output_type],
path=output_value.uri,
mode=OUTPUT_MOUNT_MAPPING_FROM_REST[output_value.mode] if output_value.mode else None,
path_on_compute=sourcePathOnCompute,
description=output_value.description,
name=output_value.asset_name,
version=output_value.asset_version,
Expand Down

0 comments on commit b605d0a

Please sign in to comment.