Skip to content

Commit

Permalink
address CR
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Dickson committed Apr 4, 2019
1 parent a06cd58 commit 5b40b2e
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 76 deletions.
8 changes: 4 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group.").
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting group.").
Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
Expand All @@ -119,7 +119,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
*groupCompactConcurrency,
*compactionConcurrency,
)
}
}
Expand All @@ -140,7 +140,7 @@ func runCompact(
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
groupCompactConcurrency int,
concurrency int,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -217,7 +217,7 @@ func runCompact(

ctx, cancel := context.WithCancel(context.Background())
f := func() error {
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil {
if err := compactor.Compact(ctx, concurrency); err != nil {
return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")
Expand Down
82 changes: 41 additions & 41 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,46 @@ usage: thanos compact [<flags>]
continuously compacts blocks in an object store bucket

Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use.
--gcloudtrace.project=GCLOUDTRACE.PROJECT
GCP project to send Google Cloud Trace tracings to.
If empty, tracing will be disabled.
--gcloudtrace.sample-factor=1
How often we send traces (1/<sample-factor>). If 0 no
trace will be sent periodically, unless forced by
baggage item. See `pkg/tracing/tracing.go` for
details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--data-dir="./data" Data directory in which to cache blocks and process
compactions.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag. Object
store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks before
they are being processed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
--retention.resolution-5m=0d
How long to retain samples of resolution 1 (5
minutes) in bucket. 0d - disables this retention
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1 hour)
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
--group-compact-concurrency=1
Number of goroutines to use when compacting group.
-h, --help Show context-sensitive help (also try --help-long
and --help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use.
--gcloudtrace.project=GCLOUDTRACE.PROJECT
GCP project to send Google Cloud Trace tracings
to. If empty, tracing will be disabled.
--gcloudtrace.sample-factor=1
How often we send traces (1/<sample-factor>). If
0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
--retention.resolution-5m=0d
How long to retain samples of resolution 1 (5
minutes) in bucket. 0d - disables this retention
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1
hour) in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been
processed and wait for new work.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
--compact.concurrency=1 Number of goroutines to use when compacting
group.

```
84 changes: 53 additions & 31 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,48 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error {
func (c *BucketCompactor) Compact(ctx context.Context, concurrency int) error {
// Set up workers who will compact the groups when the groups are ready.
var wg sync.WaitGroup
defer wg.Wait()

groupChan := make(chan *Group)
errChan := make(chan error, concurrency)
var finishedAllGroups bool
var mtx sync.Mutex

workCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
}
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
errChan <- errors.Wrap(err, "compaction")
return
}
}()
}

// Loop over bucket and compact until there's no work left.
for {
// Clean up the compaction temporary directory at the beginning of every compaction loop.
Expand All @@ -923,41 +964,22 @@ func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency i
if err != nil {
return errors.Wrap(err, "build compaction groups")
}
finishedAllGroups := true
var wg sync.WaitGroup
errChan := make(chan error, len(groups))
groupChan := make(chan struct{}, groupCompactConcurrency)
defer close(groupChan)
for i := 0; i < groupCompactConcurrency; i++ {
groupChan <- struct{}{}
}
for _, g := range groups {
<-groupChan
wg.Add(1)
go func(g *Group) {
defer func() {
wg.Done()
groupChan <- struct{}{}
}()
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
}
return
}

if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
finishedAllGroups = false
return
}
// Send all groups found during this pass to the compaction workers, the workers will update finishedAllGroups.
finishedAllGroups = true
for _, g := range groups {
select {
case err := <-errChan:
if err != nil {
return err
}
errChan <- errors.Wrap(err, "compaction")
}(g)
case groupChan <- g:
}
}
close(groupChan)
wg.Wait()
close(errChan)

if err := <-errChan; err != nil {
return err
}
Expand Down

0 comments on commit 5b40b2e

Please sign in to comment.