Skip to content

Commit

Permalink
pkg/receive: Decouple replication and quorum success
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed May 19, 2020
1 parent af6b0df commit 437fe34
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (h *Handler) writeQuorum() int {

// fanoutForward fanouts concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is cancelled.
func (h *Handler) fanoutForward(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest, successQuorum int) error {
func (h *Handler) fanoutForward(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest, successThreshold int) error {
ec := make(chan error)

var wg sync.WaitGroup
Expand Down Expand Up @@ -457,24 +457,22 @@ func (h *Handler) fanoutForward(ctx context.Context, tenant string, replicas map
select {
case <-ctx.Done():
return ctx.Err()
case err := <-ec:
case err, more := <-ec:
if !more {
return errs
}
if err == nil {
success++
if success >= successQuorum {
// Quorum met, claim success.
if success >= successThreshold {
// In case the success threshold is lower than the total
// number of requests, then we can finish early here. This
// is the case for quorum writes for example.
return nil
}
continue
}
errs.Add(err)
if countCause(errs, isNotReady) >= len(wreqs)-successQuorum {
return tsdb.ErrNotReady
}
if countCause(errs, isConflict) >= len(wreqs)-successQuorum {
return errors.Wrap(conflictErr, "did not meet replication threshold")
}
if len(errs) >= len(wreqs)-successQuorum {
return errors.Wrap(errs, "did not meet replication threshold")
if err != nil {
errs.Add(err)
}
}
}
Expand Down Expand Up @@ -511,7 +509,19 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
ctx, cancel := context.WithTimeout(ctx, h.options.ForwardTimeout)
defer cancel()

return h.fanoutForward(ctx, tenant, replicas, wreqs, h.writeQuorum())
quorum := h.writeQuorum()
err := h.fanoutForward(ctx, tenant, replicas, wreqs, quorum)
if countCause(err, isNotReady) >= quorum {
return tsdb.ErrNotReady
}
if countCause(err, isConflict) >= quorum {
return errors.Wrap(conflictErr, "did not meet success threshold due to conflict")
}
if err != nil {
return errors.Wrap(err, "replicate")
}

return nil
}

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
Expand Down

0 comments on commit 437fe34

Please sign in to comment.