diff --git a/docs/index.rst b/docs/index.rst
index 64c5c65c7f..cabf56157c 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -23,6 +23,7 @@ API Documentation
api-reference
advanced-session-pool-topics
+ opentelemetry-tracing
Changelog
---------
diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst
new file mode 100644
index 0000000000..8906db43b6
--- /dev/null
+++ b/docs/opentelemetry-tracing.rst
@@ -0,0 +1,36 @@
+Tracing with OpenTelemetry
+==================================
+This library uses `OpenTelemetry `_ to automatically generate traces providing insight on calls to Cloud Spanner.
+For information on the benefits and utility of tracing, see the `Cloud Trace docs `_.
+
+To take advantage of these traces, we first need to install opentelemetry:
+
+.. code-block:: sh
+
+ pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation
+
+We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing `_, add the following lines to your application:
+
+.. code:: python
+
+ from opentelemetry import trace
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.trace.sampling import ProbabilitySampler
+ from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
+ # BatchExportSpanProcessor exports spans to Cloud Trace
+ # in a seperate thread to not block on the main thread
+ from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
+
+ # Create and export one trace every 1000 requests
+ sampler = ProbabilitySampler(1/1000)
+ # Use the default tracer provider
+ trace.set_tracer_provider(TracerProvider(sampler=sampler))
+ trace.get_tracer_provider().add_span_processor(
+ # Initialize the cloud tracing exporter
+ BatchExportSpanProcessor(CloudTraceSpanExporter())
+ )
+
+Generated spanner traces should now be available on `Cloud Trace `_.
+
+Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
+For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs `_
diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py
new file mode 100644
index 0000000000..60e68598e9
--- /dev/null
+++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py
@@ -0,0 +1,65 @@
+# Copyright 2020 Google LLC All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Manages OpenTelemetry trace creation and handling"""
+
+from contextlib import contextmanager
+
+from google.api_core.exceptions import GoogleAPICallError
+from google.cloud.spanner_v1.gapic import spanner_client
+
+try:
+ from opentelemetry import trace
+ from opentelemetry.trace.status import Status, StatusCanonicalCode
+ from opentelemetry.instrumentation.utils import http_status_to_canonical_code
+
+ HAS_OPENTELEMETRY_INSTALLED = True
+except ImportError:
+ HAS_OPENTELEMETRY_INSTALLED = False
+
+
+@contextmanager
+def trace_call(name, session, extra_attributes=None):
+ if not HAS_OPENTELEMETRY_INSTALLED or not session:
+ # Empty context manager. Users will have to check if the generated value is None or a span
+ yield None
+ return
+
+ tracer = trace.get_tracer(__name__)
+
+ # Set base attributes that we know for every trace created
+ attributes = {
+ "db.type": "spanner",
+ "db.url": spanner_client.SpannerClient.SERVICE_ADDRESS,
+ "db.instance": session._database.name,
+ "net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS,
+ }
+
+ if extra_attributes:
+ attributes.update(extra_attributes)
+
+ with tracer.start_as_current_span(
+ name, kind=trace.SpanKind.CLIENT, attributes=attributes
+ ) as span:
+ try:
+ yield span
+ except GoogleAPICallError as error:
+ if error.code is not None:
+ span.set_status(Status(http_status_to_canonical_code(error.code)))
+ elif error.grpc_status_code is not None:
+ span.set_status(
+ # OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes
+ Status(StatusCanonicalCode(error.grpc_status_code.value[0]))
+ )
+ raise
diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py
index e62763d7fd..7ab394b215 100644
--- a/google/cloud/spanner_v1/batch.py
+++ b/google/cloud/spanner_v1/batch.py
@@ -22,6 +22,7 @@
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
+from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
# pylint: enable=ungrouped-imports
@@ -147,12 +148,14 @@ def commit(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
- response = api.commit(
- self._session.name,
- mutations=self._mutations,
- single_use_transaction=txn_options,
- metadata=metadata,
- )
+ trace_attributes = {"num_mutations": len(self._mutations)}
+ with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
+ response = api.commit(
+ self._session.name,
+ mutations=self._mutations,
+ single_use_transaction=txn_options,
+ metadata=metadata,
+ )
self.committed = _pb_timestamp_to_datetime(response.commit_timestamp)
return self.committed
diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py
index a84aaa7c6d..b3a1b7e6d8 100644
--- a/google/cloud/spanner_v1/session.py
+++ b/google/cloud/spanner_v1/session.py
@@ -26,6 +26,7 @@
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
+from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
import random
# pylint: enable=ungrouped-imports
@@ -114,7 +115,11 @@ def create(self):
kw = {}
if self._labels:
kw = {"session": {"labels": self._labels}}
- session_pb = api.create_session(self._database.name, metadata=metadata, **kw)
+
+ with trace_call("CloudSpanner.CreateSession", self, self._labels):
+ session_pb = api.create_session(
+ self._database.name, metadata=metadata, **kw
+ )
self._session_id = session_pb.name.split("/")[-1]
def exists(self):
@@ -130,10 +135,16 @@ def exists(self):
return False
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
- try:
- api.get_session(self.name, metadata=metadata)
- except NotFound:
- return False
+
+ with trace_call("CloudSpanner.GetSession", self) as span:
+ try:
+ api.get_session(self.name, metadata=metadata)
+ if span:
+ span.set_attribute("session_found", True)
+ except NotFound:
+ if span:
+ span.set_attribute("session_found", False)
+ return False
return True
@@ -150,8 +161,8 @@ def delete(self):
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
-
- api.delete_session(self.name, metadata=metadata)
+ with trace_call("CloudSpanner.DeleteSession", self):
+ api.delete_session(self.name, metadata=metadata)
def ping(self):
"""Ping the session to keep it alive by executing "SELECT 1".
diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py
index dcb6e32d88..0b5ee1d894 100644
--- a/google/cloud/spanner_v1/snapshot.py
+++ b/google/cloud/spanner_v1/snapshot.py
@@ -30,9 +30,10 @@
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1.streamed import StreamedResultSet
from google.cloud.spanner_v1.types import PartitionOptions
+from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
-def _restart_on_unavailable(restart):
+def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
"""Restart iteration after :exc:`.ServiceUnavailable`.
:type restart: callable
@@ -40,7 +41,8 @@ def _restart_on_unavailable(restart):
"""
resume_token = b""
item_buffer = []
- iterator = restart()
+ with trace_call(trace_name, session, attributes):
+ iterator = restart()
while True:
try:
for item in iterator:
@@ -50,7 +52,8 @@ def _restart_on_unavailable(restart):
break
except ServiceUnavailable:
del item_buffer[:]
- iterator = restart(resume_token=resume_token)
+ with trace_call(trace_name, session, attributes):
+ iterator = restart(resume_token=resume_token)
continue
if len(item_buffer) == 0:
@@ -143,7 +146,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
metadata=metadata,
)
- iterator = _restart_on_unavailable(restart)
+ trace_attributes = {"table_id": table, "columns": columns}
+ iterator = _restart_on_unavailable(
+ restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes
+ )
self._read_request_count += 1
@@ -243,7 +249,13 @@ def execute_sql(
timeout=timeout,
)
- iterator = _restart_on_unavailable(restart)
+ trace_attributes = {"db.statement": sql}
+ iterator = _restart_on_unavailable(
+ restart,
+ "CloudSpanner.ReadWriteTransaction",
+ self._session,
+ trace_attributes,
+ )
self._read_request_count += 1
self._execute_sql_count += 1
@@ -309,16 +321,20 @@ def partition_read(
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
)
- response = api.partition_read(
- session=self._session.name,
- table=table,
- columns=columns,
- key_set=keyset._to_pb(),
- transaction=transaction,
- index=index,
- partition_options=partition_options,
- metadata=metadata,
- )
+ trace_attributes = {"table_id": table, "columns": columns}
+ with trace_call(
+ "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
+ ):
+ response = api.partition_read(
+ session=self._session.name,
+ table=table,
+ columns=columns,
+ key_set=keyset._to_pb(),
+ transaction=transaction,
+ index=index,
+ partition_options=partition_options,
+ metadata=metadata,
+ )
return [partition.partition_token for partition in response.partitions]
@@ -385,15 +401,21 @@ def partition_query(
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
)
- response = api.partition_query(
- session=self._session.name,
- sql=sql,
- transaction=transaction,
- params=params_pb,
- param_types=param_types,
- partition_options=partition_options,
- metadata=metadata,
- )
+ trace_attributes = {"db.statement": sql}
+ with trace_call(
+ "CloudSpanner.PartitionReadWriteTransaction",
+ self._session,
+ trace_attributes,
+ ):
+ response = api.partition_query(
+ session=self._session.name,
+ sql=sql,
+ transaction=transaction,
+ params=params_pb,
+ param_types=param_types,
+ partition_options=partition_options,
+ metadata=metadata,
+ )
return [partition.partition_token for partition in response.partitions]
@@ -515,8 +537,9 @@ def begin(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_selector = self._make_txn_selector()
- response = api.begin_transaction(
- self._session.name, txn_selector.begin, metadata=metadata
- )
+ with trace_call("CloudSpanner.BeginTransaction", self._session):
+ response = api.begin_transaction(
+ self._session.name, txn_selector.begin, metadata=metadata
+ )
self._transaction_id = response.id
return self._transaction_id
diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py
index 3c1abc7326..40116a9bbb 100644
--- a/google/cloud/spanner_v1/transaction.py
+++ b/google/cloud/spanner_v1/transaction.py
@@ -26,6 +26,7 @@
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
from google.cloud.spanner_v1.snapshot import _SnapshotBase
from google.cloud.spanner_v1.batch import _BatchBase
+from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
class Transaction(_SnapshotBase, _BatchBase):
@@ -95,9 +96,10 @@ def begin(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
- response = api.begin_transaction(
- self._session.name, txn_options, metadata=metadata
- )
+ with trace_call("CloudSpanner.BeginTransaction", self._session):
+ response = api.begin_transaction(
+ self._session.name, txn_options, metadata=metadata
+ )
self._transaction_id = response.id
return self._transaction_id
@@ -107,7 +109,8 @@ def rollback(self):
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
- api.rollback(self._session.name, self._transaction_id, metadata=metadata)
+ with trace_call("CloudSpanner.Rollback", self._session):
+ api.rollback(self._session.name, self._transaction_id, metadata=metadata)
self.rolled_back = True
del self._session._transaction
@@ -123,12 +126,14 @@ def commit(self):
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
- response = api.commit(
- self._session.name,
- mutations=self._mutations,
- transaction_id=self._transaction_id,
- metadata=metadata,
- )
+ trace_attributes = {"num_mutations": len(self._mutations)}
+ with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
+ response = api.commit(
+ self._session.name,
+ mutations=self._mutations,
+ transaction_id=self._transaction_id,
+ metadata=metadata,
+ )
self.committed = _pb_timestamp_to_datetime(response.commit_timestamp)
del self._session._transaction
return self.committed
@@ -212,17 +217,21 @@ def execute_update(
default_query_options = database._instance._client._query_options
query_options = _merge_query_options(default_query_options, query_options)
- response = api.execute_sql(
- self._session.name,
- dml,
- transaction=transaction,
- params=params_pb,
- param_types=param_types,
- query_mode=query_mode,
- query_options=query_options,
- seqno=seqno,
- metadata=metadata,
- )
+ trace_attributes = {"db.statement": dml}
+ with trace_call(
+ "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes
+ ):
+ response = api.execute_sql(
+ self._session.name,
+ dml,
+ transaction=transaction,
+ params=params_pb,
+ param_types=param_types,
+ query_mode=query_mode,
+ query_options=query_options,
+ seqno=seqno,
+ metadata=metadata,
+ )
return response.stats.row_count_exact
def batch_update(self, statements):
@@ -268,13 +277,18 @@ def batch_update(self, statements):
self._execute_sql_count + 1,
)
- response = api.execute_batch_dml(
- session=self._session.name,
- transaction=transaction,
- statements=parsed,
- seqno=seqno,
- metadata=metadata,
- )
+ trace_attributes = {
+ # Get just the queries from the DML statement batch
+ "db.statement": ";".join([statement["sql"] for statement in parsed])
+ }
+ with trace_call("CloudSpanner.DMLTransaction", self._session, trace_attributes):
+ response = api.execute_batch_dml(
+ session=self._session.name,
+ transaction=transaction,
+ statements=parsed,
+ seqno=seqno,
+ metadata=metadata,
+ )
row_counts = [
result_set.stats.row_count_exact for result_set in response.result_sets
]
diff --git a/noxfile.py b/noxfile.py
index ee0e4c8b78..91de61a9de 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -66,6 +66,13 @@ def lint_setup_py(session):
def default(session):
# Install all test dependencies, then install this package in-place.
session.install("mock", "pytest", "pytest-cov")
+
+ # Install opentelemetry dependencies if python3+
+ if session.python != "2.7":
+ session.install(
+ "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation"
+ )
+
session.install("-e", ".")
# Run py.test against the unit tests.
@@ -115,6 +122,12 @@ def system(session):
# virtualenv's dist-packages.
session.install("mock", "pytest")
+ # Install opentelemetry dependencies if not 2.7
+ if session.python != "2.7":
+ session.install(
+ "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation"
+ )
+
session.install("-e", ".")
session.install("-e", "test_utils/")
diff --git a/tests/_helpers.py b/tests/_helpers.py
new file mode 100644
index 0000000000..6ebc4bb374
--- /dev/null
+++ b/tests/_helpers.py
@@ -0,0 +1,50 @@
+import unittest
+import mock
+
+try:
+ from opentelemetry import trace as trace_api
+ from opentelemetry.trace.status import StatusCanonicalCode
+
+ from opentelemetry.sdk.trace import TracerProvider, export
+ from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
+ InMemorySpanExporter,
+ )
+
+ HAS_OPENTELEMETRY_INSTALLED = True
+except ImportError:
+ HAS_OPENTELEMETRY_INSTALLED = False
+
+ StatusCanonicalCode = mock.Mock()
+
+
+class OpenTelemetryBase(unittest.TestCase):
+ def setUp(self):
+ if HAS_OPENTELEMETRY_INSTALLED:
+ self.original_tracer_provider = trace_api.get_tracer_provider()
+ self.tracer_provider = TracerProvider()
+ self.memory_exporter = InMemorySpanExporter()
+ span_processor = export.SimpleExportSpanProcessor(self.memory_exporter)
+ self.tracer_provider.add_span_processor(span_processor)
+ trace_api.set_tracer_provider(self.tracer_provider)
+
+ def tearDown(self):
+ if HAS_OPENTELEMETRY_INSTALLED:
+ trace_api.set_tracer_provider(self.original_tracer_provider)
+
+ def assertNoSpans(self):
+ if HAS_OPENTELEMETRY_INSTALLED:
+ span_list = self.memory_exporter.get_finished_spans()
+ self.assertEqual(len(span_list), 0)
+
+ def assertSpanAttributes(
+ self, name, status=StatusCanonicalCode.OK, attributes=None, span=None
+ ):
+ if HAS_OPENTELEMETRY_INSTALLED:
+ if not span:
+ span_list = self.memory_exporter.get_finished_spans()
+ self.assertEqual(len(span_list), 1)
+ span = span_list[0]
+
+ self.assertEqual(span.name, name)
+ self.assertEqual(span.status.canonical_code, status)
+ self.assertEqual(span.attributes, attributes)
diff --git a/tests/system/test_system.py b/tests/system/test_system.py
index 9fde7db0c3..7779769c8f 100644
--- a/tests/system/test_system.py
+++ b/tests/system/test_system.py
@@ -52,6 +52,7 @@
from test_utils.retry import RetryResult
from test_utils.system import unique_resource_id
from tests._fixtures import DDL_STATEMENTS
+from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED
CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None
@@ -67,6 +68,12 @@
COUNTERS_TABLE = "counters"
COUNTERS_COLUMNS = ("name", "value")
+BASE_ATTRIBUTES = {
+ "db.type": "spanner",
+ "db.url": "spanner.googleapis.com:443",
+ "net.host.name": "spanner.googleapis.com:443",
+}
+
_STATUS_CODE_TO_GRPC_STATUS_CODE = {
member.value[0]: member for member in grpc.StatusCode
}
@@ -726,7 +733,7 @@ def test_list_backups(self):
NANO_TIME = DatetimeWithNanoseconds(1995, 8, 31, nanosecond=987654321)
POS_INF = float("+inf")
NEG_INF = float("-inf")
-OTHER_NAN, = struct.unpack("