Skip to content

Commit

Permalink
feat: add timeout arg across SDK (#1099)
Browse files Browse the repository at this point in the history
* feat: add timeout arg and tests to model upload

* feat: add timeout arg and tests to dataset create method

* feat: add tiemout arg to dataset import and tensorboard create

* Update system tests with timeout arg

* rename timeout arg with method name and update tests

* add deploy_request_timeout to Model deploy

* add create_timeout_request arg to pipeline job run and submit

* add timeout arg and tests to training_jobs

* add timeout arg tests for training_jobs

* update system tests with timeout arg

* add timeout arg tests and run linter

* add timeout arg and tests to tensorboard

* add timeout arg and tests to featurestore

* fix failing tests

* update system tests with timeout arg

* fix broken tests and run linter

* update handling of import_request_timeout arg

* update timeout arg in tests and run linter

* finish moving timeout arg to end of function signatures
  • Loading branch information
sararob authored Apr 1, 2022
1 parent a3ce143 commit 184f7f3
Show file tree
Hide file tree
Showing 32 changed files with 2,275 additions and 57 deletions.
47 changes: 41 additions & 6 deletions google/cloud/aiplatform/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "_Dataset":
"""Creates a new dataset and optionally imports data into dataset when
source and import_schema_uri are passed.
Expand Down Expand Up @@ -203,6 +204,8 @@ def create(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
Returns:
dataset (Dataset):
Expand Down Expand Up @@ -240,6 +243,7 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)

@classmethod
Expand All @@ -258,6 +262,8 @@ def _create_and_import(
labels: Optional[Dict[str, str]] = None,
encryption_spec: Optional[gca_encryption_spec.EncryptionSpec] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
import_request_timeout: Optional[float] = None,
) -> "_Dataset":
"""Creates a new dataset and optionally imports data into dataset when
source and import_schema_uri are passed.
Expand Down Expand Up @@ -313,6 +319,10 @@ def _create_and_import(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
import_request_timeout (float):
Optional. The timeout for the import request in seconds.
Returns:
dataset (Dataset):
Expand All @@ -328,6 +338,7 @@ def _create_and_import(
request_metadata=request_metadata,
labels=labels,
encryption_spec=encryption_spec,
create_request_timeout=create_request_timeout,
)

_LOGGER.log_create_with_lro(cls, create_dataset_lro)
Expand All @@ -345,18 +356,26 @@ def _create_and_import(

# Import if import datasource is DatasourceImportable
if isinstance(datasource, _datasources.DatasourceImportable):
dataset_obj._import_and_wait(datasource)
dataset_obj._import_and_wait(
datasource, import_request_timeout=import_request_timeout
)

return dataset_obj

def _import_and_wait(self, datasource):
def _import_and_wait(
self,
datasource,
import_request_timeout: Optional[float] = None,
):
_LOGGER.log_action_start_against_resource(
"Importing",
"data",
self,
)

import_lro = self._import(datasource=datasource)
import_lro = self._import(
datasource=datasource, import_request_timeout=import_request_timeout
)

_LOGGER.log_action_started_against_resource_with_lro(
"Import", "data", self.__class__, import_lro
Expand All @@ -377,6 +396,7 @@ def _create(
request_metadata: Sequence[Tuple[str, str]] = (),
labels: Optional[Dict[str, str]] = None,
encryption_spec: Optional[gca_encryption_spec.EncryptionSpec] = None,
create_request_timeout: Optional[float] = None,
) -> operation.Operation:
"""Creates a new managed dataset by directly calling API client.
Expand Down Expand Up @@ -419,6 +439,8 @@ def _create(
resource is created.
If set, this Dataset and all sub-resources of this Dataset will be secured by this key.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
Returns:
operation (Operation):
An object representing a long-running operation.
Expand All @@ -433,25 +455,33 @@ def _create(
)

return api_client.create_dataset(
parent=parent, dataset=gapic_dataset, metadata=request_metadata
parent=parent,
dataset=gapic_dataset,
metadata=request_metadata,
timeout=create_request_timeout,
)

def _import(
self,
datasource: _datasources.DatasourceImportable,
import_request_timeout: Optional[float] = None,
) -> operation.Operation:
"""Imports data into managed dataset by directly calling API client.
Args:
datasource (_datasources.DatasourceImportable):
Required. Datasource for importing data to an existing dataset for Vertex AI.
import_request_timeout (float):
Optional. The timeout for the import request in seconds.
Returns:
operation (Operation):
An object representing a long-running operation.
"""
return self.api_client.import_data(
name=self.resource_name, import_configs=[datasource.import_data_config]
name=self.resource_name,
import_configs=[datasource.import_data_config],
timeout=import_request_timeout,
)

@base.optional_sync(return_input_arg="self")
Expand All @@ -461,6 +491,7 @@ def import_data(
import_schema_uri: str,
data_item_labels: Optional[Dict] = None,
sync: bool = True,
import_request_timeout: Optional[float] = None,
) -> "_Dataset":
"""Upload data to existing managed dataset.
Expand Down Expand Up @@ -498,6 +529,8 @@ def import_data(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
import_request_timeout (float):
Optional. The timeout for the import request in seconds.
Returns:
dataset (Dataset):
Expand All @@ -510,7 +543,9 @@ def import_data(
data_item_labels=data_item_labels,
)

self._import_and_wait(datasource=datasource)
self._import_and_wait(
datasource=datasource, import_request_timeout=import_request_timeout
)
return self

# TODO(b/174751568) add optional sync support
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/datasets/image_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "ImageDataset":
"""Creates a new image dataset and optionally imports data into dataset
when source and import_schema_uri are passed.
Expand Down Expand Up @@ -121,6 +122,8 @@ def create(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
Returns:
image_dataset (ImageDataset):
Expand Down Expand Up @@ -159,4 +162,5 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/datasets/tabular_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "TabularDataset":
"""Creates a new tabular dataset.
Expand Down Expand Up @@ -102,6 +103,8 @@ def create(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
Returns:
tabular_dataset (TabularDataset):
Expand Down Expand Up @@ -139,6 +142,7 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)

def import_data(self):
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/datasets/text_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "TextDataset":
"""Creates a new text dataset and optionally imports data into dataset
when source and import_schema_uri are passed.
Expand Down Expand Up @@ -124,6 +125,8 @@ def create(
If set, this Dataset and all sub-resources of this Dataset will be secured by this key.
Overrides encryption_spec_key_name set in aiplatform.init.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -166,4 +169,5 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/datasets/video_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "VideoDataset":
"""Creates a new video dataset and optionally imports data into dataset
when source and import_schema_uri are passed.
Expand Down Expand Up @@ -117,6 +118,8 @@ def create(
If set, this Dataset and all sub-resources of this Dataset will be secured by this key.
Overrides encryption_spec_key_name set in aiplatform.init.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -159,4 +162,5 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)
Loading

0 comments on commit 184f7f3

Please sign in to comment.