diff --git a/README.md b/README.md index 78276d2..2761648 100644 --- a/README.md +++ b/README.md @@ -9,14 +9,16 @@ Further information can be found on [opentracing.io](https://opentracing.io/) ## Using this library See examples in /examples directory. See opentracing [usage](https://github.com/opentracing/opentracing-python/#usage) for additional information. +It is important to consider the architecture of the application. In order for the tracer to manage spans properly, an appropriate `ScopeManager` implementation must be chosen. In most environments, the default `ThreadLocalScopeManager` will work just fine. In asynchronous frameworks, the `ContextVarsScopeManager` is a better choice. -First initialize the tracer at the application level by supplying a service name and recorder +First initialize the tracer at the application level by supplying a service name and span recorder ```python import opentracing from haystack import HaystackAgentRecorder from haystack import HaystackTracer -opentracing.tracer = HaystackTracer("a_service", HaystackAgentRecorder()) +tracer = HaystackTracer("a_service", HaystackAgentRecorder()) +opentracing.set_global_tracer(tracer) ``` Starting a span can be done as a managed resource using `start_active_span()` diff --git a/examples/main.py b/examples/main.py index 20e8346..bccdf1f 100644 --- a/examples/main.py +++ b/examples/main.py @@ -1,32 +1,93 @@ import opentracing -import time import logging -from opentracing.ext import tags +import time +from opentracing import tags from haystack import HaystackTracer -from haystack import AsyncHttpRecorder from haystack import LoggerRecorder -from haystack.text_propagator import TextPropagator -from haystack.propagator import PropagatorOpts -recorder = LoggerRecorder() -logging.basicConfig(level=logging.DEBUG) +def setup_tracer(): + global recorder + recorder = LoggerRecorder() -def act_as_remote_service(headers): - # remote service would have it"s own tracer - with HaystackTracer("Service-B", recorder,) as tracer: - opentracing.tracer = tracer + # instantiate a haystack tracer for this service and set a common tag which applies to all traces + tracer = HaystackTracer("Service-A", + recorder, + common_tags={"app.version": "1234"}) - # ---ability to use custom propagation headers if needed--- - # prop_opts = PropagatorOpts("X-Trace-ID", "X-Span-ID", "X-Parent-Span") - # opentracing.tracer.register_propagator(opentracing.Format.HTTP_HEADERS, TextPropagator(prop_opts)) + # now set the global tracer, so we can reference it with opentracing.tracer anywhere in our app + opentracing.set_global_tracer(tracer) + +def handle_request(request_body): + logging.info(f"handling new request - {request_body}") + + # this next line does a few things.. namely, it starts a new scope (which contains the span) to represent + # the scope of this "work". In this case, it should represent the work involved in processing the entire request + with opentracing.tracer.start_active_span("a_controller_method") as parent_scope: + # once within the context of an active span, there are three different ways to assign additional info or + # or attributes to the span + """ + First we'll add some tags to the span + Tags are key:value pairs that enable user-defined annotation of spans in order to query, filter, and + comprehend trace data + Tags have semantic conventions, see https://opentracing.io/specification/conventions/ + *tags do NOT propagate to child spans + """ + parent_scope.span.set_tag(tags.HTTP_URL, "http://localhost/mocksvc") + parent_scope.span.set_tag(tags.HTTP_METHOD, "GET") + parent_scope.span.set_tag(tags.SPAN_KIND, "server") + + """ + Next we'll add some baggage to the span. + Baggage carries data across process boundaries.. aka it DOES propagate to child spans + """ + parent_scope.span.set_baggage_item("business_id", "1234") + + """ + Next lets assume you need to authenticate the client making the request + """ + with opentracing.tracer.start_active_span("authenticate"): + time.sleep(.25) # fake doing some authentication work.. + + """ + Finally, we'll add a log event to the request level span. + Logs are key:value pairs that are useful for capturing timed log messages and other + debugging or informational output from the application itself. Logs may be useful for + documenting a specific moment or event within the span (in contrast to tags which + should apply to the span regardless of time). + """ + parent_scope.span.log_kv( + { + "some_string_value": "foobar", + "an_int_value": 42, + "a_float_value": 4.2, + "a_bool_value": True, + "an_obj_as_value": { + "ok": "hmm", + "blah": 4324 + } + }) + + try: + """ + Now lets say that as part of processing this request, we need to invoke some downstream service + """ + make_a_downstream_request() + except Exception: + # if that fails, we'll tag the request-scoped span with an error so we have success/fail metrics in haystack + parent_scope.span.set_tag(tags.ERROR, True) + + +def act_as_remote_service(headers): + # remote service would have it"s own tracer + with HaystackTracer("Service-B", recorder) as tracer: # simulate network transfer delay time.sleep(.25) # now as-if this was executing on the remote service, extract the parent span ctx from headers - upstream_span_ctx = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, headers) - with opentracing.tracer.start_active_span("controller_method", child_of=upstream_span_ctx) as parent_scope: + upstream_span_ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers) + with tracer.start_active_span("controller_method", child_of=upstream_span_ctx) as parent_scope: parent_scope.span.set_tag(tags.SPAN_KIND, "server") # simulate downstream service doing some work before replying time.sleep(1) @@ -35,65 +96,38 @@ def act_as_remote_service(headers): def make_a_downstream_request(): # create a child span representing the downstream request from current span. # Behind the scenes this uses the scope_manger to access the current active - # span and create a child of it. + # span (which would be our request-scoped span called "a_controller_method" and create a child of it. with opentracing.tracer.start_active_span("downstream_req") as child_scope: - child_scope.span.set_tag(tags.SPAN_KIND, "client") - # add some baggage (i.e. something that should propagate across - # process boundaries) - child_scope.span.set_baggage_item("baggage-item", "baggage-item-value") - - # carrier here represents http headers - carrier = {} - opentracing.tracer.inject(child_scope.span.context, opentracing.Format.HTTP_HEADERS, carrier) - act_as_remote_service(carrier) + # In order for the downstream client to use this trace as a parent, we must propagate the current span context. + # This is done by calling .inject() on the tracer + headers = {} + opentracing.tracer.inject(child_scope.span.context, opentracing.Format.HTTP_HEADERS, headers) + act_as_remote_service(headers) # process the response from downstream time.sleep(.15) -def use_http_recorder(): - endpoint = "http://" - global recorder - recorder = AsyncHttpRecorder(collector_url=endpoint) - - def main(): """ - Represents an application/service - """ - # instantiate a tracer with app version common tag and set it - # to opentracing.tracer property - opentracing.tracer = HaystackTracer("Service-A", - recorder, - common_tags={"app.version": "1234"}) - - logging.info("mock request received") - with opentracing.tracer.start_active_span("a_controller_method") as parent_scope: + This function represents a "parent" application/service.. i.e. the originating + service of our traces in this example. - # add a tag, tags are part of a span and do not propagate - # (tags have semantic conventions, see https://opentracing.io/specification/conventions/) - parent_scope.span.set_tag(tags.HTTP_URL, "http://localhost/mocksvc") - parent_scope.span.set_tag(tags.HTTP_METHOD, "GET") - parent_scope.span.set_tag(tags.SPAN_KIND, "server") - - # doing some work.. validation, processing, etc - time.sleep(.25) - - # tag the span with some information about the processing - parent_scope.span.log_kv( - {"string": "foobar", "int": 42, "float": 4.2, "bool": True, "obj": {"ok": "hmm", "blah": 4324}}) + In this scenario, we're pretending to be a web server. + """ - make_a_downstream_request() + # at some point during application init, you'll want to instantiate the global tracer + setup_tracer() - # uncomment this line to tag the span with an error - # parent_scope.span.set_tag(tags.ERROR, True) + # here we assume the web framework invokes this method to handle the given request + handle_request("hello world") - logging.info("done in main") + # app shutdown + logging.info("done") if __name__ == "__main__": - # uncomment line below to send traces to haystack collector using http recorder - # use_http_recorder() + logging.basicConfig(level=logging.DEBUG) main() diff --git a/examples/serverless/handler.py b/examples/serverless/handler.py index 2eb1cc4..bfe1692 100644 --- a/examples/serverless/handler.py +++ b/examples/serverless/handler.py @@ -2,7 +2,7 @@ import opentracing import os from requests import RequestException -from opentracing.ext import tags +from opentracing import tags from haystack import HaystackTracer from haystack import SyncHttpRecorder @@ -25,8 +25,9 @@ common_tags = { "svc_ver": os["APP_VERSION"] } -opentracing.tracer = HaystackTracer("example-service", - recorder, common_tags=common_tags) +tracer = HaystackTracer("example-service", + recorder, common_tags=common_tags) +opentracing.set_global_tracer(tracer) def invoke_downstream(headers): diff --git a/haystack/agent_recorder.py b/haystack/agent_recorder.py index df3685b..71baa8c 100644 --- a/haystack/agent_recorder.py +++ b/haystack/agent_recorder.py @@ -8,6 +8,10 @@ class HaystackAgentRecorder(SpanRecorder): + """ + HaystackAgentRecorder is to be used with the haystack-agent described + here (https://github.com/ExpediaInc/haystack-agent) + """ def __init__(self, agent_host="haystack-agent", agent_port=35000): logger.info("Initializing the remote grpc agent recorder, connecting " @@ -15,9 +19,10 @@ def __init__(self, agent_host="haystack-agent", agent_port=35000): channel = grpc.insecure_channel(f"{agent_host}:{agent_port}") self._stub = spanAgent_pb2_grpc.SpanAgentStub(channel) - def record_span(self, span): + @staticmethod + def process_response(future): try: - grpc_response = self._stub.dispatch(span_to_proto(span)) + grpc_response = future.result() if grpc_response.code != 0: logger.error(f"Dispatch failed with {grpc_response.code} due " f"to {grpc_response.error_message}") @@ -25,3 +30,7 @@ def record_span(self, span): logger.debug("Successfully submitted span to haystack-agent") except grpc.RpcError: logger.exception(f"Dispatch failed due to RPC error") + + def record_span(self, span): + future = self._stub.dispatch.future(span_to_proto(span)) + future.add_done_callback(HaystackAgentRecorder.process_response) diff --git a/haystack/http_recorder.py b/haystack/http_recorder.py index cca3302..772d1f0 100644 --- a/haystack/http_recorder.py +++ b/haystack/http_recorder.py @@ -45,7 +45,10 @@ def __init__(self, @staticmethod def get_json_payload(span): json_span = span_to_json(span) - str_span = json.dumps(json_span) + str_span = json.dumps( + json_span, + default=lambda o: f"{o.__class__.__name__} is not serializable" + ) return str_span.encode("utf-8") @staticmethod @@ -64,8 +67,11 @@ def post_payload(self, payload): f"to {e}") def record_span(self, span): - payload = self.get_json_payload(span) if self._use_json_payload \ - else self.get_binary_payload(span) + try: + payload = self.get_json_payload(span) if self._use_json_payload \ + else self.get_binary_payload(span) + except TypeError: + logger.exception("failed to convert span") self.post_payload(payload) diff --git a/haystack/util.py b/haystack/util.py index f06f8b5..229363e 100644 --- a/haystack/util.py +++ b/haystack/util.py @@ -1,7 +1,10 @@ from .span_pb2 import Span, Tag from .constants import SECONDS_TO_MICRO +from types import TracebackType import numbers import logging +import json +import traceback logger = logging.getLogger("haystack") @@ -25,27 +28,36 @@ def logs_as_list(logs): return log_list -def set_tag_value(tag, value): - if isinstance(value, numbers.Integral): - tag.vLong = value - tag.type = Tag.LONG - elif isinstance(value, str): +def set_proto_tag_value(tag, value): + if isinstance(value, str): tag.vStr = value tag.type = Tag.STRING elif isinstance(value, bool): tag.vBool = value tag.type = Tag.BOOL + elif isinstance(value, numbers.Integral): + tag.vLong = value + tag.type = Tag.LONG elif isinstance(value, float): tag.vDouble = value tag.type = Tag.DOUBLE elif isinstance(value, bytes): tag.vBytes = value tag.type = Tag.BINARY + elif isinstance(value, dict): + tag.vStr = json.dumps(value) + tag.type = Tag.STRING + elif isinstance(value, type): + tag.vStr = str(value) + tag.type = Tag.STRING + elif isinstance(value, TracebackType): + tag.vStr = str(traceback.format_tb(value)) + tag.type = Tag.STRING else: logger.error(f"Dropped tag {tag.key} due to " f"invalid value type of {type(value)}. " - f"Type must be Int, String, Bool, Float or Bytes") - tag.vStr = "" + f"Type must be Int, String, Bool, Float, Dict or Bytes") + tag.vStr = f"Unserializable object type: {str(type(value))}" tag.type = Tag.STRING @@ -53,17 +65,17 @@ def add_proto_tags(span_record, tags): for key, value in tags.items(): tag = span_record.tags.add() tag.key = key - set_tag_value(tag, value) + set_proto_tag_value(tag, value) -def add_proto_logs(span_record, logs): - for log_data in logs: +def add_proto_logs(span_record, span_logs): + for log_data in span_logs: log_record = span_record.logs.add() log_record.timestamp = int(log_data.timestamp * SECONDS_TO_MICRO) for key, value in log_data.key_values.items(): tag = log_record.fields.add() tag.key = key - set_tag_value(tag, value) + set_proto_tag_value(tag, value) def span_to_proto(span): diff --git a/setup.py b/setup.py index 6f4a5b6..2bb9e59 100644 --- a/setup.py +++ b/setup.py @@ -12,11 +12,11 @@ author_email="haystack@expediagroup.com", long_description=long_description, long_description_content_type="text/markdown", - install_requires=["opentracing==2.0.0", + install_requires=["opentracing>=2.3.0,<3.0", "requests>=2.19,<3.0", - "requests-futures==0.9.9,<1.0", - "protobuf>=3.6.0,<4.0", - "grpcio>=1.18.0,<2.0]"], + "requests-futures>=0.9.9,<1.0", + "protobuf>=3.11.2,<4.0", + "grpcio>=1.26.0,<2.0]"], tests_require=["mock", "nose", "pytest", @@ -26,6 +26,7 @@ "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "Operating System :: OS Independent", "License :: OSI Approved :: Apache Software License", ],