This repository has been archived by the owner on Apr 27, 2023. It is now read-only.
forked from a8m/kinesis-producer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
producer.go
290 lines (255 loc) · 7.07 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// Amazon kinesis producer
// A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
// and using the same aggregation format that KPL use.
//
// Note: this project start as a fork of `tj/go-kinesis`. if you are not intersting in the
// KPL aggregation logic, you probably want to check it out.
package producer
import (
"sync"
"time"
)
type Producer struct {
sync.RWMutex
*Config
shardMap *ShardMap
// semaphore controling size of Put backlog before blocking
backlog semaphore
pool *WorkerPool
// stopped signals that the producer is no longer accepting Puts
stopped chan struct{}
// signal for the main loop that stop has been called and it should drain the backlog
done chan struct{}
failures chan error
}
func New(config *Config) (*Producer, error) {
config.defaults()
p := &Producer{
Config: config,
backlog: make(chan struct{}, config.BacklogCount),
pool: NewWorkerPool(config),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
shards, _, err := p.GetShards(nil)
if err != nil {
return nil, err
}
p.shardMap = NewShardMap(shards, p.AggregateBatchCount)
return p, nil
}
// Put `data` using `partitionKey` asynchronously. This method is thread-safe.
//
// Under the covers, the Producer will automatically re-attempt puts in case of
// transient errors.
// When unrecoverable error has detected(e.g: trying to put to in a stream that
// doesn't exist), the message will returned by the Producer.
// Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.
func (p *Producer) Put(data []byte, partitionKey string) error {
return p.PutUserRecord(NewDataRecord(data, partitionKey))
}
func (p *Producer) PutUserRecord(userRecord UserRecord) error {
select {
case <-p.stopped:
return userRecord.(*ErrStoppedProducer)
// same as p.backlog.acquire() but using channel primative for select case
case p.backlog <- struct{}{}:
}
var release = true
defer func() {
if release {
p.backlog.release()
}
}()
partitionKey := userRecord.PartitionKey()
partitionKeySize := len(partitionKey)
if partitionKeySize < 1 || partitionKeySize > 256 {
return userRecord.(*ErrIllegalPartitionKey)
}
// Kinesis counts partition key size towards size limits
recordSize := userRecord.Size() + partitionKeySize
if recordSize > maxRecordSize {
return userRecord.(*ErrRecordSizeExceeded)
}
var (
record *AggregatedRecordRequest
err error
)
// if the record size is bigger than aggregation size
// handle it as a simple kinesis record
// TODO: this logic is not enforced when doing reaggreation after shard refresh
if recordSize > p.AggregateBatchSize {
record = NewAggregatedRecordRequest(userRecord.Data(), &partitionKey, nil, []UserRecord{userRecord})
} else {
record, err = p.shardMap.Put(userRecord)
}
if record != nil {
// if we are going to send a record over the records channel
// we hold the semaphore until that record has been sent
// this way we can rely on p.backlog.wait() to mean all waiting puts complete and
// future puts are blocked
release = false
go func() {
p.pool.Add(record)
p.backlog.release()
}()
}
return err
}
func (p *Producer) Start() {
poolErrs := p.pool.Errors()
// listen for errors from the worker pool p.notify() will send on the failures
// channel if p.NotifyFailures() has been called
go func() {
for err := range poolErrs {
p.notify(err)
}
// we can close p.failure after pool error channel has closed
// because
p.Lock()
if p.failures != nil {
close(p.failures)
p.failures = nil
}
p.Unlock()
}()
p.pool.Start()
go p.loop()
}
func (p *Producer) Stop() {
// signal to stop any future Puts
close(p.stopped)
// signal to main loop to begin cleanup process
p.done <- struct{}{}
// wait for the worker pool to complete
p.pool.Wait()
// send another signal to main loop to exit
p.done <- struct{}{}
<-p.done
}
// NotifyFailures registers and return listener to handle undeliverable messages.
// The incoming struct has a copy of the Data and the PartitionKey along with some
// error information about why the publishing failed.
func (p *Producer) NotifyFailures() <-chan error {
p.Lock()
defer p.Unlock()
if p.failures == nil {
p.failures = make(chan error, p.BacklogCount)
}
return p.failures
}
func (p *Producer) loop() {
var (
stop chan struct{}
done chan struct{} = p.done
flushTick *time.Ticker = time.NewTicker(p.FlushInterval)
flushTickC <-chan time.Time = flushTick.C
shardTick *time.Ticker
shardTickC <-chan time.Time
)
if p.ShardRefreshInterval != 0 {
shardTick = time.NewTicker(p.ShardRefreshInterval)
shardTickC = shardTick.C
defer shardTick.Stop()
}
defer flushTick.Stop()
defer close(p.done)
flush := func() {
records := p.drain()
for _, record := range records {
p.pool.Add(record)
}
p.pool.Flush()
}
for {
select {
case <-flushTickC:
flush()
case <-shardTickC:
err := p.updateShards(done == nil)
if err != nil {
p.Logger.Error("UpdateShards error", err)
p.notify(err)
}
case <-done:
// after waiting for the pool to finish, Stop() will send another signal to the done
// channel, the second time signaling its safe to end this go routine
stop, done = done, nil
// once we are done we no longer need flush tick as we are already
// flushing the backlog
flushTickC = nil
// block any more puts from happening
p.backlog.wait(p.BacklogCount)
// backlog is flushed and no more records are incomming
// flush any remaining records in the aggregator
flush()
// with puts blocked and flush complete, we can close input channel safely
p.pool.Close()
case <-stop:
return
}
}
}
func (p *Producer) updateShards(done bool) error {
old := p.shardMap.Shards()
shards, updated, err := p.GetShards(old)
if err != nil {
return err
}
if !updated {
return nil
}
if p.Verbose {
p.Logger.Info("waiting for backlog to fully release")
}
if !done {
// if done signal has not been received yet, flush all backlogged puts into the worker
// pool and block additional puts
p.backlog.wait(p.BacklogCount)
}
// pause and drain the worker pool
if p.Verbose {
p.Logger.Info("pausing worker pool")
}
pending := p.pool.Pause()
if p.Verbose {
p.Logger.Info("updating shards")
}
// update the shards and reaggregate pending records
records, err := p.shardMap.UpdateShards(shards, pending)
if p.Verbose {
p.Logger.Info("resuming worker pool")
}
// resume the worker pool
p.pool.Resume(records)
if p.Verbose {
p.Logger.Info("waiting for backlog to reopen")
}
if !done {
// if done signal has not been received yet, re-open the backlog to accept more Puts
p.backlog.open(p.BacklogCount)
}
if p.Verbose {
p.Logger.Info("done updating shards, returning")
}
return err
}
func (p *Producer) drain() []*AggregatedRecordRequest {
if p.shardMap.Size() == 0 {
return nil
}
records, errs := p.shardMap.Drain()
if len(errs) > 0 {
p.notify(errs...)
}
return records
}
func (p *Producer) notify(errs ...error) {
p.RLock()
if p.failures != nil {
for _, err := range errs {
p.failures <- err
}
}
p.RUnlock()
}