diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index 02a4dedce..1b3cb6c52 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:240b5bcc2bafd450912d2da2be15e62bc6de2cf839823ae4bf94d4f392b451dc -# created: 2023-06-03T21:25:37.968717478Z + digest: sha256:ddf4551385d566771dc713090feb7b4c1164fb8a698fe52bbe7670b24236565b +# created: 2023-06-27T13:04:21.96690344Z diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index ac491adaf..db7728ee9 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -52,7 +52,7 @@ def __init__( table: "TableAsync", mutation_entries: list["RowMutationEntry"], operation_timeout: float, - per_request_timeout: float | None, + attempt_timeout: float | None, ): """ Args: @@ -60,7 +60,7 @@ def __init__( - table: the table associated with the request - mutation_entries: a list of RowMutationEntry objects to send to the server - operation_timeout: the timeout t o use for the entire operation, in seconds. - - per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds. + - attempt_timeout: the timeoutto use for each mutate_rows attempt, in seconds. If not specified, the request will run until operation_timeout is reached. """ # check that mutations are within limits @@ -99,7 +99,7 @@ def __init__( self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout) # initialize state self.timeout_generator = _attempt_timeout_generator( - per_request_timeout, operation_timeout + attempt_timeout, operation_timeout ) self.mutations = mutation_entries self.remaining_indices = list(range(len(self.mutations))) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 910a01c4c..ec1e488c6 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -63,14 +63,14 @@ def __init__( client: BigtableAsyncClient, *, operation_timeout: float = 600.0, - per_request_timeout: float | None = None, + attempt_timeout: float | None = None, ): """ Args: - request: the request dict to send to the Bigtable API - client: the Bigtable client to use to make the request - operation_timeout: the timeout to use for the entire operation, in seconds - - per_request_timeout: the timeout to use when waiting for each individual grpc request, in seconds + - attempt_timeout: the timeout to use when waiting for each individual grpc request, in seconds If not specified, defaults to operation_timeout """ self._last_emitted_row_key: bytes | None = None @@ -79,7 +79,7 @@ def __init__( self.operation_timeout = operation_timeout # use generator to lower per-attempt timeout as we approach operation_timeout deadline attempt_timeout_gen = _attempt_timeout_generator( - per_request_timeout, operation_timeout + attempt_timeout, operation_timeout ) row_limit = request.get("rows_limit", 0) # lock in paramters for retryable wrapper diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 3a5831799..77a5c180e 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -58,6 +58,7 @@ from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync from google.cloud.bigtable.data._helpers import _make_metadata from google.cloud.bigtable.data._helpers import _convert_retry_deadline +from google.cloud.bigtable.data._helpers import _validate_timeouts from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE from google.cloud.bigtable.data._helpers import _attempt_timeout_generator @@ -340,14 +341,18 @@ async def _remove_instance_registration( except KeyError: return False - # TODO: revisit timeouts https://github.com/googleapis/python-bigtable/issues/782 def get_table( self, instance_id: str, table_id: str, app_profile_id: str | None = None, - default_operation_timeout: float = 600, - default_per_request_timeout: float | None = None, + *, + default_read_rows_operation_timeout: float = 600, + default_read_rows_attempt_timeout: float | None = None, + default_mutate_rows_operation_timeout: float = 600, + default_mutate_rows_attempt_timeout: float | None = None, + default_operation_timeout: float = 60, + default_attempt_timeout: float | None = None, ) -> TableAsync: """ Returns a table instance for making data API requests @@ -366,7 +371,7 @@ def get_table( table_id, app_profile_id, default_operation_timeout=default_operation_timeout, - default_per_request_timeout=default_per_request_timeout, + default_attempt_timeout=default_attempt_timeout, ) async def __aenter__(self): @@ -393,8 +398,12 @@ def __init__( table_id: str, app_profile_id: str | None = None, *, - default_operation_timeout: float = 600, - default_per_request_timeout: float | None = None, + default_read_rows_operation_timeout: float = 600, + default_read_rows_attempt_timeout: float | None = None, + default_mutate_rows_operation_timeout: float = 600, + default_mutate_rows_attempt_timeout: float | None = None, + default_operation_timeout: float = 60, + default_attempt_timeout: float | None = None, ): """ Initialize a Table instance @@ -410,23 +419,26 @@ def __init__( app_profile_id: (Optional) The app profile to associate with requests. https://cloud.google.com/bigtable/docs/app-profiles default_operation_timeout: (Optional) The default timeout, in seconds - default_per_request_timeout: (Optional) The default timeout for individual + default_attempt_timeout: (Optional) The default timeout for individual rpc requests, in seconds Raises: - RuntimeError if called outside of an async context (no running event loop) """ # validate timeouts - if default_operation_timeout <= 0: - raise ValueError("default_operation_timeout must be greater than 0") - if default_per_request_timeout is not None and default_per_request_timeout <= 0: - raise ValueError("default_per_request_timeout must be greater than 0") - if ( - default_per_request_timeout is not None - and default_per_request_timeout > default_operation_timeout - ): - raise ValueError( - "default_per_request_timeout must be less than default_operation_timeout" - ) + _validate_timeouts( + default_operation_timeout, default_attempt_timeout, allow_none=True + ) + _validate_timeouts( + default_read_rows_operation_timeout, + default_read_rows_attempt_timeout, + allow_none=True, + ) + _validate_timeouts( + default_mutate_rows_operation_timeout, + default_mutate_rows_attempt_timeout, + allow_none=True, + ) + self.client = client self.instance_id = instance_id self.instance_name = self.client._gapic_client.instance_path( @@ -439,7 +451,13 @@ def __init__( self.app_profile_id = app_profile_id self.default_operation_timeout = default_operation_timeout - self.default_per_request_timeout = default_per_request_timeout + self.default_attempt_timeout = default_attempt_timeout + self.default_read_rows_operation_timeout = default_read_rows_operation_timeout + self.default_read_rows_attempt_timeout = default_read_rows_attempt_timeout + self.default_mutate_rows_operation_timeout = ( + default_mutate_rows_operation_timeout + ) + self.default_mutate_rows_attempt_timeout = default_mutate_rows_attempt_timeout # raises RuntimeError if called outside of an async context (no running event loop) try: @@ -456,24 +474,24 @@ async def read_rows_stream( query: ReadRowsQuery | dict[str, Any], *, operation_timeout: float | None = None, - per_request_timeout: float | None = None, + attempt_timeout: float | None = None, ) -> ReadRowsAsyncIterator: """ + Read a set of rows from the table, based on the specified query. Returns an iterator to asynchronously stream back row data. - Failed requests within operation_timeout and operation_deadline policies will be retried. + Failed requests within operation_timeout will be retried. Args: - query: contains details about which rows to return - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - time is only counted while actively waiting on the network. - If None, defaults to the Table's default_operation_timeout - - per_request_timeout: the time budget for an individual network request, in seconds. + If None, defaults to the Table's default_read_rows_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted. - If None, defaults to the Table's default_per_request_timeout - + If None, defaults to the Table's default_read_rows_attempt_timeout, + or the operation_timeout if that is also None. Returns: - an asynchronous iterator that yields rows returned by the query Raises: @@ -483,35 +501,31 @@ async def read_rows_stream( - GoogleAPIError: raised if the request encounters an unrecoverable error - IdleTimeout: if iterator was abandoned """ + operation_timeout = ( + operation_timeout or self.default_read_rows_operation_timeout + ) + attempt_timeout = ( + attempt_timeout + or self.default_read_rows_attempt_timeout + or operation_timeout + ) + _validate_timeouts(operation_timeout, attempt_timeout) - operation_timeout = operation_timeout or self.default_operation_timeout - per_request_timeout = per_request_timeout or self.default_per_request_timeout - - if operation_timeout <= 0: - raise ValueError("operation_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout <= 0: - raise ValueError("per_request_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout > operation_timeout: - raise ValueError( - "per_request_timeout must not be greater than operation_timeout" - ) - if per_request_timeout is None: - per_request_timeout = operation_timeout request = query._to_dict() if isinstance(query, ReadRowsQuery) else query request["table_name"] = self.table_name if self.app_profile_id: request["app_profile_id"] = self.app_profile_id # read_rows smart retries is implemented using a series of iterators: - # - client.read_rows: outputs raw ReadRowsResponse objects from backend. Has per_request_timeout + # - client.read_rows: outputs raw ReadRowsResponse objects from backend. Has attempt_timeout # - ReadRowsOperation.merge_row_response_stream: parses chunks into rows - # - ReadRowsOperation.retryable_merge_rows: adds retries, caching, revised requests, per_request_timeout + # - ReadRowsOperation.retryable_merge_rows: adds retries, caching, revised requests, operation_timeout # - ReadRowsAsyncIterator: adds idle_timeout, moves stats out of stream and into attribute row_merger = _ReadRowsOperationAsync( request, self.client._gapic_client, operation_timeout=operation_timeout, - per_request_timeout=per_request_timeout, + attempt_timeout=attempt_timeout, ) output_generator = ReadRowsAsyncIterator(row_merger) # add idle timeout to clear resources if generator is abandoned @@ -524,20 +538,37 @@ async def read_rows( query: ReadRowsQuery | dict[str, Any], *, operation_timeout: float | None = None, - per_request_timeout: float | None = None, + attempt_timeout: float | None = None, ) -> list[Row]: """ - Helper function that returns a full list instead of a generator + Read a set of rows from the table, based on the specified query. + Retruns results as a list of Row objects when the request is complete. + For streamed results, use read_rows_stream. - See read_rows_stream + Failed requests within operation_timeout will be retried. + Args: + - query: contains details about which rows to return + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + If None, defaults to the Table's default_read_rows_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_read_rows_attempt_timeout, + or the operation_timeout if that is also None. Returns: - - a list of the rows returned by the query + - a list of Rows returned by the query + Raises: + - DeadlineExceeded: raised after operation timeout + will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions + from any retries that failed + - GoogleAPIError: raised if the request encounters an unrecoverable error """ row_generator = await self.read_rows_stream( query, operation_timeout=operation_timeout, - per_request_timeout=per_request_timeout, + attempt_timeout=attempt_timeout, ) results = [row async for row in row_generator] return results @@ -547,18 +578,31 @@ async def read_row( row_key: str | bytes, *, row_filter: RowFilter | None = None, - operation_timeout: int | float | None = 60, - per_request_timeout: int | float | None = None, + operation_timeout: int | float | None = None, + attempt_timeout: int | float | None = None, ) -> Row | None: """ - Helper function to return a single row + Read a single row from the table, based on the specified key. - See read_rows_stream + Failed requests within operation_timeout will be retried. - Raises: - - google.cloud.bigtable.data.exceptions.RowNotFound: if the row does not exist + Args: + - query: contains details about which rows to return + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + If None, defaults to the Table's default_read_rows_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout + if that is also None. Returns: - - the individual row requested, or None if it does not exist + - a Row object if the row exists, otherwise None + Raises: + - DeadlineExceeded: raised after operation timeout + will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions + from any retries that failed + - GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") @@ -566,7 +610,7 @@ async def read_row( results = await self.read_rows( query, operation_timeout=operation_timeout, - per_request_timeout=per_request_timeout, + attempt_timeout=attempt_timeout, ) if len(results) == 0: return None @@ -577,7 +621,7 @@ async def read_rows_sharded( sharded_query: ShardedQuery, *, operation_timeout: int | float | None = None, - per_request_timeout: int | float | None = None, + attempt_timeout: int | float | None = None, ) -> list[Row]: """ Runs a sharded query in parallel, then return the results in a single list. @@ -594,6 +638,14 @@ async def read_rows_sharded( Args: - sharded_query: a sharded query to execute + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + If None, defaults to the Table's default_read_rows_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout + if that is also None. Raises: - ShardedReadRowsExceptionGroup: if any of the queries failed - ValueError: if the query_list is empty @@ -601,10 +653,15 @@ async def read_rows_sharded( if not sharded_query: raise ValueError("empty sharded_query") # reduce operation_timeout between batches - operation_timeout = operation_timeout or self.default_operation_timeout - per_request_timeout = ( - per_request_timeout or self.default_per_request_timeout or operation_timeout + operation_timeout = ( + operation_timeout or self.default_read_rows_operation_timeout + ) + attempt_timeout = ( + attempt_timeout + or self.default_read_rows_attempt_timeout + or operation_timeout ) + _validate_timeouts(operation_timeout, attempt_timeout) timeout_generator = _attempt_timeout_generator( operation_timeout, operation_timeout ) @@ -623,9 +680,7 @@ async def read_rows_sharded( self.read_rows( query, operation_timeout=batch_operation_timeout, - per_request_timeout=min( - per_request_timeout, batch_operation_timeout - ), + attempt_timeout=min(attempt_timeout, batch_operation_timeout), ) for query in batch ] @@ -652,19 +707,33 @@ async def row_exists( self, row_key: str | bytes, *, - operation_timeout: int | float | None = 60, - per_request_timeout: int | float | None = None, + operation_timeout: int | float | None = None, + attempt_timeout: int | float | None = None, ) -> bool: """ - Helper function to determine if a row exists - + Return a boolean indicating whether the specified row exists in the table. uses the filters: chain(limit cells per row = 1, strip value) - + Args: + - row_key: the key of the row to check + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + If None, defaults to the Table's default_read_rows_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_read_rows_attempt_timeout, or the operation_timeout + if that is also None. Returns: - a bool indicating whether the row exists + Raises: + - DeadlineExceeded: raised after operation timeout + will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions + from any retries that failed + - GoogleAPIError: raised if the request encounters an unrecoverable error """ if row_key is None: raise ValueError("row_key must be string or bytes") + strip_filter = StripValueTransformerFilter(flag=True) limit_filter = CellsRowLimitFilter(1) chain_filter = RowFilterChain(filters=[limit_filter, strip_filter]) @@ -672,7 +741,7 @@ async def row_exists( results = await self.read_rows( query, operation_timeout=operation_timeout, - per_request_timeout=per_request_timeout, + attempt_timeout=attempt_timeout, ) return len(results) > 0 @@ -680,7 +749,7 @@ async def sample_row_keys( self, *, operation_timeout: float | None = None, - per_request_timeout: float | None = None, + attempt_timeout: float | None = None, ) -> RowKeySamples: """ Return a set of RowKeySamples that delimit contiguous sections of the table of @@ -693,25 +762,32 @@ async def sample_row_keys( RowKeySamples is simply a type alias for list[tuple[bytes, int]]; a list of row_keys, along with offset positions in the table + Args: + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + If None, defaults to the Table's default_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_attempt_timeout, or the operation_timeout + if that is also None. Returns: - a set of RowKeySamples the delimit contiguous sections of the table Raises: - - GoogleAPICallError: if the sample_row_keys request fails + - DeadlineExceeded: raised after operation timeout + will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions + from any retries that failed + - GoogleAPIError: raised if the request encounters an unrecoverable error """ # prepare timeouts operation_timeout = operation_timeout or self.default_operation_timeout - per_request_timeout = per_request_timeout or self.default_per_request_timeout + attempt_timeout = ( + attempt_timeout or self.default_attempt_timeout or operation_timeout + ) + _validate_timeouts(operation_timeout, attempt_timeout) - if operation_timeout <= 0: - raise ValueError("operation_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout <= 0: - raise ValueError("per_request_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout > operation_timeout: - raise ValueError( - "per_request_timeout must not be greater than operation_timeout" - ) attempt_timeout_gen = _attempt_timeout_generator( - per_request_timeout, operation_timeout + attempt_timeout, operation_timeout ) # prepare retryable predicate = retries.if_exception_type( @@ -761,7 +837,7 @@ def mutations_batcher( flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, batch_operation_timeout: float | None = None, - batch_per_request_timeout: float | None = None, + batch_attempt_timeout: float | None = None, ) -> MutationsBatcherAsync: """ Returns a new mutations batcher instance. @@ -778,9 +854,10 @@ def mutations_batcher( - flow_control_max_mutation_count: Maximum number of inflight mutations. - flow_control_max_bytes: Maximum number of inflight bytes. - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None, - table default_operation_timeout will be used - - batch_per_request_timeout: timeout for each individual request, in seconds. If None, - table default_per_request_timeout will be used + table default_mutate_rows_operation_timeout will be used + - batch_attempt_timeout: timeout for each individual request, in seconds. If None, + table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout + if that is also None. Returns: - a MutationsBatcherAsync context manager that can batch requests """ @@ -792,7 +869,7 @@ def mutations_batcher( flow_control_max_mutation_count=flow_control_max_mutation_count, flow_control_max_bytes=flow_control_max_bytes, batch_operation_timeout=batch_operation_timeout, - batch_per_request_timeout=batch_per_request_timeout, + batch_attempt_timeout=batch_attempt_timeout, ) async def mutate_row( @@ -800,8 +877,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: float | None = 60, - per_request_timeout: float | None = None, + operation_timeout: float | None = None, + attempt_timeout: float | None = None, ): """ Mutates a row atomically. @@ -813,17 +890,16 @@ async def mutate_row( retried on server failure. Non-idempotent operations will not. Args: - - row_key: the row to apply mutations to - - mutations: the set of mutations to apply to the row - - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will be retried within the budget. - time is only counted while actively waiting on the network. - DeadlineExceeded exception raised after timeout - - per_request_timeout: the time budget for an individual network request, - in seconds. If it takes longer than this time to complete, the request - will be cancelled with a DeadlineExceeded exception, and a retry will be - attempted if within operation_timeout budget - + - row_key: the row to apply mutations to + - mutations: the set of mutations to apply to the row + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will be retried within the budget. + If None, defaults to the Table's default_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_attempt_timeout, or the operation_timeout + if that is also None. Raises: - DeadlineExceeded: raised after operation timeout will be chained with a RetryExceptionGroup containing all @@ -832,14 +908,10 @@ async def mutate_row( safely retried. """ operation_timeout = operation_timeout or self.default_operation_timeout - per_request_timeout = per_request_timeout or self.default_per_request_timeout - - if operation_timeout <= 0: - raise ValueError("operation_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout <= 0: - raise ValueError("per_request_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout > operation_timeout: - raise ValueError("per_request_timeout must be less than operation_timeout") + attempt_timeout = ( + attempt_timeout or self.default_attempt_timeout or operation_timeout + ) + _validate_timeouts(operation_timeout, attempt_timeout) if isinstance(row_key, str): row_key = row_key.encode("utf-8") @@ -883,14 +955,16 @@ def on_error_fn(exc): ) metadata = _make_metadata(self.table_name, self.app_profile_id) # trigger rpc - await deadline_wrapped(request, timeout=per_request_timeout, metadata=metadata) + await deadline_wrapped( + request, timeout=attempt_timeout, metadata=metadata, retry=None + ) async def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], *, - operation_timeout: float | None = 60, - per_request_timeout: float | None = None, + operation_timeout: float | None = None, + attempt_timeout: float | None = None, ): """ Applies mutations for multiple rows in a single batched request. @@ -910,32 +984,32 @@ async def bulk_mutate_rows( in arbitrary order - operation_timeout: the time budget for the entire operation, in seconds. Failed requests will be retried within the budget. - time is only counted while actively waiting on the network. - DeadlineExceeded exception raised after timeout - - per_request_timeout: the time budget for an individual network request, - in seconds. If it takes longer than this time to complete, the request - will be cancelled with a DeadlineExceeded exception, and a retry will - be attempted if within operation_timeout budget + If None, defaults to the Table's default_mutate_rows_operation_timeout + - attempt_timeout: the time budget for an individual network request, in seconds. + If it takes longer than this time to complete, the request will be cancelled with + a DeadlineExceeded exception, and a retry will be attempted. + If None, defaults to the Table's default_mutate_rows_attempt_timeout, + or the operation_timeout if that is also None. Raises: - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions """ - operation_timeout = operation_timeout or self.default_operation_timeout - per_request_timeout = per_request_timeout or self.default_per_request_timeout - - if operation_timeout <= 0: - raise ValueError("operation_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout <= 0: - raise ValueError("per_request_timeout must be greater than 0") - if per_request_timeout is not None and per_request_timeout > operation_timeout: - raise ValueError("per_request_timeout must be less than operation_timeout") + operation_timeout = ( + operation_timeout or self.default_mutate_rows_operation_timeout + ) + attempt_timeout = ( + attempt_timeout + or self.default_mutate_rows_attempt_timeout + or operation_timeout + ) + _validate_timeouts(operation_timeout, attempt_timeout) operation = _MutateRowsOperationAsync( self.client._gapic_client, self, mutation_entries, operation_timeout, - per_request_timeout, + attempt_timeout, ) await operation.start() @@ -946,7 +1020,7 @@ async def check_and_mutate_row( *, true_case_mutations: Mutation | list[Mutation] | None = None, false_case_mutations: Mutation | list[Mutation] | None = None, - operation_timeout: int | float | None = 20, + operation_timeout: int | float | None = None, ) -> bool: """ Mutates a row atomically based on the output of a predicate filter @@ -974,7 +1048,8 @@ async def check_and_mutate_row( ones. Must contain at least one entry if `true_case_mutations is empty, and at most 100000. - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will not be retried. + Failed requests will not be retried. Defaults to the Table's default_operation_timeout + if None. Returns: - bool indicating whether the predicate was true or false Raises: @@ -1016,7 +1091,7 @@ async def read_modify_write_row( row_key: str | bytes, rules: ReadModifyWriteRule | list[ReadModifyWriteRule], *, - operation_timeout: int | float | None = 20, + operation_timeout: int | float | None = None, ) -> Row: """ Reads and modifies a row atomically according to input ReadModifyWriteRules, @@ -1032,8 +1107,9 @@ async def read_modify_write_row( - rules: A rule or set of rules to apply to the row. Rules are applied in order, meaning that earlier rules will affect the results of later ones. - - operation_timeout: the time budget for the entire operation, in seconds. - Failed requests will not be retried. + - operation_timeout: the time budget for the entire operation, in seconds. + Failed requests will not be retried. Defaults to the Table's default_operation_timeout + if None. Returns: - Row: containing cell data that was modified as part of the operation diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index 25aafc2a1..e13675ef1 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -23,6 +23,7 @@ from google.cloud.bigtable.data.mutations import RowMutationEntry from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup from google.cloud.bigtable.data.exceptions import FailedMutationEntryError +from google.cloud.bigtable.data._helpers import _validate_timeouts from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync from google.cloud.bigtable.data._async._mutate_rows import ( @@ -189,7 +190,7 @@ def __init__( flow_control_max_mutation_count: int = 100_000, flow_control_max_bytes: int = 100 * _MB_SIZE, batch_operation_timeout: float | None = None, - batch_per_request_timeout: float | None = None, + batch_attempt_timeout: float | None = None, ): """ Args: @@ -203,25 +204,18 @@ def __init__( - flow_control_max_bytes: Maximum number of inflight bytes. - batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None, table default_operation_timeout will be used - - batch_per_request_timeout: timeout for each individual request, in seconds. If None, - table default_per_request_timeout will be used + - batch_attempt_timeout: timeout for each individual request, in seconds. If None, + table default_attempt_timeout will be used """ self._operation_timeout: float = ( - batch_operation_timeout or table.default_operation_timeout + batch_operation_timeout or table.default_mutate_rows_operation_timeout ) - self._per_request_timeout: float = ( - batch_per_request_timeout - or table.default_per_request_timeout + self._attempt_timeout: float = ( + batch_attempt_timeout + or table.default_mutate_rows_attempt_timeout or self._operation_timeout ) - if self._operation_timeout <= 0: - raise ValueError("batch_operation_timeout must be greater than 0") - if self._per_request_timeout <= 0: - raise ValueError("batch_per_request_timeout must be greater than 0") - if self._per_request_timeout > self._operation_timeout: - raise ValueError( - "batch_per_request_timeout must be less than batch_operation_timeout" - ) + _validate_timeouts(self._operation_timeout, self._attempt_timeout) self.closed: bool = False self._table = table self._staged_entries: list[RowMutationEntry] = [] @@ -346,7 +340,7 @@ async def _execute_mutate_rows( Args: - batch: list of RowMutationEntry objects to send to server - - timeout: timeout in seconds. Used as operation_timeout and per_request_timeout. + - timeout: timeout in seconds. Used as operation_timeout and attempt_timeout. If not given, will use table defaults Returns: - list of FailedMutationEntryError objects for mutations that failed. @@ -361,7 +355,7 @@ async def _execute_mutate_rows( self._table, batch, operation_timeout=self._operation_timeout, - per_request_timeout=self._per_request_timeout, + attempt_timeout=self._attempt_timeout, ) await operation.start() except MutationsExceptionGroup as e: diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 64d91e108..ab816f9a7 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -109,3 +109,30 @@ def wrapper(*args, **kwargs): handle_error() return wrapper_async if iscoroutinefunction(func) else wrapper + + +def _validate_timeouts( + operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False +): + """ + Helper function that will verify that timeout values are valid, and raise + an exception if they are not. + + Args: + - operation_timeout: The timeout value to use for the entire operation, in seconds. + - attempt_timeout: The timeout value to use for each attempt, in seconds. + - allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception. + Raises: + - ValueError if operation_timeout or attempt_timeout are invalid. + """ + if operation_timeout <= 0: + raise ValueError("operation_timeout must be greater than 0") + if not allow_none and attempt_timeout is None: + raise ValueError("attempt_timeout must not be None") + elif attempt_timeout is not None: + if attempt_timeout <= 0: + raise ValueError("attempt_timeout must be greater than 0") + if attempt_timeout > operation_timeout: + raise ValueError( + "attempt_timeout must not be greater than operation_timeout" + ) diff --git a/google/cloud/bigtable/data/exceptions.py b/google/cloud/bigtable/data/exceptions.py index 9b6b4fe3f..6933d4a2a 100644 --- a/google/cloud/bigtable/data/exceptions.py +++ b/google/cloud/bigtable/data/exceptions.py @@ -74,7 +74,27 @@ def __init__(self, message, excs): if len(excs) == 0: raise ValueError("exceptions must be a non-empty sequence") self.exceptions = tuple(excs) - super().__init__(message) + # simulate an exception group in Python < 3.11 by adding exception info + # to the message + first_line = "--+---------------- 1 ----------------" + last_line = "+------------------------------------" + message_parts = [message + "\n" + first_line] + for idx, e in enumerate(excs[:15]): + if idx != 0: + message_parts.append(f"+---------------- {str(idx+1).rjust(2)} ----------------") + cause = e.__cause__ + if cause is not None: + message_parts.extend(f"| {type(cause).__name__}: {cause}".splitlines()) + message_parts.append("| ") + message_parts.append("| The above exception was the direct cause of the following exception:") + message_parts.append("| ") + message_parts.extend(f"| {type(e).__name__}: {e}".splitlines()) + if len(excs) > 15: + message_parts.append("+---------------- ... ---------------") + message_parts.append(f"| and {len(excs) - 15} more") + if last_line not in message_parts[-1]: + message_parts.append(last_line) + super().__init__("\n ".join(message_parts)) def __new__(cls, message, excs): if is_311_plus: @@ -83,11 +103,19 @@ def __new__(cls, message, excs): return super().__new__(cls) def __str__(self): + if is_311_plus: + # don't return built-in sub-exception message + return self.args[0] + return super().__str__() + + def __repr__(self): """ - String representation doesn't display sub-exceptions. Subexceptions are - described in message + repr representation should strip out sub-exception details """ - return self.args[0] + if is_311_plus: + return super().__repr__() + message = self.args[0].split("\n")[0] + return f"{self.__class__.__name__}({message!r}, {self.exceptions!r})" class MutationsExceptionGroup(_BigtableExceptionGroup): @@ -200,14 +228,12 @@ def __init__( idempotent_msg = ( "idempotent" if failed_mutation_entry.is_idempotent() else "non-idempotent" ) - index_msg = f" at index {failed_idx} " if failed_idx is not None else " " - message = ( - f"Failed {idempotent_msg} mutation entry{index_msg}with cause: {cause!r}" - ) + index_msg = f" at index {failed_idx}" if failed_idx is not None else "" + message = f"Failed {idempotent_msg} mutation entry{index_msg}" super().__init__(message) + self.__cause__ = cause self.index = failed_idx self.entry = failed_mutation_entry - self.__cause__ = cause class RetryExceptionGroup(_BigtableExceptionGroup): @@ -217,10 +243,8 @@ class RetryExceptionGroup(_BigtableExceptionGroup): def _format_message(excs: list[Exception]): if len(excs) == 0: return "No exceptions" - if len(excs) == 1: - return f"1 failed attempt: {type(excs[0]).__name__}" - else: - return f"{len(excs)} failed attempts. Latest: {type(excs[-1]).__name__}" + plural = "s" if len(excs) > 1 else "" + return f"{len(excs)} failed attempt{plural}" def __init__(self, excs: list[Exception]): super().__init__(self._format_message(excs), excs) @@ -268,8 +292,8 @@ def __init__( failed_query: "ReadRowsQuery" | dict[str, Any], cause: Exception, ): - message = f"Failed query at index {failed_index} with cause: {cause!r}" + message = f"Failed query at index {failed_index}" super().__init__(message) + self.__cause__ = cause self.index = failed_index self.query = failed_query - self.__cause__ = cause diff --git a/noxfile.py b/noxfile.py index 8499a610f..16447778e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -366,10 +366,9 @@ def docfx(session): session.install("-e", ".") session.install( - "sphinx==4.0.1", + "gcp-sphinx-docfx-yaml", "alabaster", "recommonmark", - "gcp-sphinx-docfx-yaml", ) shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True) diff --git a/tests/unit/data/_async/test__mutate_rows.py b/tests/unit/data/_async/test__mutate_rows.py index f77455d60..9bebd35e6 100644 --- a/tests/unit/data/_async/test__mutate_rows.py +++ b/tests/unit/data/_async/test__mutate_rows.py @@ -48,7 +48,7 @@ def _make_one(self, *args, **kwargs): kwargs["table"] = kwargs.pop("table", AsyncMock()) kwargs["mutation_entries"] = kwargs.pop("mutation_entries", []) kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5) - kwargs["per_request_timeout"] = kwargs.pop("per_request_timeout", 0.1) + kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1) return self._target_class()(*args, **kwargs) async def _mock_stream(self, mutation_list, error_dict): @@ -267,7 +267,7 @@ async def test_run_attempt_single_entry_success(self): mock_gapic_fn = self._make_mock_gapic({0: mutation}) instance = self._make_one( mutation_entries=[mutation], - per_request_timeout=expected_timeout, + attempt_timeout=expected_timeout, ) with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn): await instance._run_attempt() diff --git a/tests/unit/data/_async/test__read_rows.py b/tests/unit/data/_async/test__read_rows.py index c7d52280c..76e1148de 100644 --- a/tests/unit/data/_async/test__read_rows.py +++ b/tests/unit/data/_async/test__read_rows.py @@ -89,7 +89,7 @@ def test_ctor(self): request, client, operation_timeout=expected_operation_timeout, - per_request_timeout=expected_request_timeout, + attempt_timeout=expected_request_timeout, ) assert time_gen_mock.call_count == 1 time_gen_mock.assert_called_once_with( diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 25006d725..336cf961f 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -974,7 +974,11 @@ async def test_table_ctor(self): expected_instance_id = "instance-id" expected_app_profile_id = "app-profile-id" expected_operation_timeout = 123 - expected_per_request_timeout = 12 + expected_attempt_timeout = 12 + expected_read_rows_operation_timeout = 1.5 + expected_read_rows_attempt_timeout = 0.5 + expected_mutate_rows_operation_timeout = 2.5 + expected_mutate_rows_attempt_timeout = 0.75 client = BigtableDataClientAsync() assert not client._active_instances @@ -984,7 +988,11 @@ async def test_table_ctor(self): expected_table_id, expected_app_profile_id, default_operation_timeout=expected_operation_timeout, - default_per_request_timeout=expected_per_request_timeout, + default_attempt_timeout=expected_attempt_timeout, + default_read_rows_operation_timeout=expected_read_rows_operation_timeout, + default_read_rows_attempt_timeout=expected_read_rows_attempt_timeout, + default_mutate_rows_operation_timeout=expected_mutate_rows_operation_timeout, + default_mutate_rows_attempt_timeout=expected_mutate_rows_attempt_timeout, ) await asyncio.sleep(0) assert table.table_id == expected_table_id @@ -997,7 +1005,23 @@ async def test_table_ctor(self): assert instance_key in client._active_instances assert client._instance_owners[instance_key] == {id(table)} assert table.default_operation_timeout == expected_operation_timeout - assert table.default_per_request_timeout == expected_per_request_timeout + assert table.default_attempt_timeout == expected_attempt_timeout + assert ( + table.default_read_rows_operation_timeout + == expected_read_rows_operation_timeout + ) + assert ( + table.default_read_rows_attempt_timeout + == expected_read_rows_attempt_timeout + ) + assert ( + table.default_mutate_rows_operation_timeout + == expected_mutate_rows_operation_timeout + ) + assert ( + table.default_mutate_rows_attempt_timeout + == expected_mutate_rows_attempt_timeout + ) # ensure task reaches completion await table._register_instance_task assert table._register_instance_task.done() @@ -1006,30 +1030,74 @@ async def test_table_ctor(self): await client.close() @pytest.mark.asyncio - async def test_table_ctor_bad_timeout_values(self): + async def test_table_ctor_defaults(self): + """ + should provide default timeout values and app_profile_id + """ from google.cloud.bigtable.data._async.client import BigtableDataClientAsync from google.cloud.bigtable.data._async.client import TableAsync + expected_table_id = "table-id" + expected_instance_id = "instance-id" client = BigtableDataClientAsync() + assert not client._active_instances - with pytest.raises(ValueError) as e: - TableAsync(client, "", "", default_per_request_timeout=-1) - assert "default_per_request_timeout must be greater than 0" in str(e.value) - with pytest.raises(ValueError) as e: - TableAsync(client, "", "", default_operation_timeout=-1) - assert "default_operation_timeout must be greater than 0" in str(e.value) - with pytest.raises(ValueError) as e: - TableAsync( - client, - "", - "", - default_operation_timeout=1, - default_per_request_timeout=2, - ) - assert ( - "default_per_request_timeout must be less than default_operation_timeout" - in str(e.value) + table = TableAsync( + client, + expected_instance_id, + expected_table_id, ) + await asyncio.sleep(0) + assert table.table_id == expected_table_id + assert table.instance_id == expected_instance_id + assert table.app_profile_id is None + assert table.client is client + assert table.default_operation_timeout == 60 + assert table.default_read_rows_operation_timeout == 600 + assert table.default_mutate_rows_operation_timeout == 600 + assert table.default_attempt_timeout is None + assert table.default_read_rows_attempt_timeout is None + assert table.default_mutate_rows_attempt_timeout is None + await client.close() + + @pytest.mark.asyncio + async def test_table_ctor_invalid_timeout_values(self): + """ + bad timeout values should raise ValueError + """ + from google.cloud.bigtable.data._async.client import BigtableDataClientAsync + from google.cloud.bigtable.data._async.client import TableAsync + + client = BigtableDataClientAsync() + + timeout_pairs = [ + ("default_operation_timeout", "default_attempt_timeout"), + ( + "default_read_rows_operation_timeout", + "default_read_rows_attempt_timeout", + ), + ( + "default_mutate_rows_operation_timeout", + "default_mutate_rows_attempt_timeout", + ), + ] + for operation_timeout, attempt_timeout in timeout_pairs: + with pytest.raises(ValueError) as e: + TableAsync(client, "", "", **{attempt_timeout: -1}) + assert "attempt_timeout must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + TableAsync(client, "", "", **{operation_timeout: -1}) + assert "operation_timeout must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + TableAsync( + client, + "", + "", + **{operation_timeout: 1, attempt_timeout: 2}, + ) + assert "attempt_timeout must not be greater than operation_timeout" in str( + e.value + ) await client.close() def test_table_ctor_sync(self): @@ -1240,15 +1308,15 @@ async def test_read_rows_timeout(self, operation_timeout): ], ) @pytest.mark.asyncio - async def test_read_rows_per_request_timeout( + async def test_read_rows_attempt_timeout( self, per_request_t, operation_t, expected_num ): """ - Ensures that the per_request_timeout is respected and that the number of + Ensures that the attempt_timeout is respected and that the number of requests is as expected. operation_timeout does not cancel the request, so we expect the number of - requests to be the ceiling of operation_timeout / per_request_timeout. + requests to be the ceiling of operation_timeout / attempt_timeout. """ from google.cloud.bigtable.data.exceptions import RetryExceptionGroup @@ -1268,7 +1336,7 @@ async def test_read_rows_per_request_timeout( await table.read_rows( query, operation_timeout=operation_t, - per_request_timeout=per_request_t, + attempt_timeout=per_request_t, ) except core_exceptions.DeadlineExceeded as e: retry_exc = e.__cause__ @@ -1437,12 +1505,12 @@ async def test_read_rows_default_timeouts(self): from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync operation_timeout = 8 - per_request_timeout = 4 + attempt_timeout = 4 with mock.patch.object(_ReadRowsOperationAsync, "__init__") as mock_op: mock_op.side_effect = RuntimeError("mock error") async with self._make_table( - default_operation_timeout=operation_timeout, - default_per_request_timeout=per_request_timeout, + default_read_rows_operation_timeout=operation_timeout, + default_read_rows_attempt_timeout=attempt_timeout, ) as table: try: await table.read_rows(ReadRowsQuery()) @@ -1450,7 +1518,7 @@ async def test_read_rows_default_timeouts(self): pass kwargs = mock_op.call_args_list[0].kwargs assert kwargs["operation_timeout"] == operation_timeout - assert kwargs["per_request_timeout"] == per_request_timeout + assert kwargs["attempt_timeout"] == attempt_timeout @pytest.mark.asyncio async def test_read_rows_default_timeout_override(self): @@ -1460,23 +1528,23 @@ async def test_read_rows_default_timeout_override(self): from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync operation_timeout = 8 - per_request_timeout = 4 + attempt_timeout = 4 with mock.patch.object(_ReadRowsOperationAsync, "__init__") as mock_op: mock_op.side_effect = RuntimeError("mock error") async with self._make_table( - default_operation_timeout=99, default_per_request_timeout=97 + default_operation_timeout=99, default_attempt_timeout=97 ) as table: try: await table.read_rows( ReadRowsQuery(), operation_timeout=operation_timeout, - per_request_timeout=per_request_timeout, + attempt_timeout=attempt_timeout, ) except RuntimeError: pass kwargs = mock_op.call_args_list[0].kwargs assert kwargs["operation_timeout"] == operation_timeout - assert kwargs["per_request_timeout"] == per_request_timeout + assert kwargs["attempt_timeout"] == attempt_timeout @pytest.mark.asyncio async def test_read_row(self): @@ -1492,13 +1560,13 @@ async def test_read_row(self): row = await table.read_row( row_key, operation_timeout=expected_op_timeout, - per_request_timeout=expected_req_timeout, + attempt_timeout=expected_req_timeout, ) assert row == expected_result assert read_rows.call_count == 1 args, kwargs = read_rows.call_args_list[0] assert kwargs["operation_timeout"] == expected_op_timeout - assert kwargs["per_request_timeout"] == expected_req_timeout + assert kwargs["attempt_timeout"] == expected_req_timeout assert len(args) == 1 assert isinstance(args[0], ReadRowsQuery) assert args[0]._to_dict() == { @@ -1523,14 +1591,14 @@ async def test_read_row_w_filter(self): row = await table.read_row( row_key, operation_timeout=expected_op_timeout, - per_request_timeout=expected_req_timeout, + attempt_timeout=expected_req_timeout, row_filter=expected_filter, ) assert row == expected_result assert read_rows.call_count == 1 args, kwargs = read_rows.call_args_list[0] assert kwargs["operation_timeout"] == expected_op_timeout - assert kwargs["per_request_timeout"] == expected_req_timeout + assert kwargs["attempt_timeout"] == expected_req_timeout assert len(args) == 1 assert isinstance(args[0], ReadRowsQuery) assert args[0]._to_dict() == { @@ -1553,13 +1621,13 @@ async def test_read_row_no_response(self): result = await table.read_row( row_key, operation_timeout=expected_op_timeout, - per_request_timeout=expected_req_timeout, + attempt_timeout=expected_req_timeout, ) assert result is None assert read_rows.call_count == 1 args, kwargs = read_rows.call_args_list[0] assert kwargs["operation_timeout"] == expected_op_timeout - assert kwargs["per_request_timeout"] == expected_req_timeout + assert kwargs["attempt_timeout"] == expected_req_timeout assert isinstance(args[0], ReadRowsQuery) assert args[0]._to_dict() == { "rows": {"row_keys": [row_key], "row_ranges": []}, @@ -1598,13 +1666,13 @@ async def test_row_exists(self, return_value, expected_result): result = await table.row_exists( row_key, operation_timeout=expected_op_timeout, - per_request_timeout=expected_req_timeout, + attempt_timeout=expected_req_timeout, ) assert expected_result == result assert read_rows.call_count == 1 args, kwargs = read_rows.call_args_list[0] assert kwargs["operation_timeout"] == expected_op_timeout - assert kwargs["per_request_timeout"] == expected_req_timeout + assert kwargs["attempt_timeout"] == expected_req_timeout assert isinstance(args[0], ReadRowsQuery) expected_filter = { "chain": { @@ -1798,9 +1866,9 @@ async def test_read_rows_sharded_batching(self): table_mock = AsyncMock() start_operation_timeout = 10 - start_per_request_timeout = 3 - table_mock.default_operation_timeout = start_operation_timeout - table_mock.default_per_request_timeout = start_per_request_timeout + start_attempt_timeout = 3 + table_mock.default_read_rows_operation_timeout = start_operation_timeout + table_mock.default_read_rows_attempt_timeout = start_attempt_timeout # clock ticks one second on each check with mock.patch("time.monotonic", side_effect=range(0, 100000)): with mock.patch("asyncio.gather", AsyncMock()) as gather_mock: @@ -1829,14 +1897,11 @@ async def test_read_rows_sharded_batching(self): req_kwargs["operation_timeout"] == expected_operation_timeout ) - # each per_request_timeout should start with default value, but decrease when operation_timeout reaches it - expected_per_request_timeout = min( - start_per_request_timeout, expected_operation_timeout - ) - assert ( - req_kwargs["per_request_timeout"] - == expected_per_request_timeout + # each attempt_timeout should start with default value, but decrease when operation_timeout reaches it + expected_attempt_timeout = min( + start_attempt_timeout, expected_operation_timeout ) + assert req_kwargs["attempt_timeout"] == expected_attempt_timeout # await all created coroutines to avoid warnings for i in range(len(gather_mock.call_args_list)): for j in range(len(gather_mock.call_args_list[i][0])): @@ -1891,14 +1956,14 @@ async def test_sample_row_keys_bad_timeout(self): await table.sample_row_keys(operation_timeout=-1) assert "operation_timeout must be greater than 0" in str(e.value) with pytest.raises(ValueError) as e: - await table.sample_row_keys(per_request_timeout=-1) - assert "per_request_timeout must be greater than 0" in str(e.value) + await table.sample_row_keys(attempt_timeout=-1) + assert "attempt_timeout must be greater than 0" in str(e.value) with pytest.raises(ValueError) as e: await table.sample_row_keys( - operation_timeout=10, per_request_timeout=20 + operation_timeout=10, attempt_timeout=20 ) assert ( - "per_request_timeout must not be greater than operation_timeout" + "attempt_timeout must not be greater than operation_timeout" in str(e.value) ) @@ -1936,7 +2001,7 @@ async def test_sample_row_keys_gapic_params(self): table.client._gapic_client, "sample_row_keys", AsyncMock() ) as sample_row_keys: sample_row_keys.return_value = self._make_gapic_stream([]) - await table.sample_row_keys(per_request_timeout=expected_timeout) + await table.sample_row_keys(attempt_timeout=expected_timeout) args, kwargs = sample_row_keys.call_args assert len(args) == 0 assert len(kwargs) == 4 @@ -2049,7 +2114,7 @@ def _make_client(self, *args, **kwargs): ) async def test_mutate_row(self, mutation_arg): """Test mutations with no errors""" - expected_per_request_timeout = 19 + expected_attempt_timeout = 19 async with self._make_client(project="project") as client: async with client.get_table("instance", "table") as table: with mock.patch.object( @@ -2059,9 +2124,10 @@ async def test_mutate_row(self, mutation_arg): await table.mutate_row( "row_key", mutation_arg, - per_request_timeout=expected_per_request_timeout, + attempt_timeout=expected_attempt_timeout, ) assert mock_gapic.call_count == 1 + kwargs = mock_gapic.call_args_list[0].kwargs request = mock_gapic.call_args[0][0] assert ( request["table_name"] @@ -2074,8 +2140,9 @@ async def test_mutate_row(self, mutation_arg): else [mutation_arg._to_dict()] ) assert request["mutations"] == formatted_mutations - found_per_request_timeout = mock_gapic.call_args[1]["timeout"] - assert found_per_request_timeout == expected_per_request_timeout + assert kwargs["timeout"] == expected_attempt_timeout + # make sure gapic layer is not retrying + assert kwargs["retry"] is None @pytest.mark.parametrize( "retryable_exception", @@ -2243,7 +2310,7 @@ async def generator(): ) async def test_bulk_mutate_rows(self, mutation_arg): """Test mutations with no errors""" - expected_per_request_timeout = 19 + expected_attempt_timeout = 19 async with self._make_client(project="project") as client: async with client.get_table("instance", "table") as table: with mock.patch.object( @@ -2253,7 +2320,7 @@ async def test_bulk_mutate_rows(self, mutation_arg): bulk_mutation = mutations.RowMutationEntry(b"row_key", mutation_arg) await table.bulk_mutate_rows( [bulk_mutation], - per_request_timeout=expected_per_request_timeout, + attempt_timeout=expected_attempt_timeout, ) assert mock_gapic.call_count == 1 kwargs = mock_gapic.call_args[1] @@ -2262,7 +2329,7 @@ async def test_bulk_mutate_rows(self, mutation_arg): == "projects/project/instances/instance/tables/table" ) assert kwargs["entries"] == [bulk_mutation._to_dict()] - assert kwargs["timeout"] == expected_per_request_timeout + assert kwargs["timeout"] == expected_attempt_timeout @pytest.mark.asyncio async def test_bulk_mutate_rows_multiple_entries(self): diff --git a/tests/unit/data/_async/test_mutations_batcher.py b/tests/unit/data/_async/test_mutations_batcher.py index 1b14cc128..25492c4e2 100644 --- a/tests/unit/data/_async/test_mutations_batcher.py +++ b/tests/unit/data/_async/test_mutations_batcher.py @@ -288,8 +288,8 @@ def _get_target_class(self): def _make_one(self, table=None, **kwargs): if table is None: table = mock.Mock() - table.default_operation_timeout = 10 - table.default_per_request_timeout = 10 + table.default_mutate_rows_operation_timeout = 10 + table.default_mutate_rows_attempt_timeout = 10 return self._get_target_class()(table, **kwargs) @@ -300,8 +300,8 @@ def _make_one(self, table=None, **kwargs): async def test_ctor_defaults(self, flush_timer_mock): flush_timer_mock.return_value = asyncio.create_task(asyncio.sleep(0)) table = mock.Mock() - table.default_operation_timeout = 10 - table.default_per_request_timeout = 8 + table.default_mutate_rows_operation_timeout = 10 + table.default_mutate_rows_attempt_timeout = 8 async with self._make_one(table) as instance: assert instance._table == table assert instance.closed is False @@ -316,8 +316,13 @@ async def test_ctor_defaults(self, flush_timer_mock): assert instance._flow_control._in_flight_mutation_count == 0 assert instance._flow_control._in_flight_mutation_bytes == 0 assert instance._entries_processed_since_last_raise == 0 - assert instance._operation_timeout == table.default_operation_timeout - assert instance._per_request_timeout == table.default_per_request_timeout + assert ( + instance._operation_timeout + == table.default_mutate_rows_operation_timeout + ) + assert ( + instance._attempt_timeout == table.default_mutate_rows_attempt_timeout + ) await asyncio.sleep(0) assert flush_timer_mock.call_count == 1 assert flush_timer_mock.call_args[0][0] == 5 @@ -337,7 +342,7 @@ async def test_ctor_explicit(self, flush_timer_mock): flow_control_max_mutation_count = 1001 flow_control_max_bytes = 12 operation_timeout = 11 - per_request_timeout = 2 + attempt_timeout = 2 async with self._make_one( table, flush_interval=flush_interval, @@ -346,7 +351,7 @@ async def test_ctor_explicit(self, flush_timer_mock): flow_control_max_mutation_count=flow_control_max_mutation_count, flow_control_max_bytes=flow_control_max_bytes, batch_operation_timeout=operation_timeout, - batch_per_request_timeout=per_request_timeout, + batch_attempt_timeout=attempt_timeout, ) as instance: assert instance._table == table assert instance.closed is False @@ -365,7 +370,7 @@ async def test_ctor_explicit(self, flush_timer_mock): assert instance._flow_control._in_flight_mutation_bytes == 0 assert instance._entries_processed_since_last_raise == 0 assert instance._operation_timeout == operation_timeout - assert instance._per_request_timeout == per_request_timeout + assert instance._attempt_timeout == attempt_timeout await asyncio.sleep(0) assert flush_timer_mock.call_count == 1 assert flush_timer_mock.call_args[0][0] == flush_interval @@ -379,8 +384,8 @@ async def test_ctor_no_flush_limits(self, flush_timer_mock): """Test with None for flush limits""" flush_timer_mock.return_value = asyncio.create_task(asyncio.sleep(0)) table = mock.Mock() - table.default_operation_timeout = 10 - table.default_per_request_timeout = 8 + table.default_mutate_rows_operation_timeout = 10 + table.default_mutate_rows_attempt_timeout = 8 flush_interval = None flush_limit_count = None flush_limit_bytes = None @@ -410,15 +415,14 @@ async def test_ctor_invalid_values(self): """Test that timeout values are positive, and fit within expected limits""" with pytest.raises(ValueError) as e: self._make_one(batch_operation_timeout=-1) - assert "batch_operation_timeout must be greater than 0" in str(e.value) + assert "operation_timeout must be greater than 0" in str(e.value) with pytest.raises(ValueError) as e: - self._make_one(batch_per_request_timeout=-1) - assert "batch_per_request_timeout must be greater than 0" in str(e.value) + self._make_one(batch_attempt_timeout=-1) + assert "attempt_timeout must be greater than 0" in str(e.value) with pytest.raises(ValueError) as e: - self._make_one(batch_operation_timeout=1, batch_per_request_timeout=2) - assert ( - "batch_per_request_timeout must be less than batch_operation_timeout" - in str(e.value) + self._make_one(batch_operation_timeout=1, batch_attempt_timeout=2) + assert "attempt_timeout must not be greater than operation_timeout" in str( + e.value ) def test_default_argument_consistency(self): @@ -857,7 +861,7 @@ async def test_timer_flush_end_to_end(self): async with self._make_one(flush_interval=0.05) as instance: instance._table.default_operation_timeout = 10 - instance._table.default_per_request_timeout = 9 + instance._table.default_attempt_timeout = 9 with mock.patch.object( instance._table.client._gapic_client, "mutate_rows" ) as gapic_mock: @@ -881,8 +885,8 @@ async def test__execute_mutate_rows(self, mutate_rows): table = mock.Mock() table.table_name = "test-table" table.app_profile_id = "test-app-profile" - table.default_operation_timeout = 17 - table.default_per_request_timeout = 13 + table.default_mutate_rows_operation_timeout = 17 + table.default_mutate_rows_attempt_timeout = 13 async with self._make_one(table) as instance: batch = [_make_mutation()] result = await instance._execute_mutate_rows(batch) @@ -892,7 +896,7 @@ async def test__execute_mutate_rows(self, mutate_rows): assert args[1] == table assert args[2] == batch kwargs["operation_timeout"] == 17 - kwargs["per_request_timeout"] == 13 + kwargs["attempt_timeout"] == 13 assert result == [] @pytest.mark.asyncio @@ -910,8 +914,8 @@ async def test__execute_mutate_rows_returns_errors(self, mutate_rows): err2 = FailedMutationEntryError(1, mock.Mock(), RuntimeError("test error")) mutate_rows.side_effect = MutationsExceptionGroup([err1, err2], 10) table = mock.Mock() - table.default_operation_timeout = 17 - table.default_per_request_timeout = 13 + table.default_mutate_rows_operation_timeout = 17 + table.default_mutate_rows_attempt_timeout = 13 async with self._make_one(table) as instance: batch = [_make_mutation()] result = await instance._execute_mutate_rows(batch) @@ -1026,24 +1030,24 @@ async def test_atexit_registration(self): ) async def test_timeout_args_passed(self, mutate_rows): """ - batch_operation_timeout and batch_per_request_timeout should be used + batch_operation_timeout and batch_attempt_timeout should be used in api calls """ mutate_rows.return_value = AsyncMock() expected_operation_timeout = 17 - expected_per_request_timeout = 13 + expected_attempt_timeout = 13 async with self._make_one( batch_operation_timeout=expected_operation_timeout, - batch_per_request_timeout=expected_per_request_timeout, + batch_attempt_timeout=expected_attempt_timeout, ) as instance: assert instance._operation_timeout == expected_operation_timeout - assert instance._per_request_timeout == expected_per_request_timeout + assert instance._attempt_timeout == expected_attempt_timeout # make simulated gapic call await instance._execute_mutate_rows([_make_mutation()]) assert mutate_rows.call_count == 1 kwargs = mutate_rows.call_args[1] assert kwargs["operation_timeout"] == expected_operation_timeout - assert kwargs["per_request_timeout"] == expected_per_request_timeout + assert kwargs["attempt_timeout"] == expected_attempt_timeout @pytest.mark.parametrize( "limit,in_e,start_e,end_e", diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index dc688bb0c..1671bf6f7 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -143,3 +143,48 @@ async def test_func(): assert isinstance(cause, bigtable_exceptions.RetryExceptionGroup) assert cause.exceptions == tuple(associated_errors) assert f"operation_timeout of {timeout}s exceeded" in str(e.value) + + +class TestValidateTimeouts: + def test_validate_timeouts_error_messages(self): + with pytest.raises(ValueError) as e: + _helpers._validate_timeouts(operation_timeout=1, attempt_timeout=-1) + assert "attempt_timeout must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + _helpers._validate_timeouts(operation_timeout=-1, attempt_timeout=1) + assert "operation_timeout must be greater than 0" in str(e.value) + with pytest.raises(ValueError) as e: + _helpers._validate_timeouts(operation_timeout=1, attempt_timeout=2) + assert "attempt_timeout must not be greater than operation_timeout" in str( + e.value + ) + + @pytest.mark.parametrize( + "args,expected", + [ + ([1, None, False], False), + ([1, None, True], True), + ([1, 1, False], True), + ([1, 1, True], True), + ([1, 1], True), + ([1, None], False), + ([2, 1], True), + ([1, 2], False), + ([0, 1], False), + ([1, 0], False), + ([60, None], False), + ([600, None], False), + ([600, 600], True), + ], + ) + def test_validate_with_inputs(self, args, expected): + """ + test whether an exception is thrown with different inputs + """ + success = False + try: + _helpers._validate_timeouts(*args) + success = True + except ValueError: + pass + assert success == expected diff --git a/tests/unit/data/test_exceptions.py b/tests/unit/data/test_exceptions.py index 9d1145e36..da543ce6e 100644 --- a/tests/unit/data/test_exceptions.py +++ b/tests/unit/data/test_exceptions.py @@ -25,57 +25,68 @@ import mock # type: ignore -class TestBigtableExceptionGroup: +class TracebackTests311: """ - Subclass for MutationsExceptionGroup, RetryExceptionGroup, and ShardedReadRowsExceptionGroup + Provides a set of tests that should be run on python 3.11 and above, + to verify that the exception traceback looks as expected """ - def _get_class(self): - from google.cloud.bigtable.data.exceptions import _BigtableExceptionGroup - - return _BigtableExceptionGroup - - def _make_one(self, message="test_message", excs=None): - if excs is None: - excs = [RuntimeError("mock")] - - return self._get_class()(message, excs=excs) - - def test_raise(self): + @pytest.mark.skipif( + sys.version_info < (3, 11), reason="requires python3.11 or higher" + ) + def test_311_traceback(self): """ - Create exception in raise statement, which calls __new__ and __init__ + Exception customizations should not break rich exception group traceback in python 3.11 """ - test_msg = "test message" - test_excs = [Exception(test_msg)] - with pytest.raises(self._get_class()) as e: - raise self._get_class()(test_msg, test_excs) - assert str(e.value) == test_msg - assert list(e.value.exceptions) == test_excs + import traceback - def test_raise_empty_list(self): - """ - Empty exception lists are not supported - """ - with pytest.raises(ValueError) as e: - raise self._make_one(excs=[]) - assert "non-empty sequence" in str(e.value) + sub_exc1 = RuntimeError("first sub exception") + sub_exc2 = ZeroDivisionError("second sub exception") + sub_group = self._make_one(excs=[sub_exc2]) + exc_group = self._make_one(excs=[sub_exc1, sub_group]) + + expected_traceback = ( + f" | google.cloud.bigtable.data.exceptions.{type(exc_group).__name__}: {str(exc_group)}", + " +-+---------------- 1 ----------------", + " | RuntimeError: first sub exception", + " +---------------- 2 ----------------", + f" | google.cloud.bigtable.data.exceptions.{type(sub_group).__name__}: {str(sub_group)}", + " +-+---------------- 1 ----------------", + " | ZeroDivisionError: second sub exception", + " +------------------------------------", + ) + exception_caught = False + try: + raise exc_group + except self._get_class(): + exception_caught = True + tb = traceback.format_exc() + tb_relevant_lines = tuple(tb.splitlines()[3:]) + assert expected_traceback == tb_relevant_lines + assert exception_caught @pytest.mark.skipif( sys.version_info < (3, 11), reason="requires python3.11 or higher" ) - def test_311_traceback(self): + def test_311_traceback_with_cause(self): """ - Exception customizations should not break rich exception group traceback in python 3.11 + traceback should display nicely with sub-exceptions with __cause__ set """ import traceback sub_exc1 = RuntimeError("first sub exception") + cause_exc = ImportError("cause exception") + sub_exc1.__cause__ = cause_exc sub_exc2 = ZeroDivisionError("second sub exception") exc_group = self._make_one(excs=[sub_exc1, sub_exc2]) expected_traceback = ( - f" | google.cloud.bigtable.data.exceptions.{type(exc_group).__name__}: {str(exc_group)}", + f" | google.cloud.bigtable.data.exceptions.{type(exc_group).__name__}: {str(exc_group)}", " +-+---------------- 1 ----------------", + " | ImportError: cause exception", + " | ", + " | The above exception was the direct cause of the following exception:", + " | ", " | RuntimeError: first sub exception", " +---------------- 2 ----------------", " | ZeroDivisionError: second sub exception", @@ -105,6 +116,124 @@ def test_311_exception_group(self): assert runtime_error.exceptions[0] == exceptions[0] assert others.exceptions[0] == exceptions[1] + +class TracebackTests310: + """ + Provides a set of tests that should be run on python 3.10 and under, + to verify that the exception traceback looks as expected + """ + + @pytest.mark.skipif( + sys.version_info >= (3, 11), reason="requires python3.10 or lower" + ) + def test_310_traceback(self): + """ + Exception customizations should not break rich exception group traceback in python 3.10 + """ + import traceback + + sub_exc1 = RuntimeError("first sub exception") + sub_exc2 = ZeroDivisionError("second sub exception") + sub_group = self._make_one(excs=[sub_exc2]) + exc_group = self._make_one(excs=[sub_exc1, sub_group]) + found_message = str(exc_group).splitlines()[0] + found_sub_message = str(sub_group).splitlines()[0] + + expected_traceback = ( + f"google.cloud.bigtable.data.exceptions.{type(exc_group).__name__}: {found_message}", + "--+---------------- 1 ----------------", + " | RuntimeError: first sub exception", + " +---------------- 2 ----------------", + f" | {type(sub_group).__name__}: {found_sub_message}", + " --+---------------- 1 ----------------", + " | ZeroDivisionError: second sub exception", + " +------------------------------------", + ) + exception_caught = False + try: + raise exc_group + except self._get_class(): + exception_caught = True + tb = traceback.format_exc() + tb_relevant_lines = tuple(tb.splitlines()[3:]) + assert expected_traceback == tb_relevant_lines + assert exception_caught + + @pytest.mark.skipif( + sys.version_info >= (3, 11), reason="requires python3.10 or lower" + ) + def test_310_traceback_with_cause(self): + """ + traceback should display nicely with sub-exceptions with __cause__ set + """ + import traceback + + sub_exc1 = RuntimeError("first sub exception") + cause_exc = ImportError("cause exception") + sub_exc1.__cause__ = cause_exc + sub_exc2 = ZeroDivisionError("second sub exception") + exc_group = self._make_one(excs=[sub_exc1, sub_exc2]) + found_message = str(exc_group).splitlines()[0] + + expected_traceback = ( + f"google.cloud.bigtable.data.exceptions.{type(exc_group).__name__}: {found_message}", + "--+---------------- 1 ----------------", + " | ImportError: cause exception", + " | ", + " | The above exception was the direct cause of the following exception:", + " | ", + " | RuntimeError: first sub exception", + " +---------------- 2 ----------------", + " | ZeroDivisionError: second sub exception", + " +------------------------------------", + ) + exception_caught = False + try: + raise exc_group + except self._get_class(): + exception_caught = True + tb = traceback.format_exc() + tb_relevant_lines = tuple(tb.splitlines()[3:]) + assert expected_traceback == tb_relevant_lines + assert exception_caught + + +class TestBigtableExceptionGroup(TracebackTests311, TracebackTests310): + """ + Subclass for MutationsExceptionGroup, RetryExceptionGroup, and ShardedReadRowsExceptionGroup + """ + + def _get_class(self): + from google.cloud.bigtable.data.exceptions import _BigtableExceptionGroup + + return _BigtableExceptionGroup + + def _make_one(self, message="test_message", excs=None): + if excs is None: + excs = [RuntimeError("mock")] + + return self._get_class()(message, excs=excs) + + def test_raise(self): + """ + Create exception in raise statement, which calls __new__ and __init__ + """ + test_msg = "test message" + test_excs = [Exception(test_msg)] + with pytest.raises(self._get_class()) as e: + raise self._get_class()(test_msg, test_excs) + found_message = str(e.value).splitlines()[0] # added to prase out subexceptions in <3.11 + assert found_message == test_msg + assert list(e.value.exceptions) == test_excs + + def test_raise_empty_list(self): + """ + Empty exception lists are not supported + """ + with pytest.raises(ValueError) as e: + raise self._make_one(excs=[]) + assert "non-empty sequence" in str(e.value) + def test_exception_handling(self): """ All versions should inherit from exception @@ -151,7 +280,8 @@ def test_raise(self, exception_list, total_entries, expected_message): """ with pytest.raises(self._get_class()) as e: raise self._get_class()(exception_list, total_entries) - assert str(e.value) == expected_message + found_message = str(e.value).splitlines()[0] # added to prase out subexceptions in <3.11 + assert found_message == expected_message assert list(e.value.exceptions) == exception_list def test_raise_custom_message(self): @@ -162,7 +292,8 @@ def test_raise_custom_message(self): exception_list = [Exception()] with pytest.raises(self._get_class()) as e: raise self._get_class()(exception_list, 5, message=custom_message) - assert str(e.value) == custom_message + found_message = str(e.value).splitlines()[0] # added to prase out subexceptions in <3.11 + assert found_message == custom_message assert list(e.value.exceptions) == exception_list @pytest.mark.parametrize( @@ -222,7 +353,8 @@ def test_from_truncated_lists( raise self._get_class().from_truncated_lists( first_list, second_list, total_excs, entry_count ) - assert str(e.value) == expected_message + found_message = str(e.value).splitlines()[0] # added to prase out subexceptions in <3.11 + assert found_message == expected_message assert list(e.value.exceptions) == first_list + second_list @@ -241,11 +373,11 @@ def _make_one(self, excs=None): @pytest.mark.parametrize( "exception_list,expected_message", [ - ([Exception()], "1 failed attempt: Exception"), - ([Exception(), RuntimeError()], "2 failed attempts. Latest: RuntimeError"), + ([Exception()], "1 failed attempt"), + ([Exception(), RuntimeError()], "2 failed attempts"), ( [Exception(), ValueError("test")], - "2 failed attempts. Latest: ValueError", + "2 failed attempts", ), ( [ @@ -253,7 +385,7 @@ def _make_one(self, excs=None): [Exception(), ValueError("test")] ) ], - "1 failed attempt: RetryExceptionGroup", + "1 failed attempt", ), ], ) @@ -263,7 +395,8 @@ def test_raise(self, exception_list, expected_message): """ with pytest.raises(self._get_class()) as e: raise self._get_class()(exception_list) - assert str(e.value) == expected_message + found_message = str(e.value).splitlines()[0] # added to prase out subexceptions in <3.11 + assert found_message == expected_message assert list(e.value.exceptions) == exception_list @@ -299,7 +432,8 @@ def test_raise(self, exception_list, succeeded, total_entries, expected_message) """ with pytest.raises(self._get_class()) as e: raise self._get_class()(exception_list, succeeded, total_entries) - assert str(e.value) == expected_message + found_message = str(e.value).splitlines()[0] # added to prase out subexceptions in <3.11 + assert found_message == expected_message assert list(e.value.exceptions) == exception_list assert e.value.successful_rows == succeeded @@ -323,10 +457,7 @@ def test_raise(self): test_exc = ValueError("test") with pytest.raises(self._get_class()) as e: raise self._get_class()(test_idx, test_entry, test_exc) - assert ( - str(e.value) - == "Failed idempotent mutation entry at index 2 with cause: ValueError('test')" - ) + assert str(e.value) == "Failed idempotent mutation entry at index 2" assert e.value.index == test_idx assert e.value.entry == test_entry assert e.value.__cause__ == test_exc @@ -343,10 +474,7 @@ def test_raise_idempotent(self): test_exc = ValueError("test") with pytest.raises(self._get_class()) as e: raise self._get_class()(test_idx, test_entry, test_exc) - assert ( - str(e.value) - == "Failed non-idempotent mutation entry at index 2 with cause: ValueError('test')" - ) + assert str(e.value) == "Failed non-idempotent mutation entry at index 2" assert e.value.index == test_idx assert e.value.entry == test_entry assert e.value.__cause__ == test_exc @@ -361,10 +489,7 @@ def test_no_index(self): test_exc = ValueError("test") with pytest.raises(self._get_class()) as e: raise self._get_class()(test_idx, test_entry, test_exc) - assert ( - str(e.value) - == "Failed idempotent mutation entry with cause: ValueError('test')" - ) + assert str(e.value) == "Failed idempotent mutation entry" assert e.value.index == test_idx assert e.value.entry == test_entry assert e.value.__cause__ == test_exc @@ -391,7 +516,7 @@ def test_raise(self): test_exc = ValueError("test") with pytest.raises(self._get_class()) as e: raise self._get_class()(test_idx, test_query, test_exc) - assert str(e.value) == "Failed query at index 2 with cause: ValueError('test')" + assert str(e.value) == "Failed query at index 2" assert e.value.index == test_idx assert e.value.query == test_query assert e.value.__cause__ == test_exc