From 6b2e7c215566155554947fdc2d9cef09e379de30 Mon Sep 17 00:00:00 2001 From: Dmitry Prokopchenkov Date: Tue, 18 Jul 2017 12:26:55 +0300 Subject: [PATCH 1/6] Added batch spans sending --- py_zipkin/logging_helper.py | 66 ++++++++------ py_zipkin/thrift/__init__.py | 21 +++-- tests/integration/zipkin_integration_test.py | 27 ++++-- tests/logging_helper_test.py | 91 ++++++++++++++------ 4 files changed, 136 insertions(+), 69 deletions(-) diff --git a/py_zipkin/logging_helper.py b/py_zipkin/logging_helper.py index 660a84c..9552b2e 100644 --- a/py_zipkin/logging_helper.py +++ b/py_zipkin/logging_helper.py @@ -8,7 +8,7 @@ from py_zipkin.thrift import binary_annotation_list_builder from py_zipkin.thrift import copy_endpoint_with_new_service_name from py_zipkin.thrift import create_span -from py_zipkin.thrift import thrift_obj_in_bytes +from py_zipkin.thrift import thrift_objs_in_bytes from py_zipkin.util import generate_random_64bit_string @@ -86,6 +86,7 @@ def log_spans(self): ('cs' and 'cr') annotations. """ if self.zipkin_attrs.is_sampled: + span_sender = ZipkinBatchSender(self.transport_handler) end_timestamp = time.time() # Collect additional annotations from the logging handler annotations_by_span_id = defaultdict(dict) @@ -132,14 +133,13 @@ def log_spans(self): if span.get('sa_binary_annotations'): thrift_binary_annotations += span['sa_binary_annotations'] - log_span( + span_sender.add_span( span_id=span_id, parent_span_id=parent_span_id, trace_id=self.zipkin_attrs.trace_id, span_name=span['span_name'], annotations=thrift_annotations, binary_annotations=thrift_binary_annotations, - transport_handler=self.transport_handler, timestamp_s=timestamp, duration_s=duration, ) @@ -180,7 +180,7 @@ def log_spans(self): else: timestamp = duration = None - log_span( + span_sender.add_span( span_id=self.zipkin_attrs.span_id, parent_span_id=self.zipkin_attrs.parent_span_id, trace_id=self.zipkin_attrs.trace_id, @@ -189,8 +189,8 @@ def log_spans(self): binary_annotations=thrift_binary_annotations, timestamp_s=timestamp, duration_s=duration, - transport_handler=self.transport_handler, ) + span_sender.flush() def get_local_span_timestamp_and_duration(annotations): @@ -304,23 +304,16 @@ def emit(self, record): }) -def log_span( - span_id, - parent_span_id, - trace_id, - span_name, - annotations, - binary_annotations, - timestamp_s, - duration_s, - transport_handler, -): - """Creates a span and logs it using the given transport_handler.""" - # Be defensive about the lack of a transport handler - if not transport_handler: - return - - span = create_span( +class ZipkinBatchSender(object): + + MAX_PORTION_SIZE = 100 + + def __init__(self, transport_handler): + self.transport_handler = transport_handler + self.queue = [] + + def add_span( + self, span_id, parent_span_id, trace_id, @@ -329,6 +322,29 @@ def log_span( binary_annotations, timestamp_s, duration_s, - ) - message = thrift_obj_in_bytes(span) - transport_handler(message) + ): + if not self.transport_handler: + return + + thrift_span = create_span( + span_id, + parent_span_id, + trace_id, + span_name, + annotations, + binary_annotations, + timestamp_s, + duration_s, + ) + self._add_span_to_queue(thrift_span) + + def _add_span_to_queue(self, thrift_span): + self.queue.append(thrift_span) + if len(self.queue) >= self.MAX_PORTION_SIZE: + self.flush() + + def flush(self): + if self.transport_handler and len(self.queue) > 0: + message = thrift_objs_in_bytes(self.queue) + self.transport_handler(message) + self.queue = [] diff --git a/py_zipkin/thrift/__init__.py b/py_zipkin/thrift/__init__.py index 8f2cd7c..5882a29 100644 --- a/py_zipkin/thrift/__init__.py +++ b/py_zipkin/thrift/__init__.py @@ -4,7 +4,8 @@ import struct import thriftpy -from thriftpy.protocol.binary import TBinaryProtocol +from thriftpy.protocol.binary import TBinaryProtocol, write_list_begin +from thriftpy.thrift import TType from thriftpy.transport import TMemoryBuffer from py_zipkin.util import unsigned_hex_to_signed_int @@ -167,13 +168,17 @@ def create_span( return zipkin_core.Span(**span_dict) -def thrift_obj_in_bytes(thrift_obj): # pragma: no cover +def thrift_objs_in_bytes(thrift_obj_list): # pragma: no cover """ - Returns TBinaryProtocol encoded Thrift object. + Returns TBinaryProtocol encoded Thrift objects. - :param thrift_obj: thrift object to encode - :returns: thrift object in TBinaryProtocol format bytes. + :param thrift_obj_list: thrift objects list to encode + :returns: thrift objects in TBinaryProtocol format bytes. """ - trans = TMemoryBuffer() - thrift_obj.write(TBinaryProtocol(trans)) - return bytes(trans.getvalue()) + transport = TMemoryBuffer() + protocol = TBinaryProtocol(transport) + write_list_begin(transport, TType.STRUCT, len(thrift_obj_list)) + for thrift_obj in thrift_obj_list: + thrift_obj.write(protocol) + + return bytes(transport.getvalue()) diff --git a/tests/integration/zipkin_integration_test.py b/tests/integration/zipkin_integration_test.py index d3d678f..3bfe7b5 100644 --- a/tests/integration/zipkin_integration_test.py +++ b/tests/integration/zipkin_integration_test.py @@ -1,5 +1,5 @@ import pytest -from thriftpy.protocol.binary import TBinaryProtocol +from thriftpy.protocol.binary import TBinaryProtocol, read_list_begin from thriftpy.transport import TMemoryBuffer from py_zipkin import zipkin @@ -27,10 +27,19 @@ def mock_transport_handler(message): def _decode_binary_thrift_obj(obj): + spans = _decode_binary_thrift_objs(obj) + return spans[0] + + +def _decode_binary_thrift_objs(obj): + spans = [] trans = TMemoryBuffer(obj) - span = zipkin_core.Span() - span.read(TBinaryProtocol(trans)) - return span + _, size = read_list_begin(trans) + for _ in range(size): + span = zipkin_core.Span() + span.read(TBinaryProtocol(trans)) + spans.append(span) + return spans def test_starting_zipkin_trace_with_sampling_rate( @@ -101,8 +110,9 @@ def test_span_inside_trace(mock_logger): ): pass - root_span = _decode_binary_thrift_obj(mock_logs[1]) - nested_span = _decode_binary_thrift_obj(mock_logs[0]) + spans = _decode_binary_thrift_objs(mock_logs[0]) + nested_span = spans[0] + root_span = spans[1] assert nested_span.name == 'nested_span' assert nested_span.annotations[0].host.service_name == 'nested_service' assert nested_span.parent_id == root_span.id @@ -247,8 +257,9 @@ def test_log_debug_for_new_span(mock_logger): }) pass - logged_span = _decode_binary_thrift_obj(mock_logs[0]) - root_span = _decode_binary_thrift_obj(mock_logs[1]) + spans = _decode_binary_thrift_objs(mock_logs[0]) + logged_span = spans[0] + root_span = spans[1] assert logged_span.name == 'logged_name' assert logged_span.annotations[0].host.service_name == 'logged_service_name' assert logged_span.parent_id == root_span.id diff --git a/tests/logging_helper_test.py b/tests/logging_helper_test.py index 321b648..51a4866 100644 --- a/tests/logging_helper_test.py +++ b/tests/logging_helper_test.py @@ -57,7 +57,10 @@ def test_zipkin_logging_context(time_mock, mock_logger, context): @mock.patch('py_zipkin.logging_helper.time.time', autospec=True) -@mock.patch('py_zipkin.logging_helper.log_span', autospec=True) +@mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.flush', + autospec=True) +@mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.add_span', + autospec=True) @mock.patch('py_zipkin.logging_helper.annotation_list_builder', autospec=True) @mock.patch('py_zipkin.logging_helper.binary_annotation_list_builder', @@ -66,7 +69,7 @@ def test_zipkin_logging_context(time_mock, mock_logger, context): autospec=True) def test_zipkin_logging_server_context_log_spans( copy_endpoint_mock, bin_ann_list_builder, ann_list_builder, - log_span_mock, time_mock + add_span_mock, flush_mock, time_mock ): # This lengthy function tests that the logging context properly # logs both client and server spans, while attaching extra annotations @@ -137,7 +140,7 @@ def test_zipkin_logging_server_context_log_spans( expected_client_bin_annotations = {'bann1': 'aww', 'bann2': 'yiss'} context.log_spans() - client_log_call, server_log_call = log_span_mock.call_args_list + client_log_call, server_log_call = add_span_mock.call_args_list assert server_log_call[1] == { 'span_id': server_span_id, 'parent_span_id': parent_span_id, @@ -145,7 +148,6 @@ def test_zipkin_logging_server_context_log_spans( 'span_name': 'GET /foo', 'annotations': expected_server_annotations, 'binary_annotations': expected_server_bin_annotations, - 'transport_handler': transport_handler, 'duration_s': 18, 'timestamp_s': 24, } @@ -156,14 +158,18 @@ def test_zipkin_logging_server_context_log_spans( 'span_name': client_span_name, 'annotations': expected_client_annotations, 'binary_annotations': expected_client_bin_annotations, - 'transport_handler': transport_handler, 'duration_s': 4, 'timestamp_s': 26, } + assert flush_mock.call_count == 1 + @mock.patch('py_zipkin.logging_helper.time.time', autospec=True) -@mock.patch('py_zipkin.logging_helper.log_span', autospec=True) +@mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.flush', + autospec=True) +@mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.add_span', + autospec=True) @mock.patch('py_zipkin.logging_helper.annotation_list_builder', autospec=True) @mock.patch('py_zipkin.logging_helper.binary_annotation_list_builder', @@ -172,7 +178,7 @@ def test_zipkin_logging_server_context_log_spans( autospec=True) def test_zipkin_logging_client_context_log_spans( copy_endpoint_mock, bin_ann_list_builder, ann_list_builder, - log_span_mock, time_mock + add_span_mock, flush_mock, time_mock ): # This lengthy function tests that the logging context properly # logs root client span @@ -215,7 +221,7 @@ def test_zipkin_logging_client_context_log_spans( expected_server_bin_annotations = {'k': 'v'} context.log_spans() - log_call = log_span_mock.call_args_list[0] + log_call = add_span_mock.call_args_list[0] assert log_call[1] == { 'span_id': client_span_id, 'parent_span_id': None, @@ -223,14 +229,18 @@ def test_zipkin_logging_client_context_log_spans( 'span_name': 'GET /foo', 'annotations': expected_server_annotations, 'binary_annotations': expected_server_bin_annotations, - 'transport_handler': transport_handler, 'duration_s': 18, 'timestamp_s': 24, } + assert flush_mock.call_count == 1 -@mock.patch('py_zipkin.logging_helper.log_span', autospec=True) -def test_log_span_not_called_if_not_sampled(log_span_mock): +@mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.flush', + autospec=True) +@mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.add_span', + autospec=True) +def test_batch_sender_add_span_not_called_if_not_sampled(add_span_mock, + flush_mock): attr = ZipkinAttrs( trace_id='0000000000000001', span_id='0000000000000002', @@ -249,7 +259,8 @@ def test_log_span_not_called_if_not_sampled(log_span_mock): report_root_timestamp=False, ) context.log_spans() - assert log_span_mock.call_count == 0 + assert add_span_mock.call_count == 0 + assert flush_mock.call_count == 0 def test_zipkin_handler_init(): @@ -305,11 +316,11 @@ def test_zipkin_handler_raises_exception_if_ann_and_bann_not_provided( " for foo span" == str(excinfo.value)) -@mock.patch('py_zipkin.logging_helper.thrift_obj_in_bytes', autospec=True) -def test_log_span(thrift_obj): +@mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) +def test_batch_sender_add_span(thrift_objs): # Not much logic here, so this is basically a smoke test - - logging_helper.log_span( + sender = logging_helper.ZipkinBatchSender(mock_transport_handler) + sender.add_span( span_id='0000000000000002', parent_span_id='0000000000000001', trace_id='000000000000000f', @@ -318,19 +329,42 @@ def test_log_span(thrift_obj): binary_annotations='binary_ann', timestamp_s=None, duration_s=None, - transport_handler=mock_transport_handler, ) - assert thrift_obj.call_count == 1 + sender.flush() + assert thrift_objs.call_count == 1 + + +@mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) +def test_batch_sender_add_span_many_times(thrift_obj): + sender = logging_helper.ZipkinBatchSender(mock_transport_handler) + max_portion_size = logging_helper.ZipkinBatchSender.MAX_PORTION_SIZE + for _ in range(max_portion_size * 2 + 1): + sender.add_span( + span_id='0000000000000002', + parent_span_id='0000000000000001', + trace_id='000000000000000f', + span_name='span', + annotations='ann', + binary_annotations='binary_ann', + timestamp_s=None, + duration_s=None, + ) + sender.flush() + assert thrift_obj.call_count == 3 + assert len(thrift_obj.call_args_list[0][0][0]) == max_portion_size + assert len(thrift_obj.call_args_list[1][0][0]) == max_portion_size + assert len(thrift_obj.call_args_list[2][0][0]) == 1 @mock.patch('py_zipkin.logging_helper.create_span', autospec=True) -@mock.patch('py_zipkin.logging_helper.thrift_obj_in_bytes', autospec=True) -def test_log_span_calls_transport_handler_with_correct_params( - thrift_obj, +@mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) +def test_batch_sender_flush_calls_transport_handler_with_correct_params( + thrift_objs, create_sp ): transport_handler = mock.Mock() - logging_helper.log_span( + sender = logging_helper.ZipkinBatchSender(transport_handler) + sender.add_span( span_id='0000000000000002', parent_span_id='0000000000000001', trace_id='00000000000000015', @@ -339,20 +373,21 @@ def test_log_span_calls_transport_handler_with_correct_params( binary_annotations='binary_ann', timestamp_s=None, duration_s=None, - transport_handler=transport_handler, ) - transport_handler.assert_called_once_with(thrift_obj.return_value) + sender.flush() + transport_handler.assert_called_once_with(thrift_objs.return_value) @mock.patch('py_zipkin.logging_helper.create_span', autospec=True) -@mock.patch('py_zipkin.logging_helper.thrift_obj_in_bytes', autospec=True) -def test_log_span_defensive_about_transport_handler( +@mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) +def test_batch_sender_defensive_about_transport_handler( thrift_obj, create_sp ): """Make sure log_span doesn't try to call the transport handler if it's None.""" - logging_helper.log_span( + sender = logging_helper.ZipkinBatchSender(None) + sender.add_span( span_id='0000000000000002', parent_span_id='0000000000000001', trace_id='00000000000000015', @@ -361,8 +396,8 @@ def test_log_span_defensive_about_transport_handler( binary_annotations='binary_ann', timestamp_s=None, duration_s=None, - transport_handler=None, ) + sender.flush() assert thrift_obj.call_count == 0 assert create_sp.call_count == 0 From 85a9ebfcc06d51a0b72909317ad8ab00747eeb21 Mon Sep 17 00:00:00 2001 From: Dmitry Prokopchenkov Date: Tue, 18 Jul 2017 13:09:41 +0300 Subject: [PATCH 2/6] Added batch spans sending - minor changes --- README.md | 6 +----- tests/logging_helper_test.py | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/README.md b/README.md index 2268d25..16a1bb8 100644 --- a/README.md +++ b/README.md @@ -142,13 +142,9 @@ your Zipkin collector is running at localhost:9411. import requests def http_transport(encoded_span): - # The collector expects a thrift-encoded list of spans. Instead of - # decoding and re-encoding the already thrift-encoded message, we can just - # add header bytes that specify that what follows is a list of length 1. - body = '\x0c\x00\x00\x00\x01' + encoded_span requests.post( 'http://localhost:9411/api/v1/spans', - data=body, + data=encoded_span, headers={'Content-Type': 'application/x-thrift'}, ) ``` diff --git a/tests/logging_helper_test.py b/tests/logging_helper_test.py index 51a4866..640ebf4 100644 --- a/tests/logging_helper_test.py +++ b/tests/logging_helper_test.py @@ -164,7 +164,6 @@ def test_zipkin_logging_server_context_log_spans( assert flush_mock.call_count == 1 - @mock.patch('py_zipkin.logging_helper.time.time', autospec=True) @mock.patch('py_zipkin.logging_helper.ZipkinBatchSender.flush', autospec=True) From 5f8335d4bf68669b4d7463925eae4f22f67b8b9d Mon Sep 17 00:00:00 2001 From: Dmitry Prokopchenkov Date: Mon, 24 Jul 2017 15:52:05 +0300 Subject: [PATCH 3/6] minor changes --- py_zipkin/logging_helper.py | 28 +++++++++--- py_zipkin/thrift/__init__.py | 3 +- py_zipkin/zipkin.py | 8 +++- tests/logging_helper_test.py | 84 ++++++++++++++++++------------------ tests/zipkin_test.py | 4 ++ 5 files changed, 77 insertions(+), 50 deletions(-) diff --git a/py_zipkin/logging_helper.py b/py_zipkin/logging_helper.py index 9552b2e..1c4229a 100644 --- a/py_zipkin/logging_helper.py +++ b/py_zipkin/logging_helper.py @@ -46,7 +46,8 @@ def __init__( report_root_timestamp, binary_annotations=None, add_logging_annotation=False, - client_context=False + client_context=False, + max_span_portion_size=None, ): self.zipkin_attrs = zipkin_attrs self.thrift_endpoint = thrift_endpoint @@ -59,6 +60,7 @@ def __init__( self.sa_binary_annotations = [] self.add_logging_annotation = add_logging_annotation self.client_context = client_context + self.max_span_portion_size = max_span_portion_size def start(self): """Actions to be taken before request is handled. @@ -85,8 +87,12 @@ def log_spans(self): a success. It also logs the service (`ss` and `sr`) or the client ('cs' and 'cr') annotations. """ - if self.zipkin_attrs.is_sampled: - span_sender = ZipkinBatchSender(self.transport_handler) + if not self.zipkin_attrs.is_sampled: + return + + span_sender = ZipkinBatchSender(self.transport_handler, + self.max_span_portion_size) + with span_sender: end_timestamp = time.time() # Collect additional annotations from the logging handler annotations_by_span_id = defaultdict(dict) @@ -190,7 +196,6 @@ def log_spans(self): timestamp_s=timestamp, duration_s=duration, ) - span_sender.flush() def get_local_span_timestamp_and_duration(annotations): @@ -308,9 +313,20 @@ class ZipkinBatchSender(object): MAX_PORTION_SIZE = 100 - def __init__(self, transport_handler): + def __init__(self, transport_handler, max_portion_size=None): self.transport_handler = transport_handler + self.max_portion_size = max_portion_size or self.MAX_PORTION_SIZE + + def __enter__(self): self.queue = [] + return self + + def __exit__(self, _exc_type, _exc_value, _exc_traceback): + if any((_exc_type, _exc_value, _exc_traceback)): + error = '{0}: {1}'.format(_exc_type.__name__, _exc_value) + raise ZipkinError(error) + else: + self.flush() def add_span( self, @@ -340,7 +356,7 @@ def add_span( def _add_span_to_queue(self, thrift_span): self.queue.append(thrift_span) - if len(self.queue) >= self.MAX_PORTION_SIZE: + if len(self.queue) >= self.max_portion_size: self.flush() def flush(self): diff --git a/py_zipkin/thrift/__init__.py b/py_zipkin/thrift/__init__.py index 5882a29..8e1f7d7 100644 --- a/py_zipkin/thrift/__init__.py +++ b/py_zipkin/thrift/__init__.py @@ -4,7 +4,8 @@ import struct import thriftpy -from thriftpy.protocol.binary import TBinaryProtocol, write_list_begin +from thriftpy.protocol.binary import TBinaryProtocol +from thriftpy.protocol.binary import write_list_begin from thriftpy.thrift import TType from thriftpy.transport import TMemoryBuffer diff --git a/py_zipkin/zipkin.py b/py_zipkin/zipkin.py index c83964d..8d6862b 100644 --- a/py_zipkin/zipkin.py +++ b/py_zipkin/zipkin.py @@ -104,6 +104,7 @@ def __init__( span_name='span', zipkin_attrs=None, transport_handler=None, + max_span_portion_size=None, annotations=None, binary_annotations=None, port=0, @@ -126,6 +127,9 @@ def __init__( :param transport_handler: Callback function that takes a message parameter and handles logging it :type transport_handler: function + :param max_span_portion_size: Spans in a trace are sent in batches, + max_span_portion_size defines max size of one batch + :type max_span_portion_size: int :param annotations: Optional dict of str -> timestamp annotations :type annotations: dict of str -> int :param binary_annotations: Optional dict of str -> str span attrs @@ -164,6 +168,7 @@ def __init__( self.span_name = span_name self.zipkin_attrs = zipkin_attrs self.transport_handler = transport_handler + self.max_span_portion_size = max_span_portion_size self.annotations = annotations or {} self.binary_annotations = binary_annotations or {} self.port = port @@ -293,7 +298,8 @@ def start(self): report_root_timestamp or self.report_root_timestamp_override, binary_annotations=self.binary_annotations, add_logging_annotation=self.add_logging_annotation, - client_context=client_context + client_context=client_context, + max_span_portion_size = self.max_span_portion_size, ) self.logging_context.start() self.logging_configured = True diff --git a/tests/logging_helper_test.py b/tests/logging_helper_test.py index 640ebf4..ef87c79 100644 --- a/tests/logging_helper_test.py +++ b/tests/logging_helper_test.py @@ -319,25 +319,7 @@ def test_zipkin_handler_raises_exception_if_ann_and_bann_not_provided( def test_batch_sender_add_span(thrift_objs): # Not much logic here, so this is basically a smoke test sender = logging_helper.ZipkinBatchSender(mock_transport_handler) - sender.add_span( - span_id='0000000000000002', - parent_span_id='0000000000000001', - trace_id='000000000000000f', - span_name='span', - annotations='ann', - binary_annotations='binary_ann', - timestamp_s=None, - duration_s=None, - ) - sender.flush() - assert thrift_objs.call_count == 1 - - -@mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) -def test_batch_sender_add_span_many_times(thrift_obj): - sender = logging_helper.ZipkinBatchSender(mock_transport_handler) - max_portion_size = logging_helper.ZipkinBatchSender.MAX_PORTION_SIZE - for _ in range(max_portion_size * 2 + 1): + with sender: sender.add_span( span_id='0000000000000002', parent_span_id='0000000000000001', @@ -348,7 +330,25 @@ def test_batch_sender_add_span_many_times(thrift_obj): timestamp_s=None, duration_s=None, ) - sender.flush() + assert thrift_objs.call_count == 1 + + +@mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) +def test_batch_sender_add_span_many_times(thrift_obj): + sender = logging_helper.ZipkinBatchSender(mock_transport_handler) + max_portion_size = logging_helper.ZipkinBatchSender.MAX_PORTION_SIZE + with sender: + for _ in range(max_portion_size * 2 + 1): + sender.add_span( + span_id='0000000000000002', + parent_span_id='0000000000000001', + trace_id='000000000000000f', + span_name='span', + annotations='ann', + binary_annotations='binary_ann', + timestamp_s=None, + duration_s=None, + ) assert thrift_obj.call_count == 3 assert len(thrift_obj.call_args_list[0][0][0]) == max_portion_size assert len(thrift_obj.call_args_list[1][0][0]) == max_portion_size @@ -363,17 +363,17 @@ def test_batch_sender_flush_calls_transport_handler_with_correct_params( ): transport_handler = mock.Mock() sender = logging_helper.ZipkinBatchSender(transport_handler) - sender.add_span( - span_id='0000000000000002', - parent_span_id='0000000000000001', - trace_id='00000000000000015', - span_name='span', - annotations='ann', - binary_annotations='binary_ann', - timestamp_s=None, - duration_s=None, - ) - sender.flush() + with sender: + sender.add_span( + span_id='0000000000000002', + parent_span_id='0000000000000001', + trace_id='00000000000000015', + span_name='span', + annotations='ann', + binary_annotations='binary_ann', + timestamp_s=None, + duration_s=None, + ) transport_handler.assert_called_once_with(thrift_objs.return_value) @@ -386,17 +386,17 @@ def test_batch_sender_defensive_about_transport_handler( """Make sure log_span doesn't try to call the transport handler if it's None.""" sender = logging_helper.ZipkinBatchSender(None) - sender.add_span( - span_id='0000000000000002', - parent_span_id='0000000000000001', - trace_id='00000000000000015', - span_name='span', - annotations='ann', - binary_annotations='binary_ann', - timestamp_s=None, - duration_s=None, - ) - sender.flush() + with sender: + sender.add_span( + span_id='0000000000000002', + parent_span_id='0000000000000001', + trace_id='00000000000000015', + span_name='span', + annotations='ann', + binary_annotations='binary_ann', + timestamp_s=None, + duration_s=None, + ) assert thrift_obj.call_count == 0 assert create_sp.call_count == 0 diff --git a/tests/zipkin_test.py b/tests/zipkin_test.py index 9169236..1767ea7 100644 --- a/tests/zipkin_test.py +++ b/tests/zipkin_test.py @@ -57,6 +57,7 @@ def test_zipkin_span_for_new_trace( binary_annotations={}, add_logging_annotation=False, client_context=False, + max_span_portion_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() @@ -110,6 +111,7 @@ def test_zipkin_span_passed_sampled_attrs( binary_annotations={}, add_logging_annotation=False, client_context=False, + max_span_portion_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() @@ -489,6 +491,7 @@ def test_func(a, b): binary_annotations={}, add_logging_annotation=False, client_context=False, + max_span_portion_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() @@ -544,6 +547,7 @@ def test_func(a, b): binary_annotations={}, add_logging_annotation=False, client_context=True, + max_span_portion_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() From df25e8c49d3da64fce6ed3e066bee36007b377b9 Mon Sep 17 00:00:00 2001 From: Dmitry Prokopchenkov Date: Mon, 24 Jul 2017 16:15:48 +0300 Subject: [PATCH 4/6] minor changes --- py_zipkin/zipkin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py_zipkin/zipkin.py b/py_zipkin/zipkin.py index 8d6862b..8deab54 100644 --- a/py_zipkin/zipkin.py +++ b/py_zipkin/zipkin.py @@ -299,7 +299,7 @@ def start(self): binary_annotations=self.binary_annotations, add_logging_annotation=self.add_logging_annotation, client_context=client_context, - max_span_portion_size = self.max_span_portion_size, + max_span_portion_size=self.max_span_portion_size, ) self.logging_context.start() self.logging_configured = True From bd6a02546b98fb79115b0997f5a5c9f54e0d9740 Mon Sep 17 00:00:00 2001 From: Dmitry Prokopchenkov Date: Mon, 24 Jul 2017 16:41:10 +0300 Subject: [PATCH 5/6] minor changes --- tests/logging_helper_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/logging_helper_test.py b/tests/logging_helper_test.py index ef87c79..343342d 100644 --- a/tests/logging_helper_test.py +++ b/tests/logging_helper_test.py @@ -333,6 +333,13 @@ def test_batch_sender_add_span(thrift_objs): assert thrift_objs.call_count == 1 +def test_batch_sender_with_error_on_exit(): + sender = logging_helper.ZipkinBatchSender(mock_transport_handler) + with pytest.raises(ZipkinError): + with sender: + raise Exception('Error!') + + @mock.patch('py_zipkin.logging_helper.thrift_objs_in_bytes', autospec=True) def test_batch_sender_add_span_many_times(thrift_obj): sender = logging_helper.ZipkinBatchSender(mock_transport_handler) From bdef103d5999ca9301b65f24cb5f8d4d33f6250e Mon Sep 17 00:00:00 2001 From: Dmitry Prokopchenkov Date: Wed, 26 Jul 2017 17:23:37 +0300 Subject: [PATCH 6/6] minor changes --- README.md | 1 + py_zipkin/logging_helper.py | 11 +++-------- py_zipkin/zipkin.py | 12 ++++++------ tests/integration/zipkin_integration_test.py | 3 ++- tests/logging_helper_test.py | 4 ++-- tests/zipkin_test.py | 8 ++++---- 6 files changed, 18 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 16a1bb8..09178e1 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,7 @@ your Zipkin collector is running at localhost:9411. import requests def http_transport(encoded_span): + # The collector expects a thrift-encoded list of spans. requests.post( 'http://localhost:9411/api/v1/spans', data=encoded_span, diff --git a/py_zipkin/logging_helper.py b/py_zipkin/logging_helper.py index 1c4229a..7399a7d 100644 --- a/py_zipkin/logging_helper.py +++ b/py_zipkin/logging_helper.py @@ -47,7 +47,7 @@ def __init__( binary_annotations=None, add_logging_annotation=False, client_context=False, - max_span_portion_size=None, + max_span_batch_size=None, ): self.zipkin_attrs = zipkin_attrs self.thrift_endpoint = thrift_endpoint @@ -60,7 +60,7 @@ def __init__( self.sa_binary_annotations = [] self.add_logging_annotation = add_logging_annotation self.client_context = client_context - self.max_span_portion_size = max_span_portion_size + self.max_span_batch_size = max_span_batch_size def start(self): """Actions to be taken before request is handled. @@ -91,7 +91,7 @@ def log_spans(self): return span_sender = ZipkinBatchSender(self.transport_handler, - self.max_span_portion_size) + self.max_span_batch_size) with span_sender: end_timestamp = time.time() # Collect additional annotations from the logging handler @@ -339,9 +339,6 @@ def add_span( timestamp_s, duration_s, ): - if not self.transport_handler: - return - thrift_span = create_span( span_id, parent_span_id, @@ -352,9 +349,7 @@ def add_span( timestamp_s, duration_s, ) - self._add_span_to_queue(thrift_span) - def _add_span_to_queue(self, thrift_span): self.queue.append(thrift_span) if len(self.queue) >= self.max_portion_size: self.flush() diff --git a/py_zipkin/zipkin.py b/py_zipkin/zipkin.py index 8deab54..bf21df1 100644 --- a/py_zipkin/zipkin.py +++ b/py_zipkin/zipkin.py @@ -104,7 +104,7 @@ def __init__( span_name='span', zipkin_attrs=None, transport_handler=None, - max_span_portion_size=None, + max_span_batch_size=None, annotations=None, binary_annotations=None, port=0, @@ -127,9 +127,9 @@ def __init__( :param transport_handler: Callback function that takes a message parameter and handles logging it :type transport_handler: function - :param max_span_portion_size: Spans in a trace are sent in batches, - max_span_portion_size defines max size of one batch - :type max_span_portion_size: int + :param max_span_batch_size: Spans in a trace are sent in batches, + max_span_batch_size defines max size of one batch + :type max_span_batch_size: int :param annotations: Optional dict of str -> timestamp annotations :type annotations: dict of str -> int :param binary_annotations: Optional dict of str -> str span attrs @@ -168,7 +168,7 @@ def __init__( self.span_name = span_name self.zipkin_attrs = zipkin_attrs self.transport_handler = transport_handler - self.max_span_portion_size = max_span_portion_size + self.max_span_batch_size = max_span_batch_size self.annotations = annotations or {} self.binary_annotations = binary_annotations or {} self.port = port @@ -299,7 +299,7 @@ def start(self): binary_annotations=self.binary_annotations, add_logging_annotation=self.add_logging_annotation, client_context=client_context, - max_span_portion_size=self.max_span_portion_size, + max_span_batch_size=self.max_span_batch_size, ) self.logging_context.start() self.logging_configured = True diff --git a/tests/integration/zipkin_integration_test.py b/tests/integration/zipkin_integration_test.py index 3bfe7b5..3f1f503 100644 --- a/tests/integration/zipkin_integration_test.py +++ b/tests/integration/zipkin_integration_test.py @@ -1,5 +1,6 @@ import pytest -from thriftpy.protocol.binary import TBinaryProtocol, read_list_begin +from thriftpy.protocol.binary import TBinaryProtocol +from thriftpy.protocol.binary import read_list_begin from thriftpy.transport import TMemoryBuffer from py_zipkin import zipkin diff --git a/tests/logging_helper_test.py b/tests/logging_helper_test.py index 343342d..8aeec37 100644 --- a/tests/logging_helper_test.py +++ b/tests/logging_helper_test.py @@ -392,7 +392,7 @@ def test_batch_sender_defensive_about_transport_handler( ): """Make sure log_span doesn't try to call the transport handler if it's None.""" - sender = logging_helper.ZipkinBatchSender(None) + sender = logging_helper.ZipkinBatchSender(transport_handler=None) with sender: sender.add_span( span_id='0000000000000002', @@ -404,8 +404,8 @@ def test_batch_sender_defensive_about_transport_handler( timestamp_s=None, duration_s=None, ) + assert create_sp.call_count == 1 assert thrift_obj.call_count == 0 - assert create_sp.call_count == 0 def test_get_local_span_timestamp_and_duration_client(): diff --git a/tests/zipkin_test.py b/tests/zipkin_test.py index 1767ea7..9228c28 100644 --- a/tests/zipkin_test.py +++ b/tests/zipkin_test.py @@ -57,7 +57,7 @@ def test_zipkin_span_for_new_trace( binary_annotations={}, add_logging_annotation=False, client_context=False, - max_span_portion_size=None, + max_span_batch_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() @@ -111,7 +111,7 @@ def test_zipkin_span_passed_sampled_attrs( binary_annotations={}, add_logging_annotation=False, client_context=False, - max_span_portion_size=None, + max_span_batch_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() @@ -491,7 +491,7 @@ def test_func(a, b): binary_annotations={}, add_logging_annotation=False, client_context=False, - max_span_portion_size=None, + max_span_batch_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with() @@ -547,7 +547,7 @@ def test_func(a, b): binary_annotations={}, add_logging_annotation=False, client_context=True, - max_span_portion_size=None, + max_span_batch_size=None, ) pop_zipkin_attrs_mock.assert_called_once_with()