Skip to content

Commit

Permalink
Add possibility to split oversized udp batches
Browse files Browse the repository at this point in the history
If we use the BatchExportSpanProcessor combined with the
JaegerSpanExporter and use instrumentations that add a lot of metadata
to the spans like sqlalchemy, then we run occationally into the "Data
exceeds the max UDP packet size" warning causing dropped spans and
incomplete data.

The option to reduce the general batch-size to a very small number (in
my case >30) may cause a performance issue as the worker thread of the
batch exporter gets very busy. Instead this change allows the user to
ask the exporter to split oversized batches when they get detected and
send the splits separately instead of dropping them. Depending on the
usecase this is a better option than reducing the batch-size to a very
small value because every now and then they contain a couple of large
spans.
  • Loading branch information
janLo committed Feb 26, 2021
1 parent 99128b3 commit 3efc085
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v0.18b0...HEAD)

### Added
- Add `udp_split_oversized_batches` support to jaeger exporter
([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500))

## [0.18b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v0.18b0) - 2021-02-16

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_AGENT_HOST,
OTEL_EXPORTER_JAEGER_AGENT_PORT,
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_USER,
Expand Down Expand Up @@ -120,6 +121,7 @@ class JaegerSpanExporter(SpanExporter):
insecure: True if collector has no encryption or authentication
credentials: Credentials for server authentication.
transport_format: Transport format for exporting spans to collector.
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
"""

def __init__(
Expand All @@ -133,6 +135,7 @@ def __init__(
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
transport_format: Optional[str] = None,
udp_split_oversized_batches: bool = None,
):
self.service_name = service_name
self.agent_host_name = _parameter_setter(
Expand All @@ -151,8 +154,17 @@ def __init__(
env_variable=environ_agent_port,
default=DEFAULT_AGENT_PORT,
)
self.udp_split_oversized_batches = _parameter_setter(
param=udp_split_oversized_batches,
env_variable=environ.get(
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES
),
default=False,
)
self._agent_client = AgentClientUDP(
host_name=self.agent_host_name, port=self.agent_port
host_name=self.agent_host_name,
port=self.agent_port,
split_oversized_batches=self.udp_split_oversized_batches,
)
self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import base64
import logging
import math
import socket

from thrift.protocol import TBinaryProtocol, TCompactProtocol
Expand All @@ -36,6 +37,7 @@ class AgentClientUDP:
port: The port of the Jaeger server.
max_packet_size: Maximum size of UDP packet.
client: Class for creating new client objects for agencies.
split_oversized_batches: Re-emit oversized batches in smaller chunks.
"""

def __init__(
Expand All @@ -44,13 +46,15 @@ def __init__(
port,
max_packet_size=UDP_PACKET_MAX_LENGTH,
client=agent.Client,
split_oversized_batches=False,
):
self.address = (host_name, port)
self.max_packet_size = max_packet_size
self.buffer = TTransport.TMemoryBuffer()
self.client = client(
iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer)
)
self.split_oversized_batches = split_oversized_batches

def emit(self, batch: jaeger.Batch):
"""
Expand All @@ -66,11 +70,23 @@ def emit(self, batch: jaeger.Batch):
self.client.emitBatch(batch)
buff = self.buffer.getvalue()
if len(buff) > self.max_packet_size:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
if self.split_oversized_batches and len(batch.spans) > 1:
packets = math.ceil(len(buff) / self.max_packet_size)
div = math.ceil(len(batch.spans) / packets)
for packet in range(packets):
start = packet * div
end = (packet + 1) * div
self.emit(
jaeger.Batch(
process=batch.process, spans=batch.spans[start:end]
)
)
else:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
return

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,37 @@ def test_agent_client(self):
)

agent_client.emit(batch)

def test_agent_client_split(self):
agent_client = jaeger_exporter.AgentClientUDP(
host_name="localhost",
port=6354,
max_packet_size=250,
split_oversized_batches=True,
)

translator = jaeger_exporter.Translate((self._test_span,))
small_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(jaeger_exporter.ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(small_batch)
self.assertEqual(fake_sendto.call_count, 1)

translator = jaeger_exporter.Translate([self._test_span] * 2)
large_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(jaeger_exporter.ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(large_batch)
self.assertEqual(fake_sendto.call_count, 2)
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@
OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE"
OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE"
OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE"
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = (
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES"
)

0 comments on commit 3efc085

Please sign in to comment.