Skip to content

Commit

Permalink
store: add consistency delay to fetch blocks
Browse files Browse the repository at this point in the history
Signed-off-by: khyatisoneji <[email protected]>
  • Loading branch information
khyatisoneji committed Feb 14, 2020
1 parent 5a93f51 commit b01e437
Show file tree
Hide file tree
Showing 9 changed files with 499 additions and 250 deletions.
37 changes: 7 additions & 30 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -184,17 +183,11 @@ func runCompact(
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully.",
})
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)
reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -247,15 +240,18 @@ func runCompact(
}
}()

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
duplicateBlocksFilter.Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -392,25 +388,6 @@ func runCompact(
return nil
}

type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
9 changes: 8 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read.").
Default("30m"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -116,6 +119,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
time.Duration(*consistencyDelay),
)
}
}
Expand Down Expand Up @@ -148,6 +152,7 @@ func runStore(
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
consistencyDelay time.Duration,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -220,9 +225,11 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
block.NewDeduplicateFilter().Filter,
)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ In general about 1MB of local disk space is required per TSDB block stored in th
## Flags
[embedmd]:# (flags/store.txt $)
[embedmd]: # "flags/store.txt $"
```$
usage: thanos store [<flags>]

Expand Down Expand Up @@ -137,6 +138,7 @@ Flags:
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--consistency-delay=30m Minimum age of all blocks before they are being read.

```

Expand Down Expand Up @@ -179,7 +181,8 @@ The `in-memory` index cache is enabled by default and its max size can be config

Alternatively, the `in-memory` index cache can also by configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly:

[embedmd]:# (../flags/config_index_cache_in_memory.txt yaml)
[embedmd]: # "../flags/config_index_cache_in_memory.txt yaml"

```yaml
type: IN-MEMORY
config:
Expand All @@ -196,7 +199,8 @@ All the settings are **optional**:

The `memcached` index cache allows to use [Memcached](https://memcached.org) as cache backend. This cache type is configured using `--index-cache.config-file` to reference to the configuration file or `--index-cache.config` to put yaml config directly:

[embedmd]:# (../flags/config_index_cache_memcached.txt yaml)
[embedmd]: # "../flags/config_index_cache_memcached.txt yaml"

```yaml
type: MEMCACHED
config:
Expand Down Expand Up @@ -224,13 +228,12 @@ While the remaining settings are **optional**:
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.


## Index Header

In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as:

* symbols table to unintern string values
* postings offset for posting lookup
- symbols table to unintern string values
- postings offset for posting lookup

In order to achieve so, on startup for each block `index-header` is built from pieces of original block's index and stored on disk.
Such `index-header` file is then mmaped and used by Store Gateway.
Expand Down
77 changes: 68 additions & 9 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync

// DeduplicateFilter is a MetaFetcher filter that filters out older blocks that have exactly the same data.
type DeduplicateFilter struct {
DuplicateIDs []ulid.ULID
duplicateIDs []ulid.ULID
}

// NewDeduplicateFilter creates DeduplicateFilter.
Expand All @@ -428,16 +428,30 @@ func NewDeduplicateFilter() *DeduplicateFilter {
// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
root := NewNode(&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
})
var wg sync.WaitGroup

metaSlice := []*metadata.Meta{}
metasByResolution := make(map[int64][]*metadata.Meta)
for _, meta := range metas {
metaSlice = append(metaSlice, meta)
res := meta.Thanos.Downsample.Resolution
metasByResolution[res] = append(metasByResolution[res], meta)
}

for res := range metasByResolution {
wg.Add(1)
go func(res int64) {
defer wg.Done()
f.filterForResolution(NewNode(&metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, synced)
}(res)
}

wg.Wait()
}

func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand All @@ -456,13 +470,19 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga
duplicateULIDs := getNonRootIDs(root)
for _, id := range duplicateULIDs {
if metas[id] != nil {
f.DuplicateIDs = append(f.DuplicateIDs, id)
f.duplicateIDs = append(f.duplicateIDs, id)
}
synced.WithLabelValues(duplicateMeta).Inc()
delete(metas, id)
}
}

// DuplicateIDs returns slice of block ids
// that are filtered out by DeduplicateFilter.
func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID {
return f.duplicateIDs
}

func addNodeBySources(root *Node, add *Node) bool {
var rootNode *Node
for _, node := range root.Children {
Expand Down Expand Up @@ -506,3 +526,42 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
}
return true
}

// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay.
type ConsistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter {
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
reg.MustRegister(consistencyDelayMetric)

return &ConsistencyDelayMetaFilter{
logger: logger,
consistencyDelay: consistencyDelay,
}
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
// by implementing delete delay to fetch metas.
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
delete(metas, id)
}
}
}
Loading

0 comments on commit b01e437

Please sign in to comment.