Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed May 17, 2021
1 parent f6ff075 commit 4fdedb0
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 886 deletions.
21 changes: 14 additions & 7 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
Expand All @@ -52,6 +53,8 @@ var (
2 * 24 * time.Hour,
14 * 24 * time.Hour,
}

dedupAlgorithms = []string{compact.DedupAlgorithmCompact, compact.DedupAlgorithmPenalty}
)

type compactionSet []time.Duration
Expand Down Expand Up @@ -267,7 +270,7 @@ func runCompact(
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)},
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels, conf.dedupAlgorithm != compact.DedupAlgorithmCompact)},
)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
compactorView.Set(blocks, err)
Expand Down Expand Up @@ -306,8 +309,9 @@ func runCompact(
}()

var mergeFunc storage.VerticalChunkSeriesMergeFunc
if conf.enableDedup {
mergeFunc = compact.NewDedupChunkSeriesMerger()
switch conf.dedupAlgorithm {
case compact.DedupAlgorithmPenalty:
mergeFunc = dedup.NewDedupChunkSeriesMerger()

// Dedup mode needs to enable vertical compaction by default.
enableVerticalCompaction = true
Expand All @@ -316,6 +320,7 @@ func runCompact(
return errors.New("dedup mode needs at least one replica label specified")
}
}

// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc)
Expand Down Expand Up @@ -578,7 +583,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
enableDedup bool
dedupAlgorithm string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -639,11 +644,13 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label or --compact.enable-dedup flag is set.").
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set or penalty based deduplication algorithm is used.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("compact.enable-dedup", "Experimental. When set to true, compactor will use the penalty algorithm to merge chunk series. At least one --deduplication.replica-label has to be set in this mode.").
Default("false").BoolVar(&cc.enableDedup)
cmd.Flag("compact.dedup-algorithm", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"The default value is compact, which performs 1:1 deduplication for samples. When set to penalty, penalty based deduplication "+
"algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Default(dedupAlgorithms[0]).EnumVar(&cc.dedupAlgorithm, dedupAlgorithms...)

// Update this. This flag works for both dedup version compactor and the original compactor.
cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
block.NewConsistencyDelayMetaFilter(logger, *consistencyDelay, extprom.WrapRegistererWithPrefix(extpromPrefix, reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0))},
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, make([]string, 0), false)},
)
sy, err = compact.NewMetaSyncer(
logger,
Expand Down
12 changes: 8 additions & 4 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,14 @@ Flags:
happen at the end of an iteration.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--compact.enable-dedup Experimental. When set to true, compactor will
use the penalty algorithm to merge chunk series.
At least one --deduplication.replica-label has
to be set in this mode.
--compact.dedup-algorithm=compact
Experimental. Deduplication algorithm for
merging overlapping blocks. The default value is
compact, which performs 1:1 deduplication for
samples. When set to penalty, penalty based
deduplication algorithm will be used. At least
one replica label has to be set via
--deduplication.replica-label flag.
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed
blocks older than the maximum of
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/prometheus/alertmanager v0.21.1-0.20210422101724-8176f78a70e1
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.21.0
github.com/prometheus/common v0.23.0
github.com/prometheus/exporter-toolkit v0.5.1
github.com/prometheus/prometheus v1.8.2-0.20210421143221-52df5ef7a3be
github.com/uber/jaeger-client-go v2.28.0+incompatible
Expand Down
10 changes: 8 additions & 2 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,12 @@ type ReplicaLabelRemover struct {
logger log.Logger

replicaLabels []string
dedupEnabled bool
}

// NewReplicaLabelRemover creates a ReplicaLabelRemover.
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover {
return &ReplicaLabelRemover{logger: logger, replicaLabels: replicaLabels}
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string, dedupEnabled bool) *ReplicaLabelRemover {
return &ReplicaLabelRemover{logger: logger, replicaLabels: replicaLabels, dedupEnabled: dedupEnabled}
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
Expand All @@ -711,6 +712,11 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met
}

for u, meta := range metas {
// Skip downsampled blocks for now if penalty based deduplication is enabled.
// TODO: remove this after downsampled blocks are supported.
if r.dedupEnabled && meta.Thanos.Downsample.Resolution != int64(0) {
continue
}
l := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
if _, exists := l[replicaLabel]; exists {
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something1"}}},
},
modified: 0,
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}),
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}, false),
},
{
name: "with replica labels",
Expand All @@ -916,7 +916,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
ULID(4): {Thanos: metadata.Thanos{Labels: map[string]string{"replica": "deduped"}}},
},
modified: 5.0,
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}),
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}, false),
},
{
name: "no replica label specified in the ReplicaLabelRemover",
Expand All @@ -931,7 +931,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something1"}}},
},
modified: 0,
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{}),
replicaLabelRemover: NewReplicaLabelRemover(log.NewNopLogger(), []string{}, false),
},
} {
m := newTestFetcherMetrics()
Expand Down
9 changes: 9 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

const (
// DedupAlgorithmCompact is the original compactor series merge algorithm with 1:1 deduplication.
DedupAlgorithmCompact = "compact"

// DedupAlgorithmPenalty is the penalty based compactor series merge algorithm.
// This is the same as the online deduplication of querier except counter reset handling.
DedupAlgorithmPenalty = "penalty"
)

// Syncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
Expand Down
122 changes: 4 additions & 118 deletions pkg/compact/dedup.go → pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package compact
package dedup

import (
"container/heap"
Expand Down Expand Up @@ -131,10 +128,11 @@ func (d *dedupChunksIterator) Next() bool {
iter = (&seriesToChunkEncoder{Series: &storage.SeriesEntry{
Lset: nil,
SampleIteratorFn: func() chunkenc.Iterator {
it := newChunkToSeriesDecoder(nil, d.curr).Iterator()
var it adjustableSeriesIterator
it = noopAdjustableSeriesIterator{newChunkToSeriesDecoder(nil, d.curr).Iterator()}

for _, o := range overlapping {
it = newDedupSamplesIterator(it, o.Iterator())
it = newDedupSeriesIterator(it, noopAdjustableSeriesIterator{o.Iterator()})
}
return it
},
Expand All @@ -156,118 +154,6 @@ func (d *dedupChunksIterator) Err() error {
return d.err
}

type dedupSamplesIterator struct {
a, b chunkenc.Iterator

aok, bok bool

// TODO(bwplotka): Don't base on LastT, but on detected scrape interval. This will allow us to be more
// responsive to gaps: https://github.com/thanos-io/thanos/issues/981, let's do it in next PR.
lastT int64

penA, penB int64
useA bool
}

func newDedupSamplesIterator(a, b chunkenc.Iterator) *dedupSamplesIterator {
return &dedupSamplesIterator{
a: a,
b: b,
lastT: math.MinInt64,
aok: a.Next(),
bok: b.Next(),
}
}

func (it *dedupSamplesIterator) Next() bool {
// Advance both iterators to at least the next highest timestamp plus the potential penalty.
if it.aok {
it.aok = it.a.Seek(it.lastT + 1 + it.penA)
}
if it.bok {
it.bok = it.b.Seek(it.lastT + 1 + it.penB)
}

// Handle basic cases where one iterator is exhausted before the other.
if !it.aok {
it.useA = false
if it.bok {
it.lastT, _ = it.b.At()
it.penB = 0
}
return it.bok
}
if !it.bok {
it.useA = true
it.lastT, _ = it.a.At()
it.penA = 0
return true
}
// General case where both iterators still have data. We pick the one
// with the smaller timestamp.
// The applied penalty potentially already skipped potential samples already
// that would have resulted in exaggerated sampling frequency.
ta, _ := it.a.At()
tb, _ := it.b.At()

it.useA = ta <= tb

// For the series we didn't pick, add a penalty twice as high as the delta of the last two
// samples to the next seek against it.
// This ensures that we don't pick a sample too close, which would increase the overall
// sample frequency. It also guards against clock drift and inaccuracies during
// timestamp assignment.
// If we don't know a delta yet, we pick 5000 as a constant, which is based on the knowledge
// that timestamps are in milliseconds and sampling frequencies typically multiple seconds long.
const initialPenalty = 5000

if it.useA {
if it.lastT != math.MinInt64 {
it.penB = 2 * (ta - it.lastT)
} else {
it.penB = initialPenalty
}
it.penA = 0
it.lastT = ta
return true
}
if it.lastT != math.MinInt64 {
it.penA = 2 * (tb - it.lastT)
} else {
it.penA = initialPenalty
}
it.penB = 0
it.lastT = tb
return true
}

func (it *dedupSamplesIterator) Seek(t int64) bool {
// Don't use underlying Seek, but iterate over next to not miss gaps.
for {
ts, _ := it.At()
if ts >= t {
return true
}
if !it.Next() {
return false
}
}
}

func (it *dedupSamplesIterator) At() (int64, float64) {
if it.useA {
return it.a.At()
}
return it.b.At()
}

func (it *dedupSamplesIterator) Err() error {
if it.a.Err() != nil {
return it.a.Err()
}
return it.b.Err()
}

type seriesToChunkEncoder struct {
storage.Series
}
Expand Down
Loading

0 comments on commit 4fdedb0

Please sign in to comment.