diff --git a/firehose/firehose.go b/firehose/firehose.go index 48d22c5..b708bdc 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -60,7 +60,6 @@ type OutputPlugin struct { client PutRecordBatcher records []*firehose.Record dataLength int - backoff *plugins.Backoff timer *plugins.Timeout PluginID int } @@ -106,7 +105,6 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKe client: client, records: records, dataKeys: dataKeys, - backoff: plugins.NewBackoff(), timer: timer, timeKey: timeKey, fmtStrftime: timeFormatter, @@ -212,7 +210,6 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([ } func (output *OutputPlugin) sendCurrentBatch() error { - output.backoff.Wait() output.timer.Check() response, err := output.client.PutRecordBatch(&firehose.PutRecordBatchInput{ @@ -225,9 +222,6 @@ func (output *OutputPlugin) sendCurrentBatch() error { if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == firehose.ErrCodeServiceUnavailableException { logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID) - // https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch - // Firehose recommends backoff when this error is encountered - output.backoff.StartBackoff() } } return err @@ -256,9 +250,7 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch failedRecords = append(failedRecords, output.records[i]) } if aws.StringValue(record.ErrorCode) == firehose.ErrCodeServiceUnavailableException { - // https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch - // Firehose recommends backoff when this error is encountered - output.backoff.StartBackoff() + logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID) } } @@ -272,7 +264,6 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch } else { // request fully succeeded output.timer.Reset() - output.backoff.Reset() output.records = output.records[:0] output.dataLength = 0 } diff --git a/plugins/plugins.go b/plugins/plugins.go index 38f67dc..106c62c 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -20,7 +20,6 @@ import ( "strings" "time" - retry "github.com/cenkalti/backoff" "github.com/sirupsen/logrus" ) @@ -34,48 +33,6 @@ const ( maxInterval = 10 * time.Second ) -// Backoff wraps github.com/cenkalti/backoff -// Wait() is called for each AWS API call that may need back off -// But backoff only occurs if StartBackoff() has previously been called -// Reset() should be called whenever backoff can end. -type Backoff struct { - doBackoff bool - expBackoff *retry.ExponentialBackOff -} - -// Reset ends the exponential backoff -func (b *Backoff) Reset() { - b.doBackoff = false - b.expBackoff.Reset() -} - -// Wait enacts the exponential backoff, if StartBackoff() has been called -func (b *Backoff) Wait() { - if b.doBackoff { - d := b.expBackoff.NextBackOff() - logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d) - time.Sleep(d) - } -} - -// StartBackoff begins exponential backoff -// its a no-op if backoff has already started -func (b *Backoff) StartBackoff() { - b.doBackoff = true -} - -// NewBackoff creates a new Backoff struct with default values -func NewBackoff() *Backoff { - b := retry.NewExponentialBackOff() - b.InitialInterval = initialInterval - b.MaxElapsedTime = 0 // The backoff object never expires - b.MaxInterval = maxInterval - return &Backoff{ - doBackoff: false, - expBackoff: b, - } -} - // Timeout is a simple timeout for single-threaded programming // (Goroutines are expensive in Cgo) type Timeout struct {