Skip to content

Commit

Permalink
Add possibility to split oversized udp batches (#1500)
Browse files Browse the repository at this point in the history
  • Loading branch information
janLo authored Mar 8, 2021
1 parent 9254c17 commit 6f42def
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry.trace.use_span()` will now overwrite previously set status on span in case an
exception is raised inside the context manager and `set_status_on_exception` is set to `True`.
([#1668](https://github.com/open-telemetry/opentelemetry-python/pull/1668))
- Add `udp_split_oversized_batches` support to jaeger exporter
([#1500](https://github.com/open-telemetry/opentelemetry-python/pull/1500))

### Changed
- Rename `IdsGenerator` to `IdGenerator`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_AGENT_HOST,
OTEL_EXPORTER_JAEGER_AGENT_PORT,
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_USER,
Expand Down Expand Up @@ -122,6 +123,7 @@ class JaegerExporter(SpanExporter):
credentials: Credentials for server authentication.
transport_format: Transport format for exporting spans to collector.
max_tag_value_length: Max length string attribute values can have. Set to None to disable.
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
"""

def __init__(
Expand All @@ -136,6 +138,7 @@ def __init__(
credentials: Optional[ChannelCredentials] = None,
transport_format: Optional[str] = None,
max_tag_value_length: Optional[int] = None,
udp_split_oversized_batches: bool = None,
):
self.service_name = service_name
self._max_tag_value_length = max_tag_value_length
Expand All @@ -155,8 +158,17 @@ def __init__(
env_variable=environ_agent_port,
default=DEFAULT_AGENT_PORT,
)
self.udp_split_oversized_batches = _parameter_setter(
param=udp_split_oversized_batches,
env_variable=environ.get(
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES
),
default=False,
)
self._agent_client = AgentClientUDP(
host_name=self.agent_host_name, port=self.agent_port
host_name=self.agent_host_name,
port=self.agent_port,
split_oversized_batches=self.udp_split_oversized_batches,
)
self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import base64
import logging
import math
import socket

from thrift.protocol import TBinaryProtocol, TCompactProtocol
Expand All @@ -36,6 +37,7 @@ class AgentClientUDP:
port: The port of the Jaeger server.
max_packet_size: Maximum size of UDP packet.
client: Class for creating new client objects for agencies.
split_oversized_batches: Re-emit oversized batches in smaller chunks.
"""

def __init__(
Expand All @@ -44,13 +46,15 @@ def __init__(
port,
max_packet_size=UDP_PACKET_MAX_LENGTH,
client=agent.Client,
split_oversized_batches=False,
):
self.address = (host_name, port)
self.max_packet_size = max_packet_size
self.buffer = TTransport.TMemoryBuffer()
self.client = client(
iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer)
)
self.split_oversized_batches = split_oversized_batches

def emit(self, batch: jaeger.Batch):
"""
Expand All @@ -66,11 +70,25 @@ def emit(self, batch: jaeger.Batch):
self.client.emitBatch(batch)
buff = self.buffer.getvalue()
if len(buff) > self.max_packet_size:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
if self.split_oversized_batches and len(batch.spans) > 1:
packets = math.ceil(len(buff) / self.max_packet_size)
div = math.ceil(len(batch.spans) / packets)
for packet in range(packets):
start = packet * div
end = (packet + 1) * div
if start < len(batch.spans):
self.emit(
jaeger.Batch(
process=batch.process,
spans=batch.spans[start:end],
)
)
else:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
return

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,58 @@ def test_max_tag_value_length(self):
self.assertEqual("hello", tags_by_keys["key_string"])
self.assertEqual("('tup", tags_by_keys["key_tuple"])
self.assertEqual("some_", tags_by_keys["key_resource"])

def test_agent_client_split(self):
agent_client = jaeger_exporter.AgentClientUDP(
host_name="localhost",
port=6354,
max_packet_size=250,
split_oversized_batches=True,
)

translator = jaeger_exporter.Translate((self._test_span,))
small_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(small_batch)
self.assertEqual(fake_sendto.call_count, 1)

translator = jaeger_exporter.Translate([self._test_span] * 2)
large_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(large_batch)
self.assertEqual(fake_sendto.call_count, 2)

def test_agent_client_dont_send_empty_spans(self):
agent_client = jaeger_exporter.AgentClientUDP(
host_name="localhost",
port=6354,
max_packet_size=415,
split_oversized_batches=True,
)

translator = jaeger_exporter.Translate([self._test_span] * 4)
large_batch = jaeger.Batch(
# pylint: disable=protected-access
spans=translator._translate(ThriftTranslator()),
process=jaeger.Process(serviceName="xxx"),
)

with unittest.mock.patch(
"socket.socket.sendto", autospec=True
) as fake_sendto:
agent_client.emit(large_batch)
self.assertEqual(fake_sendto.call_count, 4)
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@
OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE"
OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE"
OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE"
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = (
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES"
)

0 comments on commit 6f42def

Please sign in to comment.