diff --git a/CHANGELOG.md b/CHANGELOG.md index f257ced33a8..dd7c1664ae5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add `max_attr_value_length` support to Jaeger exporter ([#1633])(https://github.com/open-telemetry/opentelemetry-python/pull/1633) +- Add `udp_split_oversized_batches` support to jaeger exporter + ([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500)) ### Changed - Rename `IdsGenerator` to `IdGenerator` diff --git a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py index be81bfaf0bd..240d35b3e6f 100644 --- a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py +++ b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py @@ -86,6 +86,7 @@ from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_JAEGER_AGENT_HOST, OTEL_EXPORTER_JAEGER_AGENT_PORT, + OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES, OTEL_EXPORTER_JAEGER_ENDPOINT, OTEL_EXPORTER_JAEGER_PASSWORD, OTEL_EXPORTER_JAEGER_USER, @@ -122,6 +123,7 @@ class JaegerSpanExporter(SpanExporter): credentials: Credentials for server authentication. transport_format: Transport format for exporting spans to collector. max_tag_value_length: Max length string attribute values can have. Set to None to disable. + udp_split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( @@ -136,6 +138,7 @@ def __init__( credentials: Optional[ChannelCredentials] = None, transport_format: Optional[str] = None, max_tag_value_length: Optional[int] = None, + udp_split_oversized_batches: bool = None, ): self.service_name = service_name self._max_tag_value_length = max_tag_value_length @@ -155,8 +158,17 @@ def __init__( env_variable=environ_agent_port, default=DEFAULT_AGENT_PORT, ) + self.udp_split_oversized_batches = _parameter_setter( + param=udp_split_oversized_batches, + env_variable=environ.get( + OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES + ), + default=False, + ) self._agent_client = AgentClientUDP( - host_name=self.agent_host_name, port=self.agent_port + host_name=self.agent_host_name, + port=self.agent_port, + split_oversized_batches=self.udp_split_oversized_batches, ) self.collector_endpoint = _parameter_setter( param=collector_endpoint, diff --git a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py index 151e017b457..fe4f4633196 100644 --- a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py +++ b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py @@ -14,6 +14,7 @@ import base64 import logging +import math import socket from thrift.protocol import TBinaryProtocol, TCompactProtocol @@ -36,6 +37,7 @@ class AgentClientUDP: port: The port of the Jaeger server. max_packet_size: Maximum size of UDP packet. client: Class for creating new client objects for agencies. + split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( @@ -44,6 +46,7 @@ def __init__( port, max_packet_size=UDP_PACKET_MAX_LENGTH, client=agent.Client, + split_oversized_batches=False, ): self.address = (host_name, port) self.max_packet_size = max_packet_size @@ -51,6 +54,7 @@ def __init__( self.client = client( iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer) ) + self.split_oversized_batches = split_oversized_batches def emit(self, batch: jaeger.Batch): """ @@ -66,11 +70,25 @@ def emit(self, batch: jaeger.Batch): self.client.emitBatch(batch) buff = self.buffer.getvalue() if len(buff) > self.max_packet_size: - logger.warning( - "Data exceeds the max UDP packet size; size %r, max %r", - len(buff), - self.max_packet_size, - ) + if self.split_oversized_batches and len(batch.spans) > 1: + packets = math.ceil(len(buff) / self.max_packet_size) + div = math.ceil(len(batch.spans) / packets) + for packet in range(packets): + start = packet * div + end = (packet + 1) * div + if start < len(batch.spans): + self.emit( + jaeger.Batch( + process=batch.process, + spans=batch.spans[start:end], + ) + ) + else: + logger.warning( + "Data exceeds the max UDP packet size; size %r, max %r", + len(buff), + self.max_packet_size, + ) return with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: diff --git a/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py b/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py index 0bee785760d..a85f051366e 100644 --- a/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py +++ b/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py @@ -22,7 +22,10 @@ from opentelemetry import trace as trace_api from opentelemetry.exporter.jaeger.gen.jaeger import ttypes as jaeger from opentelemetry.exporter.jaeger.translate import Translate -from opentelemetry.exporter.jaeger.translate.thrift import ThriftTranslator +from opentelemetry.exporter.jaeger.translate.thrift import ( + ThriftTranslator, + Translator, +) from opentelemetry.sdk import trace from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_JAEGER_AGENT_HOST, @@ -520,3 +523,58 @@ def test_max_tag_value_length(self): self.assertEqual("hello", tags_by_keys["key_string"]) self.assertEqual("('tup", tags_by_keys["key_tuple"]) self.assertEqual("some_", tags_by_keys["key_resource"]) + + def test_agent_client_split(self): + agent_client = jaeger_exporter.AgentClientUDP( + host_name="localhost", + port=6354, + max_packet_size=250, + split_oversized_batches=True, + ) + + translator = jaeger_exporter.Translate((self._test_span,)) + small_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(small_batch) + self.assertEqual(fake_sendto.call_count, 1) + + translator = jaeger_exporter.Translate([self._test_span] * 2) + large_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(large_batch) + self.assertEqual(fake_sendto.call_count, 2) + + def test_agent_client_dont_send_empty_spans(self): + agent_client = jaeger_exporter.AgentClientUDP( + host_name="localhost", + port=6354, + max_packet_size=415, + split_oversized_batches=True, + ) + + translator = jaeger_exporter.Translate([self._test_span] * 4) + large_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(large_batch) + self.assertEqual(fake_sendto.call_count, 4) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 79d9c29c20f..283aaae4705 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -46,3 +46,6 @@ OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE" OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE" OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE" +OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = ( + "OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES" +)