diff --git a/google/cloud/bigtable_v2/services/bigtable/async_client.py b/google/cloud/bigtable_v2/services/bigtable/async_client.py index a80be70af..d325564c0 100644 --- a/google/cloud/bigtable_v2/services/bigtable/async_client.py +++ b/google/cloud/bigtable_v2/services/bigtable/async_client.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import functools from collections import OrderedDict import functools import re @@ -272,7 +273,8 @@ def read_rows( "the individual field arguments should be set." ) - request = bigtable.ReadRowsRequest(request) + if not isinstance(request, bigtable.ReadRowsRequest): + request = bigtable.ReadRowsRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -283,12 +285,9 @@ def read_rows( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.read_rows, - default_timeout=43200.0, - client_info=DEFAULT_CLIENT_INFO, - ) - + rpc = self._client._transport._wrapped_methods[ + self._client._transport.read_rows + ] # Certain fields should be provided within the metadata header; # add these here. metadata = tuple(metadata) + ( @@ -367,7 +366,8 @@ def sample_row_keys( "the individual field arguments should be set." ) - request = bigtable.SampleRowKeysRequest(request) + if not isinstance(request, bigtable.SampleRowKeysRequest): + request = bigtable.SampleRowKeysRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -378,12 +378,9 @@ def sample_row_keys( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.sample_row_keys, - default_timeout=60.0, - client_info=DEFAULT_CLIENT_INFO, - ) - + rpc = self._client._transport._wrapped_methods[ + self._client._transport.sample_row_keys + ] # Certain fields should be provided within the metadata header; # add these here. metadata = tuple(metadata) + ( @@ -479,7 +476,8 @@ async def mutate_row( "the individual field arguments should be set." ) - request = bigtable.MutateRowRequest(request) + if not isinstance(request, bigtable.MutateRowRequest): + request = bigtable.MutateRowRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -494,21 +492,9 @@ async def mutate_row( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.mutate_row, - default_retry=retries.Retry( - initial=0.01, - maximum=60.0, - multiplier=2, - predicate=retries.if_exception_type( - core_exceptions.DeadlineExceeded, - core_exceptions.ServiceUnavailable, - ), - deadline=60.0, - ), - default_timeout=60.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.mutate_row + ] # Certain fields should be provided within the metadata header; # add these here. @@ -601,7 +587,8 @@ def mutate_rows( "the individual field arguments should be set." ) - request = bigtable.MutateRowsRequest(request) + if not isinstance(request, bigtable.MutateRowsRequest): + request = bigtable.MutateRowsRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -614,11 +601,9 @@ def mutate_rows( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.mutate_rows, - default_timeout=600.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.mutate_rows + ] # Certain fields should be provided within the metadata header; # add these here. @@ -749,7 +734,8 @@ async def check_and_mutate_row( "the individual field arguments should be set." ) - request = bigtable.CheckAndMutateRowRequest(request) + if not isinstance(request, bigtable.CheckAndMutateRowRequest): + request = bigtable.CheckAndMutateRowRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -768,11 +754,9 @@ async def check_and_mutate_row( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.check_and_mutate_row, - default_timeout=20.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.check_and_mutate_row + ] # Certain fields should be provided within the metadata header; # add these here. @@ -851,7 +835,8 @@ async def ping_and_warm( "the individual field arguments should be set." ) - request = bigtable.PingAndWarmRequest(request) + if not isinstance(request, bigtable.PingAndWarmRequest): + request = bigtable.PingAndWarmRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -862,11 +847,9 @@ async def ping_and_warm( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.ping_and_warm, - default_timeout=None, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.ping_and_warm + ] # Certain fields should be provided within the metadata header; # add these here. @@ -968,7 +951,8 @@ async def read_modify_write_row( "the individual field arguments should be set." ) - request = bigtable.ReadModifyWriteRowRequest(request) + if not isinstance(request, bigtable.ReadModifyWriteRowRequest): + request = bigtable.ReadModifyWriteRowRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -983,11 +967,9 @@ async def read_modify_write_row( # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = gapic_v1.method_async.wrap_method( - self._client._transport.read_modify_write_row, - default_timeout=20.0, - client_info=DEFAULT_CLIENT_INFO, - ) + rpc = self._client._transport._wrapped_methods[ + self._client._transport.read_modify_write_row + ] # Certain fields should be provided within the metadata header; # add these here. @@ -1076,7 +1058,10 @@ def generate_initial_change_stream_partitions( "the individual field arguments should be set." ) - request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request) + if not isinstance( + request, bigtable.GenerateInitialChangeStreamPartitionsRequest + ): + request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. @@ -1174,7 +1159,8 @@ def read_change_stream( "the individual field arguments should be set." ) - request = bigtable.ReadChangeStreamRequest(request) + if not isinstance(request, bigtable.ReadChangeStreamRequest): + request = bigtable.ReadChangeStreamRequest(request) # If we have keyword arguments corresponding to fields on the # request, apply these. diff --git a/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py b/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py index 8bf02ce77..3450d4969 100644 --- a/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py +++ b/google/cloud/bigtable_v2/services/bigtable/transports/grpc_asyncio.py @@ -18,6 +18,8 @@ from google.api_core import gapic_v1 from google.api_core import grpc_helpers_async +from google.api_core import exceptions as core_exceptions +from google.api_core import retry as retries from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore @@ -512,6 +514,66 @@ def read_change_stream( ) return self._stubs["read_change_stream"] + def _prep_wrapped_messages(self, client_info): + # Precompute the wrapped methods. + self._wrapped_methods = { + self.read_rows: gapic_v1.method_async.wrap_method( + self.read_rows, + default_timeout=43200.0, + client_info=client_info, + ), + self.sample_row_keys: gapic_v1.method_async.wrap_method( + self.sample_row_keys, + default_timeout=60.0, + client_info=client_info, + ), + self.mutate_row: gapic_v1.method_async.wrap_method( + self.mutate_row, + default_retry=retries.Retry( + initial=0.01, + maximum=60.0, + multiplier=2, + predicate=retries.if_exception_type( + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ), + deadline=60.0, + ), + default_timeout=60.0, + client_info=client_info, + ), + self.mutate_rows: gapic_v1.method_async.wrap_method( + self.mutate_rows, + default_timeout=600.0, + client_info=client_info, + ), + self.check_and_mutate_row: gapic_v1.method_async.wrap_method( + self.check_and_mutate_row, + default_timeout=20.0, + client_info=client_info, + ), + self.ping_and_warm: gapic_v1.method_async.wrap_method( + self.ping_and_warm, + default_timeout=None, + client_info=client_info, + ), + self.read_modify_write_row: gapic_v1.method_async.wrap_method( + self.read_modify_write_row, + default_timeout=20.0, + client_info=client_info, + ), + self.generate_initial_change_stream_partitions: gapic_v1.method_async.wrap_method( + self.generate_initial_change_stream_partitions, + default_timeout=60.0, + client_info=client_info, + ), + self.read_change_stream: gapic_v1.method_async.wrap_method( + self.read_change_stream, + default_timeout=43200.0, + client_info=client_info, + ), + } + def close(self): return self.grpc_channel.close() diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 7afecc5b0..7718246fc 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -141,7 +141,7 @@ async def test_ctor_dict_options(self): async def test_veneer_grpc_headers(self): # client_info should be populated with headers to # detect as a veneer client - patch = mock.patch("google.api_core.gapic_v1.method.wrap_method") + patch = mock.patch("google.api_core.gapic_v1.method_async.wrap_method") with patch as gapic_mock: client = self._make_one(project="project-id") wrapped_call_list = gapic_mock.call_args_list