Skip to content

Commit

Permalink
Remove exponential backoff code aws#23
Browse files Browse the repository at this point in the history
  • Loading branch information
PettitWesley committed May 12, 2020
1 parent 7252a82 commit 1c36935
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 53 deletions.
11 changes: 1 addition & 10 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type OutputPlugin struct {
client PutRecordBatcher
records []*firehose.Record
dataLength int
backoff *plugins.Backoff
timer *plugins.Timeout
PluginID int
}
Expand Down Expand Up @@ -106,7 +105,6 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKe
client: client,
records: records,
dataKeys: dataKeys,
backoff: plugins.NewBackoff(),
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
Expand Down Expand Up @@ -212,7 +210,6 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([
}

func (output *OutputPlugin) sendCurrentBatch() error {
output.backoff.Wait()
output.timer.Check()

response, err := output.client.PutRecordBatch(&firehose.PutRecordBatchInput{
Expand All @@ -225,9 +222,6 @@ func (output *OutputPlugin) sendCurrentBatch() error {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == firehose.ErrCodeServiceUnavailableException {
logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
// https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch
// Firehose recommends backoff when this error is encountered
output.backoff.StartBackoff()
}
}
return err
Expand Down Expand Up @@ -256,9 +250,7 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch
failedRecords = append(failedRecords, output.records[i])
}
if aws.StringValue(record.ErrorCode) == firehose.ErrCodeServiceUnavailableException {
// https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch
// Firehose recommends backoff when this error is encountered
output.backoff.StartBackoff()
logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
}
}

Expand All @@ -272,7 +264,6 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch
} else {
// request fully succeeded
output.timer.Reset()
output.backoff.Reset()
output.records = output.records[:0]
output.dataLength = 0
}
Expand Down
43 changes: 0 additions & 43 deletions plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strings"
"time"

retry "github.com/cenkalti/backoff"
"github.com/sirupsen/logrus"
)

Expand All @@ -34,48 +33,6 @@ const (
maxInterval = 10 * time.Second
)

// Backoff wraps github.com/cenkalti/backoff
// Wait() is called for each AWS API call that may need back off
// But backoff only occurs if StartBackoff() has previously been called
// Reset() should be called whenever backoff can end.
type Backoff struct {
doBackoff bool
expBackoff *retry.ExponentialBackOff
}

// Reset ends the exponential backoff
func (b *Backoff) Reset() {
b.doBackoff = false
b.expBackoff.Reset()
}

// Wait enacts the exponential backoff, if StartBackoff() has been called
func (b *Backoff) Wait() {
if b.doBackoff {
d := b.expBackoff.NextBackOff()
logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d)
time.Sleep(d)
}
}

// StartBackoff begins exponential backoff
// its a no-op if backoff has already started
func (b *Backoff) StartBackoff() {
b.doBackoff = true
}

// NewBackoff creates a new Backoff struct with default values
func NewBackoff() *Backoff {
b := retry.NewExponentialBackOff()
b.InitialInterval = initialInterval
b.MaxElapsedTime = 0 // The backoff object never expires
b.MaxInterval = maxInterval
return &Backoff{
doBackoff: false,
expBackoff: b,
}
}

// Timeout is a simple timeout for single-threaded programming
// (Goroutines are expensive in Cgo)
type Timeout struct {
Expand Down

0 comments on commit 1c36935

Please sign in to comment.