diff --git a/tests/_helpers.py b/tests/_helpers.py index b120932397..667f9f8be1 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -132,3 +132,20 @@ def get_finished_spans(self): def reset(self): self.tearDown() + + def finished_spans_events_statuses(self): + span_list = self.get_finished_spans() + # Some event attributes are noisy/highly ephemeral + # and can't be directly compared against. + got_all_events = [] + imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"] + for span in span_list: + for event in span.events: + evt_attributes = event.attributes.copy() + for attr_name in imprecise_event_attributes: + if attr_name in evt_attributes: + evt_attributes[attr_name] = "EPHEMERAL" + + got_all_events.append((event.name, evt_attributes)) + + return got_all_events diff --git a/tests/system/test_observability_options.py b/tests/system/test_observability_options.py index 631ea49897..2edec84bc2 100644 --- a/tests/system/test_observability_options.py +++ b/tests/system/test_observability_options.py @@ -16,6 +16,10 @@ from . import _helpers from google.cloud.spanner_v1 import Client +from google.api_core.exceptions import Aborted +from google.auth.credentials import AnonymousCredentials +from google.api_core.exceptions import Aborted +from google.rpc import code_pb2 HAS_OTEL_INSTALLED = False @@ -132,18 +136,7 @@ def test_propagation(enable_extended_tracing): test_propagation(False) -@pytest.mark.skipif( - not _helpers.USE_EMULATOR, - reason="Emulator needed to run this tests", -) -@pytest.mark.skipif( - not HAS_OTEL_INSTALLED, - reason="Tracing requires OpenTelemetry", -) -def test_transaction_abort_then_retry_spans(): - from google.auth.credentials import AnonymousCredentials - from google.api_core.exceptions import Aborted - from google.rpc import code_pb2 +def create_db_trace_exporter(): from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( InMemorySpanExporter, @@ -160,20 +153,6 @@ def test_transaction_abort_then_retry_spans(): NODE_COUNT = 5 LABELS = {"test": "true"} - counters = dict(aborted=0) - - def select_in_txn(txn): - results = txn.execute_sql("SELECT 1") - for row in results: - _ = row - - if counters["aborted"] == 0: - counters["aborted"] = 1 - raise Aborted( - "Thrown from ClientInterceptor for testing", - errors=[_helpers.FauxCall(code_pb2.ABORTED)], - ) - tracer_provider = TracerProvider(sampler=ALWAYS_ON) trace_exporter = InMemorySpanExporter() tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter)) @@ -207,22 +186,74 @@ def select_in_txn(txn): except Exception: pass + return db, trace_exporter + + +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, + reason="Emulator needed to run this test", +) +@pytest.mark.skipif( + not HAS_OTEL_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_transaction_abort_then_retry_spans(): + from opentelemetry.trace.status import StatusCode + + db, trace_exporter = create_db_trace_exporter() + + counters = dict(aborted=0) + + def select_in_txn(txn): + results = txn.execute_sql("SELECT 1") + for row in results: + _ = row + + if counters["aborted"] == 0: + counters["aborted"] = 1 + raise Aborted( + "Thrown from ClientInterceptor for testing", + errors=[_helpers.FauxCall(code_pb2.ABORTED)], + ) + db.run_in_transaction(select_in_txn) + got_statuses, got_events = finished_spans_statuses(trace_exporter) + + # Check for the series of events + want_events = [ + ("Acquiring session", {"kind": "BurstyPool"}), + ("Waiting for a session to become available", {"kind": "BurstyPool"}), + ("No sessions available in pool. Creating session", {"kind": "BurstyPool"}), + ("Creating Session", {}), + ("Creating Transaction", {}), + ( + "Transaction was aborted in user operation, retrying", + {"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1}, + ), + ("Creating Transaction", {}), + ("Starting Commit", {}), + ("Commit Done", {}), + ] + assert got_events == want_events + + # Check for the statues. + codes = StatusCode + want_statuses = [ + ("CloudSpanner.Database.run_in_transaction", codes.OK, None), + ("CloudSpanner.CreateSession", codes.OK, None), + ("CloudSpanner.Session.run_in_transaction", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.commit", codes.OK, None), + ] + assert got_statuses == want_statuses + + +def finished_spans_statuses(trace_exporter): span_list = trace_exporter.get_finished_spans() # Sort the spans by their start time in the hierarchy. span_list = sorted(span_list, key=lambda span: span.start_time) - got_span_names = [span.name for span in span_list] - want_span_names = [ - "CloudSpanner.Database.run_in_transaction", - "CloudSpanner.CreateSession", - "CloudSpanner.Session.run_in_transaction", - "CloudSpanner.Transaction.execute_streaming_sql", - "CloudSpanner.Transaction.execute_streaming_sql", - "CloudSpanner.Transaction.commit", - ] - - assert got_span_names == want_span_names got_events = [] got_statuses = [] @@ -234,6 +265,7 @@ def select_in_txn(txn): got_statuses.append( (span.name, span.status.status_code, span.status.description) ) + for event in span.events: evt_attributes = event.attributes.copy() for attr_name in imprecise_event_attributes: @@ -242,30 +274,70 @@ def select_in_txn(txn): got_events.append((event.name, evt_attributes)) + return got_statuses, got_events + + +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, + reason="Emulator needed to run this test", +) +@pytest.mark.skipif( + not HAS_OTEL_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_database_partitioned(): + from opentelemetry.trace.status import StatusCode + + db, trace_exporter = create_db_trace_exporter() + + try: + db.execute_partitioned_dml("UPDATE NonExistent SET name = 'foo' WHERE id > 1") + except Exception: + pass + + got_statuses, got_events = finished_spans_statuses(trace_exporter) # Check for the series of events want_events = [ ("Acquiring session", {"kind": "BurstyPool"}), ("Waiting for a session to become available", {"kind": "BurstyPool"}), ("No sessions available in pool. Creating session", {"kind": "BurstyPool"}), ("Creating Session", {}), + ("Starting BeginTransaction", {}), ( - "Transaction was aborted in user operation, retrying", - {"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1}, + "exception", + { + "exception.type": "google.api_core.exceptions.InvalidArgument", + "exception.message": "400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, + ), + ( + "exception", + { + "exception.type": "google.api_core.exceptions.InvalidArgument", + "exception.message": "400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, ), - ("Starting Commit", {}), - ("Commit Done", {}), ] assert got_events == want_events # Check for the statues. codes = StatusCode want_statuses = [ - ("CloudSpanner.Database.run_in_transaction", codes.OK, None), + ( + "CloudSpanner.Database.execute_partitioned_pdml", + codes.ERROR, + "InvalidArgument: 400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^", + ), ("CloudSpanner.CreateSession", codes.OK, None), - ("CloudSpanner.Session.run_in_transaction", codes.OK, None), - ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), - ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), - ("CloudSpanner.Transaction.commit", codes.OK, None), + ( + "CloudSpanner.ExecuteStreamingSql", + codes.ERROR, + "InvalidArgument: 400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^", + ), ] assert got_statuses == want_statuses