Skip to content

Commit

Permalink
Add Zipkin reporter (#16)
Browse files Browse the repository at this point in the history
* Add Zipkin reporter

* Modify stuff

* Add comments

* Fix docs

* Add unit test

* Fix time zone issue

* Fix coverage

* Make timestamp name more readable
  • Loading branch information
liyanhui1228 authored Aug 16, 2017
1 parent 9f8bddc commit 146627e
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 35 deletions.
10 changes: 5 additions & 5 deletions trace/opencensus/trace/reporters/file_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class FileReporter(object):
def __init__(self, file_name):
self.file_name = file_name

def report(self, traces):
def report(self, trace):
"""
:type traces: dict
:param traces: Traces collected.
:type trace: dict
:param trace: Trace collected.
"""
with open(self.file_name, 'w+') as file:
traces_str = json.dumps(traces)
file.write(traces_str)
trace_str = json.dumps(trace)
file.write(trace_str)
6 changes: 3 additions & 3 deletions trace/opencensus/trace/reporters/logging_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ def __init__(self, handler=None):
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

def report(self, traces):
def report(self, trace):
"""
:type traces: dict
:param traces: Traces collected.
:param traces: Trace collected.
"""
self.logger.info(traces)
self.logger.info(trace)
12 changes: 6 additions & 6 deletions trace/opencensus/trace/reporters/print_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@


class PrintReporter(object):
def report(self, traces):
def report(self, trace):
"""Report the traces by printing it out.
:type traces: dict
:param traces: Traces collected.
:type trace: dict
:param trace: Trace collected.
:rtype: dict
:returns: Traces printed.
:returns: Trace printed.
"""
print(traces)
return traces
print(trace)
return trace
158 changes: 158 additions & 0 deletions trace/opencensus/trace/reporters/zipkin_reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Export the spans data to Zipkin Collector."""

import datetime
import json
import logging
import requests
import calendar

DEFAULT_ENDPOINT = '/api/v2/spans'
DEFAULT_HOST_NAME = 'localhost'
DEFAULT_PORT = 9411
ZIPKIN_HEADERS = {'Content-Type': 'application/json'}

ISO_DATETIME_REGEX = '%Y-%m-%dT%H:%M:%S.%fZ'

SPAN_KIND_MAP = {
0: None, # span kind unspecified
1: "SERVER",
2: "CLIENT",
}

SUCCESS_STATUS_CODE = (200, 202)


class ZipkinReporter(object):
"""Report the spans to Zipkin.
See: http://zipkin.io/zipkin-api/#
:type service_name: str
:param service_name: Service that logged an annotation in a trace.
Classifier when query for spans.
:type host_name: str
:param host_name: (Optional) The host name of the Zipkin server.
:type port: int
:param port: (Optional) The port of the Zipkin server.
:type end_point: str
:param end_point: (Optional) The path for the span reporting endpoint.
"""

def __init__(
self,
service_name,
host_name=DEFAULT_HOST_NAME,
port=DEFAULT_PORT,
endpoint=DEFAULT_ENDPOINT):
self.service_name = service_name
self.host_name = host_name
self.port = port
self.endpoint = endpoint
self.url = self.get_url

@property
def get_url(self):
return 'http://{}:{}{}'.format(
self.host_name,
self.port,
self.endpoint)

def report(self, trace):
"""Send trace to Zipkin server, default using the v1 API.
:type trace: dict
:param trace: Trace data in dictionary format.
"""
trace_id = trace.get('traceId')
spans = trace.get('spans')

try:
zipkin_spans = self.translate_to_zipkin(trace_id, spans)
result = requests.post(
url=self.url,
data=json.dumps(zipkin_spans),
headers=ZIPKIN_HEADERS)

if result.status_code not in SUCCESS_STATUS_CODE:
logging.error(
"Failed to send spans to Zipkin server! Spans are {}"
.format(zipkin_spans))
except Exception as e: # pragma: NO COVER
logging.error(e.message)

def translate_to_zipkin(self, trace_id, spans):
"""Translate the opencensus spans to zipkin spans.
:type trace_id: str
:param trace_id: Trace ID.
:type spans: list
:param spans: List of spans to be reported.
:rtype: list
:returns: List of zipkin format spans.
"""
local_endpoint = {
'serviceName': self.service_name,
'ipv4': self.host_name,
'port': self.port,
}

zipkin_spans = []

for span in spans:
# Timestamp in zipkin spans is int of microseconds.
start_datetime = datetime.datetime.strptime(
span.get('startTime'),
ISO_DATETIME_REGEX)
start_timestamp_ms = calendar.timegm(
start_datetime.timetuple()) * 1000

end_datetime = datetime.datetime.strptime(
span.get('endTime'),
ISO_DATETIME_REGEX)
end_timestamp_ms = calendar.timegm(
end_datetime.timetuple()) * 1000

duration_ms = end_timestamp_ms - start_timestamp_ms

zipkin_span = {
'traceId': trace_id,
'id': str(span.get('spanId')),
'parentId': str(span.get('parentSpanId')),
'name': span.get('name'),
'timestamp': int(round(start_timestamp_ms)),
'duration': int(round(duration_ms)),
'localEndpoint': local_endpoint,
'tags': span.get('labels'),
}

span_kind = span.get('kind')

if span_kind is not None:
kind = SPAN_KIND_MAP.get(span_kind)
# Zipkin API for span kind only accept
# enum(CLIENT|SERVER|PRODUCER|CONSUMER|Absent)
if kind is not None:
zipkin_span['kind'] = kind

zipkin_spans.append(zipkin_span)

return zipkin_spans
16 changes: 6 additions & 10 deletions trace/opencensus/trace/tracer/context_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ def end_trace(self):
return

# Send the traces when finish
traces = self.get_traces_json()
trace = self.get_trace_json()

if traces is not None:
self.reporter.report(traces)
if trace is not None:
self.reporter.report(trace)

self.cur_trace.finish()

Expand Down Expand Up @@ -198,8 +198,8 @@ def add_label_to_spans(self, label_key, label_value):
for span in self.cur_trace.spans:
span.add_label(label_key, label_value)

def get_traces_json(self):
"""Get the JSON format traces."""
def get_trace_json(self):
"""Get the JSON format trace."""
spans_list = []
for root_span in self.cur_trace.spans:
span_tree = list(iter(root_span))
Expand All @@ -216,11 +216,7 @@ def get_traces_json(self):
'spans': spans_list,
}

traces = {
'traces': [trace],
}

return traces
return trace


class NullObject(object):
Expand Down
Loading

0 comments on commit 146627e

Please sign in to comment.