Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for Directed Reads #1000

Merged
merged 26 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2f70017
changes
asthamohta Feb 23, 2023
88c70ce
changes
asthamohta Feb 23, 2023
69c3f6e
docs
asthamohta Feb 23, 2023
aa9f18c
docs
asthamohta Feb 23, 2023
5a0eadc
linting
asthamohta Feb 23, 2023
6b2ca49
Merge branch 'main' into direct-read-main-code
harshachinta Sep 16, 2023
e6ae0de
Merge branch 'main' into direct-read-main-code
harshachinta Nov 19, 2023
3036bb6
feat(spanner): remove client side validations for directed read options
harshachinta Nov 20, 2023
0e68229
feat(spanner): update the auto_failover_disabled field
harshachinta Nov 20, 2023
dad4a2d
feat(spanner): update unit tests
harshachinta Nov 20, 2023
c18e0a3
feat(spanner): update test
harshachinta Nov 22, 2023
f06a7bc
Merge branch 'main' into direct-read-main-code
harshachinta Dec 8, 2023
76cbdac
feat(spanner): update documentation
harshachinta Dec 8, 2023
71d728d
feat(spanner): add system test to validate exception in case of RW tr…
harshachinta Dec 8, 2023
d349418
feat(spanner): update unit test
harshachinta Dec 8, 2023
34026ca
Merge branch 'main' into direct-read-main-code
harshachinta Dec 8, 2023
927da67
feat(spanner): add dro for batchsnapshot and update system tests
harshachinta Dec 9, 2023
2dfc406
feat(spanner): fix unit tests for batchsnapshot
harshachinta Dec 9, 2023
f9af1d8
feat(spanner): add unit tests for partition read and query
harshachinta Dec 9, 2023
54a1e16
Merge branch 'main' into direct-read-main-code
harshachinta Dec 15, 2023
366f2f8
Merge branch 'main' into direct-read-main-code
harshachinta Dec 27, 2023
426143a
Merge branch 'main' into direct-read-main-code
harshachinta Jan 7, 2024
12157b4
feat(spanner): lint fixes
harshachinta Jan 8, 2024
ce21ed8
feat(spanner): code refactor remove TransactionType
harshachinta Jan 8, 2024
37abf81
feat(spanner): comment refactor
harshachinta Jan 8, 2024
7162a42
feat(spanner): remove comments
harshachinta Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/spanner_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from .types.spanner import CommitRequest
from .types.spanner import CreateSessionRequest
from .types.spanner import DeleteSessionRequest
from .types.spanner import DirectedReadOptions
from .types.spanner import ExecuteBatchDmlRequest
from .types.spanner import ExecuteBatchDmlResponse
from .types.spanner import ExecuteSqlRequest
Expand Down Expand Up @@ -77,6 +78,7 @@
This value can only be used for timestamp columns that have set the option
``(allow_commit_timestamp=true)`` in the schema.
"""
TransactionTypes = DirectedReadOptions.ReplicaSelection.Type
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved


__all__ = (
Expand All @@ -96,6 +98,7 @@
"TransactionPingingPool",
# local
"COMMIT_TIMESTAMP",
"TransactionTypes",
# google.cloud.spanner_v1.types
"BatchCreateSessionsRequest",
"BatchCreateSessionsResponse",
Expand All @@ -104,6 +107,7 @@
"CommitResponse",
"CreateSessionRequest",
"DeleteSessionRequest",
"DirectedReadOptions",
"ExecuteBatchDmlRequest",
"ExecuteBatchDmlResponse",
"ExecuteSqlRequest",
Expand Down
50 changes: 50 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from google.cloud.spanner_v1 import TypeCode
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import JsonObject
from google.cloud.spanner_v1 import DirectedReadOptions
from google.api_core.exceptions import InvalidArgument

# Validation error messages
NUMERIC_MAX_SCALE_ERR_MSG = (
Expand Down Expand Up @@ -358,3 +360,51 @@ def _metadata_with_leader_aware_routing(value, **kw):
List[Tuple[str, str]]: RPC metadata with leader aware routing header
"""
return ("x-goog-spanner-route-to-leader", str(value).lower())


def verify_directed_read_options(directed_read_options):
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
"""Verifies if value of directed_read_options is correct.
:type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: directed_read_options for ReadRequests and ExecuteSqlRequests.

:raises InvalidArguement: if directed_read_options is incorrect.
"""
if type(directed_read_options) == dict:
if (
"include_replicas" in directed_read_options.keys()
and "exclude_replicas" in directed_read_options.keys()
):
raise InvalidArgument(
"Only one of include_replicas or exclude_replicas can be set"
)
if (
"include_replicas" in directed_read_options.keys()
and "replica_selections" in directed_read_options["include_replicas"].keys()
and len(directed_read_options["include_replicas"]["replica_selections"])
> 10
) or (
"exclude_replicas" in directed_read_options.keys()
and "replica_selections" in directed_read_options["exclude_replicas"].keys()
and len(directed_read_options["exclude_replicas"]["replica_selections"])
> 10
):
raise InvalidArgument("Maximum length of replica selection allowed is 10")
elif isinstance(directed_read_options, DirectedReadOptions):
if (
directed_read_options.include_replicas is not None
and directed_read_options.exclude_replicas is not None
):
raise InvalidArgument(
"Only one of include_replicas or exclude_replicas can be set"
)
if (
directed_read_options.include_replicas is not None
and directed_read_options.include_replicas.replica_selections is not None
and len(directed_read_options.include_replicas.replica_selections) > 10
) or (
directed_read_options.include_replicas is not None
and directed_read_options.exclude_replicas.replica_selections is not None
and len(directed_read_options.exclude_replicas.replica_selections) > 10
):
raise InvalidArgument("Maximum length of replica selection allowed is 10")
31 changes: 31 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._helpers import verify_directed_read_options
from google.cloud.spanner_v1.instance import Instance

_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
Expand Down Expand Up @@ -120,6 +121,12 @@ class Client(ClientWithProject):
disable leader aware routing. Disabling leader aware routing would
route all requests in RW/PDML transactions to the closest region.

:type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas
or regions should be used for non-transactional reads or queries.

:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -139,6 +146,7 @@ def __init__(
client_options=None,
query_options=None,
route_to_leader_enabled=True,
directed_read_options=None,
):
self._emulator_host = _get_spanner_emulator_host()

Expand Down Expand Up @@ -179,6 +187,8 @@ def __init__(
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)

self._route_to_leader_enabled = route_to_leader_enabled
verify_directed_read_options(directed_read_options)
self._directed_read_options = directed_read_options

@property
def credentials(self):
Expand Down Expand Up @@ -260,6 +270,17 @@ def route_to_leader_enabled(self):
"""
return self._route_to_leader_enabled


def directed_read_options(self):
"""Getter for directed_read_options.

:rtype:
:class:`~googlecloud.spanner_v1.types.DirectedReadOptions`
or :class:`dict`
:returns: The directed_read_options for the client.
"""
return self._directed_read_options

def copy(self):
"""Make a copy of this client.

Expand Down Expand Up @@ -383,3 +404,13 @@ def list_instances(self, filter_="", page_size=None):
request=request, metadata=metadata
)
return page_iter

def set_directed_read_options(self, directed_read_options):
"""Sets directed_read_options for the client
:type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas
or regions should be used for non-transactional reads or queries.
"""
self._directed_read_options = directed_read_options
1 change: 1 addition & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def __init__(
self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled
self._enable_drop_protection = enable_drop_protection
self._reconciling = False
self._directed_read_options = self._instance._client.directed_read_options

if pool is None:
pool = BurstyPool(database_role=database_role)
Expand Down
52 changes: 52 additions & 0 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import InvalidArgument
from google.api_core.exceptions import BadRequest
from google.api_core import gapic_v1
from google.cloud.spanner_v1._helpers import (
_make_value_pb,
Expand All @@ -41,6 +42,7 @@
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1.streamed import StreamedResultSet
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import verify_directed_read_options

_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
"RST_STREAM",
Expand Down Expand Up @@ -176,6 +178,7 @@ def read(
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
directed_read_options=None,
):
"""Perform a ``StreamingRead`` API request for rows in a table.

Expand Down Expand Up @@ -224,12 +227,22 @@ def read(
``partition_token``, the API will return an
``INVALID_ARGUMENT`` error.

:type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas
or regions should be used for non-transactional reads or queries.

:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.

:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.

:raises InvalidArguement: if directed_read_options is incorrect.

:raises BadRequest: if directed_read_options are set for READ-WRITE Transaction or PDML.
"""
if self._read_request_count > 0:
if not self._multi_use:
Expand All @@ -255,6 +268,19 @@ def read(
request_options.transaction_tag = None
elif self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag
if (
directed_read_options is None
and database._directed_read_options is not None
):
directed_read_options = database._directed_read_options
else:
if self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag
if directed_read_options is not None:
raise BadRequest(
"directed_read_options can't be set for readWrite transactions or partitioned dml requests"
)
verify_directed_read_options(directed_read_options)

request = ReadRequest(
session=self._session.name,
Expand All @@ -266,6 +292,7 @@ def read(
partition_token=partition,
request_options=request_options,
data_boost_enabled=data_boost_enabled,
directed_read_options=directed_read_options,
)
restart = functools.partial(
api.streaming_read,
Expand Down Expand Up @@ -322,6 +349,7 @@ def execute_sql(
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
data_boost_enabled=False,
directed_read_options=None,
):
"""Perform an ``ExecuteStreamingSql`` API request.

Expand Down Expand Up @@ -379,9 +407,19 @@ def execute_sql(
``partition_token``, the API will return an
``INVALID_ARGUMENT`` error.

:type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas
or regions should be used for non-transactional reads or queries.

:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.

:raises InvalidArguement: if directed_read_options is incorrect.

:raises BadRequest: if directed_read_options are set for READ-WRITE Transaction or PDML.
"""
if self._read_request_count > 0:
if not self._multi_use:
Expand Down Expand Up @@ -421,6 +459,19 @@ def execute_sql(
request_options.transaction_tag = None
elif self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag
if (
directed_read_options is None
and database._directed_read_options is not None
):
directed_read_options = database._directed_read_options
else:
if self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag
if directed_read_options is not None:
raise BadRequest(
"directed_read_options can't be set for readWrite transactions or partitioned dml requests"
)
verify_directed_read_options(directed_read_options)

request = ExecuteSqlRequest(
session=self._session.name,
Expand All @@ -433,6 +484,7 @@ def execute_sql(
query_options=query_options,
request_options=request_options,
data_boost_enabled=data_boost_enabled,
directed_read_options=directed_read_options,
)
restart = functools.partial(
api.execute_streaming_sql,
Expand Down
Loading