Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added batch spans sending #52

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,9 @@ your Zipkin collector is running at localhost:9411.
import requests

def http_transport(encoded_span):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment describing what type/format encoded_span is expected to be would be useful.

# The collector expects a thrift-encoded list of spans. Instead of
# decoding and re-encoding the already thrift-encoded message, we can just
# add header bytes that specify that what follows is a list of length 1.
body = '\x0c\x00\x00\x00\x01' + encoded_span
requests.post(
'http://localhost:9411/api/v1/spans',
data=body,
data=encoded_span,
headers={'Content-Type': 'application/x-thrift'},
)
```
Expand Down
66 changes: 41 additions & 25 deletions py_zipkin/logging_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from py_zipkin.thrift import binary_annotation_list_builder
from py_zipkin.thrift import copy_endpoint_with_new_service_name
from py_zipkin.thrift import create_span
from py_zipkin.thrift import thrift_obj_in_bytes
from py_zipkin.thrift import thrift_objs_in_bytes
from py_zipkin.util import generate_random_64bit_string


Expand Down Expand Up @@ -86,6 +86,7 @@ def log_spans(self):
('cs' and 'cr') annotations.
"""
if self.zipkin_attrs.is_sampled:
span_sender = ZipkinBatchSender(self.transport_handler)
end_timestamp = time.time()
# Collect additional annotations from the logging handler
annotations_by_span_id = defaultdict(dict)
Expand Down Expand Up @@ -132,14 +133,13 @@ def log_spans(self):
if span.get('sa_binary_annotations'):
thrift_binary_annotations += span['sa_binary_annotations']

log_span(
span_sender.add_span(
span_id=span_id,
parent_span_id=parent_span_id,
trace_id=self.zipkin_attrs.trace_id,
span_name=span['span_name'],
annotations=thrift_annotations,
binary_annotations=thrift_binary_annotations,
transport_handler=self.transport_handler,
timestamp_s=timestamp,
duration_s=duration,
)
Expand Down Expand Up @@ -180,7 +180,7 @@ def log_spans(self):
else:
timestamp = duration = None

log_span(
span_sender.add_span(
span_id=self.zipkin_attrs.span_id,
parent_span_id=self.zipkin_attrs.parent_span_id,
trace_id=self.zipkin_attrs.trace_id,
Expand All @@ -189,8 +189,8 @@ def log_spans(self):
binary_annotations=thrift_binary_annotations,
timestamp_s=timestamp,
duration_s=duration,
transport_handler=self.transport_handler,
)
span_sender.flush()


def get_local_span_timestamp_and_duration(annotations):
Expand Down Expand Up @@ -304,23 +304,16 @@ def emit(self, record):
})


def log_span(
span_id,
parent_span_id,
trace_id,
span_name,
annotations,
binary_annotations,
timestamp_s,
duration_s,
transport_handler,
):
"""Creates a span and logs it using the given transport_handler."""
# Be defensive about the lack of a transport handler
if not transport_handler:
return

span = create_span(
class ZipkinBatchSender(object):

MAX_PORTION_SIZE = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this configurable?


def __init__(self, transport_handler):
self.transport_handler = transport_handler
self.queue = []

def add_span(
self,
span_id,
parent_span_id,
trace_id,
Expand All @@ -329,6 +322,29 @@ def log_span(
binary_annotations,
timestamp_s,
duration_s,
)
message = thrift_obj_in_bytes(span)
transport_handler(message)
):
if not self.transport_handler:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe this check is unnecessary. a logging context is only created if perform_logging is True, which is only True if (self.zipkin_attrs or self.sampling_rate is not None) which requires a check for self.transport_handler.

return

thrift_span = create_span(
span_id,
parent_span_id,
trace_id,
span_name,
annotations,
binary_annotations,
timestamp_s,
duration_s,
)
self._add_span_to_queue(thrift_span)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm thinking we don't need a separate function just to add it to the queue. i'm ok with _add_span_to_queue's logic be inside add_span


def _add_span_to_queue(self, thrift_span):
self.queue.append(thrift_span)
if len(self.queue) >= self.MAX_PORTION_SIZE:
self.flush()

def flush(self):
if self.transport_handler and len(self.queue) > 0:
message = thrift_objs_in_bytes(self.queue)
self.transport_handler(message)
self.queue = []
21 changes: 13 additions & 8 deletions py_zipkin/thrift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import struct

import thriftpy
from thriftpy.protocol.binary import TBinaryProtocol
from thriftpy.protocol.binary import TBinaryProtocol, write_list_begin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We split imports into one per line. If you run pre-commit, it should fix this. I believe pre-commit should be run as part of tox.

from thriftpy.thrift import TType
from thriftpy.transport import TMemoryBuffer

from py_zipkin.util import unsigned_hex_to_signed_int
Expand Down Expand Up @@ -167,13 +168,17 @@ def create_span(
return zipkin_core.Span(**span_dict)


def thrift_obj_in_bytes(thrift_obj): # pragma: no cover
def thrift_objs_in_bytes(thrift_obj_list): # pragma: no cover
"""
Returns TBinaryProtocol encoded Thrift object.
Returns TBinaryProtocol encoded Thrift objects.

:param thrift_obj: thrift object to encode
:returns: thrift object in TBinaryProtocol format bytes.
:param thrift_obj_list: thrift objects list to encode
:returns: thrift objects in TBinaryProtocol format bytes.
"""
trans = TMemoryBuffer()
thrift_obj.write(TBinaryProtocol(trans))
return bytes(trans.getvalue())
transport = TMemoryBuffer()
protocol = TBinaryProtocol(transport)
write_list_begin(transport, TType.STRUCT, len(thrift_obj_list))
for thrift_obj in thrift_obj_list:
thrift_obj.write(protocol)

return bytes(transport.getvalue())
27 changes: 19 additions & 8 deletions tests/integration/zipkin_integration_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from thriftpy.protocol.binary import TBinaryProtocol
from thriftpy.protocol.binary import TBinaryProtocol, read_list_begin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 separate import lines for this

from thriftpy.transport import TMemoryBuffer

from py_zipkin import zipkin
Expand Down Expand Up @@ -27,10 +27,19 @@ def mock_transport_handler(message):


def _decode_binary_thrift_obj(obj):
spans = _decode_binary_thrift_objs(obj)
return spans[0]


def _decode_binary_thrift_objs(obj):
spans = []
trans = TMemoryBuffer(obj)
span = zipkin_core.Span()
span.read(TBinaryProtocol(trans))
return span
_, size = read_list_begin(trans)
for _ in range(size):
span = zipkin_core.Span()
span.read(TBinaryProtocol(trans))
spans.append(span)
return spans


def test_starting_zipkin_trace_with_sampling_rate(
Expand Down Expand Up @@ -101,8 +110,9 @@ def test_span_inside_trace(mock_logger):
):
pass

root_span = _decode_binary_thrift_obj(mock_logs[1])
nested_span = _decode_binary_thrift_obj(mock_logs[0])
spans = _decode_binary_thrift_objs(mock_logs[0])
nested_span = spans[0]
root_span = spans[1]
assert nested_span.name == 'nested_span'
assert nested_span.annotations[0].host.service_name == 'nested_service'
assert nested_span.parent_id == root_span.id
Expand Down Expand Up @@ -247,8 +257,9 @@ def test_log_debug_for_new_span(mock_logger):
})
pass

logged_span = _decode_binary_thrift_obj(mock_logs[0])
root_span = _decode_binary_thrift_obj(mock_logs[1])
spans = _decode_binary_thrift_objs(mock_logs[0])
logged_span = spans[0]
root_span = spans[1]
assert logged_span.name == 'logged_name'
assert logged_span.annotations[0].host.service_name == 'logged_service_name'
assert logged_span.parent_id == root_span.id
Expand Down
Loading