-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: sharded read rows #766
Changes from 250 commits
38e5662
5155800
522f7fa
9429244
1aa7424
a603649
68a5a0f
cc2e7c8
ba629c8
75d2c10
d5eca2a
2a26797
bcd394f
037af0d
e17d9bc
db80d22
b3d977d
1f85462
1fba6ea
c4f82b0
caca14c
5f9ce85
8e5f60a
57184c1
3eda7f4
65f5a2a
3e724db
45eadce
c06213f
6e75a2f
a205e93
ce3eb75
74029c9
7f2be30
2b044ce
1743098
e5fa4b6
8955ec5
002bc5f
65f0d2f
664a6d2
d3db731
5f41c06
aa26911
7b68207
a776cb5
c164a47
96d58d1
216610e
c505c39
179c8b8
3cc5380
4af0218
d6a323f
7b6d1db
733a393
12807e0
0e3d32c
5b055b4
dbf19c9
ab7931c
88f14f6
9f15a6a
b3c32b0
770d9f5
9f3e0c5
a0620ea
4f5ed46
c169ba8
9ec3697
b6873e8
2facc79
ee826bb
25af0c0
2f7778d
d6b8e6b
3f085a9
6abb9d4
128320c
a048536
371dd64
ebbaa1e
2a3e379
2e50c51
0b63b2b
de102bb
bbdb8e6
83472dc
bef40bd
29a98ed
534005a
6f1c781
38f66e5
bf24c25
4dbacb5
6e6978e
21f7846
52e9dbf
715be51
2f50cb7
1486d5a
28d5a7a
3b11580
d47c941
d1bd128
039d623
3d34dcd
70fbff9
383d8eb
018fe03
3764a98
745ae38
3d11d55
f0403e7
84a775a
15a9d23
8636654
1aca392
45fef1e
951a77b
e3a0b66
6089934
fb4b0ca
83b908c
5688561
7f57e7c
a6a140b
0f03aea
cfa181d
e8007c8
e190dc6
5aa89da
a565f47
7041dfd
2f7973a
872480f
47be958
c945687
b0dbaed
53878a9
ff2dfca
ff3724d
78a309c
c50ae18
d73121b
14d8527
4b89c86
9f89577
4b229b9
152bccf
67c2911
ff11ad3
78bd5d3
ca4a16d
0dba121
3537566
981f169
d3d4c76
1901094
947fe9b
773d4e5
cbb0513
2bec693
5cd8e00
d6f3ae1
f2d7e71
cb23d32
bc31ab8
f54dfde
46cfc49
573bbd1
4f2657d
59955be
377a8c9
8a29898
50aa5ba
42a52a3
abc7a2d
836af0f
e73551d
792aba1
e57c510
88748a9
0c38981
50dc608
b116755
ec5eb07
55cdcc2
0253692
3855333
213519e
9e3b411
d8cf158
c9b8217
500eff0
27130f0
f4f4fac
06dee54
9e11f88
794c55a
ccd9545
ca84b96
eb936cf
ab43138
cb1884d
9a89d74
7f783fc
6a6d219
72eca75
ad42436
7606e3a
829e68f
e8eff39
6a58e86
9be5b07
4f819b2
f476ad7
62dcbb5
b4a95b3
5972722
7c1643c
a39d931
482eed9
faec93e
6f6e010
a005ec8
dd10624
82789ec
d39fd0f
7e26d40
34aea1a
42cac01
632a106
05a311e
f53af32
ac4378d
9eaa279
6cca7cf
88e88d4
26ffe0c
9302286
3f4dd0e
bb72b5e
71b034c
ceb8129
0e277f4
37b4967
9508a0f
d3f6b0f
a4f606e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,13 +31,15 @@ | |
import warnings | ||
import sys | ||
import random | ||
from itertools import chain | ||
|
||
from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta | ||
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient | ||
from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO | ||
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import ( | ||
PooledBigtableGrpcAsyncIOTransport, | ||
) | ||
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest | ||
from google.cloud.client import ClientWithProject | ||
from google.api_core.exceptions import GoogleAPICallError | ||
from google.api_core import retry_async as retries | ||
|
@@ -50,17 +52,24 @@ | |
from google.cloud.bigtable.row import Row | ||
from google.cloud.bigtable.read_rows_query import ReadRowsQuery | ||
from google.cloud.bigtable.iterators import ReadRowsIterator | ||
from google.cloud.bigtable.exceptions import FailedQueryShardError | ||
from google.cloud.bigtable.exceptions import ShardedReadRowsExceptionGroup | ||
|
||
from google.cloud.bigtable.mutations import Mutation, RowMutationEntry | ||
from google.cloud.bigtable._mutate_rows import _MutateRowsOperation | ||
from google.cloud.bigtable._helpers import _make_metadata | ||
from google.cloud.bigtable._helpers import _convert_retry_deadline | ||
from google.cloud.bigtable._helpers import _attempt_timeout_generator | ||
|
||
if TYPE_CHECKING: | ||
from google.cloud.bigtable.mutations_batcher import MutationsBatcher | ||
from google.cloud.bigtable import RowKeySamples | ||
from google.cloud.bigtable.row_filters import RowFilter | ||
from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule | ||
|
||
# used by read_rows_sharded to limit how many requests are attempted in parallel | ||
CONCURRENCY_LIMIT = 10 | ||
|
||
|
||
class BigtableDataClient(ClientWithProject): | ||
def __init__( | ||
|
@@ -186,10 +195,13 @@ async def _ping_and_warm_instances( | |
- sequence of results or exceptions from the ping requests | ||
""" | ||
ping_rpc = channel.unary_unary( | ||
"/google.bigtable.v2.Bigtable/PingAndWarmChannel" | ||
"/google.bigtable.v2.Bigtable/PingAndWarm", | ||
request_serializer=PingAndWarmRequest.serialize, | ||
) | ||
tasks = [ping_rpc({"name": n}) for n in self._active_instances] | ||
return await asyncio.gather(*tasks, return_exceptions=True) | ||
result = await asyncio.gather(*tasks, return_exceptions=True) | ||
# return None in place of empty successful responses | ||
return [r or None for r in result] | ||
|
||
async def _manage_channel( | ||
self, | ||
|
@@ -517,20 +529,59 @@ async def read_rows_sharded( | |
self, | ||
query_list: list[ReadRowsQuery] | list[dict[str, Any]], | ||
*, | ||
limit: int | None, | ||
operation_timeout: int | float | None = 60, | ||
operation_timeout: int | float | None = None, | ||
per_request_timeout: int | float | None = None, | ||
) -> ReadRowsIterator: | ||
) -> list[Row]: | ||
""" | ||
Runs a sharded query in parallel | ||
Runs a sharded query in parallel, then return the results in a single list. | ||
Results will be returned in the order of the input queries. | ||
|
||
This function is intended to be run on the results on a query.shard() call: | ||
|
||
Each query in query list will be run concurrently, with results yielded as they are ready | ||
yielded results may be out of order | ||
``` | ||
table_shard_keys = await table.sample_row_keys() | ||
query = ReadRowsQuery(...) | ||
shard_queries = query.shard(table_shard_keys) | ||
results = await table.read_rows_sharded(shard_queries) | ||
``` | ||
|
||
Args: | ||
- query_list: a list of queries to run in parallel | ||
""" | ||
raise NotImplementedError | ||
Raises: | ||
- ShardedReadRowsExceptionGroup: if any of the queries failed | ||
- ValueError: if the query_list is empty | ||
""" | ||
if not query_list: | ||
raise ValueError("query_list must contain at least one query") | ||
routine_list = [ | ||
self.read_rows( | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
query, | ||
operation_timeout=operation_timeout, | ||
per_request_timeout=per_request_timeout, | ||
) | ||
for query in query_list | ||
] | ||
# submit requests in batches to limit concurrency | ||
batched_routines = [ | ||
routine_list[i : i + CONCURRENCY_LIMIT] | ||
for i in range(0, len(routine_list), CONCURRENCY_LIMIT) | ||
] | ||
# run batches and collect results | ||
results_list = [] | ||
for batch in batched_routines: | ||
batch_result = await asyncio.gather(*batch, return_exceptions=True) | ||
results_list.extend(batch_result) | ||
# collect exceptions | ||
exception_list = [ | ||
FailedQueryShardError(idx, query_list[idx], e) | ||
for idx, e in enumerate(results_list) | ||
if isinstance(e, Exception) | ||
] | ||
if exception_list: | ||
# if any sub-request failed, raise an exception instead of returning results | ||
raise ShardedReadRowsExceptionGroup(exception_list, len(query_list)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since you are taking the effort to let all of the shards finish despite the error, you might as well add the partial results in the exception There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I added a |
||
combined_list = list(chain.from_iterable(results_list)) | ||
return combined_list | ||
|
||
async def row_exists( | ||
self, | ||
|
@@ -549,32 +600,81 @@ async def row_exists( | |
""" | ||
raise NotImplementedError | ||
|
||
async def sample_keys( | ||
async def sample_row_keys( | ||
self, | ||
*, | ||
operation_timeout: int | float | None = 60, | ||
per_sample_timeout: int | float | None = 10, | ||
per_request_timeout: int | float | None = None, | ||
operation_timeout: float | None = None, | ||
per_request_timeout: float | None = None, | ||
) -> RowKeySamples: | ||
""" | ||
Return a set of RowKeySamples that delimit contiguous sections of the table of | ||
approximately equal size | ||
|
||
RowKeySamples output can be used with ReadRowsQuery.shard() to create a sharded query that | ||
can be parallelized across multiple backend nodes read_rows and read_rows_stream | ||
requests will call sample_keys internally for this purpose when sharding is enabled | ||
requests will call sample_row_keys internally for this purpose when sharding is enabled | ||
|
||
RowKeySamples is simply a type alias for list[tuple[bytes, int]]; a list of | ||
row_keys, along with offset positions in the table | ||
|
||
Returns: | ||
- a set of RowKeySamples the delimit contiguous sections of the table | ||
Raises: | ||
- DeadlineExceeded: raised after operation timeout | ||
will be chained with a RetryExceptionGroup containing all GoogleAPIError | ||
exceptions from any retries that failed | ||
- GoogleAPICallError: if the sample_row_keys request fails | ||
""" | ||
raise NotImplementedError | ||
# prepare timeouts | ||
operation_timeout = operation_timeout or self.default_operation_timeout | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
per_request_timeout = per_request_timeout or self.default_per_request_timeout | ||
|
||
if operation_timeout <= 0: | ||
raise ValueError("operation_timeout must be greater than 0") | ||
if per_request_timeout is not None and per_request_timeout <= 0: | ||
raise ValueError("per_request_timeout must be greater than 0") | ||
if per_request_timeout is not None and per_request_timeout > operation_timeout: | ||
raise ValueError( | ||
"per_request_timeout must not be greater than operation_timeout" | ||
) | ||
attempt_timeout_gen = _attempt_timeout_generator( | ||
per_request_timeout, operation_timeout | ||
) | ||
# prepare retryable | ||
predicate = retries.if_exception_type( | ||
core_exceptions.DeadlineExceeded, | ||
core_exceptions.ServiceUnavailable, | ||
) | ||
transient_errors = [] | ||
|
||
def on_error_fn(exc): | ||
# add errors to list if retryable | ||
if predicate(exc): | ||
transient_errors.append(exc) | ||
|
||
retry = retries.AsyncRetry( | ||
predicate=predicate, | ||
timeout=operation_timeout, | ||
initial=0.01, | ||
multiplier=2, | ||
maximum=60, | ||
on_error=on_error_fn, | ||
is_stream=False, | ||
) | ||
|
||
# prepare request | ||
metadata = _make_metadata(self.table_name, self.app_profile_id) | ||
|
||
async def execute_rpc(): | ||
results = await self.client._gapic_client.sample_row_keys( | ||
table_name=self.table_name, | ||
app_profile_id=self.app_profile_id, | ||
timeout=next(attempt_timeout_gen), | ||
metadata=metadata, | ||
) | ||
return [(s.row_key, s.offset_bytes) async for s in results] | ||
|
||
wrapped_fn = _convert_retry_deadline( | ||
retry(execute_rpc), operation_timeout, transient_errors | ||
) | ||
return await wrapped_fn() | ||
|
||
def mutations_batcher(self, **kwargs) -> MutationsBatcher: | ||
""" | ||
|
@@ -819,16 +919,17 @@ async def close(self): | |
""" | ||
Called to close the Table instance and release any resources held by it. | ||
""" | ||
self._register_instance_task.cancel() | ||
await self.client._remove_instance_registration(self.instance_id, self) | ||
|
||
async def __aenter__(self): | ||
""" | ||
Implement async context manager protocol | ||
|
||
Register this instance with the client, so that | ||
Ensure registration task has time to run, so that | ||
grpc channels will be warmed for the specified instance | ||
""" | ||
await self.client._register_instance(self.instance_id, self) | ||
await self._register_instance_task | ||
return self | ||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to raise an error if any of the shard queries overlap? Or is it ok to get duplicate rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we need an error. Also the rows will be de-duplicated on the serverside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the de-duplication work if we're requesting the duplicates in separate rpcs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think the same key can exist in multiple RPCs in the current implementation. The same key value will be put in the shard and we arent segmenting the shard. So it should end up in the rpc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah assuming they use the query.shard() function, that should be the case. But this method allows passing in a generic list of queries, so users may pass in overlapping queries, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right that its possible. I think we should avoid this situation, but not by throwing an error. I think we should make it impossible to happen. Perhaps we can do the following:
Create a Batch fetching context that end users create. The context will automatically call SampleRowKeys and cache the result. And maybe refresh it every X minutes.
The end user then interact with this object by passing it lists of keys and ranges that the context shards
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then move the
read_rows_sharded(unsharded_query)
function onto the context object? Or something else? I'd be a bit hesitant to add more background tasks if we can avoid it, but we can probably work something out.Another option that would be very simple to add would be to make
query.shard
return a customShardedQuery
object that just wraps the query list, and then only accept that as input forread_rows_sharded
. Or even simpler, just make it a type aliasIs this something we can create an issue for and address after the first alpha, or do you want it resolved before merging this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would need to come before alpha as its part of the public surface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I made a custom type for
ShardedQueries
, which should discourage people from passing their own custom queries. We can discuss more advanced changes later. Let me know what you think