From 4069c37bc7ac3c71c97fcd963e1d46c5fe15b3e6 Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Tue, 28 Jul 2020 21:51:16 -0400 Subject: [PATCH] feat: add OpenTelemetry tracing to spanner calls (#107) * feat: add optional span creation with OpenTelemetry * bring back support for python2.7 * address comments * fix 2.7 tests * nit fixes * db.statement join with ; * Update docs/opentelemetry-tracing.rst Co-authored-by: larkee <31196561+larkee@users.noreply.github.com> --- docs/index.rst | 1 + docs/opentelemetry-tracing.rst | 36 ++++ .../spanner_v1/_opentelemetry_tracing.py | 65 ++++++ google/cloud/spanner_v1/batch.py | 15 +- google/cloud/spanner_v1/session.py | 25 ++- google/cloud/spanner_v1/snapshot.py | 77 +++++--- google/cloud/spanner_v1/transaction.py | 70 ++++--- noxfile.py | 13 ++ tests/_helpers.py | 50 +++++ tests/system/test_system.py | 186 +++++++++++++++++- tests/unit/test__opentelemetry_tracing.py | 129 ++++++++++++ tests/unit/test_batch.py | 26 ++- tests/unit/test_session.py | 98 ++++++++- tests/unit/test_snapshot.py | 146 +++++++++++++- tests/unit/test_transaction.py | 65 +++++- 15 files changed, 910 insertions(+), 92 deletions(-) create mode 100644 docs/opentelemetry-tracing.rst create mode 100644 google/cloud/spanner_v1/_opentelemetry_tracing.py create mode 100644 tests/_helpers.py create mode 100644 tests/unit/test__opentelemetry_tracing.py diff --git a/docs/index.rst b/docs/index.rst index 64c5c65c7f..cabf56157c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ API Documentation api-reference advanced-session-pool-topics + opentelemetry-tracing Changelog --------- diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst new file mode 100644 index 0000000000..8906db43b6 --- /dev/null +++ b/docs/opentelemetry-tracing.rst @@ -0,0 +1,36 @@ +Tracing with OpenTelemetry +================================== +This library uses `OpenTelemetry `_ to automatically generate traces providing insight on calls to Cloud Spanner. +For information on the benefits and utility of tracing, see the `Cloud Trace docs `_. + +To take advantage of these traces, we first need to install opentelemetry: + +.. code-block:: sh + + pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation + +We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing `_, add the following lines to your application: + +.. code:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.trace.sampling import ProbabilitySampler + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + # BatchExportSpanProcessor exports spans to Cloud Trace + # in a seperate thread to not block on the main thread + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + + # Create and export one trace every 1000 requests + sampler = ProbabilitySampler(1/1000) + # Use the default tracer provider + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + trace.get_tracer_provider().add_span_processor( + # Initialize the cloud tracing exporter + BatchExportSpanProcessor(CloudTraceSpanExporter()) + ) + +Generated spanner traces should now be available on `Cloud Trace `_. + +Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request. +For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs `_ diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py new file mode 100644 index 0000000000..60e68598e9 --- /dev/null +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -0,0 +1,65 @@ +# Copyright 2020 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Manages OpenTelemetry trace creation and handling""" + +from contextlib import contextmanager + +from google.api_core.exceptions import GoogleAPICallError +from google.cloud.spanner_v1.gapic import spanner_client + +try: + from opentelemetry import trace + from opentelemetry.trace.status import Status, StatusCanonicalCode + from opentelemetry.instrumentation.utils import http_status_to_canonical_code + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: + HAS_OPENTELEMETRY_INSTALLED = False + + +@contextmanager +def trace_call(name, session, extra_attributes=None): + if not HAS_OPENTELEMETRY_INSTALLED or not session: + # Empty context manager. Users will have to check if the generated value is None or a span + yield None + return + + tracer = trace.get_tracer(__name__) + + # Set base attributes that we know for every trace created + attributes = { + "db.type": "spanner", + "db.url": spanner_client.SpannerClient.SERVICE_ADDRESS, + "db.instance": session._database.name, + "net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS, + } + + if extra_attributes: + attributes.update(extra_attributes) + + with tracer.start_as_current_span( + name, kind=trace.SpanKind.CLIENT, attributes=attributes + ) as span: + try: + yield span + except GoogleAPICallError as error: + if error.code is not None: + span.set_status(Status(http_status_to_canonical_code(error.code))) + elif error.grpc_status_code is not None: + span.set_status( + # OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes + Status(StatusCanonicalCode(error.grpc_status_code.value[0])) + ) + raise diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index e62763d7fd..7ab394b215 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -22,6 +22,7 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1._helpers import _make_list_value_pbs from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call # pylint: enable=ungrouped-imports @@ -147,12 +148,14 @@ def commit(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) - response = api.commit( - self._session.name, - mutations=self._mutations, - single_use_transaction=txn_options, - metadata=metadata, - ) + trace_attributes = {"num_mutations": len(self._mutations)} + with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + response = api.commit( + self._session.name, + mutations=self._mutations, + single_use_transaction=txn_options, + metadata=metadata, + ) self.committed = _pb_timestamp_to_datetime(response.commit_timestamp) return self.committed diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index a84aaa7c6d..b3a1b7e6d8 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -26,6 +26,7 @@ from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call import random # pylint: enable=ungrouped-imports @@ -114,7 +115,11 @@ def create(self): kw = {} if self._labels: kw = {"session": {"labels": self._labels}} - session_pb = api.create_session(self._database.name, metadata=metadata, **kw) + + with trace_call("CloudSpanner.CreateSession", self, self._labels): + session_pb = api.create_session( + self._database.name, metadata=metadata, **kw + ) self._session_id = session_pb.name.split("/")[-1] def exists(self): @@ -130,10 +135,16 @@ def exists(self): return False api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - try: - api.get_session(self.name, metadata=metadata) - except NotFound: - return False + + with trace_call("CloudSpanner.GetSession", self) as span: + try: + api.get_session(self.name, metadata=metadata) + if span: + span.set_attribute("session_found", True) + except NotFound: + if span: + span.set_attribute("session_found", False) + return False return True @@ -150,8 +161,8 @@ def delete(self): raise ValueError("Session ID not set by back-end") api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - - api.delete_session(self.name, metadata=metadata) + with trace_call("CloudSpanner.DeleteSession", self): + api.delete_session(self.name, metadata=metadata) def ping(self): """Ping the session to keep it alive by executing "SELECT 1". diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index dcb6e32d88..0b5ee1d894 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -30,9 +30,10 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1.types import PartitionOptions +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call -def _restart_on_unavailable(restart): +def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None): """Restart iteration after :exc:`.ServiceUnavailable`. :type restart: callable @@ -40,7 +41,8 @@ def _restart_on_unavailable(restart): """ resume_token = b"" item_buffer = [] - iterator = restart() + with trace_call(trace_name, session, attributes): + iterator = restart() while True: try: for item in iterator: @@ -50,7 +52,8 @@ def _restart_on_unavailable(restart): break except ServiceUnavailable: del item_buffer[:] - iterator = restart(resume_token=resume_token) + with trace_call(trace_name, session, attributes): + iterator = restart(resume_token=resume_token) continue if len(item_buffer) == 0: @@ -143,7 +146,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None): metadata=metadata, ) - iterator = _restart_on_unavailable(restart) + trace_attributes = {"table_id": table, "columns": columns} + iterator = _restart_on_unavailable( + restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes + ) self._read_request_count += 1 @@ -243,7 +249,13 @@ def execute_sql( timeout=timeout, ) - iterator = _restart_on_unavailable(restart) + trace_attributes = {"db.statement": sql} + iterator = _restart_on_unavailable( + restart, + "CloudSpanner.ReadWriteTransaction", + self._session, + trace_attributes, + ) self._read_request_count += 1 self._execute_sql_count += 1 @@ -309,16 +321,20 @@ def partition_read( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions ) - response = api.partition_read( - session=self._session.name, - table=table, - columns=columns, - key_set=keyset._to_pb(), - transaction=transaction, - index=index, - partition_options=partition_options, - metadata=metadata, - ) + trace_attributes = {"table_id": table, "columns": columns} + with trace_call( + "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes + ): + response = api.partition_read( + session=self._session.name, + table=table, + columns=columns, + key_set=keyset._to_pb(), + transaction=transaction, + index=index, + partition_options=partition_options, + metadata=metadata, + ) return [partition.partition_token for partition in response.partitions] @@ -385,15 +401,21 @@ def partition_query( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions ) - response = api.partition_query( - session=self._session.name, - sql=sql, - transaction=transaction, - params=params_pb, - param_types=param_types, - partition_options=partition_options, - metadata=metadata, - ) + trace_attributes = {"db.statement": sql} + with trace_call( + "CloudSpanner.PartitionReadWriteTransaction", + self._session, + trace_attributes, + ): + response = api.partition_query( + session=self._session.name, + sql=sql, + transaction=transaction, + params=params_pb, + param_types=param_types, + partition_options=partition_options, + metadata=metadata, + ) return [partition.partition_token for partition in response.partitions] @@ -515,8 +537,9 @@ def begin(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_selector = self._make_txn_selector() - response = api.begin_transaction( - self._session.name, txn_selector.begin, metadata=metadata - ) + with trace_call("CloudSpanner.BeginTransaction", self._session): + response = api.begin_transaction( + self._session.name, txn_selector.begin, metadata=metadata + ) self._transaction_id = response.id return self._transaction_id diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 3c1abc7326..40116a9bbb 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -26,6 +26,7 @@ from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase +from google.cloud.spanner_v1._opentelemetry_tracing import trace_call class Transaction(_SnapshotBase, _BatchBase): @@ -95,9 +96,10 @@ def begin(self): api = database.spanner_api metadata = _metadata_with_prefix(database.name) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) - response = api.begin_transaction( - self._session.name, txn_options, metadata=metadata - ) + with trace_call("CloudSpanner.BeginTransaction", self._session): + response = api.begin_transaction( + self._session.name, txn_options, metadata=metadata + ) self._transaction_id = response.id return self._transaction_id @@ -107,7 +109,8 @@ def rollback(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) - api.rollback(self._session.name, self._transaction_id, metadata=metadata) + with trace_call("CloudSpanner.Rollback", self._session): + api.rollback(self._session.name, self._transaction_id, metadata=metadata) self.rolled_back = True del self._session._transaction @@ -123,12 +126,14 @@ def commit(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) - response = api.commit( - self._session.name, - mutations=self._mutations, - transaction_id=self._transaction_id, - metadata=metadata, - ) + trace_attributes = {"num_mutations": len(self._mutations)} + with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + response = api.commit( + self._session.name, + mutations=self._mutations, + transaction_id=self._transaction_id, + metadata=metadata, + ) self.committed = _pb_timestamp_to_datetime(response.commit_timestamp) del self._session._transaction return self.committed @@ -212,17 +217,21 @@ def execute_update( default_query_options = database._instance._client._query_options query_options = _merge_query_options(default_query_options, query_options) - response = api.execute_sql( - self._session.name, - dml, - transaction=transaction, - params=params_pb, - param_types=param_types, - query_mode=query_mode, - query_options=query_options, - seqno=seqno, - metadata=metadata, - ) + trace_attributes = {"db.statement": dml} + with trace_call( + "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes + ): + response = api.execute_sql( + self._session.name, + dml, + transaction=transaction, + params=params_pb, + param_types=param_types, + query_mode=query_mode, + query_options=query_options, + seqno=seqno, + metadata=metadata, + ) return response.stats.row_count_exact def batch_update(self, statements): @@ -268,13 +277,18 @@ def batch_update(self, statements): self._execute_sql_count + 1, ) - response = api.execute_batch_dml( - session=self._session.name, - transaction=transaction, - statements=parsed, - seqno=seqno, - metadata=metadata, - ) + trace_attributes = { + # Get just the queries from the DML statement batch + "db.statement": ";".join([statement["sql"] for statement in parsed]) + } + with trace_call("CloudSpanner.DMLTransaction", self._session, trace_attributes): + response = api.execute_batch_dml( + session=self._session.name, + transaction=transaction, + statements=parsed, + seqno=seqno, + metadata=metadata, + ) row_counts = [ result_set.stats.row_count_exact for result_set in response.result_sets ] diff --git a/noxfile.py b/noxfile.py index ee0e4c8b78..91de61a9de 100644 --- a/noxfile.py +++ b/noxfile.py @@ -66,6 +66,13 @@ def lint_setup_py(session): def default(session): # Install all test dependencies, then install this package in-place. session.install("mock", "pytest", "pytest-cov") + + # Install opentelemetry dependencies if python3+ + if session.python != "2.7": + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) + session.install("-e", ".") # Run py.test against the unit tests. @@ -115,6 +122,12 @@ def system(session): # virtualenv's dist-packages. session.install("mock", "pytest") + # Install opentelemetry dependencies if not 2.7 + if session.python != "2.7": + session.install( + "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation" + ) + session.install("-e", ".") session.install("-e", "test_utils/") diff --git a/tests/_helpers.py b/tests/_helpers.py new file mode 100644 index 0000000000..6ebc4bb374 --- /dev/null +++ b/tests/_helpers.py @@ -0,0 +1,50 @@ +import unittest +import mock + +try: + from opentelemetry import trace as trace_api + from opentelemetry.trace.status import StatusCanonicalCode + + from opentelemetry.sdk.trace import TracerProvider, export + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: + HAS_OPENTELEMETRY_INSTALLED = False + + StatusCanonicalCode = mock.Mock() + + +class OpenTelemetryBase(unittest.TestCase): + def setUp(self): + if HAS_OPENTELEMETRY_INSTALLED: + self.original_tracer_provider = trace_api.get_tracer_provider() + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = export.SimpleExportSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + trace_api.set_tracer_provider(self.tracer_provider) + + def tearDown(self): + if HAS_OPENTELEMETRY_INSTALLED: + trace_api.set_tracer_provider(self.original_tracer_provider) + + def assertNoSpans(self): + if HAS_OPENTELEMETRY_INSTALLED: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def assertSpanAttributes( + self, name, status=StatusCanonicalCode.OK, attributes=None, span=None + ): + if HAS_OPENTELEMETRY_INSTALLED: + if not span: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + + self.assertEqual(span.name, name) + self.assertEqual(span.status.canonical_code, status) + self.assertEqual(span.attributes, attributes) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 9fde7db0c3..7779769c8f 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -52,6 +52,7 @@ from test_utils.retry import RetryResult from test_utils.system import unique_resource_id from tests._fixtures import DDL_STATEMENTS +from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None @@ -67,6 +68,12 @@ COUNTERS_TABLE = "counters" COUNTERS_COLUMNS = ("name", "value") +BASE_ATTRIBUTES = { + "db.type": "spanner", + "db.url": "spanner.googleapis.com:443", + "net.host.name": "spanner.googleapis.com:443", +} + _STATUS_CODE_TO_GRPC_STATUS_CODE = { member.value[0]: member for member in grpc.StatusCode } @@ -726,7 +733,7 @@ def test_list_backups(self): NANO_TIME = DatetimeWithNanoseconds(1995, 8, 31, nanosecond=987654321) POS_INF = float("+inf") NEG_INF = float("-inf") -OTHER_NAN, = struct.unpack("