-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add OpenTelemetry integration #149
Changes from all commits
6ebdf8e
84b3a12
b629f9d
a2d5037
ce1bdda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
# Copyright 2020, Google LLC All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import logging | ||
from contextlib import contextmanager | ||
|
||
from google.api_core.exceptions import GoogleAPICallError | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
try: | ||
from opentelemetry import trace | ||
from opentelemetry import propagators | ||
from opentelemetry.trace import SpanContext | ||
from opentelemetry.trace import get_current_span | ||
from opentelemetry.trace import set_span_in_context | ||
from opentelemetry.trace.status import Status | ||
from opentelemetry.instrumentation.utils import http_status_to_canonical_code | ||
|
||
USE_OPENTELEMETRY = True | ||
except ImportError: | ||
_LOGGER.info( | ||
"This service supports OpenTelemetry, but OpenTelemetry could" | ||
"not be imported. To use OpenTelemetry, please install the" | ||
"opentelemetry-api and opentelemetry-instrumentation" | ||
"pip modules. See also" | ||
"https://opentelemetry-python.readthedocs.io/en/stable/getting-started.html" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no spaces at the end of these lines, which will cause words to be displayed together (e.g. couldnot, rather than could not). |
||
USE_OPENTELEMETRY = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a suggestion: In addition to the import try, wouldn't it be possible to use an environment variable for explicitly disabling opentelemetry? That way the tests below would not need to mess with sys.modules. |
||
|
||
|
||
@contextmanager | ||
def create_span(span_name, attributes=None, parent=None): | ||
""" Create a new OpenTelemetry span | ||
|
||
Args: | ||
span_name (str): the name of the new span | ||
attributes Optional[dict]: A dictionary | ||
containing all attributes to add to a span. Defaults to None. | ||
parent Optional[dict]: A dictionary | ||
containing the attributes of a parent span's span | ||
context. Defaults to None. | ||
|
||
Yields: | ||
[opentelemetry.trace.Span]: The newly created span, or None if | ||
OpenTelemetry could not be imported | ||
""" | ||
|
||
# OpenTelemetry could not be imported. | ||
if not USE_OPENTELEMETRY: | ||
yield None | ||
return | ||
|
||
tracer = trace.get_tracer(__name__) | ||
|
||
if parent is not None: | ||
# Form the parent's context from the parent dict provided | ||
try: | ||
parent_span_context = SpanContext(**parent) | ||
except TypeError: | ||
_LOGGER.warning( | ||
"A parent span was provided but it could not be" | ||
"converted into a SpanContext. Ensure that the" | ||
"parent is a mapping with at least a trace_id, span_id" | ||
"and is_remote keys." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue from above, where spaces are missing at the end of the lines. |
||
) | ||
parent_span_context = None | ||
else: | ||
parent_span_context = None | ||
|
||
# Create a new span and yield it | ||
with tracer.start_as_current_span( | ||
span_name, attributes=attributes, parent=parent_span_context | ||
) as span: | ||
try: | ||
yield span | ||
except GoogleAPICallError as error: | ||
if error.code is not None: | ||
span.set_status(Status(http_status_to_canonical_code(error.code))) | ||
raise |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ | |
import pkg_resources | ||
import threading | ||
import time | ||
import sys | ||
import json | ||
|
||
import grpc | ||
import six | ||
|
@@ -29,6 +31,7 @@ | |
|
||
from google.cloud.pubsub_v1 import _gapic | ||
from google.cloud.pubsub_v1 import types | ||
from google.cloud.pubsub_v1.opentelemetry_tracing import create_span | ||
from google.cloud.pubsub_v1.gapic import publisher_client | ||
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport | ||
from google.cloud.pubsub_v1.publisher import exceptions | ||
|
@@ -369,38 +372,54 @@ def publish(self, topic, data, ordering_key="", **attrs): | |
"be sent as text strings." | ||
) | ||
|
||
# Create the Pub/Sub message object. | ||
message = types.PubsubMessage( | ||
data=data, ordering_key=ordering_key, attributes=attrs | ||
) | ||
span_name = "{} publisher".format(topic) | ||
span_attributes = {"data": data.decode()} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the argument for doing this? It seems to me that, best case, it results in the duplication of data in the message. But in the worst case it actually leaks sensitive information into observability stack, just like logging the output of a SQL query would. |
||
with create_span(span_name, attributes=span_attributes) as span: | ||
if span is not None: | ||
|
||
# Messages should go through flow control to prevent excessive | ||
# queuing on the client side (depending on the settings). | ||
try: | ||
self._flow_controller.add(message) | ||
except exceptions.FlowControlLimitError as exc: | ||
future = futures.Future() | ||
future.set_exception(exc) | ||
return future | ||
if "googclient_OpenTelemetrySpanContext" in attrs: | ||
_LOGGER.warning( | ||
"googclient_OpenTelemetrySpanContext set on message" | ||
"as an attribute, but will be overridden." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No space between "message" and "as". |
||
) | ||
|
||
def on_publish_done(future): | ||
self._flow_controller.release(message) | ||
# Add the context of the span as an attribute | ||
attrs["googclient_OpenTelemetrySpanContext"] = json.dumps( | ||
sethmaxwl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
span.get_context().__dict__ | ||
) | ||
|
||
with self._batch_lock: | ||
if self._is_stopped: | ||
raise RuntimeError("Cannot publish on a stopped publisher.") | ||
# Create the Pub/Sub message object. | ||
message = types.PubsubMessage( | ||
data=data, ordering_key=ordering_key, attributes=attrs | ||
) | ||
|
||
sequencer = self._get_or_create_sequencer(topic, ordering_key) | ||
# Messages should go through flow control to prevent excessive | ||
# queuing on the client side (depending on the settings). | ||
try: | ||
self._flow_controller.add(message) | ||
except exceptions.FlowControlLimitError as exc: | ||
future = futures.Future() | ||
future.set_exception(exc) | ||
return future | ||
|
||
# Delegate the publishing to the sequencer. | ||
future = sequencer.publish(message) | ||
future.add_done_callback(on_publish_done) | ||
def on_publish_done(future): | ||
self._flow_controller.release(message) | ||
|
||
# Create a timer thread if necessary to enforce the batching | ||
# timeout. | ||
self._ensure_commit_timer_runs_no_lock() | ||
with self._batch_lock: | ||
if self._is_stopped: | ||
raise RuntimeError("Cannot publish on a stopped publisher.") | ||
|
||
sequencer = self._get_or_create_sequencer(topic, ordering_key) | ||
|
||
# Delegate the publishing to the sequencer. | ||
future = sequencer.publish(message) | ||
future.add_done_callback(on_publish_done) | ||
|
||
# Create a timer thread if necessary to enforce the batching | ||
# timeout. | ||
self._ensure_commit_timer_runs_no_lock() | ||
|
||
return future | ||
return future | ||
|
||
def ensure_cleanup_and_commit_timer_runs(self): | ||
""" Ensure a cleanup/commit timer thread is running. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -279,4 +279,17 @@ def _merge_dict(d1, d2): | |
# ---------------------------------------------------------------------------- | ||
python.py_samples() | ||
|
||
# ---------------------------------------------------------------------------- | ||
# Additional unit test dependincies | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo: dependencies. |
||
# ---------------------------------------------------------------------------- | ||
s.replace( | ||
"noxfile.py", | ||
r'session\.install\("mock", "pytest", "pytest-cov"\)', | ||
"""\g<0> | ||
session.install( | ||
"mock", "pytest", "pytest-cov", | ||
"opentelemetry-api", "opentelemetry-sdk", "opentelemetry-instrumentation", | ||
)""", | ||
) | ||
|
||
s.shell.run(["nox", "-s", "blacken"], hide_output=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be using a
BatchSpanProcessor
here.SimpleExportSpanProcessor
exports all spans sequentially and will be very slow.