Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: group concurrency improvements #1029

Merged
merged 3 commits into from
Apr 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

Expand All @@ -22,7 +23,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)

type ResolutionLevel int64
Expand Down Expand Up @@ -921,17 +921,22 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
// Loop over bucket and compact until there's no work left.
for {
var (
errGroup, errGroupCtx = errgroup.WithContext(ctx)
groupChan = make(chan *Group)
finishedAllGroups = true
mtx sync.Mutex
wg sync.WaitGroup
workCtx, workCtxCancel = context.WithCancel(ctx)
groupChan = make(chan *Group)
errChan = make(chan error, c.concurrency)
finishedAllGroups = true
mtx sync.Mutex
)

// Set up workers who will compact the groups when the groups are ready.
// They will compact available groups until they encounter an error, after which they will stop.
for i := 0; i < c.concurrency; i++ {
errGroup.Go(func() error {
wg.Add(1)
go func() {
defer wg.Done()
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(errGroupCtx, c.compactDir, c.comp)
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
Expand All @@ -942,17 +947,17 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
}

if IsIssue347Error(err) {
if err := RepairIssue347(errGroupCtx, c.logger, c.bkt, err); err == nil {
if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
return errors.Wrap(err, "compaction")
errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key()))
return
}
return nil
})
}()
}

// Clean up the compaction temporary directory at the beginning of every compaction loop.
Expand Down Expand Up @@ -980,16 +985,26 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
}

// Send all groups found during this pass to the compaction workers.
groupLoop:
for _, g := range groups {
select {
case <-errGroupCtx.Done():
break
case err = <-errChan:
break groupLoop
case groupChan <- g:
}
}
close(groupChan)
if err := errGroup.Wait(); err != nil {
return err
wg.Wait()

close(errChan)
workCtxCancel()
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
errMsgs := []string{err.Error()}
// Collect any other errors reported by the workers.
for e := range errChan {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
errMsgs = append(errMsgs, e.Error())
}
return errors.New(strings.Join(errMsgs, "; "))
}

if finishedAllGroups {
Expand Down