From a06b00c65602728f723bc562f8d1ee23606774b3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 20 Dec 2024 00:03:18 -0800 Subject: [PATCH] More plumbing for Database DDL methods --- google/cloud/spanner_v1/database.py | 53 +++++++++++++++++----- google/cloud/spanner_v1/pool.py | 4 +- google/cloud/spanner_v1/session.py | 26 ++++++++--- tests/unit/test_pool.py | 30 +++++++++++++ tests/unit/test_session.py | 70 +++++++++++++++++++++++++++-- 5 files changed, 161 insertions(+), 22 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index a33ca8b0fb..942dcf28ff 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -515,7 +515,10 @@ def create(self): database_dialect=self._database_dialect, proto_descriptors=self._proto_descriptors, ) - future = api.create_database(request=request, metadata=metadata) + future = api.create_database( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return future def exists(self): @@ -531,7 +534,12 @@ def exists(self): metadata = _metadata_with_prefix(self.name) try: - api.get_database_ddl(database=self.name, metadata=metadata) + api.get_database_ddl( + database=self.name, + metadata=self.metadata_with_request_id( + self._next_nth_request, 1, metadata + ), + ) except NotFound: return False return True @@ -548,10 +556,16 @@ def reload(self): """ api = self._instance._client.database_admin_api metadata = _metadata_with_prefix(self.name) - response = api.get_database_ddl(database=self.name, metadata=metadata) + response = api.get_database_ddl( + database=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) self._ddl_statements = tuple(response.statements) self._proto_descriptors = response.proto_descriptors - response = api.get_database(name=self.name, metadata=metadata) + response = api.get_database( + name=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) self._state = DatabasePB.State(response.state) self._create_time = response.create_time self._restore_info = response.restore_info @@ -596,7 +610,10 @@ def update_ddl(self, ddl_statements, operation_id="", proto_descriptors=None): proto_descriptors=proto_descriptors, ) - future = api.update_database_ddl(request=request, metadata=metadata) + future = api.update_database_ddl( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return future def update(self, fields): @@ -634,7 +651,9 @@ def update(self, fields): metadata = _metadata_with_prefix(self.name) future = api.update_database( - database=database_pb, update_mask=field_mask, metadata=metadata + database=database_pb, + update_mask=field_mask, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), ) return future @@ -647,7 +666,10 @@ def drop(self): """ api = self._instance._client.database_admin_api metadata = _metadata_with_prefix(self.name) - api.drop_database(database=self.name, metadata=metadata) + api.drop_database( + database=self.name, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) def execute_partitioned_dml( self, @@ -995,7 +1017,7 @@ def restore(self, source): ) future = api.restore_database( request=request, - metadata=metadata, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), ) return future @@ -1064,7 +1086,10 @@ def list_database_roles(self, page_size=None): parent=self.name, page_size=page_size, ) - return api.list_database_roles(request=request, metadata=metadata) + return api.list_database_roles( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) def table(self, table_id): """Factory to create a table object within this database. @@ -1148,7 +1173,10 @@ def get_iam_policy(self, policy_version=None): requested_policy_version=policy_version ), ) - response = api.get_iam_policy(request=request, metadata=metadata) + response = api.get_iam_policy( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return response def set_iam_policy(self, policy): @@ -1170,7 +1198,10 @@ def set_iam_policy(self, policy): resource=self.name, policy=policy, ) - response = api.set_iam_policy(request=request, metadata=metadata) + response = api.set_iam_policy( + request=request, + metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata), + ) return response @property diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index efae909da9..ce8fc49854 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -576,7 +576,9 @@ def bind(self, database): while created_session_count < self.size: resp = api.batch_create_sessions( request=request, - metadata=metadata, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), ) for session_pb in resp.session: session = self._new_session() diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 3495e50a08..88d730743e 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -168,7 +168,9 @@ def create(self): ): session_pb = api.create_session( request=request, - metadata=metadata, + metadata=self._database.metadata_with_request_id( + self._database._next_nth_request, 1, metadata + ), ) self._session_id = session_pb.name.split("/")[-1] @@ -257,7 +259,12 @@ def delete(self): }, observability_options=observability_options, ): - api.delete_session(name=self.name, metadata=metadata) + api.delete_session( + name=self.name, + metadata=database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ), + ) def ping(self): """Ping the session to keep it alive by executing "SELECT 1". @@ -266,13 +273,18 @@ def ping(self): """ if self._session_id is None: raise ValueError("Session ID not set by back-end") - api = self._database.spanner_api database = self._database - metadata = database.metadata_with_request_id( - database._next_nth_request, 1, _metadata_with_prefix(database.name) - ) + api = database.spanner_api + database = self._database request = ExecuteSqlRequest(session=self.name, sql="SELECT 1") - api.execute_sql(request=request, metadata=metadata) + api.execute_sql( + request=request, + metadata=database.metadata_with_request_id( + database._next_nth_request, + 1, + _metadata_with_prefix(database.name), + ), + ) self._last_use_time = datetime.now() def snapshot(self, **kw): diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 89715c741d..c529d5d6fd 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -19,6 +19,11 @@ from datetime import datetime, timedelta import mock +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, + AtomicCounter, +) + from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from tests._helpers import ( OpenTelemetryBase, @@ -1179,6 +1184,9 @@ def session_id(self): class _Database(object): + NTH_REQUEST = AtomicCounter() + NTH_CLIENT_ID = AtomicCounter() + def __init__(self, name): self.name = name self._sessions = [] @@ -1233,6 +1241,28 @@ def session(self, **kwargs): def observability_options(self): return dict(db_name=self.name) + @property + def _next_nth_request(self): + return self.NTH_REQUEST.increment() + + @property + def _nth_client_id(self): + return self.NTH_CLIENT_ID.increment() + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + client_id = self._nth_client_id + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Queue(object): _size = 1 diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 0d60e98cd0..9803dbc0e7 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -22,6 +22,11 @@ StatusCode, enrich_with_otel_scope, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID +from google.cloud.spanner_v1._helpers import ( + _metadata_with_request_id, + AtomicCounter, +) def _make_rpc_error(error_cls, trailing_metadata=None): @@ -66,6 +71,7 @@ def _make_database(name=DATABASE_NAME, database_role=None): database.log_commit_stats = False database.database_role = database_role database._route_to_leader_enabled = True + database.NTH_CLIENT = AtomicCounter() return database @staticmethod @@ -168,6 +174,10 @@ def test_create_w_database_role(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -206,6 +216,10 @@ def test_create_session_span_annotations(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -235,6 +249,10 @@ def test_create_wo_database_role(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -265,6 +283,10 @@ def test_create_ok(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -298,6 +320,10 @@ def test_create_w_labels(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -483,7 +509,13 @@ def test_ping_hit(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_ping_miss(self): @@ -507,7 +539,13 @@ def test_ping_miss(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_ping_error(self): @@ -531,7 +569,13 @@ def test_ping_error(self): gax_api.execute_sql.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), + ], ) def test_delete_wo_session_id(self): @@ -1722,6 +1766,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1785,6 +1833,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) request = CommitRequest( @@ -1798,6 +1850,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1885,6 +1941,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) ] @@ -1904,6 +1964,10 @@ def unit_of_work(txn, *args, **kw): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1", + ), ], ) ]