-
Notifications
You must be signed in to change notification settings - Fork 660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sdk/trace/exporters: add batch span processor exporter #153
sdk/trace/exporters: add batch span processor exporter #153
Conversation
The exporters specification states that two built-in span processors should be implemented, the simple processor span and the batch processor span. This commit implements the later, it is mainly based on the opentelemetry/java one. The algorithm implements the following logic: - a condition variable is used to notify the worker thread in case the queue is half full, so that exporting can start before the queue gets full and spans are dropped. - export is called each schedule_delay_millis if there is a least one new span to export. - when the processor is shutdown all remaining spans are exported.
Wow, @mauriciovasquezbernal you're faster! I was writing the same thing last night :) |
opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Outdated
Show resolved
Hide resolved
opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Outdated
Show resolved
Hide resolved
self, | ||
span_exporter: SpanExporter, | ||
max_queue_size: int = 2048, | ||
schedule_delay_millis: int = 5000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int
-> float
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reyang milliseconds aren't precise enough here?
"""Exports at most max_export_batch_size spans.""" | ||
idx = 0 | ||
spans = [] | ||
# currently only a single thread acts as consumer, so queue.get() will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how I could integrate it. It'd a big redesign.
except Exception as exc: | ||
logger.warning("Exception while exporting data: %s", exc) | ||
|
||
def flush(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def flush(self): | |
def _flush(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
- use // instead of / - set daemon=true - fix error message for max_batch_size - change schedule_delay_millis' type to float - make flush internal method
I handled most of the feedback, improved the way the timeout it calculated and added a couple of new tests. |
it doesn't work sometimes when using the exact same value of schedule_delay_millis, add some room
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the skipping of _flush is a bug, all others are nitpicks.
try: | ||
self.queue.put(span, block=False) | ||
except queue.Full: | ||
# TODO: dropped spans counter? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropped span counter sounds like a plan. Or we could log a warning the first time a span is dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I'd just log the first time a span is dropped. A better approach could be a rate-limited logging system that actually prints the number of spans being dropped per second or so.
opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Outdated
Show resolved
Hide resolved
except queue.Full: | ||
# TODO: dropped spans counter? | ||
pass | ||
if self.queue.qsize() >= self.half_max_queue_size: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.queue.qsize() >= self.half_max_queue_size: | |
if self.queue.qsize() == self.half_max_queue_size: |
I think we send too many notifications otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create a variable to avoid this "notification storm", the equal comparison could not work because it is possible that the check misses it (two spans end at the same time...).
Please give me your feedback in the new solution.
) | ||
|
||
self.span_exporter = span_exporter | ||
self.queue = queue.Queue(max_queue_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You never call https://docs.python.org/3/library/queue.html#queue.Queue.task_done on the queue. Maybe a https://docs.python.org/3/library/collections.html#collections.deque would be the better (more light-weight) choice?
Deques support thread-safe, memory efficient appends and pops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it'll work. It doesn't provide a way to access the number of elements in the queue, so an external counter for the number of elements would be needed (not sure if this will work because deque drops elements without warning).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, just use len(mydeque)
:
In addition to the above, deques support iteration, pickling, len(d), reversed(d), copy.copy(d), copy.deepcopy(d), membership testing with the in operator, and subscript references such as d[-1].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're totally right, I need more coffee.
I used it, two changes:
- now older spans are dropped (that's the way deque works, cannot be changed).
- it is not possible to count the number of dropped spans (we can guess that spans would be dropped).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- now older spans are dropped
It sounds like we need to clarify that in the spec, I actually expected that we'd drop the oldest spans first.
- it is not possible to count the number of dropped spans
I think it is if we lock around adding spans to the deque, which we might need to do later anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we only consider CPython with its GIL, a plain list with a lock (condition) might actually be the best solution after all. But I expect the deque without locking at every added span to perform significantly better in GIL-less Python (pypy). By introducing a single lock that is called on every span.end(), we would effectively reintroduce a sort of GIL (even though we only hold this lock for a short time at once, it would be highly contended).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to avoid as much as possible having a lock when adding an element as it will content on all span endings.
That said, we could look more into detail of this implementation later on, there is too much room to improve and discuss.
opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Outdated
Show resolved
Hide resolved
opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Outdated
Show resolved
Hide resolved
opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Outdated
Show resolved
Hide resolved
logger.warning("Exception while exporting data: %s", exc) | ||
|
||
def _flush(self): | ||
while not self.queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking queue.empty() here again, we could have export return a bool.
pass | ||
|
||
def on_end(self, span: Span) -> None: | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should also check done
here and bail out with a warning if it is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think specification is not clear if onEnd()
could be called after shutdown()
, anyway, be defensive now and check it.
pass | ||
|
||
def on_end(self, span: Span) -> None: | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing we should maybe do is check https://docs.python.org/3/library/threading.html#threading.Thread.is_alive for our worker and restart it (logging a warning) if it crashed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this one. Maybe if that thread could crash we should implement a health check somewhere else I think. I don't want to make this onEnd
slower because it's called to each span.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although, about the slowness, we could maybe only check it in cases where we notify the condition.
- log when dropping spans - check if processor is already shutdown - avoid using RLock - prereserve span array - introduce variable to avoid sending notification storm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few non-blocking comments, and I think the tests could still use some attention, but otherwise it looks great. Thanks for picking this up!
|
||
if max_export_batch_size > max_queue_size: | ||
raise ValueError( | ||
"max_export_batch_size must be less and equal to max_export_batch_size." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"max_export_batch_size must be less and equal to max_export_batch_size." | |
"max_export_batch_size must be less than or equal to max_export_batch_size." |
Also FWIW the type annotations don't do anything at runtime, if you want to enforce int/float types here we need a type check too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That check is not strictly needed, I just want a number, if the user pass something else it'll fail at some point.
self.span_exporter.export(spans[:idx]) | ||
# pylint: disable=broad-except | ||
except Exception: | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't think logger.exception
is justified here?
"Exception while exporting data.", exc_info=sys.exc_info() | ||
) | ||
|
||
return self.queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why return the queue?
I see @Oberon00's comment now, maybe do this instead:
return self.queue | |
return bool(self.queue) |
But I think this is a surprising return type for export
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just return to my original approach. export()
returns nothing and flush()
checks the size of the queue. It's clear after all.
return self.queue | ||
|
||
def _flush(self): | ||
while self.export(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a problem for another PR, but we probably want a timeout on flush too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's keep an eye on it.
if len(self.queue) >= self.max_queue_size // 2: | ||
with self.condition: | ||
if not self.notified: | ||
self.notified = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, it's to prevent calling notify_all
when the worker isn't actually wait
ing, which could happen if we're adding spans to the queue fast enough that we never drain it more than halfway.
My intuition is that notify_all
is effectively a no-op if nothing is waiting on the cv, so I'm surprised you think it's worth the check to prevent the extra calls. In any case this is fine, just curious whether I'm missing something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my original suggestion that triggered this was to use ==
instead of >=
but @mauriciovasquezbernal rightly rejected that because with multiple threads adding spans, the condition might never be hit. But you are right that notify_all
is semantically a no-op if no one is waiting, so I'd say the self.notified
variable is overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused. The notified
variable I introduced is a horrible and wrong hack.
notify()
is not-op when nobody is waiting:
This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting.
return | ||
if len(self.queue) == self.max_queue_size: | ||
if not self._spans_dropped: | ||
logging.warning("Queue is full, likely spans will be dropped.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a consequence of adding items outside the condition, because the worker might wake up and consume some spans between here and appendleft
below?
I bet we'll want exact stats on dropped spans in the future, will probably need to change this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want exact stats we can either:
- Go back to using
queue.Queue
- Use a lock and a plain list
- Estimate the number with
n_ended_spans - (n_exported_or_exporting_spans + len(self.queue))
(becomes exact after shutdown, or whenever no spans are currently added or processed) - Maybe the queue implementation that @reyang suggested, I haven't looked at it though so I don't know if that would solve the problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c24t yes you right. There is not a way to actually now if the queue will be full when appendleft
will be called. I agree with both of you, if we need to support that use case we'll need another approach.
class MySpanExporter(export.SpanExporter): | ||
def __init__(self, destination): | ||
self.destination = destination | ||
tracer = trace.Tracer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd vote to leave the tracer out of these tests and call on_end
directly with some mock spans instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea, make them look more like test units than integration tests.
I updated it.
|
||
# call shutdown on specific span processor | ||
# TODO: this call is missing in the tracer | ||
span_processor.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this test and the one above just seem to check _flush
, it's probably worth adding a separate check that we only export max_export_batch_size
many spans at a time during normal operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add it.
with self.condition: | ||
if not self.notified: | ||
self.notified = True | ||
self.condition.notify_all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, out of curiosity, why this instead of notify
when there's only one worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to notify()
"""Exports at most max_export_batch_size spans.""" | ||
idx = 0 | ||
spans = [None] * self.max_export_batch_size | ||
# currently only a single thread acts as consumer, so queue.pop() will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will raise or will not raise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the not
got lost while wrapping the line.
def export(self) -> bool: | ||
"""Exports at most max_export_batch_size spans.""" | ||
idx = 0 | ||
spans = [None] * self.max_export_batch_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuing from #153 (comment)
(Just an optimization, maybe for another PR:) I suggest we either use some more clever preallocation size than the max, or we create the list once at thread startup in the worker function and reuse it every time in export.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Oberon00 one clever preallocation strategy is letting python handle this under the hood and just append
ing to an empty list. :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we reuse the same list and emtpy it every time, that would actually be good. 😉
The thing with the batch exporter is that it is all about performance (otherwise we could use the simple exporter). I understand that moving the bulk of the work to another thread is already the most important part though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the @Oberon00 could be a good approach, I implemented it. However if we really want to optimize it we should do some benchmark first.
- export returns nothing - added another test that sends more than max_queue_size elements - removed notified extra variable - move preallocated list to class
@mauriciovasquezbernal: once you fixed the checks and think this is ready for merging, please ping! |
for start_time and end_time Make lint happy Addressing comments Addressing comments Allowing 0 as start and end time Fix lint issues Metrics API RFC 0003 cont'd (open-telemetry#136) * Create functions Comments for Meter More comments Add more comments Fix typos * fix lint * Fix lint * fix typing * Remove options, constructors, seperate labels * Consistent naming for float and int * Abstract time series * Use ABC * Fix typo * Fix docs * seperate measure classes * Add examples * fix lint * Update to RFC 0003 * Add spancontext, measurebatch * Fix docs * Fix comments * fix lint * fix lint * fix lint * skip examples * white space * fix spacing * fix imports * fix imports * LabelValues to str * Black formatting * fix isort * Remove aggregation * Fix names * Remove aggregation from docs * Fix lint * metric changes * Typing * Fix lint * Fix lint * Add space * Fix lint * fix comments * address comments * fix comments Adding a working propagator, adding to integrations and example (open-telemetry#137) Adding a full, end-to-end example of propagation at work in the example application, including a test. Adding the use of propagators into the integrations. Metrics API RFC 0009 (open-telemetry#140) * Create functions Comments for Meter More comments Add more comments Fix typos * fix lint * Fix lint * fix typing * Remove options, constructors, seperate labels * Consistent naming for float and int * Abstract time series * Use ABC * Fix typo * Fix docs * seperate measure classes * Add examples * fix lint * Update to RFC 0003 * Add spancontext, measurebatch * Fix docs * Fix comments * fix lint * fix lint * fix lint * skip examples * white space * fix spacing * fix imports * fix imports * LabelValues to str * Black formatting * fix isort * Remove aggregation * Fix names * Remove aggregation from docs * Fix lint * metric changes * Typing * Fix lint * Fix lint * Add space * Fix lint * fix comments * handle, recordbatch * docs * Update recordbatch * black * Fix typo * remove ValueType * fix lint Console exporter (open-telemetry#156) Make use_span more flexible (closes open-telemetry#147). (open-telemetry#154) Co-Authored-By: Reiley Yang <[email protected]> Co-Authored-By: Chris Kleinknecht <[email protected]> WSGI fixes (open-telemetry#148) Fix http.url. Don't delay calling wrapped app. Skeleton for azure monitor exporters (open-telemetry#151) Add link to docs to README (open-telemetry#170) Move example app to the examples folder (open-telemetry#172) WSGI: Fix port 80 always appended in http.host (open-telemetry#173) Build and host docs via github action (open-telemetry#167) Add missing license boilerplate to a few files (open-telemetry#176) sdk/trace/exporters: add batch span processor exporter (open-telemetry#153) The exporters specification states that two built-in span processors should be implemented, the simple processor span and the batch processor span. This commit implements the latter, it is mainly based on the opentelemetry/java one. The algorithm implements the following logic: - a condition variable is used to notify the worker thread in case the queue is half full, so that exporting can start before the queue gets full and spans are dropped. - export is called each schedule_delay_millis if there is a least one new span to export. - when the processor is shutdown all remaining spans are exported. Implementing W3C TraceContext (fixes open-telemetry#116) (open-telemetry#180) * Implementing TraceContext (fixes open-telemetry#116) This introduces a w3c TraceContext propagator, primarily inspired by opencensus. fix time conversion bug (open-telemetry#182) Introduce Context.suppress_instrumentation (open-telemetry#181) Metrics Implementation (open-telemetry#160) * Create functions Comments for Meter More comments Add more comments Fix typos * fix lint * Fix lint * fix typing * Remove options, constructors, seperate labels * Consistent naming for float and int * Abstract time series * Use ABC * Fix typo * Fix docs * seperate measure classes * Add examples * fix lint * Update to RFC 0003 * Add spancontext, measurebatch * Fix docs * Fix comments * fix lint * fix lint * fix lint * skip examples * white space * fix spacing * fix imports * fix imports * LabelValues to str * Black formatting * fix isort * Remove aggregation * Fix names * Remove aggregation from docs * Fix lint * metric changes * Typing * Fix lint * Fix lint * Add space * Fix lint * fix comments * handle, recordbatch * docs * Update recordbatch * black * Fix typo * remove ValueType * fix lint * sdk * metrics * example * counter * Tests * Address comments * ADd tests * Fix typing and examples * black * fix lint * remove override * Fix tests * mypy * fix lint * fix type * fix typing * fix tests * isort * isort * isort * isort * noop * lint * lint * fix tuple typing * fix type * black * address comments * fix type * fix lint * remove imports * default tests * fix lint * usse sequence * remove ellipses * remove ellipses * black * Fix typo * fix example * fix type * fix type * address comments Implement Azure Monitor Exporter (open-telemetry#175) Span add override parameters for start_time and end_time (open-telemetry#179) CONTRIBUTING.md: Fix clone URL (open-telemetry#177) Add B3 exporter to alpha release table (open-telemetry#164) Update README for alpha release (open-telemetry#189) Update Contributing.md doc (open-telemetry#194) Add **simple** client/server examples (open-telemetry#191) Remove unused dev-requirements.txt (open-telemetry#200) The requirements are contained in tox.ini now. Fx bug in BoundedList for Python 3.4 and add tests (open-telemetry#199) * fix bug in BoundedList for python 3.4 and add tests collections.deque.copy() was introduced in python 3.5, this commit changes that by the deque constructor and adds some tests to BoundedList and BoundedDict to avoid similar problems in the future. Also, improve docstrings of BoundedList and BoundedDict classes Move util.time_ns to API. (open-telemetry#205) Add Jaeger exporter (open-telemetry#174) This adds a Jeager exporter for OpenTelemetry. This exporter is based on https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-jaeger. The exporter uses thrift and can be configured to send data to the agent and also to a remote collector. There is a long discussion going on about how to include generated files in the repo, so for now just put them here. Add code coverage Revert latest commit Fix some "errors" found by mypy. (open-telemetry#204) Fix some errors found by mypy (split from open-telemetry#201). Update README for new milestones (open-telemetry#218) Refactor current span handling for newly created spans. (open-telemetry#198) 1. Make Tracer.start_span() simply create and start the Span, without setting it as the current instance. 2. Add an extra Tracer.start_as_current_span() to create the Span and set it as the current instance automatically. Co-Authored-By: Chris Kleinknecht <[email protected]> Add set_status to Span (open-telemetry#213) Initial commit Initial version
* fix: minor fix * fix: remove _ prefix from params
The exporters specification states that two built-in span processors should be
implemented, the simple processor span and the batch processor span.
This commit implements the later, it is mainly based on the opentelemetry/java
one.
The algorithm implements the following logic:
is half full, so that exporting can start before the queue gets full and spans
are dropped.
to export.