Skip to content

Commit

Permalink
Use block.MetaFetcher in Compactor.
Browse files Browse the repository at this point in the history
Fixes: #1335
Fixes: #1919
Fixes: #1300

* Clean up of meta files are now started only if block which is being uploaded is older than 2 days (only a mitigation).
* Blocks without meta.json are handled properly for all compactor phases.
* Prepare for future implementation of https://thanos.io/proposals/201901-read-write-operations-bucket.md/
* Added metric for partialUploadAttempt deletions and delayed it.
* More tests.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jan 3, 2020
1 parent 0c6ec24 commit 876d5c8
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 503 deletions.
117 changes: 66 additions & 51 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -31,6 +33,11 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)

var (
compactions = compactionSet{
1 * time.Hour,
Expand Down Expand Up @@ -85,7 +92,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)).
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
Default("30m"))

retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
Expand Down Expand Up @@ -162,21 +169,28 @@ func runCompact(
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Help: "Set to 1 if the compactor halted due to an unexpected error",
Help: "Set to 1 if the compactor halted due to an unexpected error.",
})
halted.Set(0)
retried := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_retries_total",
Help: "Total number of retries after retriable compactor error",
Help: "Total number of retries after retriable compactor error.",
})
iterations := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully",
Help: "Total number of iterations that were executed successfully.",
})
halted.Set(0)

reg.MustRegister(halted)
reg.MustRegister(retried)
reg.MustRegister(iterations)
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -225,8 +239,15 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -276,36 +297,36 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

f := func() error {
compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")

// TODO(bplotka): Remove "disableDownsampling" once https://github.com/thanos-io/thanos/issues/297 is fixed.
if !disableDownsampling {
// After all compactions are done, work down the downsampling backlog.
// We run two passes of this to ensure that the 1h downsampling is generated
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts)
return nil
}

Expand All @@ -314,18 +335,18 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, indexCacheDir); err != nil {
return err
}
}

if !wait {
return f()
return compactMainFn()
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
err := f()
err := compactMainFn()
if err == nil {
iterations.Inc()
return nil
Expand Down Expand Up @@ -363,13 +384,27 @@ func runCompact(
return nil
}

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)
type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
Expand All @@ -391,38 +426,18 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom

level.Info(logger).Log("msg", "start index cache processing")

var metas []*metadata.Meta

if err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
// Probably not finished block, skip it.
if bkt.IsObjNotFoundErr(errors.Cause(err)) {
level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String())
return nil
}
return errors.Wrap(err, "download metadata")
}
metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "fetch metas")
}

for _, meta := range metas {
// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
if meta.Compaction.Level == 1 {
return nil
continue
}

metas = append(metas, &meta)

return nil
}); err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}

for _, meta := range metas {
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
Expand Down
31 changes: 11 additions & 20 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -88,6 +89,11 @@ func runDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand All @@ -107,13 +113,13 @@ func runDownsample(

level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -148,6 +154,7 @@ func downsampleBucket(
logger log.Logger,
metrics *DownsampleMetrics,
bkt objstore.Bucket,
fetcher block.MetadataFetcher,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
Expand All @@ -163,25 +170,9 @@ func downsampleBucket(
}
}()

var metas []*metadata.Meta

err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

m, err := block.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
return errors.Wrap(err, "download metadata")
}

metas = append(metas, &m)

return nil
})
metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
return errors.Wrap(err, "downsampling meta fetch")
}

// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
Expand Down
10 changes: 8 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
})
expReg.MustRegister(genIndexExp)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, dir))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)
Expand Down Expand Up @@ -112,7 +115,10 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
1 change: 0 additions & 1 deletion pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err)
}
}

return m, nil
}

Expand Down
48 changes: 48 additions & 0 deletions pkg/compact/clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package compact

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
)

const (
// PartialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned.
// Keep it long as it is based on block creation time not upload start time.
PartialUploadThresholdAge = 2 * 24 * time.Hour
)

func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter) {
level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")
_, partial, err := fetcher.Fetch(ctx)
if err != nil {
level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err)
}

// Delete partial blocks that are older than partialUploadThresholdAge.
// TODO(bwplotka): This is can cause data loss if blocks are:
// * being uploaded longer than partialUploadThresholdAge
// * being uploaded and started after their partialUploadThresholdAge
// can be assumed in this case. Keep partialUploadThresholdAge long for now.
// Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time).
for id := range partial {
if ulid.Now()-id.Time() <= uint64(PartialUploadThresholdAge/time.Millisecond) {
// Minimum delay has not expired, ignore for now.
continue
}

deleteAttempts.Inc()
if err := block.Delete(ctx, logger, bkt, id); err != nil {
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
return
}
level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge)
}
level.Info(logger).Log("msg", "cleaning of aborted partial uploads done")
}
Loading

0 comments on commit 876d5c8

Please sign in to comment.