Skip to content

Commit

Permalink
Use block.MetaFetcher in Compactor.
Browse files Browse the repository at this point in the history
Fixes: #1335
Fixes: #1919
Fixes: #1300

* Clean up of meta files are now started only if block which is being uploaded is older than 2 days (only a mitigation).
* Blocks without meta.json are handled properly for all compactor phases.
* Prepare for future implementation of https://thanos.io/proposals/201901-read-write-operations-bucket.md/
* Added metric for partialUploadAttempt deletions and delayed it.
* More tests.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jan 3, 2020
1 parent 0c6ec24 commit bbf3d00
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 498 deletions.
117 changes: 66 additions & 51 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -31,6 +33,11 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)

var (
compactions = compactionSet{
1 * time.Hour,
Expand Down Expand Up @@ -85,7 +92,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)).
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
Default("30m"))

retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
Expand Down Expand Up @@ -162,21 +169,28 @@ func runCompact(
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Help: "Set to 1 if the compactor halted due to an unexpected error",
Help: "Set to 1 if the compactor halted due to an unexpected error.",
})
halted.Set(0)
retried := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_retries_total",
Help: "Total number of retries after retriable compactor error",
Help: "Total number of retries after retriable compactor error.",
})
iterations := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully",
Help: "Total number of iterations that were executed successfully.",
})
halted.Set(0)

reg.MustRegister(halted)
reg.MustRegister(retried)
reg.MustRegister(iterations)
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -225,8 +239,15 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -276,36 +297,36 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

f := func() error {
compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")

// TODO(bplotka): Remove "disableDownsampling" once https://github.com/thanos-io/thanos/issues/297 is fixed.
if !disableDownsampling {
// After all compactions are done, work down the downsampling backlog.
// We run two passes of this to ensure that the 1h downsampling is generated
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts)
return nil
}

Expand All @@ -314,18 +335,18 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, metaFetcher, reg, bkt, indexCacheDir); err != nil {
return err
}
}

if !wait {
return f()
return compactMainFn()
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
err := f()
err := compactMainFn()
if err == nil {
iterations.Inc()
return nil
Expand Down Expand Up @@ -363,13 +384,27 @@ func runCompact(
return nil
}

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)
type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
Expand All @@ -391,38 +426,18 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom

level.Info(logger).Log("msg", "start index cache processing")

var metas []*metadata.Meta

if err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
// Probably not finished block, skip it.
if bkt.IsObjNotFoundErr(errors.Cause(err)) {
level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String())
return nil
}
return errors.Wrap(err, "download metadata")
}
metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "fetch metas")
}

for _, meta := range metas {
// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
if meta.Compaction.Level == 1 {
return nil
continue
}

metas = append(metas, &meta)

return nil
}); err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}

for _, meta := range metas {
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
Expand Down
52 changes: 52 additions & 0 deletions cmd/thanos/compact_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"bytes"
"context"
"path"
"testing"
"time"

"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestBe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()

metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false)
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
shouldDeleteID, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil)
testutil.Ok(t, err)

var fakeChunk bytes.Buffer
fakeChunk.Write([]byte{0, 1, 2, 3})
testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"), &fakeChunk))

// Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk
// data but no meta. Compactor should ignore it.
shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil)
testutil.Ok(t, err)

testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk))

testutil.Ok(t, sy.SyncMetas(ctx))

exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, false, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}
31 changes: 11 additions & 20 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -88,6 +89,11 @@ func runDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand All @@ -107,13 +113,13 @@ func runDownsample(

level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -148,6 +154,7 @@ func downsampleBucket(
logger log.Logger,
metrics *DownsampleMetrics,
bkt objstore.Bucket,
fetcher block.MetadataFetcher,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
Expand All @@ -163,25 +170,9 @@ func downsampleBucket(
}
}()

var metas []*metadata.Meta

err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

m, err := block.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
return errors.Wrap(err, "download metadata")
}

metas = append(metas, &m)

return nil
})
metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
return errors.Wrap(err, "downsampling meta fetch")
}

// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
Expand Down
1 change: 0 additions & 1 deletion pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err)
}
}

return m, nil
}

Expand Down
Loading

0 comments on commit bbf3d00

Please sign in to comment.