From 93fc7e246f6344e55b21a827c8479719de294354 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Fri, 12 Apr 2019 11:14:09 +0100 Subject: [PATCH 1/3] group concurrency improvements --- pkg/compact/compact.go | 47 ++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 9b820f3500..5f0cbe7ce5 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "time" @@ -22,7 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" - "golang.org/x/sync/errgroup" ) type ResolutionLevel int64 @@ -921,17 +921,22 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { // Loop over bucket and compact until there's no work left. for { var ( - errGroup, errGroupCtx = errgroup.WithContext(ctx) - groupChan = make(chan *Group) - finishedAllGroups = true - mtx sync.Mutex + wg sync.WaitGroup + workCtx, workCtxCancel = context.WithCancel(ctx) + groupChan = make(chan *Group) + errChan = make(chan error, c.concurrency) + finishedAllGroups = true + mtx sync.Mutex ) // Set up workers who will compact the groups when the groups are ready. + // They will compact available groups until they encounter an error, after which they will stop. for i := 0; i < c.concurrency; i++ { - errGroup.Go(func() error { + go func() { + wg.Add(1) + defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(errGroupCtx, c.compactDir, c.comp) + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp) if err == nil { if shouldRerunGroup { mtx.Lock() @@ -942,17 +947,17 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } if IsIssue347Error(err) { - if err := RepairIssue347(errGroupCtx, c.logger, c.bkt, err); err == nil { + if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil { mtx.Lock() finishedAllGroups = false mtx.Unlock() continue } } - return errors.Wrap(err, "compaction") + errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key())) + return } - return nil - }) + }() } // Clean up the compaction temporary directory at the beginning of every compaction loop. @@ -980,16 +985,28 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { } // Send all groups found during this pass to the compaction workers. + groupLoop: for _, g := range groups { select { - case <-errGroupCtx.Done(): - break + case err = <-errChan: + if err != nil { + break groupLoop + } case groupChan <- g: } } close(groupChan) - if err := errGroup.Wait(); err != nil { - return err + wg.Wait() + + close(errChan) + workCtxCancel() + if err != nil { + errMsgs := []string{err.Error()} + // Collect any other errors reported by the workers. + for e := range errChan { + errMsgs = append(errMsgs, e.Error()) + } + return errors.New(strings.Join(errMsgs, "; ")) } if finishedAllGroups { From 4835e29d250582fbdd4fede75922c749daad7759 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Tue, 16 Apr 2019 09:40:20 +0100 Subject: [PATCH 2/3] remove unnecessary error check --- pkg/compact/compact.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5f0cbe7ce5..2f498d9851 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -989,9 +989,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { for _, g := range groups { select { case err = <-errChan: - if err != nil { - break groupLoop - } + break groupLoop case groupChan <- g: } } From 548fa6a993bec13192e05c72943c6f65a5eebe3f Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 17 Apr 2019 09:38:03 +0100 Subject: [PATCH 3/3] add to wg in main goroutine --- pkg/compact/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 2f498d9851..150451185d 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -932,8 +932,8 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { // Set up workers who will compact the groups when the groups are ready. // They will compact available groups until they encounter an error, after which they will stop. for i := 0; i < c.concurrency; i++ { + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() for g := range groupChan { shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp)