Skip to content

Commit

Permalink
Add more system tests
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Dec 17, 2024
1 parent 6777f97 commit 441bb9a
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 46 deletions.
17 changes: 17 additions & 0 deletions tests/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,20 @@ def get_finished_spans(self):

def reset(self):
self.tearDown()

def finished_spans_events_statuses(self):
span_list = self.get_finished_spans()
# Some event attributes are noisy/highly ephemeral
# and can't be directly compared against.
got_all_events = []
imprecise_event_attributes = ["exception.stacktrace", "delay_seconds", "cause"]
for span in span_list:
for event in span.events:
evt_attributes = event.attributes.copy()
for attr_name in imprecise_event_attributes:
if attr_name in evt_attributes:
evt_attributes[attr_name] = "EPHEMERAL"

got_all_events.append((event.name, evt_attributes))

return got_all_events
164 changes: 118 additions & 46 deletions tests/system/test_observability_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

from . import _helpers
from google.cloud.spanner_v1 import Client
from google.api_core.exceptions import Aborted
from google.auth.credentials import AnonymousCredentials
from google.api_core.exceptions import Aborted
from google.rpc import code_pb2

HAS_OTEL_INSTALLED = False

Expand Down Expand Up @@ -132,18 +136,7 @@ def test_propagation(enable_extended_tracing):
test_propagation(False)


@pytest.mark.skipif(
not _helpers.USE_EMULATOR,
reason="Emulator needed to run this tests",
)
@pytest.mark.skipif(
not HAS_OTEL_INSTALLED,
reason="Tracing requires OpenTelemetry",
)
def test_transaction_abort_then_retry_spans():
from google.auth.credentials import AnonymousCredentials
from google.api_core.exceptions import Aborted
from google.rpc import code_pb2
def create_db_trace_exporter():
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
Expand All @@ -160,20 +153,6 @@ def test_transaction_abort_then_retry_spans():
NODE_COUNT = 5
LABELS = {"test": "true"}

counters = dict(aborted=0)

def select_in_txn(txn):
results = txn.execute_sql("SELECT 1")
for row in results:
_ = row

if counters["aborted"] == 0:
counters["aborted"] = 1
raise Aborted(
"Thrown from ClientInterceptor for testing",
errors=[_helpers.FauxCall(code_pb2.ABORTED)],
)

tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = InMemorySpanExporter()
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
Expand Down Expand Up @@ -207,22 +186,74 @@ def select_in_txn(txn):
except Exception:
pass

return db, trace_exporter


@pytest.mark.skipif(
not _helpers.USE_EMULATOR,
reason="Emulator needed to run this tests",
)
@pytest.mark.skipif(
not HAS_OTEL_INSTALLED,
reason="Tracing requires OpenTelemetry",
)
def test_transaction_abort_then_retry_spans():
from opentelemetry.trace.status import StatusCode

db, trace_exporter = create_db_trace_exporter()

counters = dict(aborted=0)

def select_in_txn(txn):
results = txn.execute_sql("SELECT 1")
for row in results:
_ = row

if counters["aborted"] == 0:
counters["aborted"] = 1
raise Aborted(
"Thrown from ClientInterceptor for testing",
errors=[_helpers.FauxCall(code_pb2.ABORTED)],
)

db.run_in_transaction(select_in_txn)

got_statuses, got_events = finished_spans_statuses(trace_exporter)

# Check for the series of events
want_events = [
("Acquiring session", {"kind": "BurstyPool"}),
("Waiting for a session to become available", {"kind": "BurstyPool"}),
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
("Creating Session", {}),
("Creating Transaction", {}),
(
"Transaction was aborted in user operation, retrying",
{"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1},
),
("Creating Transaction", {}),
("Starting Commit", {}),
("Commit Done", {}),
]
assert got_events == want_events

# Check for the statues.
codes = StatusCode
want_statuses = [
("CloudSpanner.Database.run_in_transaction", codes.OK, None),
("CloudSpanner.CreateSession", codes.OK, None),
("CloudSpanner.Session.run_in_transaction", codes.OK, None),
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
("CloudSpanner.Transaction.commit", codes.OK, None),
]
assert got_statuses == want_statuses


def finished_spans_statuses(trace_exporter):
span_list = trace_exporter.get_finished_spans()
# Sort the spans by their start time in the hierarchy.
span_list = sorted(span_list, key=lambda span: span.start_time)
got_span_names = [span.name for span in span_list]
want_span_names = [
"CloudSpanner.Database.run_in_transaction",
"CloudSpanner.CreateSession",
"CloudSpanner.Session.run_in_transaction",
"CloudSpanner.Transaction.execute_streaming_sql",
"CloudSpanner.Transaction.execute_streaming_sql",
"CloudSpanner.Transaction.commit",
]

assert got_span_names == want_span_names

got_events = []
got_statuses = []
Expand All @@ -234,6 +265,7 @@ def select_in_txn(txn):
got_statuses.append(
(span.name, span.status.status_code, span.status.description)
)

for event in span.events:
evt_attributes = event.attributes.copy()
for attr_name in imprecise_event_attributes:
Expand All @@ -242,30 +274,70 @@ def select_in_txn(txn):

got_events.append((event.name, evt_attributes))

return got_statuses, got_events


@pytest.mark.skipif(
not _helpers.USE_EMULATOR,
reason="Emulator needed to run this tests",
)
@pytest.mark.skipif(
not HAS_OTEL_INSTALLED,
reason="Tracing requires OpenTelemetry",
)
def test_database_partitioned():
from opentelemetry.trace.status import StatusCode

db, trace_exporter = create_db_trace_exporter()

try:
db.execute_partitioned_dml("UPDATE NonExistent SET name = 'foo' WHERE id > 1")
except Exception:
pass

got_statuses, got_events = finished_spans_statuses(trace_exporter)
# Check for the series of events
want_events = [
("Acquiring session", {"kind": "BurstyPool"}),
("Waiting for a session to become available", {"kind": "BurstyPool"}),
("No sessions available in pool. Creating session", {"kind": "BurstyPool"}),
("Creating Session", {}),
("Starting BeginTransaction", {}),
(
"Transaction was aborted in user operation, retrying",
{"delay_seconds": "EPHEMERAL", "cause": "EPHEMERAL", "attempt": 1},
"exception",
{
"exception.type": "google.api_core.exceptions.InvalidArgument",
"exception.message": "400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^",
"exception.stacktrace": "EPHEMERAL",
"exception.escaped": "False",
},
),
(
"exception",
{
"exception.type": "google.api_core.exceptions.InvalidArgument",
"exception.message": "400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^",
"exception.stacktrace": "EPHEMERAL",
"exception.escaped": "False",
},
),
("Starting Commit", {}),
("Commit Done", {}),
]
assert got_events == want_events

# Check for the statues.
codes = StatusCode
want_statuses = [
("CloudSpanner.Database.run_in_transaction", codes.OK, None),
(
"CloudSpanner.Database.execute_partitioned_pdml",
codes.ERROR,
"InvalidArgument: 400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^",
),
("CloudSpanner.CreateSession", codes.OK, None),
("CloudSpanner.Session.run_in_transaction", codes.OK, None),
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None),
("CloudSpanner.Transaction.commit", codes.OK, None),
(
"CloudSpanner.ExecuteStreamingSql",
codes.ERROR,
"InvalidArgument: 400 Table not found: NonExistent [at 1:8]\nUPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^",
),
]
assert got_statuses == want_statuses

Expand Down

0 comments on commit 441bb9a

Please sign in to comment.