diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 9b820f3500..150451185d 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "time" @@ -22,7 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" - "golang.org/x/sync/errgroup" ) type ResolutionLevel int64 @@ -921,17 +921,22 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { // Loop over bucket and compact until there's no work left. for { var ( - errGroup, errGroupCtx = errgroup.WithContext(ctx) - groupChan = make(chan *Group) - finishedAllGroups = true - mtx sync.Mutex + wg sync.WaitGroup + workCtx, workCtxCancel = context.WithCancel(ctx) + groupChan = make(chan *Group) + errChan = make(chan error, c.concurrency) + finishedAllGroups = true + mtx sync.Mutex ) // Set up workers who will compact the groups when the groups are ready. + // They will compact available groups until they encounter an error, after which they will stop. for i := 0; i < c.concurrency; i++ { - errGroup.Go(func() error { + wg.Add(1) + go func() { + defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(errGroupCtx, c.compactDir, c.comp) + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp) if err == nil { if shouldRerunGroup { mtx.Lock() @@ -942,17 +947,17 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } if IsIssue347Error(err) { - if err := RepairIssue347(errGroupCtx, c.logger, c.bkt, err); err == nil { + if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil { mtx.Lock() finishedAllGroups = false mtx.Unlock() continue } } - return errors.Wrap(err, "compaction") + errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key())) + return } - return nil - }) + }() } // Clean up the compaction temporary directory at the beginning of every compaction loop. @@ -980,16 +985,26 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } // Send all groups found during this pass to the compaction workers. + groupLoop: for _, g := range groups { select { - case <-errGroupCtx.Done(): - break + case err = <-errChan: + break groupLoop case groupChan <- g: } } close(groupChan) - if err := errGroup.Wait(); err != nil { - return err + wg.Wait() + + close(errChan) + workCtxCancel() + if err != nil { + errMsgs := []string{err.Error()} + // Collect any other errors reported by the workers. + for e := range errChan { + errMsgs = append(errMsgs, e.Error()) + } + return errors.New(strings.Join(errMsgs, "; ")) } if finishedAllGroups {