Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experimental] add simplified bsp and grpc exporter #3465

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import collections
import threading
import typing

from opentelemetry.sdk.trace import ReadableSpan


class SpanAccumulator:
"""
A thread-safe container designed to collect and batch spans. It accumulates spans until a specified batch size is
reached, at which point the accumulated spans are moved into a FIFO queue. Provides methods to add spans, check if
the accumulator is non-empty, and retrieve the earliest batch of spans from the queue.
"""

def __init__(self, batch_size: int):
self._batch_size = batch_size
self._spans: typing.List[ReadableSpan] = []
self._batches = collections.deque() # fixme set max size
self._lock = threading.Lock()

def nonempty(self) -> bool:
"""
Checks if the accumulator contains any spans or batches. It returns True if either the span list or the batch
queue is non-empty, and False otherwise.
"""
with self._lock:
return len(self._spans) > 0 or len(self._batches) > 0

def push(self, span: ReadableSpan) -> bool:
"""
Adds a span to the accumulator. If the addition causes the number of spans to reach the
specified batch size, the accumulated spans are moved into a FIFO queue as a new batch. Returns True if a new
batch was created, otherwise returns False.
"""
with self._lock:
self._spans.append(span)
if len(self._spans) < self._batch_size:
return False
self._batches.appendleft(self._spans)
self._spans = []
return True

def batch(self) -> typing.List[ReadableSpan]:
"""
Returns the earliest (first in line) batch of spans from the FIFO queue. If the queue is empty, returns any
remaining spans that haven't been batched.
"""
try:
return self._batches.pop()
except IndexError:
# if there are no batches left, return the current spans
with self._lock:
out = self._spans
self._spans = []
return out
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import abc
import time
import typing

import grpc

from opentelemetry.exporter.otlp.proto.common._internal import trace_encoder
from opentelemetry.proto.collector.trace.v1 import trace_service_pb2_grpc as pb
from opentelemetry.sdk.trace import ReadableSpan


class GrpcClientABC(abc.ABC):

@abc.abstractmethod
def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode:
pass


class GrpcClient(GrpcClientABC):
"""
Exports a batch of spans to the specified endpoint. Wrap this in a RetryingGrpcClient for retry/backoff.
"""

def __init__(
self,
target: str = 'localhost:4317',
timeout_sec: int = 10,
):
self._timeout_sec = timeout_sec
self._stub = pb.TraceServiceStub(grpc.insecure_channel(target))

def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode:
try:
self._stub.Export(request=trace_encoder.encode_spans(batch), timeout=self._timeout_sec)
except grpc.RpcError as err:
# noinspection PyUnresolvedReferences
return err.code()
return grpc.StatusCode.OK


class RetryingGrpcClient(GrpcClientABC):
"""
A GRPC client implementation that wraps another GRPC client and retries failed requests using exponential backoff.
The `sleepfunc` arg can be passed in to fake time-based sleeping for testing.
"""

def __init__(
self,
client: GrpcClientABC,
sleepfunc: typing.Callable[[int], None] = time.sleep,
max_retries: int = 4,
initial_sleep_time_sec: int = 0.5,
):
self._client = client
self._sleep = sleepfunc
self._max_retries = max_retries
self._initial_sleep_time_sec = initial_sleep_time_sec

def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode:
sleep_time_sec = self._initial_sleep_time_sec
out = grpc.StatusCode.OK
for i in range(self._max_retries):
out = self._client.send(batch)
if out == grpc.StatusCode.OK:
return grpc.StatusCode.OK
self._sleep(sleep_time_sec)
sleep_time_sec *= 2
return out


class FakeGrpcClient(GrpcClientABC):
"""
A fake GRPC client that can be used for testing. To fake status codes, optionally set the `status_codes` arg to a
list/tuple of status codes you want the send() method to return. If there are more calls to send() than there are
status codes, the last status code is reused. Set the `sleep_time_sec` arg to a positive value to sleep every time
there is a send(), simulating network transmission time.
"""

def __init__(
self,
status_codes: typing.List[grpc.StatusCode] = (grpc.StatusCode.OK,),
sleep_time_sec: int = 0,
):
self._status_codes = status_codes
self._sleep_time_sec = sleep_time_sec
self._batches = []

def send(self, batch: typing.Sequence[ReadableSpan]) -> grpc.StatusCode:
time.sleep(self._sleep_time_sec)
self._batches.append(batch)
num_sends = len(self._batches)
idx = min(num_sends, len(self._status_codes)) - 1
return self._status_codes[idx]

def get_batches(self):
return self._batches

def num_batches(self):
return len(self._batches)

def num_spans_in_batch(self, idx: int):
return len(self._batches[idx])
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import typing
from enum import Enum

import grpc

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace.export.experimental.client import GrpcClientABC, RetryingGrpcClient, GrpcClient


class ExporterFlushResult(Enum):
SUCCESS = 1
FAILURE = 2
TIMEOUT = 3


class OTLPSpanExporter2(SpanExporter):
"""
An implementation of SpanExporter. Accepts an optional client. If one is not supplied, creates a retrying client.
Sends spans immediately -- has no queue to flush or separate thread to shut down.
"""

def __init__(self, client: GrpcClientABC = RetryingGrpcClient(GrpcClient())):
self._client = client

def export(self, batch: typing.Sequence[ReadableSpan]) -> SpanExportResult:
status_code = self._client.send(batch)
return SpanExportResult.SUCCESS if status_code == grpc.StatusCode.OK else SpanExportResult.FAILURE

def force_flush(self, timeout_millis: int = 30000) -> ExporterFlushResult:
"""
Nothing to flush.
"""
return ExporterFlushResult.SUCCESS

def shutdown(self) -> None:
"""
Nothing to shut down.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import typing

from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, Span
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace.export.experimental.accumulator import SpanAccumulator
from opentelemetry.sdk.trace.export.experimental.timer import TimerABC, EventBasedTimer, ThreadBasedTimer


class BatchSpanProcessor2(SpanProcessor):
"""
A SpanProcessor that sends spans in batches on an interval or when a maximum number of spans has been reached,
whichever comes first.
"""

def __init__(
self,
exporter: SpanExporter,
max_batch_size: int = 512,
interval_sec: int = 5,
timer: typing.Optional[TimerABC] = None,
):
self._exporter = exporter
self._accumulator = SpanAccumulator(max_batch_size)
self._timer = timer or ThreadBasedTimer(interval_sec)
self._timer.set_callback(self._export_batch)
self._timer.start()

def on_start(self, span: Span, parent_context: typing.Optional[Context] = None) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
"""
This method must be extremely fast. It adds the span to the accumulator for later sending and pokes the timer
if the number of spans waiting to be sent has reached the maximum batch size.
"""
full = self._accumulator.push(span)
if full:
self._timer.poke()

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""
Stops the timer, exports any spans in the accumulator then restarts the timer.
"""
self._timer.stop()
self._exporter.force_flush(timeout_millis) # this may be a no-op depending on the impl
while self._accumulator.nonempty():
result = self._export_batch()
if result != SpanExportResult.SUCCESS:
return False
self._timer.start()

def shutdown(self) -> None:
self._timer.stop()
while self._accumulator.nonempty():
self._export_batch()
self._exporter.shutdown()

def _export_batch(self) -> SpanExportResult:
"""
This is the timer's callback. It runs on its own thread.
"""
batch = self._accumulator.batch()
if len(batch) > 0:
return self._exporter.export(batch)
return SpanExportResult.SUCCESS
Loading
Loading