Skip to content

Commit

Permalink
receive: Only wait for write quorum
Browse files Browse the repository at this point in the history
This patch modifies receive replication slightly, in that it doesn't
always wait for all requests to complete anymore. If quorum amount of
replication requests were successful it now does not wait for the
remaining request to finish as it's not necessary to reach quorum
anymore. In error cases where quorum is not reached, it still continues
to wait for all requests to finish in an attempt to return a quorum
error.

Additionally this patch moves log lines printed in the parallelize
requests function to debug logging. Calling functions already print the
resulting error(s), so this was previously just noise, even in cases
where requests actually succeeded.

Signed-off-by: Frederic Branczyk <[email protected]>
  • Loading branch information
brancz committed May 18, 2020
1 parent 1859833 commit ce24cc8
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option `--store.caching-bucket.config=<yaml content>` (or `--store.caching-bucket.config-file=<file.yaml>`) for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage.
- [#2579](https://github.com/thanos-io/thanos/pull/2579) Store: Experimental caching bucket can now cache metadata as well. Config has changed from #2532.
- [#2526](https://github.com/thanos-io/thanos/pull/2526) Compact: In case there are no labels left after deduplication via `--deduplication.replica-label`, assign first `replica-label` with value `deduped`.
- [#2532](https://github.com/thanos-io/thanos/pull/2532) Store: Added hidden option for experimental caching bucket, that can cache chunks into shared memcached. This can speed up querying and reduce number of requests to object storage.
- [#2621](https://github.com/thanos-io/thanos/pull/2621) Receive: add flag to configure forward request timeout.

### Changed

Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden())

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()
Expand Down Expand Up @@ -153,6 +155,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*tenantLabelName,
*replicaHeader,
*replicationFactor,
time.Duration(*forwardTimeout),
comp,
)
}
Expand Down Expand Up @@ -190,6 +193,7 @@ func runReceive(
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
forwardTimeout time.Duration,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
Expand Down Expand Up @@ -256,6 +260,7 @@ func runReceive(
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: forwardTimeout,
})

grpcProbe := prober.NewGRPC()
Expand Down
132 changes: 87 additions & 45 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -67,6 +68,7 @@ type Options struct {
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -324,29 +326,43 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
}
h.mtx.RUnlock()

return h.parallelizeRequests(ctx, tenant, replicas, wreqs)
n, ec := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
// Collect any errors from forwarding the time series.
// Rather than doing a wg.Wait here, we decrement a counter
// for every error received on the chan. This simplifies
// error collection and avoids data races with a separate
// error collection goroutine.
var errs terrors.MultiError
for ; n > 0; n-- {
if err := <-ec; err != nil {
errs.Add(err)
}
}

return errs.Err()
}

// parallelizeRequests parallelizes a given set of write requests.
// The function only returns when all requests have finished
// or the context is canceled.
func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest) error {
func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest) (int, <-chan error) {
ec := make(chan error)
defer close(ec)
// We don't wan't to use a sync.WaitGroup here because that
// introduces an unnecessary second synchronization mechanism,
// the first being the error chan. Plus, it saves us a goroutine
// as in order to collect errors while doing wg.Wait, we would
// need a separate error collection goroutine.

// We need the sync group as a second synchronization mechanism as we can
// only close the producing channel when all requests have terminated.
var wg sync.WaitGroup
var n int

for endpoint := range wreqs {
n++
// If the request is not yet replicated, let's replicate it.
// If the replication factor isn't greater than 1, let's
// just forward the requests.
if !replicas[endpoint].replicated && h.options.ReplicationFactor > 1 {
wg.Add(1)
go func(endpoint string) {
ec <- h.replicate(ctx, tenant, wreqs[endpoint])
defer wg.Done()
ec <- errors.Wrap(h.replicate(ctx, tenant, wreqs[endpoint]), "could not replicate write request")
}(endpoint)
continue
}
Expand All @@ -357,7 +373,9 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
// a failure to write locally as just another error that
// can be ignored if the replication factor is met.
if endpoint == h.options.Endpoint {
wg.Add(1)
go func(endpoint string) {
defer wg.Done()
var err error

tracing.DoInSpan(ctx, "receive_tsdb_write", func(ctx context.Context) {
Expand All @@ -375,15 +393,15 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
err = errors.New(errs.Error())
}
}
if err != nil {
level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint)
}
ec <- err
ec <- errors.Wrapf(err, "storing locally, endpoint %v", endpoint)
}(endpoint)
continue
}

wg.Add(1)
// Make a request to the specified endpoint.
go func(endpoint string) {
defer wg.Done()
var err error

// Increment the counters as necessary now that
Expand All @@ -398,8 +416,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic

cl, err := h.peers.get(ctx, endpoint)
if err != nil {
level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint)
ec <- err
ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)
return
}
// Create a span to track the request made to another receive node.
Expand All @@ -411,29 +428,17 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
Tenant: tenant,
Replica: int64(replicas[endpoint].n + 1), // increment replica since on-the-wire format is 1-indexed and 0 indicates unreplicated.
})
if err != nil {
level.Error(h.logger).Log("msg", "forwarding request", "err", err, "endpoint", endpoint)
ec <- err
return
}
ec <- nil
ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)
})
}(endpoint)
}

// Collect any errors from forwarding the time series.
// Rather than doing a wg.Wait here, we decrement a counter
// for every error received on the chan. This simplifies
// error collection and avoids data races with a separate
// error collection goroutine.
var errs terrors.MultiError
for ; n > 0; n-- {
if err := <-ec; err != nil {
errs.Add(err)
}
}
go func() {
wg.Wait()
close(ec)
}()

return errs.Err()
return n, ec
}

// replicate replicates a write request to (replication-factor) nodes
Expand Down Expand Up @@ -464,20 +469,57 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
}
h.mtx.RUnlock()

err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
if errs, ok := err.(terrors.MultiError); ok {
if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 {
return tsdb.ErrNotReady
}
if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(conflictErr, "did not meet replication threshold")
}
if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(err, "did not meet replication threshold")
ctx, cancel := context.WithTimeout(ctx, h.options.ForwardTimeout)
success := uint64(0)

n, ec := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
defer func() {
go func() {
for {
err, more := <-ec
// Exhaust the channel, letting remaining unnecessary requests
// finish asnychronously.
if !more {
cancel()
return
}
level.Debug(h.logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
}
}()
}()

var errs terrors.MultiError
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-ec:
if err == nil {
success++
if success >= (h.options.ReplicationFactor/2)+1 {
return nil
}
}
errs.Add(err)
n--
if n > 0 {
// Not done yet need to keep going.
continue
}

if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 {
return tsdb.ErrNotReady
}
if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(conflictErr, "did not meet replication threshold")
}
if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(errs, "did not meet replication threshold")
}

return nil
}
return nil
}
return errors.Wrap(err, "could not replicate write request")
}

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
Expand Down
Loading

0 comments on commit ce24cc8

Please sign in to comment.