Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial support for Extended Operations #344

Merged
merged 5 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions google/api_core/extended_operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Futures for extended long-running operations returned from Google Cloud APIs.

These futures can be used to synchronously wait for the result of a
lon-running operations using :meth:`ExtendedOperation.result`:

.. code-block:: python

extended_operation = my_api_client.long_running_method()

extended_operation.result()

Or asynchronously using callbacks and :meth:`Operation.add_done_callback`:

.. code-block:: python

extended_operation = my_api_client.long_running_method()

def my_callback(ex_op):
print(f"Operation {ex_op.name} completed")

extended_operation.add_done_callback(my_callback)

"""

import threading

from google.api_core import exceptions
from google.api_core.future import polling


class ExtendedOperation(polling.PollingFuture):
"""An ExtendedOperation future for interacting with a Google API Long-Running Operation.

Args:
extended_operation (proto.Message): The initial operation.
refresh (Callable[[], type(extended_operation)]): A callable that returns
the latest state of the operation.
cancel (Callable[[], None]): A callable that tries to cancel the operation.
retry: Optional(google.api_core.retry.Retry): The retry configuration used
when polling. This can be used to control how often :meth:`done`
is polled. Regardless of the retry's ``deadline``, it will be
overridden by the ``timeout`` argument to :meth:`result`.

Note: Most long-running API methods use google.api_core.operation.Operation
This class is a wrapper for a subset of methods that use alternative
Long-Running Operation (LRO) semantics.

Note: there is not a concrete type the extended operation must be.
It MUST have fields that correspond to the following, POSSIBLY WITH DIFFERENT NAMES:
* name: str
* status: Union[str, bool, enum.Enum]
* error_code: int
* error_message: str
"""

def __init__(
self, extended_operation, refresh, cancel, retry=polling.DEFAULT_RETRY
):
super().__init__(retry=retry)
self._extended_operation = extended_operation
self._refresh = refresh
self._cancel = cancel
# Note: the extended operation does not give a good way to indicate cancellation.
# We make do with manually tracking cancellation and checking for doneness.
self._cancelled = False
self._completion_lock = threading.Lock()
# Invoke in case the operation came back already complete.
self._handle_refreshed_operation()

# Note: the following four properties MUST be overridden in a subclass
# if, and only if, the fields in the corresponding extended operation message
# have different names.
#
# E.g. we have an extended operation class that looks like
#
# class MyOperation(proto.Message):
# moniker = proto.Field(proto.STRING, number=1)
# status_msg = proto.Field(proto.STRING, number=2)
# optional http_error_code = proto.Field(proto.INT32, number=3)
# optional http_error_msg = proto.Field(proto.STRING, number=4)
#
# the ExtendedOperation subclass would provide property overrrides that map
# to these (poorly named) fields.
@property
def name(self):
return self._extended_operation.name

@property
def status(self):
return self._extended_operation.status

@property
def error_code(self):
return self._extended_operation.error_code

@property
def error_message(self):
return self._extended_operation.error_message

def done(self, retry=polling.DEFAULT_RETRY):
self._refresh_and_update(retry)
return self._extended_operation.done

def cancel(self):
if self.done():
return False

self._cancel()
self._cancelled = True
return True

def cancelled(self):
# TODO(dovs): there is not currently a good way to determine whether the
# operation has been cancelled.
# The best we can do is manually keep track of cancellation
# and check for doneness.
if not self._cancelled:
return False

self._refresh_and_update()
return self._extended_operation.done

def _refresh_and_update(self, retry=polling.DEFAULT_RETRY):
if not self._extended_operation.done:
self._extended_operation = self._refresh(retry=retry)
self._handle_refreshed_operation()

def _handle_refreshed_operation(self):
with self._completion_lock:
if not self._extended_operation.done:
return

if self.error_code and self.error_message:
exception = exceptions.from_http_status(
status_code=self.error_code,
message=self.error_message,
response=self._extended_operation,
)
self.set_exception(exception)
elif self.error_code or self.error_message:
exception = exceptions.GoogleAPICallError(
f"Unexpected error {self.error_code}: {self.error_message}"
)
self.set_exception(exception)
else:
# Extended operations have no payload.
self.set_result(None)

@classmethod
def make(cls, refresh, cancel, extended_operation, **kwargs):
"""
Return an instantiated ExtendedOperation (or child) that wraps
* a refresh callable
* a cancel callable (can be a no-op)
* an initial result

.. note::
It is the caller's responsibility to set up refresh and cancel
with their correct request argument.
The reason for this is that the services that use Extended Operations
have rpcs that look something like the following:

// service.proto
service MyLongService {
rpc StartLongTask(StartLongTaskRequest) returns (ExtendedOperation) {
option (google.cloud.operation_service) = "CustomOperationService";
}
}

service CustomOperationService {
rpc Get(GetOperationRequest) returns (ExtendedOperation) {
option (google.cloud.operation_polling_method) = true;
}
}

Any info needed for the poll, e.g. a name, path params, etc.
is held in the request, which the initial client method is in a much
better position to make made because the caller made the initial request.

TL;DR: the caller sets up closures for refresh and cancel that carry
the properly configured requests.

Args:
refresh (Callable[Optional[Retry]][type(extended_operation)]): A callable that
returns the latest state of the operation.
cancel (Callable[][Any]): A callable that tries to cancel the operation
on a best effort basis.
extended_operation (Any): The initial response of the long running method.
See the docstring for ExtendedOperation.__init__ for requirements on
the type and fields of extended_operation
"""
return cls(extended_operation, refresh, cancel, **kwargs)
46 changes: 29 additions & 17 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def default(session, install_grpc=True):
)

# Install all test dependencies, then install this package in-place.
session.install("mock", "pytest", "pytest-cov")
session.install("dataclasses", "mock", "pytest", "pytest-cov", "pytest-xdist")
if install_grpc:
session.install("-e", ".[grpc]", "-c", constraints_path)
else:
Expand All @@ -102,28 +102,36 @@ def default(session, install_grpc=True):
"python",
"-m",
"py.test",
"--quiet",
"--cov=google.api_core",
"--cov=tests.unit",
"--cov-append",
"--cov-config=.coveragerc",
"--cov-report=",
"--cov-fail-under=0",
os.path.join("tests", "unit"),
*(
# Helpful for running a single test or testfile.
session.posargs
or [
"--quiet",
"--cov=google.api_core",
"--cov=tests.unit",
"--cov-append",
"--cov-config=.coveragerc",
"--cov-report=",
"--cov-fail-under=0",
# Running individual tests with parallelism enabled is usually not helpful.
"-n=auto",
os.path.join("tests", "unit"),
]
),
]
pytest_args.extend(session.posargs)

# Inject AsyncIO content and proto-plus, if version >= 3.6.
# proto-plus is needed for a field mask test in test_protobuf_helpers.py
if _greater_or_equal_than_36(session.python):
session.install("asyncmock", "pytest-asyncio", "proto-plus")

pytest_args.append("--cov=tests.asyncio")
pytest_args.append(os.path.join("tests", "asyncio"))
session.run(*pytest_args)
else:
# Run py.test against the unit tests.
session.run(*pytest_args)
# Having positional arguments means the user wants to run specific tests.
# Best not to add additional tests to that list.
if not session.posargs:
pytest_args.append("--cov=tests.asyncio")
pytest_args.append(os.path.join("tests", "asyncio"))

session.run(*pytest_args)


@nox.session(python=["3.6", "3.7", "3.8", "3.9", "3.10"])
Expand Down Expand Up @@ -171,7 +179,11 @@ def mypy(session):
"""Run type-checking."""
session.install(".[grpc, grpcgcp]", "mypy")
session.install(
"types-setuptools", "types-requests", "types-protobuf", "types-mock"
"types-setuptools",
"types-requests",
"types-protobuf",
"types-mock",
"types-dataclasses",
)
session.run("mypy", "google", "tests")

Expand Down
Loading