From 6f42def1d3bed5fd1812316f0e8e0e62d02ff76c Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Mon, 8 Mar 2021 19:57:06 +0100 Subject: [PATCH] Add possibility to split oversized udp batches (#1500) --- CHANGELOG.md | 2 + .../opentelemetry/exporter/jaeger/__init__.py | 14 ++++- .../exporter/jaeger/send/thrift.py | 28 ++++++++-- .../tests/test_jaeger_exporter_thrift.py | 55 +++++++++++++++++++ .../sdk/environment_variables/__init__.py | 3 + 5 files changed, 96 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd126d8a52c..d7268cde0ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry.trace.use_span()` will now overwrite previously set status on span in case an exception is raised inside the context manager and `set_status_on_exception` is set to `True`. ([#1668](https://github.com/open-telemetry/opentelemetry-python/pull/1668)) +- Add `udp_split_oversized_batches` support to jaeger exporter + ([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500)) ### Changed - Rename `IdsGenerator` to `IdGenerator` diff --git a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py index a98dca33edc..80c5f04317c 100644 --- a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py +++ b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py @@ -86,6 +86,7 @@ from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_JAEGER_AGENT_HOST, OTEL_EXPORTER_JAEGER_AGENT_PORT, + OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES, OTEL_EXPORTER_JAEGER_ENDPOINT, OTEL_EXPORTER_JAEGER_PASSWORD, OTEL_EXPORTER_JAEGER_USER, @@ -122,6 +123,7 @@ class JaegerExporter(SpanExporter): credentials: Credentials for server authentication. transport_format: Transport format for exporting spans to collector. max_tag_value_length: Max length string attribute values can have. Set to None to disable. + udp_split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( @@ -136,6 +138,7 @@ def __init__( credentials: Optional[ChannelCredentials] = None, transport_format: Optional[str] = None, max_tag_value_length: Optional[int] = None, + udp_split_oversized_batches: bool = None, ): self.service_name = service_name self._max_tag_value_length = max_tag_value_length @@ -155,8 +158,17 @@ def __init__( env_variable=environ_agent_port, default=DEFAULT_AGENT_PORT, ) + self.udp_split_oversized_batches = _parameter_setter( + param=udp_split_oversized_batches, + env_variable=environ.get( + OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES + ), + default=False, + ) self._agent_client = AgentClientUDP( - host_name=self.agent_host_name, port=self.agent_port + host_name=self.agent_host_name, + port=self.agent_port, + split_oversized_batches=self.udp_split_oversized_batches, ) self.collector_endpoint = _parameter_setter( param=collector_endpoint, diff --git a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py index 151e017b457..fe4f4633196 100644 --- a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py +++ b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py @@ -14,6 +14,7 @@ import base64 import logging +import math import socket from thrift.protocol import TBinaryProtocol, TCompactProtocol @@ -36,6 +37,7 @@ class AgentClientUDP: port: The port of the Jaeger server. max_packet_size: Maximum size of UDP packet. client: Class for creating new client objects for agencies. + split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( @@ -44,6 +46,7 @@ def __init__( port, max_packet_size=UDP_PACKET_MAX_LENGTH, client=agent.Client, + split_oversized_batches=False, ): self.address = (host_name, port) self.max_packet_size = max_packet_size @@ -51,6 +54,7 @@ def __init__( self.client = client( iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer) ) + self.split_oversized_batches = split_oversized_batches def emit(self, batch: jaeger.Batch): """ @@ -66,11 +70,25 @@ def emit(self, batch: jaeger.Batch): self.client.emitBatch(batch) buff = self.buffer.getvalue() if len(buff) > self.max_packet_size: - logger.warning( - "Data exceeds the max UDP packet size; size %r, max %r", - len(buff), - self.max_packet_size, - ) + if self.split_oversized_batches and len(batch.spans) > 1: + packets = math.ceil(len(buff) / self.max_packet_size) + div = math.ceil(len(batch.spans) / packets) + for packet in range(packets): + start = packet * div + end = (packet + 1) * div + if start < len(batch.spans): + self.emit( + jaeger.Batch( + process=batch.process, + spans=batch.spans[start:end], + ) + ) + else: + logger.warning( + "Data exceeds the max UDP packet size; size %r, max %r", + len(buff), + self.max_packet_size, + ) return with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: diff --git a/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py b/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py index 5b611093ae2..750b050fe21 100644 --- a/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py +++ b/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py @@ -520,3 +520,58 @@ def test_max_tag_value_length(self): self.assertEqual("hello", tags_by_keys["key_string"]) self.assertEqual("('tup", tags_by_keys["key_tuple"]) self.assertEqual("some_", tags_by_keys["key_resource"]) + + def test_agent_client_split(self): + agent_client = jaeger_exporter.AgentClientUDP( + host_name="localhost", + port=6354, + max_packet_size=250, + split_oversized_batches=True, + ) + + translator = jaeger_exporter.Translate((self._test_span,)) + small_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(small_batch) + self.assertEqual(fake_sendto.call_count, 1) + + translator = jaeger_exporter.Translate([self._test_span] * 2) + large_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(large_batch) + self.assertEqual(fake_sendto.call_count, 2) + + def test_agent_client_dont_send_empty_spans(self): + agent_client = jaeger_exporter.AgentClientUDP( + host_name="localhost", + port=6354, + max_packet_size=415, + split_oversized_batches=True, + ) + + translator = jaeger_exporter.Translate([self._test_span] * 4) + large_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(large_batch) + self.assertEqual(fake_sendto.call_count, 4) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 79d9c29c20f..283aaae4705 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -46,3 +46,6 @@ OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE" OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE" OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE" +OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = ( + "OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES" +)