Skip to content

Commit

Permalink
Merge branch 'master' into py3.6
Browse files Browse the repository at this point in the history
  • Loading branch information
ji-yaqi authored Feb 15, 2022
2 parents c1e8080 + 9be1720 commit b008ba3
Show file tree
Hide file tree
Showing 42 changed files with 1,215 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.batch_predict_job module
===============================================================

.. automodule:: google_cloud_pipeline_components.v1.batch_predict_job
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.bigquery module
======================================================

.. automodule:: google_cloud_pipeline_components.v1.bigquery
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.custom\_job module
=========================================================

.. automodule:: google_cloud_pipeline_components.v1.custom_job
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.dataflow module
======================================================

.. automodule:: google_cloud_pipeline_components.v1.dataflow
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.dataset module
=====================================================

.. automodule:: google_cloud_pipeline_components.v1.dataset
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.endpoint module
======================================================

.. automodule:: google_cloud_pipeline_components.v1.endpoint
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.hyperparameter_tuning_job package
=========================================================================

.. automodule:: google_cloud_pipeline_components.v1.hyperparameter_tuning_job
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.model module
===================================================

.. automodule:: google_cloud_pipeline_components.v1.model
:members:
:undoc-members:
:show-inheritance:
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
google\_cloud\_pipeline\_components.v1 components
=================================================

Components
-----------

.. toctree::
:maxdepth: 1

google_cloud_pipeline_components.v1.batch_predict_job
google_cloud_pipeline_components.v1.bigquery
google_cloud_pipeline_components.v1.custom_job
google_cloud_pipeline_components.v1.dataflow
google_cloud_pipeline_components.v1.dataset
google_cloud_pipeline_components.v1.endpoint
google_cloud_pipeline_components.v1.hyperparameter_tuning_job
google_cloud_pipeline_components.v1.model
google_cloud_pipeline_components.v1.wait_gcp_resources
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
google\_cloud\_pipeline\_components.v1.wait_gcp_resources package
=================================================================

.. automodule:: google_cloud_pipeline_components.v1.wait_gcp_resources
:members:
:undoc-members:
:show-inheritance:
1 change: 1 addition & 0 deletions components/google-cloud/docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ To learn more about Google Cloud Pipeline Components see

google_cloud_pipeline_components.aiplatform
google_cloud_pipeline_components.experimental
google_cloud_pipeline_components.v1

Indices and tables
==================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from google.cloud import aiplatform
from google_cloud_pipeline_components.aiplatform import utils
from ..utils import execution_context

INIT_KEY = 'init'
METHOD_KEY = 'method'
Expand Down Expand Up @@ -188,7 +189,7 @@ def prepare_parameters(kwargs: Dict[str, Any],
1. Determines the annotation type that should used with the parameter
2. Reads input values if needed
3. Deserializes thos value where appropriate
3. Deserializes those values where appropriate
4. Or casts to the correct type.
Args:
Expand Down Expand Up @@ -240,14 +241,17 @@ def runner(cls_name, method_name, executor_input, kwargs):
method = getattr(obj, method_name)
prepare_parameters(serialized_args[METHOD_KEY], method, is_init=False)

print(
f'method:{method} is being called with parameters {serialized_args[METHOD_KEY]}'
)
output = method(**serialized_args[METHOD_KEY])

if output:
write_to_artifact(executor_input, make_output(output))
return output
with execution_context.ExecutionContext(
on_cancel=getattr(obj, 'cancel', None)):
print(
f'method:{method} is being called with parameters {serialized_args[METHOD_KEY]}'
)
output = method(**serialized_args[METHOD_KEY])
print('resource_name: %s', obj.resource_name)

if output:
write_to_artifact(executor_input, make_output(output))
return output


def main():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2022 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Pipeline Components Container Utils."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2022 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Execution context for launcher clients to support cancellation propagation."""
import logging
import os
import signal


class ExecutionContext:
"""Execution context for running inside Google Cloud Pipeline Components.
The base class is aware of the GCPC environment and can cascade
a pipeline cancel event to the operation through ``on_cancel`` handler.
Args:
on_cancel: optional, function to handle KFP cancel event.
"""

def __init__(self, on_cancel=None):
logging.info('Starting GCPC context')
self._on_cancel = on_cancel
self._original_sigterm_handler = None

def __enter__(self):
logging.info('Adding signal handler')
self._original_sigterm_handler = signal.signal(signal.SIGTERM,
self._exit_gracefully)
return self

def __exit__(self, *_):
logging.info('Exiting GCPC context')
signal.signal(signal.SIGTERM, self._original_sigterm_handler)

def _exit_gracefully(self, *_):
logging.info('SIGTERM signal received.')
if self._on_cancel:
logging.info('Cancelling...')
self._on_cancel()

# Exiting here to prevent any downstream errors due to the cancelled job
logging.info('Exiting GCPC context due to cancellation.')
os._exit(0)
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def download_blob(source_blob_path, destination_file_path):


class Process:
"""Helper class to redirec the subprocess output."""
"""Helper class to redirect the subprocess output."""

def __init__(self, cmd):
self._cmd = cmd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import google.auth
import google.auth.transport.requests

from ...utils import execution_context
from .utils import json_util
from .utils import artifact_util
from google.cloud import bigquery
Expand Down Expand Up @@ -169,26 +170,43 @@ def _create_query_job(project, location, payload,

def _poll_job(job_uri, creds) -> dict:
"""Poll the bigquery job till it reaches a final state."""
job = {}
while ('status' not in job) or ('state' not in job['status']) or (
job['status']['state'].lower() != 'done'):
time.sleep(_POLLING_INTERVAL_IN_SECONDS)
logging.info('The job is running...')
if not creds.valid:
creds.refresh(google.auth.transport.requests.Request())
headers = {
'Content-type': 'application/json',
'Authorization': 'Bearer ' + creds.token
}
job = requests.get(job_uri, headers=headers).json()
if 'status' in job and 'errorResult' in job['status']:
raise RuntimeError('The BigQuery job failed. Error: {}'.format(
job['status']))

logging.info('BigQuery Job completed succesesfully. Job: %s.', job)
with execution_context.ExecutionContext(
on_cancel=lambda: _send_cancel_request(job_uri, creds)):
job = {}
while ('status' not in job) or ('state' not in job['status']) or (
job['status']['state'].lower() != 'done'):
time.sleep(_POLLING_INTERVAL_IN_SECONDS)
logging.info('The job is running...')
if not creds.valid:
creds.refresh(google.auth.transport.requests.Request())
headers = {
'Content-type': 'application/json',
'Authorization': 'Bearer ' + creds.token
}
job = requests.get(job_uri, headers=headers).json()
if 'status' in job and 'errorResult' in job['status']:
raise RuntimeError('The BigQuery job failed. Error: {}'.format(
job['status']))

logging.info('BigQuery Job completed successfully. Job: %s.', job)
return job


def _send_cancel_request(job_uri, creds):
if not creds.valid:
creds.refresh(google.auth.transport.requests.Request())
headers = {
'Content-type': 'application/json',
'Authorization': 'Bearer ' + creds.token,
}
# Bigquery cancel API:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
response = requests.post(
url=f'{job_uri.split("?")[0]}/cancel',
data='', headers=headers)
logging.info('Cancel response: %s', response)


def bigquery_query_job(
type,
project,
Expand Down
Loading

0 comments on commit b008ba3

Please sign in to comment.