From 3de93f83db566fb1363c24eed04d23e8b4da2af5 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 18 May 2020 10:50:37 +0200 Subject: [PATCH] receive: Only wait for write quorum This patch modifies receive replication slightly, in that it doesn't always wait for all requests to complete anymore. If quorum amount of replication requests were successful it now does not wait for the remaining request to finish as it's not necessary to reach quorum anymore. In error cases where quorum is not reached, it still continues to wait for all requests to finish in an attempt to return a quorum error. Additionally this patch moves log lines printed in the parallelize requests function to debug logging. Calling functions already print the resulting error(s), so this was previously just noise, even in cases where requests actually succeeded. Signed-off-by: Frederic Branczyk --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 5 + pkg/receive/handler.go | 130 +++++++++----- pkg/receive/handler_test.go | 349 +++++++++++++++++++++++++++++++++++- pkg/receive/writer.go | 9 + 5 files changed, 447 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c14164f1ae..a259255de75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option `--store.caching-bucket.config=` (or `--store.caching-bucket.config-file=`) for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage. - [#2579](https://github.com/thanos-io/thanos/pull/2579) Store: Experimental caching bucket can now cache metadata as well. Config has changed from #2532. - [#2526](https://github.com/thanos-io/thanos/pull/2526) Compact: In case there are no labels left after deduplication via `--deduplication.replica-label`, assign first `replica-label` with value `deduped`. +- [#2621](https://github.com/thanos-io/thanos/pull/2621) Receive: add flag to configure forward request timeout. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index cf2f6fba21e..972e02f2504 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -81,6 +81,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() + forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden()) + tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool() @@ -153,6 +155,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *tenantLabelName, *replicaHeader, *replicationFactor, + time.Duration(*forwardTimeout), comp, ) } @@ -190,6 +193,7 @@ func runReceive( tenantLabelName string, replicaHeader string, replicationFactor uint64, + forwardTimeout time.Duration, comp component.SourceStoreAPI, ) error { logger = log.With(logger, "component", "receive") @@ -256,6 +260,7 @@ func runReceive( Tracer: tracer, TLSConfig: rwTLSConfig, DialOpts: dialOpts, + ForwardTimeout: forwardTimeout, }) grpcProbe := prober.NewGRPC() diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index ded80595277..943e0f489e3 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -13,6 +13,7 @@ import ( "net/http" "strconv" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -67,6 +68,7 @@ type Options struct { Tracer opentracing.Tracer TLSConfig *tls.Config DialOpts []grpc.DialOption + ForwardTimeout time.Duration } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -324,29 +326,39 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p } h.mtx.RUnlock() - return h.parallelizeRequests(ctx, tenant, replicas, wreqs) + n, ec := h.parallelizeRequests(ctx, tenant, replicas, wreqs) + // Collect any errors from forwarding the time series. + var errs terrors.MultiError + for ; n > 0; n-- { + if err := <-ec; err != nil { + errs.Add(err) + } + } + + return errs.Err() } // parallelizeRequests parallelizes a given set of write requests. // The function only returns when all requests have finished // or the context is canceled. -func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest) error { +func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest) (int, <-chan error) { ec := make(chan error) - defer close(ec) - // We don't wan't to use a sync.WaitGroup here because that - // introduces an unnecessary second synchronization mechanism, - // the first being the error chan. Plus, it saves us a goroutine - // as in order to collect errors while doing wg.Wait, we would - // need a separate error collection goroutine. + + // We need the sync group as a second synchronization mechanism as we can + // only close the producing channel when all requests have terminated. + var wg sync.WaitGroup var n int + for endpoint := range wreqs { n++ // If the request is not yet replicated, let's replicate it. // If the replication factor isn't greater than 1, let's // just forward the requests. if !replicas[endpoint].replicated && h.options.ReplicationFactor > 1 { + wg.Add(1) go func(endpoint string) { - ec <- h.replicate(ctx, tenant, wreqs[endpoint]) + defer wg.Done() + ec <- errors.Wrap(h.replicate(ctx, tenant, wreqs[endpoint]), "could not replicate write request") }(endpoint) continue } @@ -357,7 +369,9 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic // a failure to write locally as just another error that // can be ignored if the replication factor is met. if endpoint == h.options.Endpoint { + wg.Add(1) go func(endpoint string) { + defer wg.Done() var err error tracing.DoInSpan(ctx, "receive_tsdb_write", func(ctx context.Context) { @@ -375,15 +389,15 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic err = errors.New(errs.Error()) } } - if err != nil { - level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint) - } - ec <- err + ec <- errors.Wrapf(err, "storing locally, endpoint %v", endpoint) }(endpoint) continue } + + wg.Add(1) // Make a request to the specified endpoint. go func(endpoint string) { + defer wg.Done() var err error // Increment the counters as necessary now that @@ -398,8 +412,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic cl, err := h.peers.get(ctx, endpoint) if err != nil { - level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint) - ec <- err + ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) return } // Create a span to track the request made to another receive node. @@ -411,29 +424,17 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic Tenant: tenant, Replica: int64(replicas[endpoint].n + 1), // increment replica since on-the-wire format is 1-indexed and 0 indicates unreplicated. }) - if err != nil { - level.Error(h.logger).Log("msg", "forwarding request", "err", err, "endpoint", endpoint) - ec <- err - return - } - ec <- nil + ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) }) }(endpoint) } - // Collect any errors from forwarding the time series. - // Rather than doing a wg.Wait here, we decrement a counter - // for every error received on the chan. This simplifies - // error collection and avoids data races with a separate - // error collection goroutine. - var errs terrors.MultiError - for ; n > 0; n-- { - if err := <-ec; err != nil { - errs.Add(err) - } - } + go func() { + wg.Wait() + close(ec) + }() - return errs.Err() + return n, ec } // replicate replicates a write request to (replication-factor) nodes @@ -464,20 +465,59 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri } h.mtx.RUnlock() - err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) - if errs, ok := err.(terrors.MultiError); ok { - if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 { - return tsdb.ErrNotReady - } - if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 { - return errors.Wrap(conflictErr, "did not meet replication threshold") - } - if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 { - return errors.Wrap(err, "did not meet replication threshold") + ctx, cancel := context.WithTimeout(ctx, h.options.ForwardTimeout) + success := uint64(0) + + n, ec := h.parallelizeRequests(ctx, tenant, replicas, wreqs) + defer func() { + go func() { + defer cancel() + for { + err, more := <-ec + // Exhaust the channel, letting remaining unnecessary requests + // finish asnychronously. + if !more { + return + } + if err != nil { + level.Debug(h.logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) + } + } + }() + }() + + var errs terrors.MultiError + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-ec: + if err == nil { + success++ + if success >= (h.options.ReplicationFactor/2)+1 { + return nil + } + } + errs.Add(err) + n-- + if n > 0 { + // Not done yet need to keep going. + continue + } + + if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 { + return tsdb.ErrNotReady + } + if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 { + return errors.Wrap(conflictErr, "did not meet replication threshold") + } + if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 { + return errors.Wrap(errs, "did not meet replication threshold") + } + + return nil } - return nil } - return errors.Wrap(err, "could not replicate write request") } // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 9612a13b839..731c821b408 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -165,6 +165,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, + ForwardTimeout: 5 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) @@ -181,7 +182,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) return handlers, hashring } -func TestReceive(t *testing.T) { +func TestReceiveQuorum(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second) appenderErrFn := func() error { return errors.New("failed to get appender") } @@ -491,6 +492,350 @@ func TestReceive(t *testing.T) { t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) } } + // Test that each time series is stored + // the correct amount of times in each fake DB. + for _, ts := range tc.wreq.Timeseries { + lset := make(labels.Labels, len(ts.Labels)) + for j := range ts.Labels { + lset[j] = labels.Label{ + Name: ts.Labels[j].Name, + Value: ts.Labels[j].Value, + } + } + for j, a := range tc.appendables { + var expectedMin int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + } + if uint64(expectedMin) > got { + t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + } + } + } + }) + } +} + +func TestReceiveWithConsistencyDelay(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second) + + appenderErrFn := func() error { return errors.New("failed to get appender") } + conflictErrFn := func() error { return storage.ErrOutOfBounds } + commitErrFn := func() error { return errors.New("failed to commit") } + wreq1 := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "foo", + Value: "bar", + }, + }, + Samples: []prompb.Sample{ + { + Value: 1, + Timestamp: 1, + }, + { + Value: 2, + Timestamp: 2, + }, + { + Value: 3, + Timestamp: 3, + }, + }, + }, + }, + } + for _, tc := range []struct { + name string + status int + replicationFactor uint64 + wreq *prompb.WriteRequest + appendables []*fakeAppendable + }{ + { + name: "size 1 success", + status: http.StatusOK, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 1 commit error", + status: http.StatusInternalServerError, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + }, + }, + { + name: "size 1 conflict", + status: http.StatusConflict, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(conflictErrFn, nil, nil, nil), + }, + }, + }, + { + name: "size 2 success", + status: http.StatusOK, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 success", + status: http.StatusOK, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 success with replication", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 commit error", + status: http.StatusInternalServerError, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + }, + }, + { + name: "size 3 commit error with replication", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + }, + }, + { + name: "size 3 appender error with replication", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil, nil), + appenderErr: appenderErrFn, + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + appenderErr: appenderErrFn, + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + appenderErr: appenderErrFn, + }, + }, + }, + { + name: "size 3 conflict with replication", + status: http.StatusConflict, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(conflictErrFn, nil, nil, nil), + }, + { + appender: newFakeAppender(conflictErrFn, nil, nil, nil), + }, + { + appender: newFakeAppender(conflictErrFn, nil, nil, nil), + }, + }, + }, + { + name: "size 3 conflict and commit error with replication", + status: http.StatusConflict, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(conflictErrFn, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(conflictErrFn, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(conflictErrFn, nil, commitErrFn, nil), + }, + }, + }, + { + name: "size 3 with replication and one faulty", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 with replication and one commit error", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 with replication and two conflicts", + status: http.StatusConflict, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil, nil), + }, + { + appender: newFakeAppender(conflictErrFn, nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 with replication one conflict and one commit error", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + { + name: "size 3 with replication two commit errors", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil, nil), + }, + }, + }, + } { + // Run the quorum tests with consistency delay, which should allow us + // to see all requests completing all the time, since we're using local + // network we are not expecting anything to go wrong with these. + t.Run(tc.name, func(t *testing.T) { + handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + tenant := "test" + // Test from the point of view of every node + // so that we know status code does not depend + // on which node is erroring and which node is receiving. + for i, handler := range handlers { + // Test that the correct status is returned. + rec, err := makeRequest(handler, tenant, tc.wreq) + if err != nil { + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) + } + if rec.Code != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) + } + } + + time.Sleep(50 * time.Millisecond) + // Test that each time series is stored // the correct amount of times in each fake DB. for _, ts := range tc.wreq.Timeseries { @@ -503,7 +848,7 @@ func TestReceive(t *testing.T) { } for j, a := range tc.appendables { var expected int - n := a.appender.(*fakeAppender).samples[lset.Hash()] + n := a.appender.(*fakeAppender).Get(lset) got := uint64(len(n)) if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { // We have len(handlers) copies of each sample because the test case diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 728abbe7424..f5ad27ef959 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -171,6 +171,15 @@ func newFakeAppender(addErr, addFastErr, commitErr, rollbackErr func() error) *f } } +func (f *fakeAppender) Get(l labels.Labels) []prompb.Sample { + f.Lock() + defer f.Unlock() + s := f.samples[l.Hash()] + res := make([]prompb.Sample, len(s)) + copy(res, s) + return res +} + func (f *fakeAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { f.Lock() defer f.Unlock()