Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sharded read rows #766

Merged
merged 405 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
405 commits
Select commit Hold shift + click to select a range
38e5662
did some restructuring
daniel-sanche Apr 1, 2023
5155800
got some tests working
daniel-sanche Apr 1, 2023
522f7fa
improved tests
daniel-sanche Apr 2, 2023
9429244
renamed RowResponse and CellResponse to Row and Cell
daniel-sanche Apr 2, 2023
1aa7424
fixed tests
daniel-sanche Apr 2, 2023
a603649
simplified row construction
daniel-sanche Apr 2, 2023
68a5a0f
added RowRange object
daniel-sanche Apr 3, 2023
cc2e7c8
added comments
daniel-sanche Apr 3, 2023
ba629c8
added api-core submodule
daniel-sanche Apr 3, 2023
75d2c10
copied in rough retryable logic
daniel-sanche Apr 3, 2023
d5eca2a
Merge branch 'v3_row_response' into read_rows_state_machine
daniel-sanche Apr 3, 2023
2a26797
updated Row and Cell class names
daniel-sanche Apr 3, 2023
bcd394f
fixed tests
daniel-sanche Apr 3, 2023
037af0d
added last scanned row class
daniel-sanche Apr 3, 2023
e17d9bc
ran blacken
daniel-sanche Apr 3, 2023
db80d22
Merge branch 'read_rows_state_machine' into read_rows_retries
daniel-sanche Apr 3, 2023
b3d977d
handle last scanned rows
daniel-sanche Apr 3, 2023
1f85462
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 3, 2023
1fba6ea
updated add_keys
daniel-sanche Apr 3, 2023
c4f82b0
removed chaining
daniel-sanche Apr 3, 2023
caca14c
improved to_dicts
daniel-sanche Apr 3, 2023
5f9ce85
improving row_ranges
daniel-sanche Apr 3, 2023
8e5f60a
fixed properties
daniel-sanche Apr 3, 2023
57184c1
added type checking to range
daniel-sanche Apr 3, 2023
3eda7f4
got tests passing
daniel-sanche Apr 3, 2023
65f5a2a
blacken, mypy
daniel-sanche Apr 3, 2023
3e724db
ran blacken
daniel-sanche Apr 3, 2023
45eadce
improved API usage
daniel-sanche Apr 3, 2023
c06213f
use invalid chunk
daniel-sanche Apr 3, 2023
6e75a2f
added per request timeouts
daniel-sanche Apr 3, 2023
a205e93
account for RequestStats
daniel-sanche Apr 3, 2023
ce3eb75
added output generator wrapper
daniel-sanche Apr 3, 2023
74029c9
updated template
daniel-sanche Apr 3, 2023
7f2be30
got tests passing
daniel-sanche Apr 3, 2023
2b044ce
removed metadata
daniel-sanche Apr 4, 2023
1743098
added sleep between swwapping and closing channels
daniel-sanche Apr 4, 2023
e5fa4b6
ran blacken
daniel-sanche Apr 4, 2023
8955ec5
got tests working
daniel-sanche Apr 4, 2023
002bc5f
fixed lint issue
daniel-sanche Apr 4, 2023
65f0d2f
fixed tests
daniel-sanche Apr 4, 2023
664a6d2
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 4, 2023
d3db731
Merge branch 'add_new_transport' into read_rows_state_machine
daniel-sanche Apr 4, 2023
5f41c06
changed return type
daniel-sanche Apr 4, 2023
aa26911
Merge branch 'v3_read_rows_query' into read_rows_state_machine
daniel-sanche Apr 4, 2023
7b68207
fixed typing issues
daniel-sanche Apr 4, 2023
a776cb5
Merge branch 'read_rows_state_machine' into read_rows_retries
daniel-sanche Apr 4, 2023
c164a47
adjusted types
daniel-sanche Apr 4, 2023
96d58d1
added per-row-rimeout to merge_row_stream_with_cache
daniel-sanche Apr 4, 2023
216610e
cancel stream on exception
daniel-sanche Apr 4, 2023
c505c39
moved retry logic into RetryableRowMerger
daniel-sanche Apr 4, 2023
179c8b8
fixed issues in merger
daniel-sanche Apr 4, 2023
3cc5380
moved streaming into cache into RetryableRowMerger
daniel-sanche Apr 4, 2023
4af0218
restructuring
daniel-sanche Apr 4, 2023
d6a323f
added idle timeout
daniel-sanche Apr 4, 2023
7b6d1db
keep track of last_raised
daniel-sanche Apr 4, 2023
733a393
fixed mypy issues
daniel-sanche Apr 4, 2023
12807e0
made idle timeout internal value
daniel-sanche Apr 4, 2023
0e3d32c
combined row merger functions
daniel-sanche Apr 4, 2023
5b055b4
made adjustments to RowMerger
daniel-sanche Apr 4, 2023
dbf19c9
holds a gapic client instead of inherits from it
daniel-sanche Apr 5, 2023
ab7931c
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 5, 2023
88f14f6
don't emit _LastScannedRows
daniel-sanche Apr 5, 2023
9f15a6a
fixed type issues
daniel-sanche Apr 5, 2023
b3c32b0
got tests passing
daniel-sanche Apr 5, 2023
770d9f5
added comments
daniel-sanche Apr 5, 2023
9f3e0c5
added comment
daniel-sanche Apr 5, 2023
a0620ea
added random noise to refresh intervals
daniel-sanche Apr 5, 2023
4f5ed46
improving comments; clean up
daniel-sanche Apr 5, 2023
c169ba8
fixed param order
daniel-sanche Apr 5, 2023
9ec3697
working on getting end-to-end read_rows working
daniel-sanche Apr 5, 2023
b6873e8
fixed issue in pulling from cache
daniel-sanche Apr 5, 2023
2facc79
added timeout to results generator
daniel-sanche Apr 5, 2023
ee826bb
added acceptance tests for read_rows
daniel-sanche Apr 5, 2023
25af0c0
adding tests
daniel-sanche Apr 5, 2023
2f7778d
got operation deadline error working properly
daniel-sanche Apr 5, 2023
d6b8e6b
made RowMerger back into an iterable
daniel-sanche Apr 5, 2023
3f085a9
added test for per-row timeout
daniel-sanche Apr 6, 2023
6abb9d4
don't attach retry errors if there are none
daniel-sanche Apr 6, 2023
128320c
added tests for per_request_timeout
daniel-sanche Apr 6, 2023
a048536
added idle timeout test
daniel-sanche Apr 6, 2023
371dd64
remove row merger after error
daniel-sanche Apr 6, 2023
ebbaa1e
reorganized retryable_merge_rows
daniel-sanche Apr 6, 2023
2a3e379
improved resource clean up on retries and expiration
daniel-sanche Apr 6, 2023
2e50c51
added tests for request stats
daniel-sanche Apr 6, 2023
0b63b2b
added tests for exceptions
daniel-sanche Apr 6, 2023
de102bb
clean up on_error
daniel-sanche Apr 6, 2023
bbdb8e6
await sleep
daniel-sanche Apr 6, 2023
83472dc
got tests working
daniel-sanche Apr 6, 2023
bef40bd
updated api-core
daniel-sanche Apr 6, 2023
29a98ed
Merge branch 'v3' into read_rows_retries
daniel-sanche Apr 6, 2023
534005a
ran blacken
daniel-sanche Apr 6, 2023
6f1c781
made invalid chunk a server error
daniel-sanche Apr 6, 2023
38f66e5
moved invalid chunk with other exceptions
daniel-sanche Apr 6, 2023
bf24c25
made row merger and classes private
daniel-sanche Apr 6, 2023
4dbacb5
added read_rows
daniel-sanche Apr 6, 2023
6e6978e
ran blacken
daniel-sanche Apr 6, 2023
21f7846
added comments
daniel-sanche Apr 6, 2023
52e9dbf
added test for revise rowset
daniel-sanche Apr 6, 2023
715be51
fixed lint issues
daniel-sanche Apr 6, 2023
2f50cb7
moved ReadRowsIterator into new file
daniel-sanche Apr 6, 2023
1486d5a
Merge branch 'v3' into add_new_transport
daniel-sanche Apr 6, 2023
28d5a7a
fixed lint issues
daniel-sanche Apr 6, 2023
3b11580
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 6, 2023
d47c941
changed comment
daniel-sanche Apr 6, 2023
d1bd128
added comments to iterator
daniel-sanche Apr 6, 2023
039d623
added var for idle timeout
daniel-sanche Apr 6, 2023
3d34dcd
sped up acceptance tests
daniel-sanche Apr 6, 2023
70fbff9
reduced size of template by making subclass
daniel-sanche Apr 7, 2023
383d8eb
reverted unintentional gapic generation changes
daniel-sanche Apr 7, 2023
018fe03
updated submodule
daniel-sanche Apr 7, 2023
3764a98
added default timeouts to table surface
daniel-sanche Apr 7, 2023
745ae38
end after row_limit rows
daniel-sanche Apr 13, 2023
3d11d55
changed retryable exceptions
daniel-sanche Apr 13, 2023
f0403e7
changed warning stack level
daniel-sanche Apr 13, 2023
84a775a
changed retryable errors
daniel-sanche Apr 13, 2023
15a9d23
improved comments
daniel-sanche Apr 13, 2023
8636654
improved idle timeouts
daniel-sanche Apr 13, 2023
1aca392
changed retry parameters
daniel-sanche Apr 13, 2023
45fef1e
added limit revision to each retry
daniel-sanche Apr 13, 2023
951a77b
removed unneeded check
daniel-sanche Apr 13, 2023
e3a0b66
fixed idle timeout test
daniel-sanche Apr 13, 2023
6089934
removed tracking of emitted rows
daniel-sanche Apr 13, 2023
fb4b0ca
removed revise_on_retry flag
daniel-sanche Apr 14, 2023
83b908c
changed initial sleep
daniel-sanche Apr 14, 2023
5688561
added extra timeout check
daniel-sanche Apr 14, 2023
7f57e7c
implemented sample_keys
daniel-sanche Apr 14, 2023
a6a140b
initial implementation of query.shard
daniel-sanche Apr 14, 2023
0f03aea
added read_rows_sharded implementation
daniel-sanche Apr 14, 2023
cfa181d
fixed bugs in implementation
daniel-sanche Apr 14, 2023
e8007c8
added str and equal to query and range
daniel-sanche Apr 14, 2023
e190dc6
added a test for sharding
daniel-sanche Apr 14, 2023
5aa89da
got first set of tests passing
daniel-sanche Apr 15, 2023
a565f47
added table scan test
daniel-sanche Apr 15, 2023
7041dfd
added more tests
daniel-sanche Apr 15, 2023
2f7973a
made row ranges into set
daniel-sanche Apr 15, 2023
872480f
added unsorted test
daniel-sanche Apr 15, 2023
47be958
ran blacken
daniel-sanche Apr 15, 2023
c945687
fixed mypy issues
daniel-sanche Apr 15, 2023
b0dbaed
fixed lint issues
daniel-sanche Apr 15, 2023
53878a9
fixed bug in from_dict
daniel-sanche Apr 15, 2023
ff2dfca
fixed tests
daniel-sanche Apr 17, 2023
ff3724d
removed outdated test
daniel-sanche Apr 17, 2023
78a309c
fixed type annotations
daniel-sanche Apr 17, 2023
c50ae18
added slots
daniel-sanche Apr 17, 2023
d73121b
renamed cache to buffer
daniel-sanche Apr 17, 2023
14d8527
renamed errors
daniel-sanche Apr 17, 2023
4b89c86
replaced type check with None check
daniel-sanche Apr 17, 2023
9f89577
added comment for last_scanned_row heartbeat
daniel-sanche Apr 17, 2023
4b229b9
added early return
daniel-sanche Apr 17, 2023
152bccf
moved validation
daniel-sanche Apr 17, 2023
67c2911
added close call to ReadRowsIterator
daniel-sanche Apr 18, 2023
ff11ad3
removed del
daniel-sanche Apr 18, 2023
78bd5d3
pull out buffer control logic
daniel-sanche Apr 18, 2023
ca4a16d
got buffering working
daniel-sanche Apr 18, 2023
0dba121
check for full table scan revision
daniel-sanche Apr 18, 2023
3537566
renamed and added underscores
daniel-sanche Apr 18, 2023
981f169
added extra check
daniel-sanche Apr 18, 2023
d3d4c76
removed unneeded validation
daniel-sanche Apr 18, 2023
1901094
renamed RowMerger to ReadRowsOperation
daniel-sanche Apr 18, 2023
947fe9b
changed _read_rows test file name
daniel-sanche Apr 18, 2023
773d4e5
added row builder tests
daniel-sanche Apr 18, 2023
cbb0513
added revise_row tests
daniel-sanche Apr 19, 2023
2bec693
ran blacken
daniel-sanche Apr 19, 2023
5cd8e00
added constructor tests
daniel-sanche Apr 19, 2023
d6f3ae1
upgraded submodule
daniel-sanche Apr 19, 2023
f2d7e71
added tests
daniel-sanche Apr 19, 2023
cb23d32
update docstring
daniel-sanche Apr 19, 2023
bc31ab8
update docstring
daniel-sanche Apr 19, 2023
f54dfde
fix typo
daniel-sanche Apr 19, 2023
46cfc49
docstring improvements
daniel-sanche Apr 19, 2023
573bbd1
made creating table outside loop into error
daniel-sanche Apr 19, 2023
4f2657d
make tables own active instances, and remove instances when tables close
daniel-sanche Apr 19, 2023
59955be
added pool_size and channels as public properties
daniel-sanche Apr 19, 2023
377a8c9
fixed typo
daniel-sanche Apr 19, 2023
8a29898
simplified pooled multicallable
daniel-sanche Apr 20, 2023
50aa5ba
ran blacken
daniel-sanche Apr 20, 2023
42a52a3
associate ids with instances, instead of Table objects
daniel-sanche Apr 20, 2023
abc7a2d
fixed tests
daniel-sanche Apr 20, 2023
836af0f
made sure that empty strings are valid family and qualifier inputs
daniel-sanche Apr 20, 2023
e73551d
added tests for state machine
daniel-sanche Apr 20, 2023
792aba1
added state machine tests
daniel-sanche Apr 20, 2023
e57c510
fixed broken mock
daniel-sanche Apr 20, 2023
88748a9
added additional tests
daniel-sanche Apr 20, 2023
0c38981
ran blacken
daniel-sanche Apr 20, 2023
50dc608
reverted pooled multicallable changes
daniel-sanche Apr 20, 2023
b116755
pass scopes to created channels
daniel-sanche Apr 21, 2023
ec5eb07
added basic ping system test
daniel-sanche Apr 21, 2023
55cdcc2
keep both the names and ids in table object
daniel-sanche Apr 21, 2023
0253692
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 21, 2023
3855333
added api-core to noxfile tests
daniel-sanche Apr 21, 2023
213519e
added basic read rows stream to system tests
daniel-sanche Apr 21, 2023
9e3b411
pull project details out of env vars
daniel-sanche Apr 21, 2023
d8cf158
added automatic row creation for system tests
daniel-sanche Apr 21, 2023
c9b8217
added read_rows non stream
daniel-sanche Apr 21, 2023
500eff0
added range query system test
daniel-sanche Apr 21, 2023
27130f0
added logic for temporary test tables and instances
daniel-sanche Apr 21, 2023
f4f4fac
made iterator active into a property
daniel-sanche Apr 21, 2023
06dee54
added more read_rows system tests
daniel-sanche Apr 21, 2023
9e11f88
fixed lint issues
daniel-sanche Apr 21, 2023
794c55a
added iterator tests
daniel-sanche Apr 21, 2023
ccd9545
added tests for timeouts
daniel-sanche Apr 21, 2023
ca84b96
ran black
daniel-sanche Apr 21, 2023
eb936cf
fixed lint issues
daniel-sanche Apr 21, 2023
ab43138
restructured test_client
daniel-sanche Apr 21, 2023
cb1884d
changed how random is mocked
daniel-sanche Apr 21, 2023
9a89d74
ran black
daniel-sanche Apr 21, 2023
7f783fc
restructred test_client
daniel-sanche Apr 21, 2023
6a6d219
Merge branch 'add_new_transport' into read_rows_retries
daniel-sanche Apr 21, 2023
72eca75
restructured test_client_read_rows
daniel-sanche Apr 21, 2023
ad42436
moved read rows tests in test_client
daniel-sanche Apr 21, 2023
7606e3a
update submodules in nox
daniel-sanche Apr 22, 2023
829e68f
ran black
daniel-sanche Apr 22, 2023
e8eff39
Merge branch 'v3' into read_rows_retries
daniel-sanche Apr 24, 2023
6a58e86
removed submodule update
daniel-sanche Apr 24, 2023
9be5b07
removed unneeded import
daniel-sanche Apr 24, 2023
4f819b2
Merge branch 'read_rows_retries' into sharded_read_rows
daniel-sanche Apr 24, 2023
f476ad7
Merge branch 'v3' into sharded_read_rows
daniel-sanche May 24, 2023
62dcbb5
cleaned up read_rows_sharded function
daniel-sanche May 30, 2023
b4a95b3
refactored read_rows_query tests to match other files
daniel-sanche May 30, 2023
5972722
finished read_rows_query tests
daniel-sanche May 30, 2023
7c1643c
fixed issue with ping and warm
daniel-sanche May 30, 2023
a39d931
added tests for sharded queries
daniel-sanche May 30, 2023
482eed9
added new exception type for sharded rpcs
daniel-sanche May 30, 2023
faec93e
added test for concurrency
daniel-sanche May 30, 2023
6f6e010
removed subclass for sharded tests
daniel-sanche May 30, 2023
a005ec8
added sample_key samples
daniel-sanche May 30, 2023
dd10624
added system tests
daniel-sanche May 30, 2023
82789ec
refactoring shard function
daniel-sanche May 31, 2023
d39fd0f
added extra checks to query class
daniel-sanche May 31, 2023
7e26d40
fixed comment
daniel-sanche May 31, 2023
34aea1a
added extra docstring
daniel-sanche May 31, 2023
42cac01
renamed sample_keys to sample_row_keys
daniel-sanche May 31, 2023
632a106
Merge branch 'v3' into sharded_read_rows
daniel-sanche Jun 6, 2023
05a311e
added metadata to sample_row_keys
daniel-sanche Jun 6, 2023
f53af32
changed shard points to be range ends instead of starts
daniel-sanche Jun 6, 2023
ac4378d
added concurrency limit
daniel-sanche Jun 7, 2023
9eaa279
added retries for sample_keys
daniel-sanche Jun 7, 2023
6cca7cf
cleaned up code block
daniel-sanche Jun 15, 2023
88e88d4
documented and simplified sharding function
daniel-sanche Jun 15, 2023
26ffe0c
Merge branch 'v3' into sharded_read_rows
daniel-sanche Jun 18, 2023
9302286
split row_range sharding into own helper
daniel-sanche Jun 18, 2023
3f4dd0e
added type alias
daniel-sanche Jun 21, 2023
bb72b5e
modify timeouts with batch
daniel-sanche Jun 21, 2023
71b034c
added successfult rows to ShardedReadRowsExceptionGroup
daniel-sanche Jun 21, 2023
ceb8129
improved end segment search
daniel-sanche Jun 21, 2023
0e277f4
removed changes to mutation exception
daniel-sanche Jun 21, 2023
37b4967
added excaption tests for new exception types
daniel-sanche Jun 21, 2023
9508a0f
fixed error in sharded_read_rows
daniel-sanche Jun 22, 2023
d3f6b0f
added timeouts to batching test
daniel-sanche Jun 22, 2023
a4f606e
ran black
daniel-sanche Jun 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigtable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

# Type alias for the output of sample_keys
RowKeySamples = List[Tuple[bytes, int]]
# type alias for the output of query.shard()
ShardedQuery = List[ReadRowsQuery]

__version__: str = package_version.__version__

Expand Down
163 changes: 141 additions & 22 deletions google/cloud/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
PooledBigtableGrpcAsyncIOTransport,
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
from google.cloud.client import ClientWithProject
from google.api_core.exceptions import GoogleAPICallError
from google.api_core import retry_async as retries
Expand All @@ -50,10 +51,14 @@
from google.cloud.bigtable.row import Row
from google.cloud.bigtable.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.iterators import ReadRowsIterator
from google.cloud.bigtable.exceptions import FailedQueryShardError
from google.cloud.bigtable.exceptions import ShardedReadRowsExceptionGroup

from google.cloud.bigtable.mutations import Mutation, RowMutationEntry
from google.cloud.bigtable._mutate_rows import _MutateRowsOperation
from google.cloud.bigtable._helpers import _make_metadata
from google.cloud.bigtable._helpers import _convert_retry_deadline
from google.cloud.bigtable._helpers import _attempt_timeout_generator

from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule
from google.cloud.bigtable.row_filters import RowFilter
Expand All @@ -64,6 +69,10 @@
if TYPE_CHECKING:
from google.cloud.bigtable.mutations_batcher import MutationsBatcher
from google.cloud.bigtable import RowKeySamples
from google.cloud.bigtable import ShardedQuery

# used by read_rows_sharded to limit how many requests are attempted in parallel
CONCURRENCY_LIMIT = 10


class BigtableDataClient(ClientWithProject):
Expand Down Expand Up @@ -190,10 +199,13 @@ async def _ping_and_warm_instances(
- sequence of results or exceptions from the ping requests
"""
ping_rpc = channel.unary_unary(
"/google.bigtable.v2.Bigtable/PingAndWarmChannel"
"/google.bigtable.v2.Bigtable/PingAndWarm",
request_serializer=PingAndWarmRequest.serialize,
)
tasks = [ping_rpc({"name": n}) for n in self._active_instances]
return await asyncio.gather(*tasks, return_exceptions=True)
result = await asyncio.gather(*tasks, return_exceptions=True)
# return None in place of empty successful responses
return [r or None for r in result]

async def _manage_channel(
self,
Expand Down Expand Up @@ -532,22 +544,79 @@ async def read_row(

async def read_rows_sharded(
self,
query_list: list[ReadRowsQuery] | list[dict[str, Any]],
sharded_query: ShardedQuery,
*,
limit: int | None,
operation_timeout: int | float | None = 60,
operation_timeout: int | float | None = None,
per_request_timeout: int | float | None = None,
) -> ReadRowsIterator:
) -> list[Row]:
"""
Runs a sharded query in parallel
Runs a sharded query in parallel, then return the results in a single list.
Results will be returned in the order of the input queries.

This function is intended to be run on the results on a query.shard() call:

Each query in query list will be run concurrently, with results yielded as they are ready
yielded results may be out of order
```
table_shard_keys = await table.sample_row_keys()
query = ReadRowsQuery(...)
shard_queries = query.shard(table_shard_keys)
results = await table.read_rows_sharded(shard_queries)
```

Args:
- query_list: a list of queries to run in parallel
- sharded_query: a sharded query to execute
Raises:
- ShardedReadRowsExceptionGroup: if any of the queries failed
- ValueError: if the query_list is empty
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to raise an error if any of the shard queries overlap? Or is it ok to get duplicate rows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we need an error. Also the rows will be de-duplicated on the serverside

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the de-duplication work if we're requesting the duplicates in separate rpcs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think the same key can exist in multiple RPCs in the current implementation. The same key value will be put in the shard and we arent segmenting the shard. So it should end up in the rpc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah assuming they use the query.shard() function, that should be the case. But this method allows passing in a generic list of queries, so users may pass in overlapping queries, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that its possible. I think we should avoid this situation, but not by throwing an error. I think we should make it impossible to happen. Perhaps we can do the following:

Create a Batch fetching context that end users create. The context will automatically call SampleRowKeys and cache the result. And maybe refresh it every X minutes.

The end user then interact with this object by passing it lists of keys and ranges that the context shards

Copy link
Contributor Author

@daniel-sanche daniel-sanche Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then move the read_rows_sharded(unsharded_query) function onto the context object? Or something else? I'd be a bit hesitant to add more background tasks if we can avoid it, but we can probably work something out.

Another option that would be very simple to add would be to make query.shard return a custom ShardedQuery object that just wraps the query list, and then only accept that as input for read_rows_sharded. Or even simpler, just make it a type alias

Is this something we can create an issue for and address after the first alpha, or do you want it resolved before merging this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would need to come before alpha as its part of the public surface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I made a custom type for ShardedQueries, which should discourage people from passing their own custom queries. We can discuss more advanced changes later. Let me know what you think

raise NotImplementedError
if not sharded_query:
raise ValueError("empty sharded_query")
# reduce operation_timeout between batches
operation_timeout = operation_timeout or self.default_operation_timeout
per_request_timeout = (
per_request_timeout or self.default_per_request_timeout or operation_timeout
)
timeout_generator = _attempt_timeout_generator(
operation_timeout, operation_timeout
)
# submit shards in batches if the number of shards goes over CONCURRENCY_LIMIT
batched_queries = [
sharded_query[i : i + CONCURRENCY_LIMIT]
for i in range(0, len(sharded_query), CONCURRENCY_LIMIT)
]
# run batches and collect results
results_list = []
error_dict = {}
shard_idx = 0
for batch in batched_queries:
batch_operation_timeout = next(timeout_generator)
routine_list = [
self.read_rows(
query,
operation_timeout=batch_operation_timeout,
per_request_timeout=min(
per_request_timeout, batch_operation_timeout
),
)
for query in batch
]
batch_result = await asyncio.gather(*routine_list, return_exceptions=True)
for result in batch_result:
if isinstance(result, Exception):
error_dict[shard_idx] = result
else:
results_list.extend(result)
shard_idx += 1
if error_dict:
# if any sub-request failed, raise an exception instead of returning results
raise ShardedReadRowsExceptionGroup(
[
FailedQueryShardError(idx, sharded_query[idx], e)
for idx, e in error_dict.items()
],
results_list,
len(sharded_query),
)
return results_list

async def row_exists(
self,
Expand Down Expand Up @@ -577,32 +646,81 @@ async def row_exists(
)
return len(results) > 0

async def sample_keys(
async def sample_row_keys(
self,
*,
operation_timeout: int | float | None = 60,
per_sample_timeout: int | float | None = 10,
per_request_timeout: int | float | None = None,
operation_timeout: float | None = None,
per_request_timeout: float | None = None,
) -> RowKeySamples:
"""
Return a set of RowKeySamples that delimit contiguous sections of the table of
approximately equal size

RowKeySamples output can be used with ReadRowsQuery.shard() to create a sharded query that
can be parallelized across multiple backend nodes read_rows and read_rows_stream
requests will call sample_keys internally for this purpose when sharding is enabled
requests will call sample_row_keys internally for this purpose when sharding is enabled

RowKeySamples is simply a type alias for list[tuple[bytes, int]]; a list of
row_keys, along with offset positions in the table

Returns:
- a set of RowKeySamples the delimit contiguous sections of the table
Raises:
- DeadlineExceeded: raised after operation timeout
will be chained with a RetryExceptionGroup containing all GoogleAPIError
exceptions from any retries that failed
- GoogleAPICallError: if the sample_row_keys request fails
"""
raise NotImplementedError
# prepare timeouts
operation_timeout = operation_timeout or self.default_operation_timeout
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
per_request_timeout = per_request_timeout or self.default_per_request_timeout

if operation_timeout <= 0:
raise ValueError("operation_timeout must be greater than 0")
if per_request_timeout is not None and per_request_timeout <= 0:
raise ValueError("per_request_timeout must be greater than 0")
if per_request_timeout is not None and per_request_timeout > operation_timeout:
raise ValueError(
"per_request_timeout must not be greater than operation_timeout"
)
attempt_timeout_gen = _attempt_timeout_generator(
per_request_timeout, operation_timeout
)
# prepare retryable
predicate = retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
)
transient_errors = []

def on_error_fn(exc):
# add errors to list if retryable
if predicate(exc):
transient_errors.append(exc)

retry = retries.AsyncRetry(
predicate=predicate,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
on_error=on_error_fn,
is_stream=False,
)

# prepare request
metadata = _make_metadata(self.table_name, self.app_profile_id)

async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=next(attempt_timeout_gen),
metadata=metadata,
)
return [(s.row_key, s.offset_bytes) async for s in results]

wrapped_fn = _convert_retry_deadline(
retry(execute_rpc), operation_timeout, transient_errors
)
return await wrapped_fn()

def mutations_batcher(self, **kwargs) -> MutationsBatcher:
"""
Expand Down Expand Up @@ -896,16 +1014,17 @@ async def close(self):
"""
Called to close the Table instance and release any resources held by it.
"""
self._register_instance_task.cancel()
await self.client._remove_instance_registration(self.instance_id, self)

async def __aenter__(self):
"""
Implement async context manager protocol

Register this instance with the client, so that
Ensure registration task has time to run, so that
grpc channels will be warmed for the specified instance
"""
await self.client._register_instance(self.instance_id, self)
await self._register_instance_task
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand Down
50 changes: 49 additions & 1 deletion google/cloud/bigtable/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

import sys

from typing import TYPE_CHECKING
from typing import Any, TYPE_CHECKING

from google.api_core import exceptions as core_exceptions
from google.cloud.bigtable.row import Row

is_311_plus = sys.version_info >= (3, 11)

if TYPE_CHECKING:
from google.cloud.bigtable.mutations import RowMutationEntry
from google.cloud.bigtable.read_rows_query import ReadRowsQuery


class IdleTimeout(core_exceptions.DeadlineExceeded):
Expand Down Expand Up @@ -137,3 +139,49 @@ def __init__(self, excs: list[Exception]):

def __new__(cls, excs: list[Exception]):
return super().__new__(cls, cls._format_message(excs), excs)


class ShardedReadRowsExceptionGroup(BigtableExceptionGroup):
"""
Represents one or more exceptions that occur during a sharded read rows operation
"""

@staticmethod
def _format_message(excs: list[FailedQueryShardError], total_queries: int):
query_str = "query" if total_queries == 1 else "queries"
plural_str = "" if len(excs) == 1 else "s"
return f"{len(excs)} sub-exception{plural_str} (from {total_queries} {query_str} attempted)"

def __init__(
self,
excs: list[FailedQueryShardError],
succeeded: list[Row],
total_queries: int,
):
super().__init__(self._format_message(excs, total_queries), excs)
self.successful_rows = succeeded

def __new__(
cls, excs: list[FailedQueryShardError], succeeded: list[Row], total_queries: int
):
instance = super().__new__(cls, cls._format_message(excs, total_queries), excs)
instance.successful_rows = succeeded
return instance


class FailedQueryShardError(Exception):
"""
Represents an individual failed query in a sharded read rows operation
"""

def __init__(
self,
failed_index: int,
failed_query: "ReadRowsQuery" | dict[str, Any],
cause: Exception,
):
message = f"Failed query at index {failed_index} with cause: {cause!r}"
super().__init__(message)
self.index = failed_index
self.query = failed_query
self.__cause__ = cause
Loading