-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the BatchingProcessor #5093
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5093 +/- ##
=======================================
+ Coverage 84.3% 84.5% +0.1%
=======================================
Files 258 258
Lines 17042 17110 +68
=======================================
+ Hits 14382 14466 +84
+ Misses 2361 2345 -16
Partials 299 299
|
3604420
to
5ce16bf
Compare
Check the len of records in eventually assertion given that is what we are going to measure.
The batching log processor will generate records from 4 different locations (polling, OnEmit, ForceFlush, Shutdown). In order to ensure an Exporter is called serially, as is required by the interface, this function will be used in the processor to centralize the interaction with its Exporter. Part of open-telemetry#5063. See open-telemetry#5093 for the implementation use.
The BatchingProcessor is not expected to ultimately contain configuration fields for queue size or export parameters (see open-telemetry#5093). This will break TestNewBatchingProcessorConfiguration which tests the configuration by evaluating the BatchingProcessor directly. Instead, test the batchingConfig and rename the test to TestNewBatchingConfig to match what is being tested.
The BatchingProcessor is not expected to ultimately contain configuration fields for queue size or export parameters (see open-telemetry#5093). This will break TestNewBatchingProcessorConfiguration which tests the configuration by evaluating the BatchingProcessor directly. Instead, test the batchingConfig and rename the test to TestNewBatchingConfig to match what is being tested.
Do not spawn a goroutine for the flush operation.
All my blocking comments are resolved. I will do my best to do another look on Thursday.
Co-authored-by: Robert Pająk <[email protected]>
Plan is to wait until Monday to merge. @dashpole and @MadVikingGod are planning to take a look at this. |
@dashpole does this look good to you? |
@dashpole: planning to merge given this is blocking other work. |
Part of #5063
Adds a feature complete implementation of the
BatchingProcessor
. There remains performance improvements to the addressed as follow-up tasks (tracked below)Design
At a high-level, the
BatchingProcessor
is designed to provide the highest throughput of log Records possible while being compatible with OpenTelemetry. The entry point of log Records is theOnEmit
method. This method is designed to receive Records as fast as possible while still honoring shutdown commands. All Records received are enqueued to aqueue
.In order to block
OnEmit
as little as possible, a separate "poll" goroutine is spawned at the creation of aBatchingProcessor
. This goroutine is responsible for batching the queue at regular polled intervals, or when it is directly signaled to.To keep the polling goroutine from backing up, all batches it makes are exported with a
bufferedExporter
. This exporter allows the poll goroutine to enqueue an export payload that will be handled in a separate goroutine dedicated to the export. This asynchronous behavior allows the poll goroutine to maintain accurate interval polling.Release valve
When the
BatchingProcessor
receives more Records than it is able to batch and export it needs a way to not block the user application still sending Records. The "release valve" that prevents this is thequeue
records are stored to. Thisqueue
is backed by a ring buffer. It will overwrite the oldest records first when writes toOnEmit
are made faster than the queue can be batched.Shutdown
andForceFlush
The asynchronous design of the
BatchingProcessor
that allows for high throughput needs to also incorporate the synchronous nature of calls toShutdown
andForceFlush
.To handle the flushing operation for both of these operations, the
queue
type can be explicitly flushed. The data returned can be sent to thebufferExporter
'sExport
method to synchronously be exported (along with flushing any pending exports).The shutdown state of the processor is managed at a high-level using an
atomic.Bool
that is checked by all exported methods before the preform operations.Enabled
This updates the
Enabled
method to returnfalse
when the processor has been shutdown.Follow up tasks
Tracked in #5063
BatchingProcessor
created without usingNewBatchingProcessor
does not panic