Skip to content

Commit

Permalink
store: add consistency delay to fetch blocks
Browse files Browse the repository at this point in the history
Signed-off-by: khyatisoneji <[email protected]>
  • Loading branch information
khyatisoneji committed Jan 24, 2020
1 parent 860460c commit 3c68464
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 100 deletions.
37 changes: 7 additions & 30 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compact/garbagecollector"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -181,17 +181,11 @@ func runCompact(
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)
reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -240,15 +234,17 @@ func runCompact(
}
}()

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder()
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, garbageBlocksFinder, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -385,25 +381,6 @@ func runCompact(
return nil
}

type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
12 changes: 11 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact/garbagecollector"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -78,6 +79,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed.").
Default("30m"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -113,6 +117,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
time.Duration(*consistencyDelay),
)
}
}
Expand Down Expand Up @@ -145,6 +150,7 @@ func runStore(
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
consistencyDelay time.Duration,
) error {
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -211,9 +217,13 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
garbagecollector.NewGarbageBlocksFilter(garbageBlocksFinder).Filter,
)
if err != nil {
return errors.Wrap(err, "meta fetcher")
Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ Flags:
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.

```

Expand Down
37 changes: 37 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,40 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync
delete(metas, id)
}
}

// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay.
type ConsistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter {
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
reg.MustRegister(consistencyDelayMetric)

return &ConsistencyDelayMetaFilter{
logger: logger,
consistencyDelay: consistencyDelay,
}
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
delete(metas, id)
}
}
}
66 changes: 5 additions & 61 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compact/garbagecollector"
"github.com/thanos-io/thanos/pkg/objstore"
)

Expand All @@ -46,6 +47,7 @@ type Syncer struct {
metrics *syncerMetrics
acceptMalformedIndex bool
enableVerticalCompaction bool
garbageBlocksFinder *garbagecollector.GarbageBlocksFinder
}

type syncerMetrics struct {
Expand Down Expand Up @@ -120,7 +122,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, garbageBlocksFinder *garbagecollector.GarbageBlocksFinder, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -131,6 +133,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
fetcher: fetcher,
blocks: map[ulid.ULID]*metadata.Meta{},
metrics: newSyncerMetrics(reg),
garbageBlocksFinder: garbageBlocksFinder,
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
// The syncer offers an option to enable vertical compaction, even if it's
Expand Down Expand Up @@ -245,67 +248,8 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
return nil
}

func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
// Map each block to its highest priority parent. Initial blocks have themselves
// in their source section, i.e. are their own parent.
parents := map[ulid.ULID]ulid.ULID{}

for id, meta := range s.blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}

// For each source block we contain, check whether we are the highest priority parent block.
for _, sid := range meta.Compaction.Sources {
pid, ok := parents[sid]
// No parents for the source block so far.
if !ok {
parents[sid] = id
continue
}
pmeta, ok := s.blocks[pid]
if !ok {
return nil, errors.Errorf("previous parent block %s not found", pid)
}
// The current block is the higher priority parent for the source if its
// compaction level is higher than that of the previously set parent.
// If compaction levels are equal, the more recent ULID wins.
//
// The ULID recency alone is not sufficient since races, e.g. induced
// by downtime of garbage collection, may re-compact blocks that are
// were already compacted into higher-level blocks multiple times.
level, plevel := meta.Compaction.Level, pmeta.Compaction.Level

if level > plevel || (level == plevel && id.Compare(pid) > 0) {
parents[sid] = id
}
}
}

// A block can safely be deleted if they are not the highest priority parent for
// any source block.
topParents := map[ulid.ULID]struct{}{}
for _, pid := range parents {
topParents[pid] = struct{}{}
}

for id, meta := range s.blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}
if _, ok := topParents[id]; ok {
continue
}

ids = append(ids, id)
}
return ids, nil
}

func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
garbageIds, err := s.GarbageBlocks(resolution)
garbageIds, err := s.garbageBlocksFinder.GarbageBlocks(resolution, s.blocks)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

const (
BlockDelay = 30 * time.Minute
)

func TestSyncer_GarbageCollect_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
Expand Down Expand Up @@ -90,7 +94,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false)
sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 1, false, false)
testutil.Ok(t, err)

// Do one initial synchronization with the bucket.
Expand Down Expand Up @@ -163,7 +167,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false)
sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 5, false, false)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
Expand Down Expand Up @@ -380,7 +384,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (
if b.numSamples == 0 {
id, err = createEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res)
} else {
id, err = testutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res)
id, err = testutil.CreateBlockWithBlockDelay(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, BlockDelay, b.extLset, b.res)
}
testutil.Ok(t, err)

Expand Down
Loading

0 comments on commit 3c68464

Please sign in to comment.