Skip to content

Commit

Permalink
Adding a working propagator, adding to integrations and example
Browse files Browse the repository at this point in the history
Adding a full, end-to-end example of propagation at work in the
example application, including a test.

Adding the use of propagators into the integrations.
  • Loading branch information
toumorokoshi committed Sep 15, 2019
1 parent 876f34c commit 384496c
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from requests.sessions import Session

import opentelemetry.propagator as propagator


# NOTE: Currently we force passing a tracer. But in turn, this forces the user
# to configure a SDK before enabling this integration. In turn, this means that
Expand Down Expand Up @@ -72,6 +74,11 @@ def instrumented_request(self, method, url, *args, **kwargs):
# TODO: Propagate the trace context via headers once we have a way
# to access propagators.

headers = kwargs.get("headers", {})
propagator.get_global_propagator().inject(
tracer, type(headers).__setitem__, headers
)
kwargs["headers"] = headers
result = wrapped(self, method, url, *args, **kwargs) # *** PROCEED

span.set_attribute("http.status_code", result.status_code)
Expand Down
25 changes: 19 additions & 6 deletions ext/opentelemetry-ext-wsgi/src/opentelemetry/ext/wsgi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
"""

import functools
import typing
import wsgiref.util as wsgiref_util

from opentelemetry import trace
from opentelemetry import propagator, trace
from opentelemetry.ext.wsgi.version import __version__ # noqa


Expand All @@ -35,12 +36,9 @@ class OpenTelemetryMiddleware:
wsgi: The WSGI application callable.
"""

def __init__(self, wsgi, propagators=None):
def __init__(self, wsgi):
self.wsgi = wsgi

# TODO: implement context propagation
self.propagators = propagators

@staticmethod
def _add_request_attributes(span, environ):
span.set_attribute("component", "http")
Expand Down Expand Up @@ -87,8 +85,11 @@ def __call__(self, environ, start_response):

tracer = trace.tracer()
path_info = environ["PATH_INFO"] or "/"
parent_span = propagator.get_global_propagator().extract(
get_header_from_environ, environ
)

with tracer.start_span(path_info) as span:
with tracer.start_span(path_info, parent_span) as span:
self._add_request_attributes(span, environ)
start_response = self._create_start_response(span, start_response)

Expand All @@ -99,3 +100,15 @@ def __call__(self, environ, start_response):
finally:
if hasattr(iterable, "close"):
iterable.close()


def get_header_from_environ(
environ: dict, header_name: str
) -> typing.Optional[str]:
"""Retrieve the header value from the wsgi environ dictionary.
Returns:
A string with the header value if it exists, else None.
"""
environ_key = "HTTP_" + header_name.upper().replace("-", "_")
return [environ.get(environ_key)]
4 changes: 3 additions & 1 deletion ext/opentelemetry-ext-wsgi/tests/test_wsgi_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def validate_response(self, response, error=None):
self.assertIsNone(self.exc_info)

# Verify that start_span has been called
self.start_span.assert_called_once_with("/")
self.start_span.assert_called_once_with(
"/", trace_api.INVALID_SPAN_CONTEXT
)

def test_basic_wsgi_call(self):
app = OpenTelemetryMiddleware(simple_wsgi)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import opentelemetry.trace as trace
from opentelemetry.context.propagation import httptextformat


class TraceStateHTTPTextFormat(httptextformat.HTTPTextFormat):
"""TODO: a propagator that extracts and injects tracestate.
"""

def extract(
self, _get_from_carrier: httptextformat.Getter, _carrier: object
) -> trace.SpanContext:
return trace.INVALID_SPAN_CONTEXT

def inject(
self,
context: trace.SpanContext,
set_in_carrier: httptextformat.Setter,
carrier: object,
) -> None:
pass
77 changes: 77 additions & 0 deletions opentelemetry-api/src/opentelemetry/propagator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import typing

import opentelemetry.context.propagation.httptextformat as httptextformat
import opentelemetry.trace as trace
from opentelemetry.context.propagation.tracestatehttptextformat import (
TraceStateHTTPTextFormat,
)


class Propagator:
"""Class which encapsulates propagation of values to and from context.
In contrast to using the formatters directly, a propagator object can
help own configuration around which formatters to use, as well as
help simplify the work require for integrations to use the intended
formatters.
"""

def __init__(self, httptextformat_instance: httptextformat.HTTPTextFormat):
self._httptextformat = httptextformat_instance

def extract(
self, get_from_carrier: httptextformat.Getter, carrier: object
) -> typing.Union[trace.SpanContext, trace.Span, None]:
"""Load the parent SpanContext from values in the carrier.
Using the specified HTTPTextFormatter, the propagator will
extract a SpanContext from the carrier. If one is found,
it will be set as the parent context of the current span.
Args:
get_from_carrier: a function that can retrieve zero
or more values from the carrier. In the case that
the value does not exist, return an empty list.
carrier: and object which contains values that are
used to construct a SpanContext. This object
must be paired with an appropriate get_from_carrier
which understands how to extract a value from it.
"""
span_context = self._httptextformat.extract(get_from_carrier, carrier)
return span_context if span_context else trace.Tracer.CURRENT_SPAN

def inject(
self,
tracer: trace.Tracer,
set_in_carrier: httptextformat.Setter,
carrier: object,
) -> None:
"""Inject values from the current context into the carrier.
inject enables the propagation of values into HTTP clients or
other objects which perform an HTTP request. Implementations
should use the set_in_carrier method to set values on the
carrier.
Args:
set_in_carrier: A setter function that can set values
on the carrier.
carrier: An object that a place to define HTTP headers.
Should be paired with set_in_carrier, which should
know how to set header values on the carrier.
"""
self._httptextformat.inject(
tracer.get_current_span().get_context(), set_in_carrier, carrier
)


_PROPAGATOR = Propagator(TraceStateHTTPTextFormat())


def get_global_propagator() -> Propagator:
return _PROPAGATOR


def set_global_propagator(propagator: Propagator) -> None:
global _PROPAGATOR # pylint:disable=global-statement
_PROPAGATOR = propagator
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
This module serves as an example to integrate with flask, using
the requests library to perform downstream requests
"""
import time

import flask
import requests

import opentelemetry.ext.http_requests
from opentelemetry import trace
from opentelemetry import propagator, trace
from opentelemetry.ext.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.context.propagation.b3_format import B3Format
from opentelemetry.sdk.trace import Tracer


Expand All @@ -39,14 +39,20 @@ def configure_opentelemetry(flask_app: flask.Flask):
* processors?
* exporters?
* propagators?
"""
# Start by configuring all objects required to ensure
# a complete end to end workflow.
# the preferred implementation of these objects must be set,
# as the opentelemetry-api defines the interface with a no-op
# implementation.
trace.set_preferred_tracer_implementation(lambda _: Tracer())
# Next, we need to configure how the values that are used by
# traces and metrics are propagated (such as what specific headers
# carry this value).

# TBD: can remove once default TraceState propagators are installed.
propagator.set_global_propagator(propagator.Propagator(B3Format()))

# Integrations are the glue that binds the OpenTelemetry API
# and the frameworks and libraries that are used together, automatically
# creating Spans and propagating context as appropriate.
Expand All @@ -61,8 +67,8 @@ def configure_opentelemetry(flask_app: flask.Flask):
def hello():
# emit a trace that measures how long the
# sleep takes
with trace.tracer().start_span("sleep"):
time.sleep(0.001)
with trace.tracer().start_span("example-request"):
requests.get("http://www.example.com")
return "hello"


Expand Down
49 changes: 46 additions & 3 deletions opentelemetry-example-app/tests/test_flask_example.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,57 @@
import unittest
from unittest import mock

import requests
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse

import opentelemetry_example_app.flask_example as flask_example
from opentelemetry.sdk import trace
from opentelemetry.sdk.context.propagation import b3_format


class TestFlaskExample(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.app = flask_example.app

def setUp(self):
mocked_response = requests.models.Response()
mocked_response.status_code = 200
mocked_response.reason = "Roger that!"
self.send_patcher = mock.patch.object(
requests.Session,
"send",
autospec=True,
spec_set=True,
return_value=mocked_response,
)
self.send = self.send_patcher.start()

def tearDown(self):
self.send_patcher.stop()

def test_full_path(self):
with self.app.test_client() as client:
response = client.get("/")
assert response.data.decode() == "hello"
trace_id = trace.generate_trace_id()
# We need to use the Werkzeug test app because
# The headers are injected at the wsgi layer.
# The flask test app will not include these, and
# result in the values not propagated.
client = Client(self.app.wsgi_app, BaseResponse)
# emulate b3 headers
client.get(
"/",
headers={
"x-b3-traceid": b3_format.format_trace_id(trace_id),
"x-b3-spanid": b3_format.format_span_id(
trace.generate_span_id()
),
"x-b3-sampled": "1",
},
)
# assert the http request header was propagated through.
prepared_request = self.send.call_args[0][1]
headers = prepared_request.headers
for required_header in {"x-b3-traceid", "x-b3-spanid", "x-b3-sampled"}:
assert required_header in headers
assert headers["x-b3-traceid"] == b3_format.format_trace_id(trace_id)
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def extract(cls, get_from_carrier, carrier):
# header is set to allow.
if sampled in cls._SAMPLE_PROPAGATE_VALUES or flags == "1":
options |= trace.TraceOptions.RECORDED

return trace.SpanContext(
# trace an span ids are encoded in hex, so must be converted
trace_id=int(trace_id, 16),
Expand Down

0 comments on commit 384496c

Please sign in to comment.