Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Add configurable exponential backoff for disk queue write errors #21493

Merged
merged 8 commits into from
Oct 2, 2020
52 changes: 47 additions & 5 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
Expand Down Expand Up @@ -61,16 +62,26 @@ type Settings struct {
// A listener that should be sent ACKs when an event is successfully
// written to disk.
WriteToDiskListener queue.ACKListener

// RetryInterval specifies how long to wait before retrying a fatal error
// writing to disk. If MaxRetryInterval is nonzero, subsequent retries will
// use exponential backoff up to the specified limit.
RetryInterval time.Duration
MaxRetryInterval time.Duration
}

// userConfig holds the parameters for a disk queue that are configurable
// by the end user in the beats yml file.
type userConfig struct {
Path string `config:"path"`
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
ReadAheadLimit *int `config:"read_ahead"`
WriteAheadLimit *int `config:"write_ahead"`
Path string `config:"path"`
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
SegmentSize *cfgtype.ByteSize `config:"segment_size"`

ReadAheadLimit *int `config:"read_ahead"`
WriteAheadLimit *int `config:"write_ahead"`

RetryInterval *time.Duration `config:"retry_interval" validate:"positive"`
MaxRetryInterval *time.Duration `config:"max_retry_interval" validate:"positive"`
}

func (c *userConfig) Validate() error {
Expand All @@ -96,6 +107,13 @@ func (c *userConfig) Validate() error {
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
}

if c.RetryInterval != nil && c.MaxRetryInterval != nil &&
*c.MaxRetryInterval < *c.RetryInterval {
return fmt.Errorf(
"Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
*c.MaxRetryInterval, *c.RetryInterval)
}

return nil
}

Expand All @@ -108,6 +126,9 @@ func DefaultSettings() Settings {

ReadAheadLimit: 512,
WriteAheadLimit: 2048,

RetryInterval: 1 * time.Second,
MaxRetryInterval: 30 * time.Second,
}
}

Expand Down Expand Up @@ -137,6 +158,13 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) {
settings.WriteAheadLimit = *userConfig.WriteAheadLimit
}

if userConfig.RetryInterval != nil {
settings.RetryInterval = *userConfig.RetryInterval
}
if userConfig.MaxRetryInterval != nil {
settings.MaxRetryInterval = *userConfig.RetryInterval
}

return settings, nil
}

Expand Down Expand Up @@ -164,3 +192,17 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
func (settings Settings) maxSegmentOffset() segmentOffset {
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
}

// Given a retry interval, nextRetryInterval returns the next higher level
// of backoff.
func (settings Settings) nextRetryInterval(
currentInterval time.Duration,
) time.Duration {
if settings.MaxRetryInterval > 0 {
currentInterval *= 2
if currentInterval > settings.MaxRetryInterval {
currentInterval = settings.MaxRetryInterval
}
}
return currentInterval
}
9 changes: 7 additions & 2 deletions libbeat/publisher/queue/diskqueue/deleter_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func newDeleterLoop(settings Settings) *deleterLoop {
}

func (dl *deleterLoop) run() {
currentRetryInterval := dl.settings.RetryInterval
for {
request, ok := <-dl.requestChan
if !ok {
Expand Down Expand Up @@ -87,10 +88,14 @@ func (dl *deleterLoop) run() {
// The delay can be interrupted if the request channel is closed,
// indicating queue shutdown.
select {
// TODO: make the retry interval configurable.
case <-time.After(time.Second):
case <-time.After(currentRetryInterval):
case <-dl.requestChan:
}
currentRetryInterval =
dl.settings.nextRetryInterval(currentRetryInterval)
} else {
// If we made progress, reset the retry interval.
currentRetryInterval = dl.settings.RetryInterval
}
dl.responseChan <- deleterLoopResponse{
results: results,
Expand Down
9 changes: 7 additions & 2 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,15 @@ func (segment *queueSegment) getWriter(
// retry callback returns true. This is used for timed retries when
// creating a queue segment from the writer loop.
func (segment *queueSegment) getWriterWithRetry(
queueSettings Settings, retry func(error) bool,
queueSettings Settings, retry func(err error, firstTime bool) bool,
) (*os.File, error) {
firstTime := true
file, err := segment.getWriter(queueSettings)
for err != nil && retry(err) {
for err != nil && retry(err, firstTime) {
// Set firstTime to false so the retry callback can perform backoff
// etc if needed.
firstTime = false

// Try again
file, err = segment.getWriter(queueSettings)
}
Expand Down
21 changes: 18 additions & 3 deletions libbeat/publisher/queue/diskqueue/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,31 @@ func writeErrorIsRetriable(err error) bool {
// "wrapped" field in-place as long as it isn't captured by the callback.
type callbackRetryWriter struct {
wrapped io.Writer
retry func(error) bool

// The retry callback is called with the error that was produced and whether
// this is the first
retry func(err error, firstTime bool) bool
}

func (w callbackRetryWriter) Write(p []byte) (int, error) {
// firstTime tracks whether the current error is the first subsequent error
// being passed to the retry callback. This is so that the callback can
// reset its internal counters in case it is using exponential backoff or
// a retry limit.
firstTime := true
bytesWritten := 0
writer := w.wrapped
n, err := writer.Write(p)
for n < len(p) {
if err != nil && !w.retry(err) {
return bytesWritten + n, err
if err != nil {
shouldRetry := w.retry(err, firstTime)
firstTime = false
if !shouldRetry {
return bytesWritten + n, err
}
} else {
// If we made progress without an error, reset firstTime.
firstTime = true
}
// Advance p and try again.
bytesWritten += n
Expand Down
28 changes: 24 additions & 4 deletions libbeat/publisher/queue/diskqueue/writer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type writerLoop struct {
// The file handle corresponding to currentSegment. When currentSegment
// changes, this handle is closed and a new one is created.
outputFile *os.File

currentRetryInterval time.Duration
}

func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {
Expand All @@ -91,6 +93,8 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {

requestChan: make(chan writerLoopRequest, 1),
responseChan: make(chan writerLoopResponse),

currentRetryInterval: settings.RetryInterval,
}
}

Expand Down Expand Up @@ -215,23 +219,39 @@ outerLoop:
return append(bytesWritten, curBytesWritten)
}

// retryCallback is called (by way of retryCallbackWriter) when there is
func (wl *writerLoop) applyRetryBackoff() {
wl.currentRetryInterval =
wl.settings.nextRetryInterval(wl.currentRetryInterval)
}

func (wl *writerLoop) resetRetryBackoff() {
wl.currentRetryInterval = wl.settings.RetryInterval
}

// retryCallback is called (by way of callbackRetryWriter) when there is
// an error writing to a segment file. It pauses for a configurable
// interval and returns true if the operation should be retried (which
// it always should, unless the queue is being closed).
func (wl *writerLoop) retryCallback(err error) bool {
func (wl *writerLoop) retryCallback(err error, firstTime bool) bool {
if firstTime {
// Reset any exponential backoff in the retry interval.
wl.resetRetryBackoff()
}
if writeErrorIsRetriable(err) {
return true
}
// If this error isn't immediately retriable, increase the exponential
// backoff afterwards.
defer wl.applyRetryBackoff()

// If the error is not immediately retriable, log the error
// and wait for the retry interval before trying again, but
// abort if the queue is closed (indicated by the request channel
// becoming unblocked).
wl.logger.Errorf("Writing to segment %v: %v",
wl.currentSegment.id, err)
select {
case <-time.After(time.Second):
// TODO: use a configurable interval here
case <-time.After(wl.currentRetryInterval):
return true
case <-wl.requestChan:
return false
Expand Down