Skip to content

Commit

Permalink
opentracing updates: ver 2.3.0 (#8)
Browse files Browse the repository at this point in the history
* opentracing updates: ver 2.3.0 w/ContextVarsScopeManager support. Global Tracer support. protospan boolean tag value bugfix

* fix linting error
  • Loading branch information
rhilfers authored and ashishagg committed Jan 6, 2020
1 parent e400a43 commit 38dcb76
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 85 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ Further information can be found on [opentracing.io](https://opentracing.io/)
## Using this library
See examples in /examples directory. See opentracing [usage](https://github.com/opentracing/opentracing-python/#usage) for additional information.

It is important to consider the architecture of the application. In order for the tracer to manage spans properly, an appropriate `ScopeManager` implementation must be chosen. In most environments, the default `ThreadLocalScopeManager` will work just fine. In asynchronous frameworks, the `ContextVarsScopeManager` is a better choice.

First initialize the tracer at the application level by supplying a service name and recorder
First initialize the tracer at the application level by supplying a service name and span recorder
```python
import opentracing
from haystack import HaystackAgentRecorder
from haystack import HaystackTracer

opentracing.tracer = HaystackTracer("a_service", HaystackAgentRecorder())
tracer = HaystackTracer("a_service", HaystackAgentRecorder())
opentracing.set_global_tracer(tracer)
```

Starting a span can be done as a managed resource using `start_active_span()`
Expand Down
154 changes: 94 additions & 60 deletions examples/main.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,93 @@
import opentracing
import time
import logging
from opentracing.ext import tags
import time
from opentracing import tags
from haystack import HaystackTracer
from haystack import AsyncHttpRecorder
from haystack import LoggerRecorder
from haystack.text_propagator import TextPropagator
from haystack.propagator import PropagatorOpts

recorder = LoggerRecorder()
logging.basicConfig(level=logging.DEBUG)

def setup_tracer():
global recorder
recorder = LoggerRecorder()

def act_as_remote_service(headers):
# remote service would have it"s own tracer
with HaystackTracer("Service-B", recorder,) as tracer:
opentracing.tracer = tracer
# instantiate a haystack tracer for this service and set a common tag which applies to all traces
tracer = HaystackTracer("Service-A",
recorder,
common_tags={"app.version": "1234"})

# ---ability to use custom propagation headers if needed---
# prop_opts = PropagatorOpts("X-Trace-ID", "X-Span-ID", "X-Parent-Span")
# opentracing.tracer.register_propagator(opentracing.Format.HTTP_HEADERS, TextPropagator(prop_opts))
# now set the global tracer, so we can reference it with opentracing.tracer anywhere in our app
opentracing.set_global_tracer(tracer)


def handle_request(request_body):
logging.info(f"handling new request - {request_body}")

# this next line does a few things.. namely, it starts a new scope (which contains the span) to represent
# the scope of this "work". In this case, it should represent the work involved in processing the entire request
with opentracing.tracer.start_active_span("a_controller_method") as parent_scope:
# once within the context of an active span, there are three different ways to assign additional info or
# or attributes to the span
"""
First we'll add some tags to the span
Tags are key:value pairs that enable user-defined annotation of spans in order to query, filter, and
comprehend trace data
Tags have semantic conventions, see https://opentracing.io/specification/conventions/
*tags do NOT propagate to child spans
"""
parent_scope.span.set_tag(tags.HTTP_URL, "http://localhost/mocksvc")
parent_scope.span.set_tag(tags.HTTP_METHOD, "GET")
parent_scope.span.set_tag(tags.SPAN_KIND, "server")

"""
Next we'll add some baggage to the span.
Baggage carries data across process boundaries.. aka it DOES propagate to child spans
"""
parent_scope.span.set_baggage_item("business_id", "1234")

"""
Next lets assume you need to authenticate the client making the request
"""
with opentracing.tracer.start_active_span("authenticate"):
time.sleep(.25) # fake doing some authentication work..

"""
Finally, we'll add a log event to the request level span.
Logs are key:value pairs that are useful for capturing timed log messages and other
debugging or informational output from the application itself. Logs may be useful for
documenting a specific moment or event within the span (in contrast to tags which
should apply to the span regardless of time).
"""
parent_scope.span.log_kv(
{
"some_string_value": "foobar",
"an_int_value": 42,
"a_float_value": 4.2,
"a_bool_value": True,
"an_obj_as_value": {
"ok": "hmm",
"blah": 4324
}
})

try:
"""
Now lets say that as part of processing this request, we need to invoke some downstream service
"""
make_a_downstream_request()
except Exception:
# if that fails, we'll tag the request-scoped span with an error so we have success/fail metrics in haystack
parent_scope.span.set_tag(tags.ERROR, True)


def act_as_remote_service(headers):
# remote service would have it"s own tracer
with HaystackTracer("Service-B", recorder) as tracer:
# simulate network transfer delay
time.sleep(.25)

# now as-if this was executing on the remote service, extract the parent span ctx from headers
upstream_span_ctx = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, headers)
with opentracing.tracer.start_active_span("controller_method", child_of=upstream_span_ctx) as parent_scope:
upstream_span_ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers)
with tracer.start_active_span("controller_method", child_of=upstream_span_ctx) as parent_scope:
parent_scope.span.set_tag(tags.SPAN_KIND, "server")
# simulate downstream service doing some work before replying
time.sleep(1)
Expand All @@ -35,65 +96,38 @@ def act_as_remote_service(headers):
def make_a_downstream_request():
# create a child span representing the downstream request from current span.
# Behind the scenes this uses the scope_manger to access the current active
# span and create a child of it.
# span (which would be our request-scoped span called "a_controller_method" and create a child of it.
with opentracing.tracer.start_active_span("downstream_req") as child_scope:

child_scope.span.set_tag(tags.SPAN_KIND, "client")

# add some baggage (i.e. something that should propagate across
# process boundaries)
child_scope.span.set_baggage_item("baggage-item", "baggage-item-value")

# carrier here represents http headers
carrier = {}
opentracing.tracer.inject(child_scope.span.context, opentracing.Format.HTTP_HEADERS, carrier)
act_as_remote_service(carrier)
# In order for the downstream client to use this trace as a parent, we must propagate the current span context.
# This is done by calling .inject() on the tracer
headers = {}
opentracing.tracer.inject(child_scope.span.context, opentracing.Format.HTTP_HEADERS, headers)
act_as_remote_service(headers)

# process the response from downstream
time.sleep(.15)


def use_http_recorder():
endpoint = "http://<replace_me>"
global recorder
recorder = AsyncHttpRecorder(collector_url=endpoint)


def main():
"""
Represents an application/service
"""
# instantiate a tracer with app version common tag and set it
# to opentracing.tracer property
opentracing.tracer = HaystackTracer("Service-A",
recorder,
common_tags={"app.version": "1234"})

logging.info("mock request received")
with opentracing.tracer.start_active_span("a_controller_method") as parent_scope:
This function represents a "parent" application/service.. i.e. the originating
service of our traces in this example.
# add a tag, tags are part of a span and do not propagate
# (tags have semantic conventions, see https://opentracing.io/specification/conventions/)
parent_scope.span.set_tag(tags.HTTP_URL, "http://localhost/mocksvc")
parent_scope.span.set_tag(tags.HTTP_METHOD, "GET")
parent_scope.span.set_tag(tags.SPAN_KIND, "server")

# doing some work.. validation, processing, etc
time.sleep(.25)

# tag the span with some information about the processing
parent_scope.span.log_kv(
{"string": "foobar", "int": 42, "float": 4.2, "bool": True, "obj": {"ok": "hmm", "blah": 4324}})
In this scenario, we're pretending to be a web server.
"""

make_a_downstream_request()
# at some point during application init, you'll want to instantiate the global tracer
setup_tracer()

# uncomment this line to tag the span with an error
# parent_scope.span.set_tag(tags.ERROR, True)
# here we assume the web framework invokes this method to handle the given request
handle_request("hello world")

logging.info("done in main")
# app shutdown
logging.info("done")


if __name__ == "__main__":
# uncomment line below to send traces to haystack collector using http recorder
# use_http_recorder()
logging.basicConfig(level=logging.DEBUG)
main()
7 changes: 4 additions & 3 deletions examples/serverless/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import opentracing
import os
from requests import RequestException
from opentracing.ext import tags
from opentracing import tags
from haystack import HaystackTracer
from haystack import SyncHttpRecorder

Expand All @@ -25,8 +25,9 @@
common_tags = {
"svc_ver": os["APP_VERSION"]
}
opentracing.tracer = HaystackTracer("example-service",
recorder, common_tags=common_tags)
tracer = HaystackTracer("example-service",
recorder, common_tags=common_tags)
opentracing.set_global_tracer(tracer)


def invoke_downstream(headers):
Expand Down
13 changes: 11 additions & 2 deletions haystack/agent_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@


class HaystackAgentRecorder(SpanRecorder):
"""
HaystackAgentRecorder is to be used with the haystack-agent described
here (https://github.com/ExpediaInc/haystack-agent)
"""

def __init__(self, agent_host="haystack-agent", agent_port=35000):
logger.info("Initializing the remote grpc agent recorder, connecting "
f"at {agent_host}:{agent_port}")
channel = grpc.insecure_channel(f"{agent_host}:{agent_port}")
self._stub = spanAgent_pb2_grpc.SpanAgentStub(channel)

def record_span(self, span):
@staticmethod
def process_response(future):
try:
grpc_response = self._stub.dispatch(span_to_proto(span))
grpc_response = future.result()
if grpc_response.code != 0:
logger.error(f"Dispatch failed with {grpc_response.code} due "
f"to {grpc_response.error_message}")
else:
logger.debug("Successfully submitted span to haystack-agent")
except grpc.RpcError:
logger.exception(f"Dispatch failed due to RPC error")

def record_span(self, span):
future = self._stub.dispatch.future(span_to_proto(span))
future.add_done_callback(HaystackAgentRecorder.process_response)
12 changes: 9 additions & 3 deletions haystack/http_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def __init__(self,
@staticmethod
def get_json_payload(span):
json_span = span_to_json(span)
str_span = json.dumps(json_span)
str_span = json.dumps(
json_span,
default=lambda o: f"{o.__class__.__name__} is not serializable"
)
return str_span.encode("utf-8")

@staticmethod
Expand All @@ -64,8 +67,11 @@ def post_payload(self, payload):
f"to {e}")

def record_span(self, span):
payload = self.get_json_payload(span) if self._use_json_payload \
else self.get_binary_payload(span)
try:
payload = self.get_json_payload(span) if self._use_json_payload \
else self.get_binary_payload(span)
except TypeError:
logger.exception("failed to convert span")
self.post_payload(payload)


Expand Down
34 changes: 23 additions & 11 deletions haystack/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from .span_pb2 import Span, Tag
from .constants import SECONDS_TO_MICRO
from types import TracebackType
import numbers
import logging
import json
import traceback

logger = logging.getLogger("haystack")

Expand All @@ -25,45 +28,54 @@ def logs_as_list(logs):
return log_list


def set_tag_value(tag, value):
if isinstance(value, numbers.Integral):
tag.vLong = value
tag.type = Tag.LONG
elif isinstance(value, str):
def set_proto_tag_value(tag, value):
if isinstance(value, str):
tag.vStr = value
tag.type = Tag.STRING
elif isinstance(value, bool):
tag.vBool = value
tag.type = Tag.BOOL
elif isinstance(value, numbers.Integral):
tag.vLong = value
tag.type = Tag.LONG
elif isinstance(value, float):
tag.vDouble = value
tag.type = Tag.DOUBLE
elif isinstance(value, bytes):
tag.vBytes = value
tag.type = Tag.BINARY
elif isinstance(value, dict):
tag.vStr = json.dumps(value)
tag.type = Tag.STRING
elif isinstance(value, type):
tag.vStr = str(value)
tag.type = Tag.STRING
elif isinstance(value, TracebackType):
tag.vStr = str(traceback.format_tb(value))
tag.type = Tag.STRING
else:
logger.error(f"Dropped tag {tag.key} due to "
f"invalid value type of {type(value)}. "
f"Type must be Int, String, Bool, Float or Bytes")
tag.vStr = ""
f"Type must be Int, String, Bool, Float, Dict or Bytes")
tag.vStr = f"Unserializable object type: {str(type(value))}"
tag.type = Tag.STRING


def add_proto_tags(span_record, tags):
for key, value in tags.items():
tag = span_record.tags.add()
tag.key = key
set_tag_value(tag, value)
set_proto_tag_value(tag, value)


def add_proto_logs(span_record, logs):
for log_data in logs:
def add_proto_logs(span_record, span_logs):
for log_data in span_logs:
log_record = span_record.logs.add()
log_record.timestamp = int(log_data.timestamp * SECONDS_TO_MICRO)
for key, value in log_data.key_values.items():
tag = log_record.fields.add()
tag.key = key
set_tag_value(tag, value)
set_proto_tag_value(tag, value)


def span_to_proto(span):
Expand Down
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
author_email="[email protected]",
long_description=long_description,
long_description_content_type="text/markdown",
install_requires=["opentracing==2.0.0",
install_requires=["opentracing>=2.3.0,<3.0",
"requests>=2.19,<3.0",
"requests-futures==0.9.9,<1.0",
"protobuf>=3.6.0,<4.0",
"grpcio>=1.18.0,<2.0]"],
"requests-futures>=0.9.9,<1.0",
"protobuf>=3.11.2,<4.0",
"grpcio>=1.26.0,<2.0]"],
tests_require=["mock",
"nose",
"pytest",
Expand All @@ -26,6 +26,7 @@
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Operating System :: OS Independent",
"License :: OSI Approved :: Apache Software License",
],
Expand Down

0 comments on commit 38dcb76

Please sign in to comment.