From 3efc0854454f10f71328fbc0d68c62630cba7187 Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Tue, 22 Dec 2020 14:19:45 +0100 Subject: [PATCH] Add possibility to split oversized udp batches If we use the BatchExportSpanProcessor combined with the JaegerSpanExporter and use instrumentations that add a lot of metadata to the spans like sqlalchemy, then we run occationally into the "Data exceeds the max UDP packet size" warning causing dropped spans and incomplete data. The option to reduce the general batch-size to a very small number (in my case >30) may cause a performance issue as the worker thread of the batch exporter gets very busy. Instead this change allows the user to ask the exporter to split oversized batches when they get detected and send the splits separately instead of dropping them. Depending on the usecase this is a better option than reducing the batch-size to a very small value because every now and then they contain a couple of large spans. --- CHANGELOG.md | 4 +++ .../opentelemetry/exporter/jaeger/__init__.py | 14 +++++++- .../exporter/jaeger/send/thrift.py | 26 +++++++++++--- .../tests/test_jaeger_exporter_thrift.py | 34 +++++++++++++++++++ .../sdk/environment_variables/__init__.py | 3 ++ 5 files changed, 75 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bcb8d65d4b8..9163ab5405f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v0.18b0...HEAD) +### Added +- Add `udp_split_oversized_batches` support to jaeger exporter + ([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500)) + ## [0.18b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v0.18b0) - 2021-02-16 ### Added diff --git a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py index 20b4bda3d9f..547018a2790 100644 --- a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py +++ b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/__init__.py @@ -85,6 +85,7 @@ from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_JAEGER_AGENT_HOST, OTEL_EXPORTER_JAEGER_AGENT_PORT, + OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES, OTEL_EXPORTER_JAEGER_ENDPOINT, OTEL_EXPORTER_JAEGER_PASSWORD, OTEL_EXPORTER_JAEGER_USER, @@ -120,6 +121,7 @@ class JaegerSpanExporter(SpanExporter): insecure: True if collector has no encryption or authentication credentials: Credentials for server authentication. transport_format: Transport format for exporting spans to collector. + udp_split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( @@ -133,6 +135,7 @@ def __init__( insecure: Optional[bool] = None, credentials: Optional[ChannelCredentials] = None, transport_format: Optional[str] = None, + udp_split_oversized_batches: bool = None, ): self.service_name = service_name self.agent_host_name = _parameter_setter( @@ -151,8 +154,17 @@ def __init__( env_variable=environ_agent_port, default=DEFAULT_AGENT_PORT, ) + self.udp_split_oversized_batches = _parameter_setter( + param=udp_split_oversized_batches, + env_variable=environ.get( + OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES + ), + default=False, + ) self._agent_client = AgentClientUDP( - host_name=self.agent_host_name, port=self.agent_port + host_name=self.agent_host_name, + port=self.agent_port, + split_oversized_batches=self.udp_split_oversized_batches, ) self.collector_endpoint = _parameter_setter( param=collector_endpoint, diff --git a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py index 151e017b457..7260ffc0728 100644 --- a/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py +++ b/exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/send/thrift.py @@ -14,6 +14,7 @@ import base64 import logging +import math import socket from thrift.protocol import TBinaryProtocol, TCompactProtocol @@ -36,6 +37,7 @@ class AgentClientUDP: port: The port of the Jaeger server. max_packet_size: Maximum size of UDP packet. client: Class for creating new client objects for agencies. + split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( @@ -44,6 +46,7 @@ def __init__( port, max_packet_size=UDP_PACKET_MAX_LENGTH, client=agent.Client, + split_oversized_batches=False, ): self.address = (host_name, port) self.max_packet_size = max_packet_size @@ -51,6 +54,7 @@ def __init__( self.client = client( iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer) ) + self.split_oversized_batches = split_oversized_batches def emit(self, batch: jaeger.Batch): """ @@ -66,11 +70,23 @@ def emit(self, batch: jaeger.Batch): self.client.emitBatch(batch) buff = self.buffer.getvalue() if len(buff) > self.max_packet_size: - logger.warning( - "Data exceeds the max UDP packet size; size %r, max %r", - len(buff), - self.max_packet_size, - ) + if self.split_oversized_batches and len(batch.spans) > 1: + packets = math.ceil(len(buff) / self.max_packet_size) + div = math.ceil(len(batch.spans) / packets) + for packet in range(packets): + start = packet * div + end = (packet + 1) * div + self.emit( + jaeger.Batch( + process=batch.process, spans=batch.spans[start:end] + ) + ) + else: + logger.warning( + "Data exceeds the max UDP packet size; size %r, max %r", + len(buff), + self.max_packet_size, + ) return with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: diff --git a/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py b/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py index c0faafc1b6e..3c42b0e2085 100644 --- a/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py +++ b/exporter/opentelemetry-exporter-jaeger/tests/test_jaeger_exporter_thrift.py @@ -465,3 +465,37 @@ def test_agent_client(self): ) agent_client.emit(batch) + + def test_agent_client_split(self): + agent_client = jaeger_exporter.AgentClientUDP( + host_name="localhost", + port=6354, + max_packet_size=250, + split_oversized_batches=True, + ) + + translator = jaeger_exporter.Translate((self._test_span,)) + small_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(jaeger_exporter.ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(small_batch) + self.assertEqual(fake_sendto.call_count, 1) + + translator = jaeger_exporter.Translate([self._test_span] * 2) + large_batch = jaeger.Batch( + # pylint: disable=protected-access + spans=translator._translate(jaeger_exporter.ThriftTranslator()), + process=jaeger.Process(serviceName="xxx"), + ) + + with unittest.mock.patch( + "socket.socket.sendto", autospec=True + ) as fake_sendto: + agent_client.emit(large_batch) + self.assertEqual(fake_sendto.call_count, 2) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 79d9c29c20f..283aaae4705 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -46,3 +46,6 @@ OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE" OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE" OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE" +OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = ( + "OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES" +)