Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact: group concurrency #1010

Merged
merged 17 commits into from
Apr 10, 2019
7 changes: 6 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting group.").
Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -116,6 +119,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
)
}
}
Expand All @@ -136,6 +140,7 @@ func runCompact(
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -212,7 +217,7 @@ func runCompact(

ctx, cancel := context.WithCancel(context.Background())
f := func() error {
if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(ctx, concurrency); err != nil {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")
Expand Down
62 changes: 32 additions & 30 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,46 @@ usage: thanos compact [<flags>]
continuously compacts blocks in an object store bucket

Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use.
-h, --help Show context-sensitive help (also try --help-long
and --help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use.
--gcloudtrace.project=GCLOUDTRACE.PROJECT
GCP project to send Google Cloud Trace tracings to.
If empty, tracing will be disabled.
GCP project to send Google Cloud Trace tracings
to. If empty, tracing will be disabled.
--gcloudtrace.sample-factor=1
How often we send traces (1/<sample-factor>). If 0 no
trace will be sent periodically, unless forced by
baggage item. See `pkg/tracing/tracing.go` for
details.
How often we send traces (1/<sample-factor>). If
0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--data-dir="./data" Data directory in which to cache blocks and process
compactions.
Listen host:port for HTTP endpoints.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Path to YAML file that contains object store
configuration.
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag. Object
store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks before
they are being processed.
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
How long to retain raw samples in bucket. 0d -
disables this retention
--retention.resolution-5m=0d
How long to retain samples of resolution 1 (5
minutes) in bucket. 0d - disables this retention
How long to retain samples of resolution 1 (5
minutes) in bucket. 0d - disables this retention
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1 hour)
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
How long to retain samples of resolution 2 (1
hour) in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been
processed and wait for new work.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
Number of goroutines to use when syncing block
metadata from object storage.
--compact.concurrency=1 Number of goroutines to use when compacting
group.

```
70 changes: 56 additions & 14 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,48 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
func (c *BucketCompactor) Compact(ctx context.Context, concurrency int) error {
// Set up workers who will compact the groups when the groups are ready.
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup
defer wg.Wait()

groupChan := make(chan *Group)
errChan := make(chan error, concurrency)
var finishedAllGroups bool
var mtx sync.Mutex
mjd95 marked this conversation as resolved.
Show resolved Hide resolved

workCtx, cancel := context.WithCancel(ctx)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
}
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
errChan <- errors.Wrap(err, "compaction")
return
}
}()
}

// Loop over bucket and compact until there's no work left.
for {
// Clean up the compaction temporary directory at the beginning of every compaction loop.
Expand All @@ -923,23 +964,24 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "build compaction groups")
}
finishedAllGroups := true

// Send all groups found during this pass to the compaction workers, the workers will update finishedAllGroups.
finishedAllGroups = true
for _, g := range groups {
povilasv marked this conversation as resolved.
Show resolved Hide resolved
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
select {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
case err := <-errChan:
if err != nil {
return err
}
continue
case groupChan <- g:
}
}
close(groupChan)
wg.Wait()
close(errChan)

if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
finishedAllGroups = false
continue
}
}
return errors.Wrap(err, "compaction")
if err := <-errChan; err != nil {
return err
}
if finishedAllGroups {
break
Expand Down