Skip to content

Commit

Permalink
Add possibility to split oversized udp batches
Browse files Browse the repository at this point in the history
If we use the BatchExportSpanProcessor combined with the
JaegerSpanExporter and use instrumentations that add a lot of metadata
to the spans like sqlalchemy, then we run occationally into the "Data
exceeds the max UDP packet size" warning causing dropped spans and
incomplete data.

The option to reduce the general batch-size to a very small number (in
my case >30) may cause a performance issue as the worker thread of the
batch exporter gets very busy. Instead this change allows the user to
ask the exporter to split oversized batches when they get detected and
send the splits separately instead of dropping them. Depending on the
usecase this is a better option than reducing the batch-size to a very
small value because every now and then they contain a couple of large
spans.
  • Loading branch information
janLo committed Mar 4, 2021
1 parent 9bf28fb commit b9bca15
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Add `max_attr_value_length` support to Jaeger exporter
([#1633])(https://github.com/open-telemetry/opentelemetry-python/pull/1633)
- Add `udp_split_oversized_batches` support to jaeger exporter
([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500))

### Changed
- Rename `IdsGenerator` to `IdGenerator`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_AGENT_HOST,
OTEL_EXPORTER_JAEGER_AGENT_PORT,
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_USER,
Expand Down Expand Up @@ -122,6 +123,7 @@ class JaegerSpanExporter(SpanExporter):
credentials: Credentials for server authentication.
transport_format: Transport format for exporting spans to collector.
max_tag_value_length: Max length string attribute values can have. Set to None to disable.
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
"""

def __init__(
Expand All @@ -136,6 +138,7 @@ def __init__(
credentials: Optional[ChannelCredentials] = None,
transport_format: Optional[str] = None,
max_tag_value_length: Optional[int] = None,
udp_split_oversized_batches: bool = None,
):
self.service_name = service_name
self._max_tag_value_length = max_tag_value_length
Expand All @@ -155,8 +158,17 @@ def __init__(
env_variable=environ_agent_port,
default=DEFAULT_AGENT_PORT,
)
self.udp_split_oversized_batches = _parameter_setter(
param=udp_split_oversized_batches,
env_variable=environ.get(
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES
),
default=False,
)
self._agent_client = AgentClientUDP(
host_name=self.agent_host_name, port=self.agent_port
host_name=self.agent_host_name,
port=self.agent_port,
split_oversized_batches=self.udp_split_oversized_batches,
)
self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import base64
import logging
import math
import socket

from thrift.protocol import TBinaryProtocol, TCompactProtocol
Expand All @@ -36,6 +37,7 @@ class AgentClientUDP:
port: The port of the Jaeger server.
max_packet_size: Maximum size of UDP packet.
client: Class for creating new client objects for agencies.
split_oversized_batches: Re-emit oversized batches in smaller chunks.
"""

def __init__(
Expand All @@ -44,13 +46,15 @@ def __init__(
port,
max_packet_size=UDP_PACKET_MAX_LENGTH,
client=agent.Client,
split_oversized_batches=False,
):
self.address = (host_name, port)
self.max_packet_size = max_packet_size
self.buffer = TTransport.TMemoryBuffer()
self.client = client(
iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer)
)
self.split_oversized_batches = split_oversized_batches

def emit(self, batch: jaeger.Batch):
"""
Expand All @@ -66,11 +70,25 @@ def emit(self, batch: jaeger.Batch):
self.client.emitBatch(batch)
buff = self.buffer.getvalue()
if len(buff) > self.max_packet_size:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
if self.split_oversized_batches and len(batch.spans) > 1:
packets = math.ceil(len(buff) / self.max_packet_size)
div = math.ceil(len(batch.spans) / packets)
for packet in range(packets):
start = packet * div
end = (packet + 1) * div
if start < len(batch.spans):
self.emit(
jaeger.Batch(
process=batch.process,
spans=batch.spans[start:end],
)
)
else:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
return

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from opentelemetry import trace as trace_api
from opentelemetry.exporter.jaeger.gen.jaeger import ttypes as jaeger
from opentelemetry.exporter.jaeger.translate import Translate
from opentelemetry.exporter.jaeger.translate.thrift import ThriftTranslator
from opentelemetry.exporter.jaeger.translate.thrift import (
ThriftTranslator,
Translator,
)
from opentelemetry.sdk import trace
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_AGENT_HOST,
Expand Down Expand Up @@ -520,3 +523,58 @@ def test_max_tag_value_length(self):
self.assertEqual("hello", tags_by_keys["key_string"])
self.assertEqual("('tup", tags_by_keys["key_tuple"])
self.assertEqual("some_", tags_by_keys["key_resource"])

def test_agent_client_split(self):
agent_client = jaeger_exporter.AgentClientUDP(
host_name="localhost",
port=6354,
max_packet_size=250,
split_oversized_batches=True,
)

translator = jaeger_exporter.Translate((self._test_span,))
small_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(small_batch)
self.assertEqual(fake_sendto.call_count, 1)

translator = jaeger_exporter.Translate([self._test_span] * 2)
large_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(large_batch)
self.assertEqual(fake_sendto.call_count, 2)

def test_agent_client_dont_send_empty_spans(self):
agent_client = jaeger_exporter.AgentClientUDP(
host_name="localhost",
port=6354,
max_packet_size=415,
split_oversized_batches=True,
)

translator = jaeger_exporter.Translate([self._test_span] * 4)
large_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(large_batch)
self.assertEqual(fake_sendto.call_count, 4)
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@
OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE"
OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE"
OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE"
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = (
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES"
)

0 comments on commit b9bca15

Please sign in to comment.