From a33bb932973ebd03aa5bf2043ca34d6c713ed481 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 5 Oct 2020 09:58:05 -0400 Subject: [PATCH] [libbeat] Add configurable exponential backoff for disk queue write errors (#21493) (#21497) (cherry picked from commit b0236ee8afacb87e47a98ab075b2b189e5911ff1) --- libbeat/publisher/queue/diskqueue/config.go | 52 +++++++++++++++++-- .../publisher/queue/diskqueue/deleter_loop.go | 9 +++- libbeat/publisher/queue/diskqueue/segments.go | 9 +++- libbeat/publisher/queue/diskqueue/util.go | 22 ++++++-- .../publisher/queue/diskqueue/writer_loop.go | 28 ++++++++-- 5 files changed, 104 insertions(+), 16 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 6a165a665db..b8ef456d03d 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "path/filepath" + "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgtype" @@ -61,16 +62,26 @@ type Settings struct { // A listener that should be sent ACKs when an event is successfully // written to disk. WriteToDiskListener queue.ACKListener + + // RetryInterval specifies how long to wait before retrying a fatal error + // writing to disk. If MaxRetryInterval is nonzero, subsequent retries will + // use exponential backoff up to the specified limit. + RetryInterval time.Duration + MaxRetryInterval time.Duration } // userConfig holds the parameters for a disk queue that are configurable // by the end user in the beats yml file. type userConfig struct { - Path string `config:"path"` - MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"` - SegmentSize *cfgtype.ByteSize `config:"segment_size"` - ReadAheadLimit *int `config:"read_ahead"` - WriteAheadLimit *int `config:"write_ahead"` + Path string `config:"path"` + MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"` + SegmentSize *cfgtype.ByteSize `config:"segment_size"` + + ReadAheadLimit *int `config:"read_ahead"` + WriteAheadLimit *int `config:"write_ahead"` + + RetryInterval *time.Duration `config:"retry_interval" validate:"positive"` + MaxRetryInterval *time.Duration `config:"max_retry_interval" validate:"positive"` } func (c *userConfig) Validate() error { @@ -96,6 +107,13 @@ func (c *userConfig) Validate() error { "Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize) } + if c.RetryInterval != nil && c.MaxRetryInterval != nil && + *c.MaxRetryInterval < *c.RetryInterval { + return fmt.Errorf( + "Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)", + *c.MaxRetryInterval, *c.RetryInterval) + } + return nil } @@ -108,6 +126,9 @@ func DefaultSettings() Settings { ReadAheadLimit: 512, WriteAheadLimit: 2048, + + RetryInterval: 1 * time.Second, + MaxRetryInterval: 30 * time.Second, } } @@ -137,6 +158,13 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) { settings.WriteAheadLimit = *userConfig.WriteAheadLimit } + if userConfig.RetryInterval != nil { + settings.RetryInterval = *userConfig.RetryInterval + } + if userConfig.MaxRetryInterval != nil { + settings.MaxRetryInterval = *userConfig.RetryInterval + } + return settings, nil } @@ -164,3 +192,17 @@ func (settings Settings) segmentPath(segmentID segmentID) string { func (settings Settings) maxSegmentOffset() segmentOffset { return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize) } + +// Given a retry interval, nextRetryInterval returns the next higher level +// of backoff. +func (settings Settings) nextRetryInterval( + currentInterval time.Duration, +) time.Duration { + if settings.MaxRetryInterval > 0 { + currentInterval *= 2 + if currentInterval > settings.MaxRetryInterval { + currentInterval = settings.MaxRetryInterval + } + } + return currentInterval +} diff --git a/libbeat/publisher/queue/diskqueue/deleter_loop.go b/libbeat/publisher/queue/diskqueue/deleter_loop.go index 4e685285948..3948d200cbc 100644 --- a/libbeat/publisher/queue/diskqueue/deleter_loop.go +++ b/libbeat/publisher/queue/diskqueue/deleter_loop.go @@ -57,6 +57,7 @@ func newDeleterLoop(settings Settings) *deleterLoop { } func (dl *deleterLoop) run() { + currentRetryInterval := dl.settings.RetryInterval for { request, ok := <-dl.requestChan if !ok { @@ -87,10 +88,14 @@ func (dl *deleterLoop) run() { // The delay can be interrupted if the request channel is closed, // indicating queue shutdown. select { - // TODO: make the retry interval configurable. - case <-time.After(time.Second): + case <-time.After(currentRetryInterval): case <-dl.requestChan: } + currentRetryInterval = + dl.settings.nextRetryInterval(currentRetryInterval) + } else { + // If we made progress, reset the retry interval. + currentRetryInterval = dl.settings.RetryInterval } dl.responseChan <- deleterLoopResponse{ results: results, diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 5ce0dc49962..617b089110e 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -207,10 +207,15 @@ func (segment *queueSegment) getWriter( // retry callback returns true. This is used for timed retries when // creating a queue segment from the writer loop. func (segment *queueSegment) getWriterWithRetry( - queueSettings Settings, retry func(error) bool, + queueSettings Settings, retry func(err error, firstTime bool) bool, ) (*os.File, error) { + firstTime := true file, err := segment.getWriter(queueSettings) - for err != nil && retry(err) { + for err != nil && retry(err, firstTime) { + // Set firstTime to false so the retry callback can perform backoff + // etc if needed. + firstTime = false + // Try again file, err = segment.getWriter(queueSettings) } diff --git a/libbeat/publisher/queue/diskqueue/util.go b/libbeat/publisher/queue/diskqueue/util.go index 60c529a9992..c54a26154e8 100644 --- a/libbeat/publisher/queue/diskqueue/util.go +++ b/libbeat/publisher/queue/diskqueue/util.go @@ -69,16 +69,32 @@ func writeErrorIsRetriable(err error) bool { // "wrapped" field in-place as long as it isn't captured by the callback. type callbackRetryWriter struct { wrapped io.Writer - retry func(error) bool + + // The retry callback is called with the error that was produced and whether + // this is the first (subsequent) error arising from this particular + // write call. + retry func(err error, firstTime bool) bool } func (w callbackRetryWriter) Write(p []byte) (int, error) { + // firstTime tracks whether the current error is the first subsequent error + // being passed to the retry callback. This is so that the callback can + // reset its internal counters in case it is using exponential backoff or + // a retry limit. + firstTime := true bytesWritten := 0 writer := w.wrapped n, err := writer.Write(p) for n < len(p) { - if err != nil && !w.retry(err) { - return bytesWritten + n, err + if err != nil { + shouldRetry := w.retry(err, firstTime) + firstTime = false + if !shouldRetry { + return bytesWritten + n, err + } + } else { + // If we made progress without an error, reset firstTime. + firstTime = true } // Advance p and try again. bytesWritten += n diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index b42e4573cab..ff1ff97616a 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -82,6 +82,8 @@ type writerLoop struct { // The file handle corresponding to currentSegment. When currentSegment // changes, this handle is closed and a new one is created. outputFile *os.File + + currentRetryInterval time.Duration } func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { @@ -91,6 +93,8 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { requestChan: make(chan writerLoopRequest, 1), responseChan: make(chan writerLoopResponse), + + currentRetryInterval: settings.RetryInterval, } } @@ -215,14 +219,31 @@ outerLoop: return append(bytesWritten, curBytesWritten) } -// retryCallback is called (by way of retryCallbackWriter) when there is +func (wl *writerLoop) applyRetryBackoff() { + wl.currentRetryInterval = + wl.settings.nextRetryInterval(wl.currentRetryInterval) +} + +func (wl *writerLoop) resetRetryBackoff() { + wl.currentRetryInterval = wl.settings.RetryInterval +} + +// retryCallback is called (by way of callbackRetryWriter) when there is // an error writing to a segment file. It pauses for a configurable // interval and returns true if the operation should be retried (which // it always should, unless the queue is being closed). -func (wl *writerLoop) retryCallback(err error) bool { +func (wl *writerLoop) retryCallback(err error, firstTime bool) bool { + if firstTime { + // Reset any exponential backoff in the retry interval. + wl.resetRetryBackoff() + } if writeErrorIsRetriable(err) { return true } + // If this error isn't immediately retriable, increase the exponential + // backoff afterwards. + defer wl.applyRetryBackoff() + // If the error is not immediately retriable, log the error // and wait for the retry interval before trying again, but // abort if the queue is closed (indicated by the request channel @@ -230,8 +251,7 @@ func (wl *writerLoop) retryCallback(err error) bool { wl.logger.Errorf("Writing to segment %v: %v", wl.currentSegment.id, err) select { - case <-time.After(time.Second): - // TODO: use a configurable interval here + case <-time.After(wl.currentRetryInterval): return true case <-wl.requestChan: return false