Skip to content

Commit

Permalink
chore: optimize gapic calls (#863)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Dec 1, 2023
1 parent 3ac80a9 commit b191451
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 57 deletions.
98 changes: 42 additions & 56 deletions google/cloud/bigtable_v2/services/bigtable/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
from collections import OrderedDict
import functools
import re
Expand Down Expand Up @@ -272,7 +273,8 @@ def read_rows(
"the individual field arguments should be set."
)

request = bigtable.ReadRowsRequest(request)
if not isinstance(request, bigtable.ReadRowsRequest):
request = bigtable.ReadRowsRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -283,12 +285,9 @@ def read_rows(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.read_rows,
default_timeout=43200.0,
client_info=DEFAULT_CLIENT_INFO,
)

rpc = self._client._transport._wrapped_methods[
self._client._transport.read_rows
]
# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
Expand Down Expand Up @@ -367,7 +366,8 @@ def sample_row_keys(
"the individual field arguments should be set."
)

request = bigtable.SampleRowKeysRequest(request)
if not isinstance(request, bigtable.SampleRowKeysRequest):
request = bigtable.SampleRowKeysRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -378,12 +378,9 @@ def sample_row_keys(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.sample_row_keys,
default_timeout=60.0,
client_info=DEFAULT_CLIENT_INFO,
)

rpc = self._client._transport._wrapped_methods[
self._client._transport.sample_row_keys
]
# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
Expand Down Expand Up @@ -479,7 +476,8 @@ async def mutate_row(
"the individual field arguments should be set."
)

request = bigtable.MutateRowRequest(request)
if not isinstance(request, bigtable.MutateRowRequest):
request = bigtable.MutateRowRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -494,21 +492,9 @@ async def mutate_row(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.mutate_row,
default_retry=retries.Retry(
initial=0.01,
maximum=60.0,
multiplier=2,
predicate=retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
),
deadline=60.0,
),
default_timeout=60.0,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self._client._transport._wrapped_methods[
self._client._transport.mutate_row
]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -601,7 +587,8 @@ def mutate_rows(
"the individual field arguments should be set."
)

request = bigtable.MutateRowsRequest(request)
if not isinstance(request, bigtable.MutateRowsRequest):
request = bigtable.MutateRowsRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -614,11 +601,9 @@ def mutate_rows(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.mutate_rows,
default_timeout=600.0,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self._client._transport._wrapped_methods[
self._client._transport.mutate_rows
]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -749,7 +734,8 @@ async def check_and_mutate_row(
"the individual field arguments should be set."
)

request = bigtable.CheckAndMutateRowRequest(request)
if not isinstance(request, bigtable.CheckAndMutateRowRequest):
request = bigtable.CheckAndMutateRowRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -768,11 +754,9 @@ async def check_and_mutate_row(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.check_and_mutate_row,
default_timeout=20.0,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self._client._transport._wrapped_methods[
self._client._transport.check_and_mutate_row
]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -851,7 +835,8 @@ async def ping_and_warm(
"the individual field arguments should be set."
)

request = bigtable.PingAndWarmRequest(request)
if not isinstance(request, bigtable.PingAndWarmRequest):
request = bigtable.PingAndWarmRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -862,11 +847,9 @@ async def ping_and_warm(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.ping_and_warm,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self._client._transport._wrapped_methods[
self._client._transport.ping_and_warm
]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -968,7 +951,8 @@ async def read_modify_write_row(
"the individual field arguments should be set."
)

request = bigtable.ReadModifyWriteRowRequest(request)
if not isinstance(request, bigtable.ReadModifyWriteRowRequest):
request = bigtable.ReadModifyWriteRowRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand All @@ -983,11 +967,9 @@ async def read_modify_write_row(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.read_modify_write_row,
default_timeout=20.0,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self._client._transport._wrapped_methods[
self._client._transport.read_modify_write_row
]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -1076,7 +1058,10 @@ def generate_initial_change_stream_partitions(
"the individual field arguments should be set."
)

request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request)
if not isinstance(
request, bigtable.GenerateInitialChangeStreamPartitionsRequest
):
request = bigtable.GenerateInitialChangeStreamPartitionsRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand Down Expand Up @@ -1174,7 +1159,8 @@ def read_change_stream(
"the individual field arguments should be set."
)

request = bigtable.ReadChangeStreamRequest(request)
if not isinstance(request, bigtable.ReadChangeStreamRequest):
request = bigtable.ReadChangeStreamRequest(request)

# If we have keyword arguments corresponding to fields on the
# request, apply these.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from google.api_core import gapic_v1
from google.api_core import grpc_helpers_async
from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
from google.auth import credentials as ga_credentials # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore

Expand Down Expand Up @@ -512,6 +514,66 @@ def read_change_stream(
)
return self._stubs["read_change_stream"]

def _prep_wrapped_messages(self, client_info):
# Precompute the wrapped methods.
self._wrapped_methods = {
self.read_rows: gapic_v1.method_async.wrap_method(
self.read_rows,
default_timeout=43200.0,
client_info=client_info,
),
self.sample_row_keys: gapic_v1.method_async.wrap_method(
self.sample_row_keys,
default_timeout=60.0,
client_info=client_info,
),
self.mutate_row: gapic_v1.method_async.wrap_method(
self.mutate_row,
default_retry=retries.Retry(
initial=0.01,
maximum=60.0,
multiplier=2,
predicate=retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
),
deadline=60.0,
),
default_timeout=60.0,
client_info=client_info,
),
self.mutate_rows: gapic_v1.method_async.wrap_method(
self.mutate_rows,
default_timeout=600.0,
client_info=client_info,
),
self.check_and_mutate_row: gapic_v1.method_async.wrap_method(
self.check_and_mutate_row,
default_timeout=20.0,
client_info=client_info,
),
self.ping_and_warm: gapic_v1.method_async.wrap_method(
self.ping_and_warm,
default_timeout=None,
client_info=client_info,
),
self.read_modify_write_row: gapic_v1.method_async.wrap_method(
self.read_modify_write_row,
default_timeout=20.0,
client_info=client_info,
),
self.generate_initial_change_stream_partitions: gapic_v1.method_async.wrap_method(
self.generate_initial_change_stream_partitions,
default_timeout=60.0,
client_info=client_info,
),
self.read_change_stream: gapic_v1.method_async.wrap_method(
self.read_change_stream,
default_timeout=43200.0,
client_info=client_info,
),
}

def close(self):
return self.grpc_channel.close()

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def test_ctor_dict_options(self):
async def test_veneer_grpc_headers(self):
# client_info should be populated with headers to
# detect as a veneer client
patch = mock.patch("google.api_core.gapic_v1.method.wrap_method")
patch = mock.patch("google.api_core.gapic_v1.method_async.wrap_method")
with patch as gapic_mock:
client = self._make_one(project="project-id")
wrapped_call_list = gapic_mock.call_args_list
Expand Down

0 comments on commit b191451

Please sign in to comment.