diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 00aad05475495..a31aa91fac225 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -2,12 +2,14 @@ package ingester import ( "bytes" + "errors" "fmt" "net/http" "sync" "time" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" @@ -147,7 +149,7 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - err := i.flushUserSeries(op.userID, op.fp, op.immediate) + err := i.flushOp(op) if err != nil { level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err) } @@ -161,7 +163,39 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { +func (i *Ingester) flushOp(op *flushOp) error { + // A flush is retried until either it is successful, the maximum number + // of retries is exceeded, or the timeout has expired. The context is + // used to cancel the backoff should the latter happen. + ctx, cancelFunc := context.WithTimeout(context.Background(), i.cfg.FlushOpTimeout) + defer cancelFunc() + + b := backoff.New(ctx, i.cfg.FlushOpBackoff) + for b.Ongoing() { + err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate) + if err == nil { + break + } + level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err) + b.Wait() + } + + if err := b.Err(); err != nil { + // If we got here then either the maximum number of retries have + // been exceeded or the timeout expired. We do not need to check + // ctx.Err() as it is checked in b.Err(). + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("timed out after %s: %w", i.cfg.FlushOpTimeout, err) + } + return err + } + + return nil +} + +func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error { + ctx = user.InjectOrgID(context.Background(), userID) + instance, ok := i.getInstanceByID(userID) if !ok { return nil @@ -175,9 +209,6 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat lbs := labels.String() level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs) - ctx := user.InjectOrgID(context.Background(), userID) - ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) - defer cancel() err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) if err != nil { return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 6fd52bafa066f..21b2fcb1d07cb 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -2,6 +2,7 @@ package ingester import ( "fmt" + "github.com/pkg/errors" "os" "sort" "sync" @@ -102,6 +103,99 @@ func Benchmark_FlushLoop(b *testing.B) { } } +func Test_FlushOp(t *testing.T) { + t.Run("no error", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + _, ing := newTestStore(t, cfg, nil) + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.NoError(t, ing.flushOp(&flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + })) + }) + + t.Run("max retries exceeded", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("failed to write chunks") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(&flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "terminated after 1 retries") + }) + + t.Run("timeout expired", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = time.Second + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("store is unavailable") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(&flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "timed out after 1s: context deadline exceeded") + }) +} + func Test_Flush(t *testing.T) { var ( store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) @@ -297,6 +391,10 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg := Config{} flagext.DefaultValues(&cfg) + cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = 15 * time.Second cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6d27d349c93f4..b837d28342fa9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" @@ -81,6 +82,7 @@ type Config struct { ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` @@ -126,7 +128,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.") + f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes. Is canceled when `ingester.flush-op-timeout` is exceeded.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is canceled.") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") @@ -154,6 +159,15 @@ func (cfg *Config) Validate() error { return err } + if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff { + return errors.New("invalid flush op min backoff: cannot be larger than max backoff") + } + if cfg.FlushOpBackoff.MaxRetries <= 0 { + return fmt.Errorf("invalid flush op max retries: %s", cfg.FlushOpBackoff.MaxRetries) + } + if cfg.FlushOpTimeout <= 0 { + return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout) + } if cfg.IndexShards <= 0 { return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards) }