-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
batch_span_processor.go
409 lines (358 loc) · 11.1 KB
/
batch_span_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/trace"
)
// Defaults for BatchSpanProcessorOptions.
const (
DefaultMaxQueueSize = 2048
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)
// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
// BatchSpanProcessorOptions is configuration settings for a
// BatchSpanProcessor.
type BatchSpanProcessorOptions struct {
// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
// The default value of MaxQueueSize is 2048.
MaxQueueSize int
// BatchTimeout is the maximum duration for constructing a batch. Processor
// forcefully sends available spans when timeout is reached.
// The default value of BatchTimeout is 5000 msec.
BatchTimeout time.Duration
// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
// is reached, the export will be cancelled.
// The default value of ExportTimeout is 30000 msec.
ExportTimeout time.Duration
// MaxExportBatchSize is the maximum number of spans to process in a single batch.
// If there are more than one batch worth of spans then it processes multiple batches
// of spans one batch after the other without any delay.
// The default value of MaxExportBatchSize is 512.
MaxExportBatchSize int
// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
// AND if BlockOnQueueFull is set to true.
// Blocking option should be used carefully as it can severely affect the performance of an
// application.
BlockOnQueueFull bool
}
// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
// spans and sends them to a trace.Exporter when complete.
type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions
queue chan ReadOnlySpan
dropped uint32
batch []ReadOnlySpan
batchMutex sync.Mutex
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
}
var _ SpanProcessor = (*batchSpanProcessor)(nil)
// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
// span batches to the exporter with the supplied options.
//
// If the exporter is nil, the span processor will perform no action.
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
if maxExportBatchSize > maxQueueSize {
if DefaultMaxExportBatchSize > maxQueueSize {
maxExportBatchSize = maxQueueSize
} else {
maxExportBatchSize = DefaultMaxExportBatchSize
}
}
o := BatchSpanProcessorOptions{
BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
}
for _, opt := range options {
opt(&o)
}
bsp := &batchSpanProcessor{
e: exporter,
o: o,
batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan ReadOnlySpan, o.MaxQueueSize),
stopCh: make(chan struct{}),
}
bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
bsp.processQueue()
bsp.drainQueue()
}()
return bsp
}
// OnStart method does nothing.
func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
// Do not enqueue spans after Shutdown.
if bsp.stopped.Load() {
return
}
// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
}
bsp.enqueue(s)
}
// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
bsp.stopped.Store(true)
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
if bsp.e != nil {
if err := bsp.e.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}
close(wait)
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}
type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}
func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}
// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
// Interrupt if context is already canceled.
if err := ctx.Err(); err != nil {
return err
}
// Do nothing after Shutdown.
if bsp.stopped.Load() {
return nil
}
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
case <-bsp.stopCh:
// The batchSpanProcessor is Shutdown.
return nil
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}
wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
close(wait)
}()
// Wait until the export is finished or the context is cancelled/timed out
select {
case err = <-wait:
case <-ctx.Done():
err = ctx.Err()
}
}
return err
}
// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
// maximum queue size allowed for a BatchSpanProcessor.
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxQueueSize = size
}
}
// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
// the maximum export batch size allowed for a BatchSpanProcessor.
func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxExportBatchSize = size
}
}
// WithBatchTimeout returns a BatchSpanProcessorOption that configures the
// maximum delay allowed for a BatchSpanProcessor before it will export any
// held span (whether the queue is full or not).
func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BatchTimeout = delay
}
}
// WithExportTimeout returns a BatchSpanProcessorOption that configures the
// amount of time a BatchSpanProcessor waits for an exporter to export before
// abandoning the export.
func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.ExportTimeout = timeout
}
}
// WithBlocking returns a BatchSpanProcessorOption that configures a
// BatchSpanProcessor to wait for enqueue operations to succeed instead of
// dropping data when the queue is full.
func WithBlocking() BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.BlockOnQueueFull = true
}
}
// exportSpans is a subroutine of processing and draining the queue.
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
bsp.timer.Reset(bsp.o.BatchTimeout)
bsp.batchMutex.Lock()
defer bsp.batchMutex.Unlock()
if bsp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
defer cancel()
}
if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
err := bsp.e.ExportSpans(ctx, bsp.batch)
// A new batch is always created after exporting, even if the batch failed to be exported.
//
// It is up to the exporter to implement any type of retry logic if a batch is failing
// to be exported, since it is specific to the protocol and backend being sent to.
bsp.batch = bsp.batch[:0]
if err != nil {
return err
}
}
return nil
}
// processQueue removes spans from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch.
func (bsp *batchSpanProcessor) processQueue() {
defer bsp.timer.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case <-bsp.stopCh:
return
case <-bsp.timer.C:
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
if !bsp.timer.Stop() {
<-bsp.timer.C
}
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
}
}
}
}
// drainQueue awaits the any caller that had added to bsp.stopWait
// to finish the enqueue, then exports the final batch.
func (bsp *batchSpanProcessor) drainQueue() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case sd := <-bsp.queue:
if _, ok := sd.(forceFlushSpan); ok {
// Ignore flush requests as they are not valid spans.
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
}
default:
// There are no more enqueued spans. Make final export.
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
}
}
}
func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
ctx := context.TODO()
if bsp.o.BlockOnQueueFull {
bsp.enqueueBlockOnQueueFull(ctx, sd)
} else {
bsp.enqueueDrop(ctx, sd)
}
}
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
select {
case bsp.queue <- sd:
return true
case <-ctx.Done():
return false
}
}
func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
select {
case bsp.queue <- sd:
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
}
return false
}
// MarshalLog is the marshaling function used by the logging system to represent this Span Processor.
func (bsp *batchSpanProcessor) MarshalLog() interface{} {
return struct {
Type string
SpanExporter SpanExporter
Config BatchSpanProcessorOptions
}{
Type: "BatchSpanProcessor",
SpanExporter: bsp.e,
Config: bsp.o,
}
}