-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
batch_sender.go
233 lines (206 loc) · 6.45 KB
/
batch_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
import (
"context"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
)
// batchSender is a component that places requests into batches before passing them to the downstream senders.
// Batches are sent out with any of the following conditions:
// - batch size reaches cfg.MinSizeItems
// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type batchSender struct {
baseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[Request]
// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
// Populated from the number of queue consumers if queue is enabled.
concurrencyLimit uint64
activeRequests atomic.Uint64
resetTimerCh chan struct{}
mu sync.Mutex
activeBatch *batch
logger *zap.Logger
shutdownCh chan struct{}
shutdownCompleteCh chan struct{}
stopped *atomic.Bool
}
// newBatchSender returns a new batch consumer component.
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) *batchSender {
bs := &batchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
mergeSplitFunc: msf,
shutdownCh: make(chan struct{}),
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}
func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
timer := time.NewTimer(bs.cfg.FlushTimeout)
go func() {
for {
select {
case <-bs.shutdownCh:
// There is a minimal chance that another request is added after the shutdown signal.
// This loop will handle that case.
for bs.activeRequests.Load() > 0 {
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
}
if !timer.Stop() {
<-timer.C
}
close(bs.shutdownCompleteCh)
return
case <-timer.C:
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}
timer.Reset(bs.cfg.FlushTimeout)
}
}
}()
return nil
}
type batch struct {
ctx context.Context
request Request
done chan struct{}
err error
}
func newEmptyBatch() *batch {
return &batch{
ctx: context.Background(),
done: make(chan struct{}),
}
}
// exportActiveBatch exports the active batch asynchronously and replaces it with a new one.
// Caller must hold the lock.
func (bs *batchSender) exportActiveBatch() {
go func(b *batch) {
b.err = b.request.Export(b.ctx)
close(b.done)
}(bs.activeBatch)
bs.activeBatch = newEmptyBatch()
}
func (bs *batchSender) resetTimer() {
if !bs.stopped.Load() {
bs.resetTimerCh <- struct{}{}
}
}
// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
func (bs *batchSender) isActiveBatchReady() bool {
return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems ||
(bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit)
}
func (bs *batchSender) send(ctx context.Context, req Request) error {
// Stopped batch sender should act as pass-through to allow the queue to be drained.
if bs.stopped.Load() {
return bs.nextSender.send(ctx, req)
}
if bs.cfg.MaxSizeItems > 0 {
return bs.sendMergeSplitBatch(ctx, req)
}
return bs.sendMergeBatch(ctx, req)
}
// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))
reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
}
if len(reqs) == 1 || bs.activeBatch.request != nil {
bs.updateActiveBatch(ctx, reqs[0])
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
if batch.err != nil {
return batch.err
}
reqs = reqs[1:]
} else {
bs.mu.Unlock()
}
// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
for _, r := range reqs {
if err := r.Export(ctx); err != nil {
return err
}
}
return nil
}
// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
bs.mu.Lock()
bs.activeRequests.Add(1)
defer bs.activeRequests.Add(^uint64(0))
if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
if err != nil {
bs.mu.Unlock()
return err
}
}
bs.updateActiveBatch(ctx, req)
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
return batch.err
}
// updateActiveBatch update the active batch to the new merged request and context.
// The context is only set once and is not updated after the first call.
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
if bs.activeBatch.request == nil {
bs.activeBatch.ctx = ctx
}
bs.activeBatch.request = req
}
func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
close(bs.shutdownCh)
<-bs.shutdownCompleteCh
return nil
}