From d0f3fbee59e8c98a7b5cb2db0566cf54528a606e Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 5 Nov 2020 17:44:51 +0100 Subject: [PATCH] tools: Added thanos bucket tool rewrite (for now: allowing block series deletions). Signed-off-by: Bartlomiej Plotka --- CHANGELOG.md | 2 + Makefile | 1 - cmd/thanos/compact.go | 2 +- cmd/thanos/main.go | 1 - cmd/thanos/store.go | 2 +- cmd/thanos/tools_bucket.go | 158 ++++++++- docs/components/tools.md | 126 ++++++++ pkg/block/fetcher.go | 7 +- pkg/block/fetcher_test.go | 8 +- pkg/block/index.go | 4 +- pkg/block/metadata/meta.go | 32 ++ pkg/block/metadata/meta_test.go | 13 +- pkg/block/writer.go | 184 +++++++++++ pkg/compactv2/changelog.go | 35 ++ pkg/compactv2/chunk_series_set.go | 220 +++++++++++++ pkg/compactv2/compactor.go | 188 +++++++++++ pkg/compactv2/compactor_test.go | 443 ++++++++++++++++++++++++++ pkg/compactv2/modifiers.go | 329 +++++++++++++++++++ pkg/component/component.go | 1 + pkg/objstore/filesystem/filesystem.go | 4 +- pkg/store/bucket_test.go | 2 +- scripts/genflagdocs.sh | 2 +- 22 files changed, 1743 insertions(+), 21 deletions(-) create mode 100644 pkg/block/writer.go create mode 100644 pkg/compactv2/changelog.go create mode 100644 pkg/compactv2/chunk_series_set.go create mode 100644 pkg/compactv2/compactor.go create mode 100644 pkg/compactv2/compactor_test.go create mode 100644 pkg/compactv2/modifiers.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ffdff66b1..7c69f28406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ + # Changelog All notable changes to this project will be documented in this file. @@ -14,6 +15,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ### Added - [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. +- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block. ### Fixed diff --git a/Makefile b/Makefile index 9736b84ec8..c6e81d52d1 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,6 @@ github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUn github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\ NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/client_golang/prometheus/promauto.{NewCounter,\ NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\ -github.com/prometheus/prometheus/tsdb/errors=github.com/thanos-io/thanos/pkg/errutil,\ sync/atomic=go.uber.org/atomic" ./... @$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./... @echo ">> linting all of the Go files GOGC=${GOGC}" diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 711c90261b..b741767d76 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -182,7 +182,7 @@ func runCompact( return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml) + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index cf0b0f59b9..bcaf2b4a8e 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -90,7 +90,6 @@ func main() { } if len(confContentYaml) == 0 { - level.Info(logger).Log("msg", "Tracing will be disabled") tracer = client.NoopTracer() } else { tracer, closer, err = client.NewTracer(ctx, logger, metrics, confContentYaml) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 85735ece6f..f1751b7787 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -241,7 +241,7 @@ func runStore( return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml) + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 24f46e908e..29cdd8ab30 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -5,9 +5,12 @@ package main import ( "context" + "crypto/rand" "encoding/json" "fmt" + "io/ioutil" "os" + "path/filepath" "sort" "strconv" "strings" @@ -25,11 +28,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" v1 "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/compactv2" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extkingpin" @@ -47,6 +53,7 @@ import ( "github.com/thanos-io/thanos/pkg/verifier" "golang.org/x/text/language" "golang.org/x/text/message" + "gopkg.in/yaml.v3" ) const extpromPrefix = "thanos_bucket_" @@ -74,6 +81,7 @@ func registerBucket(app extkingpin.AppClause) { registerBucketDownsample(cmd, objStoreConfig) registerBucketCleanup(cmd, objStoreConfig) registerBucketMarkBlock(cmd, objStoreConfig) + registerBucketRewrite(cmd, objStoreConfig) } func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { @@ -500,7 +508,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml) + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } @@ -709,13 +717,14 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P blockIDs := cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().Strings() marker := cmd.Flag("marker", "Marker to be put.").Required().Enum(metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename) details := cmd.Flag("details", "Human readable details to be put into marker.").Required().String() + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { return err } - bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String()) + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Mark.String()) if err != nil { return err } @@ -724,7 +733,7 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P for _, id := range *blockIDs { u, err := ulid.Parse(id) if err != nil { - return errors.Errorf("id is not a valid block ULID, got: %v", id) + return errors.Errorf("block.id is not a valid UUID, got: %v", id) } ids = append(ids, u) } @@ -753,3 +762,146 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P return nil }) } + +func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { + cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series "+ + "Resulted block has modified stats in meta.json. Additionally compaction.sources are altered to not confuse readers of meta.json. "+ + "Instead thanos.rewrite section is added with useful info like old sources and deletion requests. "+ + "NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that "+ + "is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion) "+ + "and the data you wanted to rewrite could already part of bigger block.\n\n"+ + "Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus) "+ + "After rewrite, it's caller responsibility to delete or mark source block for deletion to avoid overlaps. "+ + "WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.") + blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings() + tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String() + dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() + toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true) + provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool() + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String()) + if err != nil { + return err + } + + deletionsYaml, err := toDelete.Content() + if err != nil { + return err + } + + var deletions []metadata.DeletionRequest + if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil { + return err + } + + var ids []ulid.ULID + for _, id := range *blockIDs { + u, err := ulid.Parse(id) + if err != nil { + return errors.Errorf("id is not a valid block ULID, got: %v", id) + } + ids = append(ids, u) + } + + if err := os.RemoveAll(*tmpDir); err != nil { + return err + } + if err := os.MkdirAll(*tmpDir, os.ModePerm); err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + chunkPool := chunkenc.NewPool() + changeLog := compactv2.NewChangeLog(ioutil.Discard) + for _, id := range ids { + // Delete series from block & modify. + level.Info(logger).Log("msg", "downloading block", "source", id) + if err := block.Download(ctx, logger, bkt, id, filepath.Join(*tmpDir, id.String())); err != nil { + return errors.Wrapf(err, "download %v", id) + } + + meta, err := metadata.ReadFromDir(filepath.Join(*tmpDir, id.String())) + if err != nil { + return errors.Wrapf(err, "read meta of %v", id) + } + b, err := tsdb.OpenBlock(logger, filepath.Join(*tmpDir, id.String()), chunkPool) + if err != nil { + return errors.Wrapf(err, "open block %v", id) + } + + p := compactv2.NewProgressLogger(logger, int(b.Meta().Stats.NumSeries)) + newID := ulid.MustNew(ulid.Now(), rand.Reader) + meta.ULID = newID + meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{ + Sources: meta.Compaction.Sources, + DeletionsApplied: deletions, + }) + meta.Compaction.Sources = []ulid.ULID{newID} + meta.Thanos.Source = metadata.BucketRewriteSource + + if err := os.MkdirAll(filepath.Join(*tmpDir, newID.String()), os.ModePerm); err != nil { + return err + } + + if *provideChangeLog { + f, err := os.OpenFile(filepath.Join(*tmpDir, newID.String(), "change.log"), os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + return err + } + defer runutil.CloseWithLogOnErr(logger, f, "close changelog") + + changeLog = compactv2.NewChangeLog(f) + level.Info(logger).Log("msg", "changelog will be available", "file", filepath.Join(*tmpDir, newID.String(), "change.log")) + } + + d, err := block.NewDiskWriter(ctx, logger, filepath.Join(*tmpDir, newID.String())) + if err != nil { + return err + } + + var comp *compactv2.Compactor + if *dryRun { + comp = compactv2.NewDryRun(*tmpDir, logger, changeLog, chunkPool) + } else { + comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool) + } + + level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml)) + if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil { + return errors.Wrapf(err, "writing series from %v to %v", id, newID) + } + + if *dryRun { + level.Info(logger).Log("msg", "dry run finished. Changes should be printed to stderr") + return nil + } + + level.Info(logger).Log("msg", "wrote new block after modifications; flushing", "source", id, "new", newID) + meta.Stats, err = d.Flush() + if err != nil { + return errors.Wrap(err, "flush") + } + if err := meta.WriteToDir(logger, filepath.Join(*tmpDir, newID.String())); err != nil { + return err + } + + level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID) + if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil { + return errors.Wrap(err, "upload") + } + level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID) + } + level.Info(logger).Log("msg", "rewrite done", "IDs", strings.Join(*blockIDs, ",")) + return nil + }, func(err error) { + cancel() + }) + return nil + }) +} diff --git a/docs/components/tools.md b/docs/components/tools.md index b82eeb10c3..f87cb3be95 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -65,6 +65,23 @@ Subcommands: is currently running compacting same block, this operation would be potentially a noop. + tools bucket rewrite --id=ID [] + Rewrite chosen blocks in the bucket, while deleting or modifying + seriesResulted block has modified stats in meta.json. Additionally + compaction.sources are altered to not confuse readers of meta.json.Instead + thanos.rewrite section is added with useful info like old sources and + deletion requestsNOTE: It's recommended to turn off compactor while doing + this operation. If the compactor is running and touching exactly same block + thatis being rewritten, the resulted rewritten block might only cause + overlap (mitigated by marking overlapping block manually for deletion)and + the data you wanted to rewrite could already part of bigger block. + + Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla + Prometheus)After rewrite, it's caller responsibility to delete or mark + source block for deletion to avoid overlaps.WARNING: This procedure is + *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks + first. + tools rules-check --rules=RULES Check if the rule files are valid or not. @@ -154,6 +171,23 @@ Subcommands: is currently running compacting same block, this operation would be potentially a noop. + tools bucket rewrite --id=ID [] + Rewrite chosen blocks in the bucket, while deleting or modifying + seriesResulted block has modified stats in meta.json. Additionally + compaction.sources are altered to not confuse readers of meta.json.Instead + thanos.rewrite section is added with useful info like old sources and + deletion requestsNOTE: It's recommended to turn off compactor while doing + this operation. If the compactor is running and touching exactly same block + thatis being rewritten, the resulted rewritten block might only cause + overlap (mitigated by marking overlapping block manually for deletion)and + the data you wanted to rewrite could already part of bigger block. + + Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla + Prometheus)After rewrite, it's caller responsibility to delete or mark + source block for deletion to avoid overlaps.WARNING: This procedure is + *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks + first. + ``` @@ -619,6 +653,98 @@ Flags: ``` +### Bucket Rewrite + +`tools bucket rewrite` reewrites chosen blocks in the bucket, while deleting or modifying series. + +For example we can remove all non counters from the block you have on your disk (e.g in Prometheus dir): + +```bash +thanos tools bucket rewrite --no-dry-run \ + --id 01DN3SK96XDAEKRB1AN30AAW6E \ + --objstore.config " +type: FILESYSTEM +config: + directory: +" \ + --rewrite.to-delete-config " +- matchers: \"{__name__!~\\\".*total\\\"}\" +" +``` + +By default, rewrite also produces `change.log` in the tmp local dir. Look for log message like: + +``` +ts=2020-11-09T00:40:13.703322181Z caller=level.go:63 level=info msg="changelog will be available" file=/tmp/thanos-rewrite/01EPN74E401ZD2SQXS4SRY6DZX/change.log` +``` + +[embedmd]:# (flags/tools_bucket_rewrite.txt $) +```$ +usage: thanos tools bucket rewrite --id=ID [] + +Rewrite chosen blocks in the bucket, while deleting or modifying seriesResulted +block has modified stats in meta.json. Additionally compaction.sources are +altered to not confuse readers of meta.json.Instead thanos.rewrite section is +added with useful info like old sources and deletion requestsNOTE: It's +recommended to turn off compactor while doing this operation. If the compactor +is running and touching exactly same block thatis being rewritten, the resulted +rewritten block might only cause overlap (mitigated by marking overlapping block +manually for deletion)and the data you wanted to rewrite could already part of +bigger block. + +Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla +Prometheus)After rewrite, it's caller responsibility to delete or mark source +block for deletion to avoid overlaps.WARNING: This procedure is *IRREVERSIBLE* +after certain time (delete delay), so do backup your blocks first. + +Flags: + -h, --help Show context-sensitive help (also try + --help-long and --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. Possible options: logfmt or + json. + --tracing.config-file= + Path to YAML file with tracing configuration. + See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag (lower + priority). Content of YAML file with tracing + configuration. See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --objstore.config-file= + Path to YAML file that contains object store + configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag + (lower priority). Content of YAML file that + contains object store configuration. See format + details: + https://thanos.io/tip/thanos/storage.md/#configuration + --id=ID ... ID (ULID) of the blocks for rewrite (repeated + flag). + --tmp.dir="/tmp/thanos-rewrite" + Working directory for temporary files + --dry-run Prints the series changes instead of doing them. + Defaults to true, for user to double check. (: + Pass --no-dry-run to skip this. + --rewrite.to-delete-config-file= + Path to YAML file that contains + []metadata.DeletionRequest that will be applied + to blocks + --rewrite.to-delete-config= + Alternative to 'rewrite.to-delete-config-file' + flag (lower priority). Content of YAML file that + contains []metadata.DeletionRequest that will be + applied to blocks + --rewrite.add-change-log If specified, all modifications are written to + new block directory. Disable if latency is to + high. + +``` + ## Rules-check The `tools rules-check` subcommand contains tools for validation of Prometheus rules. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index b5006e5926..07c5d7b77c 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -807,13 +807,16 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL return nil } +var ( + SelectorSupportedRelabelActions = map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}} +) + // ParseRelabelConfig parses relabel configuration. -func ParseRelabelConfig(contentYaml []byte) ([]*relabel.Config, error) { +func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) { var relabelConfig []*relabel.Config if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil { return nil, errors.Wrap(err, "parsing relabel configuration") } - supportedActions := map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}} for _, cfg := range relabelConfig { if _, ok := supportedActions[cfg.Action]; !ok { diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index a2e7d3cf0b..cf0692f92e 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -309,7 +309,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { source_labels: - message ` - relabelConfig, err := ParseRelabelConfig([]byte(relabelContentYaml)) + relabelConfig, err := ParseRelabelConfig([]byte(relabelContentYaml), SelectorSupportedRelabelActions) testutil.Ok(t, err) f := NewLabelShardedMetaFilter(relabelConfig) @@ -375,7 +375,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { ` for i := 0; i < 3; i++ { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYamlFmt, BlockIDLabel, i))) + relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYamlFmt, BlockIDLabel, i)), SelectorSupportedRelabelActions) testutil.Ok(t, err) f := NewLabelShardedMetaFilter(relabelConfig) @@ -1203,13 +1203,13 @@ func Test_ParseRelabelConfig(t *testing.T) { regex: "A" source_labels: - cluster - `)) + `), SelectorSupportedRelabelActions) testutil.Ok(t, err) _, err = ParseRelabelConfig([]byte(` - action: labelmap regex: "A" - `)) + `), SelectorSupportedRelabelActions) testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } diff --git a/pkg/block/index.go b/pkg/block/index.go index 2c0721634a..00c8f0ba3a 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -559,9 +559,9 @@ func rewrite( series = []seriesRepair{} ) + var lset labels.Labels + var chks []chunks.Meta for all.Next() { - var lset labels.Labels - var chks []chunks.Meta id := all.At() if err := indexr.Series(id, &lset, &chks); err != nil { diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 5e18a48197..d6b4180a28 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -16,15 +16,21 @@ import ( "path/filepath" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/thanos/pkg/runutil" + "gopkg.in/yaml.v3" ) type SourceType string const ( + // TODO(bwplotka): Merge with pkg/component package. UnknownSource SourceType = "" SidecarSource SourceType = "sidecar" ReceiveSource SourceType = "receive" @@ -32,6 +38,7 @@ const ( CompactorRepairSource SourceType = "compactor.repair" RulerSource SourceType = "ruler" BucketRepairSource SourceType = "bucket.repair" + BucketRewriteSource SourceType = "bucket.rewrite" TestSource SourceType = "test" ) @@ -78,6 +85,31 @@ type Thanos struct { // Useful to avoid API call to get size of each file, as well as for debugging purposes. // Optional, added in v0.17.0. Files []File `json:"files,omitempty"` + + // Rewrites is present when any rewrite (deletion, relabel etc) were applied to this block. Optional. + Rewrites []Rewrite `json:"rewrites,omitempty"` +} + +type Rewrite struct { + // ULIDs of all source head blocks that went into the block. + Sources []ulid.ULID `json:"sources,omitempty"` + // Deletions if applied (in order). + DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"` +} + +type Matchers []*labels.Matcher + +func (m *Matchers) UnmarshalYAML(value *yaml.Node) (err error) { + *m, err = parser.ParseMetricSelector(value.Value) + if err != nil { + return errors.Wrapf(err, "parse metric selector %v", value.Value) + } + return nil +} + +type DeletionRequest struct { + Matchers Matchers `json:"matchers" yaml:"matchers"` + Intervals tombstones.Intervals `json:"intervals,omitempty" yaml:"intervals,omitempty"` } type File struct { diff --git a/pkg/block/metadata/meta_test.go b/pkg/block/metadata/meta_test.go index 93049e54f5..9f44300c33 100644 --- a/pkg/block/metadata/meta_test.go +++ b/pkg/block/metadata/meta_test.go @@ -66,7 +66,9 @@ func TestMeta_ReadWrite(t *testing.T) { Labels: map[string]string{"ext": "lset1"}, Source: ReceiveSource, Files: []File{ - {RelPath: "index", SizeBytes: 1313}, + {RelPath: "chunks/000001", SizeBytes: 3751}, + {RelPath: "index", SizeBytes: 401}, + {RelPath: "meta.json"}, }, Downsample: ThanosDownsample{ Resolution: 123144, @@ -108,9 +110,16 @@ func TestMeta_ReadWrite(t *testing.T) { }, "source": "receive", "files": [ + { + "rel_path": "chunks/000001", + "size_bytes": 3751 + }, { "rel_path": "index", - "size_bytes": 1313 + "size_bytes": 401 + }, + { + "rel_path": "meta.json" } ] } diff --git a/pkg/block/writer.go b/pkg/block/writer.go new file mode 100644 index 0000000000..995d8f72ae --- /dev/null +++ b/pkg/block/writer.go @@ -0,0 +1,184 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package block + +import ( + "context" + "io" + "os" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" +) + +// Reader is like tsdb.BlockReader but without tombstones and size methods. +type Reader interface { + // Index returns an IndexReader over the block's data. + Index() (tsdb.IndexReader, error) + + // Chunks returns a ChunkReader over the block's data. + Chunks() (tsdb.ChunkReader, error) + + // Meta returns block metadata file. + Meta() tsdb.BlockMeta +} + +// SeriesWriter is interface for writing series into one or multiple Blocks. +// Statistics has to be counted by implementation. +type SeriesWriter interface { + tsdb.IndexWriter + tsdb.ChunkWriter +} + +// Writer is interface for creating block(s). +type Writer interface { + SeriesWriter + + Flush() (tsdb.BlockStats, error) +} + +type DiskWriter struct { + statsGatheringSeriesWriter + + bTmp, bDir string + logger log.Logger + closers []io.Closer +} + +const tmpForCreationBlockDirSuffix = ".tmp-for-creation" + +// NewDiskWriter allows to write single TSDB block to disk and returns statistics. +// Destination block directory has to exists. +func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *DiskWriter, err error) { + bTmp := bDir + tmpForCreationBlockDirSuffix + + d := &DiskWriter{ + bTmp: bTmp, + bDir: bDir, + logger: logger, + } + defer func() { + if err != nil { + err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err() + if err := os.RemoveAll(bTmp); err != nil { + level.Error(logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } + } + }() + + if err = os.RemoveAll(bTmp); err != nil { + return nil, err + } + if err = os.MkdirAll(bTmp, 0777); err != nil { + return nil, err + } + + chunkw, err := chunks.NewWriter(filepath.Join(bTmp, ChunksDirname)) + if err != nil { + return nil, errors.Wrap(err, "open chunk writer") + } + d.closers = append(d.closers, chunkw) + + // TODO(bwplotka): Setup instrumentedChunkWriter if we want to upstream this code. + + indexw, err := index.NewWriter(ctx, filepath.Join(bTmp, IndexFilename)) + if err != nil { + return nil, errors.Wrap(err, "open index writer") + } + d.closers = append(d.closers, indexw) + d.statsGatheringSeriesWriter = statsGatheringSeriesWriter{iw: indexw, cw: chunkw} + return d, nil +} + +func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) { + defer func() { + if err != nil { + err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err() + if err := os.RemoveAll(d.bTmp); err != nil { + level.Error(d.logger).Log("msg", "removed tmp folder failed after block(s) write", "err", err.Error()) + } + } + }() + df, err := fileutil.OpenDir(d.bTmp) + if err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "open temporary block dir") + } + defer func() { + if df != nil { + err = tsdb_errors.NewMulti(err, df.Close()).Err() + } + }() + + if err := df.Sync(); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "sync temporary dir file") + } + + // Close temp dir before rename block dir (for windows platform). + if err = df.Close(); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "close temporary dir") + } + df = nil + + if err := tsdb_errors.CloseAll(d.closers); err != nil { + d.closers = nil + return tsdb.BlockStats{}, err + } + d.closers = nil + + // Block files successfully written, make them visible by moving files from tmp dir. + if err := fileutil.Replace(filepath.Join(d.bTmp, IndexFilename), filepath.Join(d.bDir, IndexFilename)); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "replace index file") + } + if err := fileutil.Replace(filepath.Join(d.bTmp, ChunksDirname), filepath.Join(d.bDir, ChunksDirname)); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "replace chunks dir") + } + return d.stats, nil +} + +type statsGatheringSeriesWriter struct { + iw tsdb.IndexWriter + cw tsdb.ChunkWriter + + stats tsdb.BlockStats + symbols int64 +} + +func (s *statsGatheringSeriesWriter) AddSymbol(sym string) error { + if err := s.iw.AddSymbol(sym); err != nil { + return err + } + s.symbols++ + return nil +} + +func (s *statsGatheringSeriesWriter) AddSeries(ref uint64, l labels.Labels, chks ...chunks.Meta) error { + if err := s.iw.AddSeries(ref, l, chks...); err != nil { + return err + } + s.stats.NumSeries++ + return nil +} + +func (s *statsGatheringSeriesWriter) WriteChunks(chks ...chunks.Meta) error { + if err := s.cw.WriteChunks(chks...); err != nil { + return err + } + s.stats.NumChunks += uint64(len(chks)) + for _, chk := range chks { + s.stats.NumSamples += uint64(chk.Chunk.NumSamples()) + } + return nil +} + +func (s statsGatheringSeriesWriter) Close() error { + return tsdb_errors.NewMulti(s.iw.Close(), s.cw.Close()).Err() +} diff --git a/pkg/compactv2/changelog.go b/pkg/compactv2/changelog.go new file mode 100644 index 0000000000..7849319b63 --- /dev/null +++ b/pkg/compactv2/changelog.go @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compactv2 + +import ( + "fmt" + "io" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/tombstones" +) + +type ChangeLogger interface { + DeleteSeries(del labels.Labels, intervals tombstones.Intervals) + ModifySeries(old labels.Labels, new labels.Labels) +} + +type changeLog struct { + w io.Writer +} + +func NewChangeLog(w io.Writer) *changeLog { + return &changeLog{ + w: w, + } +} + +func (l *changeLog) DeleteSeries(del labels.Labels, intervals tombstones.Intervals) { + _, _ = fmt.Fprintf(l.w, "Deleted %v %v\n", del.String(), intervals) +} + +func (l *changeLog) ModifySeries(old labels.Labels, new labels.Labels) { + _, _ = fmt.Fprintf(l.w, "Relabelled %v %v\n", old.String(), new.String()) +} diff --git a/pkg/compactv2/chunk_series_set.go b/pkg/compactv2/chunk_series_set.go new file mode 100644 index 0000000000..6bcaa2abf6 --- /dev/null +++ b/pkg/compactv2/chunk_series_set.go @@ -0,0 +1,220 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compactv2 + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block" +) + +type lazyPopulateChunkSeriesSet struct { + sReader seriesReader + + all index.Postings + + bufChks []chunks.Meta + bufLbls labels.Labels + + curr *storage.ChunkSeriesEntry + err error +} + +func newLazyPopulateChunkSeriesSet(sReader seriesReader, all index.Postings) *lazyPopulateChunkSeriesSet { + return &lazyPopulateChunkSeriesSet{sReader: sReader, all: all} +} + +func (s *lazyPopulateChunkSeriesSet) Next() bool { + for s.all.Next() { + if err := s.sReader.ir.Series(s.all.At(), &s.bufLbls, &s.bufChks); err != nil { + // Postings may be stale. Skip if no underlying series exists. + if errors.Cause(err) == storage.ErrNotFound { + continue + } + s.err = errors.Wrapf(err, "get series %d", s.all.At()) + return false + } + + if len(s.bufChks) == 0 { + continue + } + + for i := range s.bufChks { + s.bufChks[i].Chunk = &lazyPopulatableChunk{cr: s.sReader.cr, m: &s.bufChks[i]} + } + s.curr = &storage.ChunkSeriesEntry{ + Lset: make(labels.Labels, len(s.bufLbls)), + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(s.bufChks...) + }, + } + // TODO: Do we need to copy this? + copy(s.curr.Lset, s.bufLbls) + return true + } + return false +} + +func (s *lazyPopulateChunkSeriesSet) At() storage.ChunkSeries { + return s.curr +} + +func (s *lazyPopulateChunkSeriesSet) Err() error { + if s.err != nil { + return s.err + } + return s.all.Err() +} + +func (s *lazyPopulateChunkSeriesSet) Warnings() storage.Warnings { return nil } + +type lazyPopulatableChunk struct { + m *chunks.Meta + + cr tsdb.ChunkReader + + populated chunkenc.Chunk +} + +type errChunkIterator struct{ err error } + +func (e errChunkIterator) Seek(int64) bool { return false } +func (e errChunkIterator) At() (int64, float64) { return 0, 0 } +func (e errChunkIterator) Next() bool { return false } +func (e errChunkIterator) Err() error { return e.err } + +type errChunk struct{ err errChunkIterator } + +func (e errChunk) Bytes() []byte { return nil } +func (e errChunk) Encoding() chunkenc.Encoding { return chunkenc.EncXOR } +func (e errChunk) Appender() (chunkenc.Appender, error) { return nil, e.err.err } +func (e errChunk) Iterator(chunkenc.Iterator) chunkenc.Iterator { return e.err } +func (e errChunk) NumSamples() int { return 0 } +func (e errChunk) Compact() {} + +func (l *lazyPopulatableChunk) populate() { + // TODO(bwplotka): In most cases we don't need to parse anything, just copy. Extend reader/writer for this. + var err error + l.populated, err = l.cr.Chunk(l.m.Ref) + if err != nil { + l.m.Chunk = errChunk{err: errChunkIterator{err: errors.Wrapf(err, "cannot populate chunk %d", l.m.Ref)}} + return + } + + l.m.Chunk = l.populated +} + +func (l *lazyPopulatableChunk) Bytes() []byte { + if l.populated == nil { + l.populate() + } + return l.populated.Bytes() +} + +func (l *lazyPopulatableChunk) Encoding() chunkenc.Encoding { + if l.populated == nil { + l.populate() + } + return l.populated.Encoding() +} + +func (l *lazyPopulatableChunk) Appender() (chunkenc.Appender, error) { + if l.populated == nil { + l.populate() + } + return l.populated.Appender() +} + +func (l *lazyPopulatableChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { + if l.populated == nil { + l.populate() + } + return l.populated.Iterator(iterator) +} + +func (l *lazyPopulatableChunk) NumSamples() int { + if l.populated == nil { + l.populate() + } + return l.populated.NumSamples() +} + +func (l *lazyPopulatableChunk) Compact() { + if l.populated == nil { + l.populate() + } + l.populated.Compact() +} + +func (w *Compactor) write(ctx context.Context, symbols index.StringIter, populatedSet storage.ChunkSeriesSet, sWriter block.SeriesWriter, p ProgressLogger) error { + var ( + chks []chunks.Meta + ref uint64 + ) + + for symbols.Next() { + if err := sWriter.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } + } + if err := symbols.Err(); err != nil { + return errors.Wrap(err, "symbols") + } + + // Iterate over all sorted chunk series. + for populatedSet.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + s := populatedSet.At() + chksIter := s.Iterator() + chks = chks[:0] + for chksIter.Next() { + // We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and + // chunk file purposes. + chks = append(chks, chksIter.At()) + } + + if chksIter.Err() != nil { + return errors.Wrap(chksIter.Err(), "chunk iter") + } + + // Skip the series with all deleted chunks. + if len(chks) == 0 { + // All series will be ignored. + p.SeriesProcessed() + continue + } + + if err := sWriter.WriteChunks(chks...); err != nil { + return errors.Wrap(err, "write chunks") + } + if err := sWriter.AddSeries(ref, s.Labels(), chks...); err != nil { + return errors.Wrap(err, "add series") + } + for _, chk := range chks { + // ChunkPool is used by tsdb.OpenBlock BlockReader. + if err := w.chunkPool.Put(chk.Chunk); err != nil { + return errors.Wrap(err, "put chunk") + } + } + ref++ + p.SeriesProcessed() + } + if populatedSet.Err() != nil { + return errors.Wrap(populatedSet.Err(), "iterate populated chunk series set") + } + + return nil +} diff --git a/pkg/compactv2/compactor.go b/pkg/compactv2/compactor.go new file mode 100644 index 0000000000..3da860a998 --- /dev/null +++ b/pkg/compactv2/compactor.go @@ -0,0 +1,188 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compactv2 + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block" +) + +type ProgressLogger interface { + SeriesProcessed() +} + +type progressLogger struct { + logger log.Logger + + series int + processed int +} + +func NewProgressLogger(logger log.Logger, series int) *progressLogger { + return &progressLogger{logger: logger, series: series} +} + +func (p *progressLogger) SeriesProcessed() { + p.processed++ + if (p.series/10) == 0 || p.processed%(p.series/10) == 0 { + level.Info(p.logger).Log("msg", fmt.Sprintf("processed %0.2f%s of %v series", 100*(float64(p.processed)/float64(p.series)), "%", p.series)) + } +} + +type Compactor struct { + tmpDir string + logger log.Logger + + chunkPool chunkenc.Pool + changeLogger ChangeLogger + + dryRun bool +} + +type seriesReader struct { + ir tsdb.IndexReader + cr tsdb.ChunkReader +} + +func New(tmpDir string, logger log.Logger, changeLogger ChangeLogger, pool chunkenc.Pool) *Compactor { + return &Compactor{ + tmpDir: tmpDir, + logger: logger, + changeLogger: changeLogger, + chunkPool: pool, + } +} + +func NewDryRun(tmpDir string, logger log.Logger, changeLogger ChangeLogger, pool chunkenc.Pool) *Compactor { + s := New(tmpDir, logger, changeLogger, pool) + s.dryRun = true + return s +} + +// TODO(bwplotka): Upstream this. +func (w *Compactor) WriteSeries(ctx context.Context, readers []block.Reader, sWriter block.Writer, p ProgressLogger, modifiers ...Modifier) (err error) { + if len(readers) == 0 { + return errors.New("cannot write from no readers") + } + + var ( + sReaders []seriesReader + closers []io.Closer + ) + defer func() { + errs := tsdb_errors.NewMulti(err) + if cerr := tsdb_errors.CloseAll(closers); cerr != nil { + errs.Add(errors.Wrap(cerr, "close")) + } + err = errs.Err() + }() + + for _, b := range readers { + indexr, err := b.Index() + if err != nil { + return errors.Wrapf(err, "open index reader for block %+v", b.Meta()) + } + closers = append(closers, indexr) + + chunkr, err := b.Chunks() + if err != nil { + return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta()) + } + closers = append(closers, chunkr) + sReaders = append(sReaders, seriesReader{ir: indexr, cr: chunkr}) + } + + symbols, set, err := compactSeries(ctx, sReaders...) + if err != nil { + return errors.Wrapf(err, "compact series from %v", func() string { + var metas []string + for _, m := range readers { + metas = append(metas, fmt.Sprintf("%v", m.Meta())) + } + return strings.Join(metas, ",") + }()) + } + + for _, m := range modifiers { + symbols, set = m.Modify(symbols, set, w.changeLogger, p) + } + + if w.dryRun { + // Even for dry run, we need to exhaust iterators to see potential changes. + for set.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + s := set.At() + iter := s.Iterator() + for iter.Next() { + } + if err := iter.Err(); err != nil { + level.Error(w.logger).Log("msg", "error while iterating over chunks", "series", s.Labels(), "err", err) + } + p.SeriesProcessed() + } + if err := set.Err(); err != nil { + level.Error(w.logger).Log("msg", "error while iterating over set", "err", err) + } + return nil + } + + if err := w.write(ctx, symbols, set, sWriter, p); err != nil { + return errors.Wrap(err, "write") + } + return nil +} + +// compactSeries compacts blocks' series into symbols and one ChunkSeriesSet with lazy populating chunks. +func compactSeries(ctx context.Context, sReaders ...seriesReader) (symbols index.StringIter, set storage.ChunkSeriesSet, _ error) { + if len(sReaders) == 0 { + return nil, nil, errors.New("cannot populate block from no readers") + } + + var sets []storage.ChunkSeriesSet + for i, r := range sReaders { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } + + k, v := index.AllPostingsKey() + all, err := r.ir.Postings(k, v) + if err != nil { + return nil, nil, err + } + all = r.ir.SortedPostings(all) + syms := r.ir.Symbols() + sets = append(sets, newLazyPopulateChunkSeriesSet(r, all)) + if i == 0 { + symbols = syms + set = sets[0] + continue + } + symbols = tsdb.NewMergedStringIter(symbols, syms) + } + + if len(sets) <= 1 { + return symbols, set, nil + } + // Merge series using compacting chunk series merger. + return symbols, storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil +} diff --git a/pkg/compactv2/compactor_test.go b/pkg/compactv2/compactor_test.go new file mode 100644 index 0000000000..cc10a1897a --- /dev/null +++ b/pkg/compactv2/compactor_test.go @@ -0,0 +1,443 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compactv2 + +import ( + "bytes" + "context" + "io/ioutil" + "math" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestCompactor_WriteSeries_e2e(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + logger := log.NewLogfmtLogger(os.Stderr) + for _, tcase := range []struct { + name string + + input [][]seriesSamples + modifiers []Modifier + dryRun bool + + expected []seriesSamples + expectedErr error + expectedStats tsdb.BlockStats + expectedChanges string + }{ + { + name: "empty block", + expectedErr: errors.New("cannot write from no readers"), + }, + { + name: "1 blocks no modify", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "2 blocks compact no modify", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "4"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "4"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 21, + NumSeries: 4, + NumChunks: 7, + }, + }, + { + name: "1 blocks + delete modifier, empty deletion request", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier()}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, deletion request no deleting anything", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "0")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, deletion request no deleting anything - by specifying no intervals.", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "0")}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, delete second series", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "2")}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 12, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 blocks + delete modifier, delete second series and part of first 3rd", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "2")}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "3")}, + Intervals: tombstones.Intervals{{Mint: 10, Maxt: 11}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\"} [{0 20}]\nDeleted {a=\"3\"} [{10 11}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 10, + NumSeries: 2, + NumChunks: 2, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-series-writer") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + chunkPool := chunkenc.NewPool() + + changes := bytes.Buffer{} + changeLog := &changeLog{w: &changes} + var s *Compactor + if tcase.dryRun { + s = NewDryRun(tmpDir, logger, changeLog, chunkPool) + } else { + s = New(tmpDir, logger, changeLog, chunkPool) + } + + var series int + var blocks []block.Reader + for _, b := range tcase.input { + series += len(b) + id := ulid.MustNew(uint64(len(blocks)+1), nil) + bdir := filepath.Join(tmpDir, id.String()) + testutil.Ok(t, os.MkdirAll(bdir, os.ModePerm)) + testutil.Ok(t, createBlockSeries(bdir, b)) + // Meta does not matter, but let's create for OpenBlock to work. + testutil.Ok(t, metadata.Meta{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: id}}.WriteToDir(logger, bdir)) + block, err := tsdb.OpenBlock(logger, bdir, chunkPool) + testutil.Ok(t, err) + blocks = append(blocks, block) + } + + id := ulid.MustNew(uint64(len(blocks)+1), nil) + d, err := block.NewDiskWriter(ctx, logger, filepath.Join(tmpDir, id.String())) + testutil.Ok(t, err) + p := NewProgressLogger(logger, series) + if tcase.expectedErr != nil { + err := s.WriteSeries(ctx, blocks, d, p, tcase.modifiers...) + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, s.WriteSeries(ctx, blocks, d, p, tcase.modifiers...)) + + testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, id.String()), os.ModePerm)) + stats, err := d.Flush() + testutil.Ok(t, err) + + testutil.Equals(t, tcase.expectedChanges, changes.String()) + testutil.Equals(t, tcase.expectedStats, stats) + testutil.Equals(t, tcase.expected, readBlockSeries(t, filepath.Join(tmpDir, id.String()))) + }) + } +} + +type sample struct { + t int64 + v float64 +} + +type seriesSamples struct { + lset labels.Labels + chunks [][]sample +} + +func readBlockSeries(t *testing.T, bDir string) []seriesSamples { + indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename)) + testutil.Ok(t, err) + defer indexr.Close() + + chunkr, err := chunks.NewDirReader(filepath.Join(bDir, block.ChunksDirname), nil) + testutil.Ok(t, err) + defer chunkr.Close() + + all, err := indexr.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + all = indexr.SortedPostings(all) + + var series []seriesSamples + var chks []chunks.Meta + for all.Next() { + s := seriesSamples{} + testutil.Ok(t, indexr.Series(all.At(), &s.lset, &chks)) + + for _, c := range chks { + c.Chunk, err = chunkr.Chunk(c.Ref) + testutil.Ok(t, err) + + var chk []sample + iter := c.Chunk.Iterator(nil) + for iter.Next() { + sa := sample{} + sa.t, sa.v = iter.At() + chk = append(chk, sa) + } + testutil.Ok(t, iter.Err()) + s.chunks = append(s.chunks, chk) + } + series = append(series, s) + } + testutil.Ok(t, all.Err()) + return series +} + +func createBlockSeries(bDir string, inputSeries []seriesSamples) (err error) { + d, err := block.NewDiskWriter(context.TODO(), log.NewNopLogger(), bDir) + if err != nil { + return err + } + defer func() { + if err != nil { + _, _ = d.Flush() + _ = os.RemoveAll(bDir) + } + }() + + sort.Slice(inputSeries, func(i, j int) bool { + return labels.Compare(inputSeries[i].lset, inputSeries[j].lset) < 0 + }) + + // Gather symbols. + symbols := map[string]struct{}{} + for _, input := range inputSeries { + for _, l := range input.lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + symbolsSlice := make([]string, 0, len(symbols)) + for s := range symbols { + symbolsSlice = append(symbolsSlice, s) + } + sort.Strings(symbolsSlice) + for _, s := range symbolsSlice { + if err := d.AddSymbol(s); err != nil { + return err + } + } + var ref uint64 + for _, input := range inputSeries { + var chks []chunks.Meta + for _, chk := range input.chunks { + x := chunkenc.NewXORChunk() + a, err := x.Appender() + if err != nil { + return err + } + for _, sa := range chk { + a.Append(sa.t, sa.v) + } + chks = append(chks, chunks.Meta{Chunk: x, MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t}) + } + if err := d.WriteChunks(chks...); err != nil { + return errors.Wrap(err, "write chunks") + } + if err := d.AddSeries(ref, input.lset, chks...); err != nil { + return errors.Wrap(err, "add series") + } + ref++ + } + + if _, err = d.Flush(); err != nil { + return errors.Wrap(err, "flush") + } + return nil +} diff --git a/pkg/compactv2/modifiers.go b/pkg/compactv2/modifiers.go new file mode 100644 index 0000000000..2139d98d85 --- /dev/null +++ b/pkg/compactv2/modifiers.go @@ -0,0 +1,329 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compactv2 + +import ( + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type Modifier interface { + Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) +} + +type DeletionModifier struct { + deletions []metadata.DeletionRequest +} + +func WithDeletionModifier(deletions ...metadata.DeletionRequest) *DeletionModifier { + return &DeletionModifier{deletions: deletions} +} + +func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) { + // TODO(bwplotka): Modify symbols as well. Otherwise large string will be kept forever. + // This is however what Prometheus already does. It does not increase index size too much though. + // This needs a bit of work due to sorting and tracking required to rebuild them.pp + + return sym, &delModifierSeriesSet{ + d: d, + + ChunkSeriesSet: set, + log: log, + p: p, + } +} + +type delModifierSeriesSet struct { + storage.ChunkSeriesSet + + d *DeletionModifier + log ChangeLogger + p ProgressLogger + + curr *storage.ChunkSeriesEntry + err error +} + +func (d *delModifierSeriesSet) Next() bool { +SeriesLoop: + for d.ChunkSeriesSet.Next() { + s := d.ChunkSeriesSet.At() + lbls := s.Labels() + + var intervals tombstones.Intervals + for _, deletions := range d.d.deletions { + for _, m := range deletions.Matchers { + v := lbls.Get(m.Name) + if v == "" { + continue + } + + if !m.Matches(v) { + continue + } + if len(deletions.Intervals) > 0 { + for _, in := range deletions.Intervals { + intervals = intervals.Add(in) + } + break + } + + // Special case: Delete whole series. + chksIter := s.Iterator() + var chks []chunks.Meta + for chksIter.Next() { + chks = append(chks, chksIter.At()) + } + if d.err = chksIter.Err(); d.err != nil { + return false + } + + var deleted tombstones.Intervals + if len(chks) > 0 { + deleted = deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)-1].MaxTime}) + } + d.log.DeleteSeries(lbls, deleted) + d.p.SeriesProcessed() + continue SeriesLoop + } + } + + d.curr = &storage.ChunkSeriesEntry{ + Lset: lbls, + ChunkIteratorFn: func() chunks.Iterator { + return NewDelGenericSeriesIterator(s.Iterator(), intervals, func(intervals tombstones.Intervals) { + d.log.DeleteSeries(lbls, intervals) + }).ToChunkSeriesIterator() + }, + } + return true + } + return false +} + +// intersection returns intersection between interval and range of intervals. +func intersection(i tombstones.Interval, dranges tombstones.Intervals) tombstones.Intervals { + var ret tombstones.Intervals + for _, r := range dranges { + isLeftIn := r.Mint <= i.Maxt + isRightIn := i.Mint <= r.Maxt + if !isLeftIn || !isRightIn { + continue + } + intersection := tombstones.Interval{Mint: r.Mint, Maxt: r.Maxt} + if intersection.Mint < i.Mint { + intersection.Mint = i.Mint + } + if intersection.Maxt > i.Maxt { + intersection.Maxt = i.Maxt + } + ret = ret.Add(intersection) + } + return ret +} + +func (d *delModifierSeriesSet) At() storage.ChunkSeries { + return d.curr +} + +func (d *delModifierSeriesSet) Err() error { + if d.err != nil { + return d.err + } + return d.ChunkSeriesSet.Err() +} + +func (d *delModifierSeriesSet) Warnings() storage.Warnings { + return d.ChunkSeriesSet.Warnings() +} + +type delGenericSeriesIterator struct { + chks chunks.Iterator + + err error + bufIter *tsdb.DeletedIterator + intervals tombstones.Intervals + + currDelIter chunkenc.Iterator + currChkMeta chunks.Meta + logDelete func(intervals tombstones.Intervals) + deleted tombstones.Intervals +} + +func NewDelGenericSeriesIterator( + chks chunks.Iterator, + intervals tombstones.Intervals, + logDelete func(intervals tombstones.Intervals), +) *delGenericSeriesIterator { + return &delGenericSeriesIterator{ + chks: chks, + bufIter: &tsdb.DeletedIterator{}, + intervals: intervals, + logDelete: logDelete, + } +} + +func (d *delGenericSeriesIterator) next() (ok bool) { + if d.err != nil { + return false + } + + for d.chks.Next() { + d.currChkMeta = d.chks.At() + + if chk := (tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}); chk.IsSubrange(d.intervals) { + d.deleted = d.deleted.Add(chk) + continue + } + d.bufIter.Intervals = d.bufIter.Intervals[:0] + for _, interval := range d.intervals { + if d.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { + d.bufIter.Intervals = d.bufIter.Intervals.Add(interval) + } + } + if len(d.bufIter.Intervals) == 0 { + d.currDelIter = nil + return true + } + + for _, del := range intersection(tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}, d.bufIter.Intervals) { + d.deleted = d.deleted.Add(del) + } + + // We don't want full chunk, take just part of it. + d.bufIter.Iter = d.currChkMeta.Chunk.Iterator(nil) + d.currDelIter = d.bufIter + return true + } + if len(d.deleted) > 0 { + d.logDelete(d.deleted) + } + return false +} + +func (d *delGenericSeriesIterator) Err() error { + if d.err != nil { + return d.err + } + return d.chks.Err() +} + +func (d *delGenericSeriesIterator) ToSeriesIterator() chunkenc.Iterator { + return &delSeriesIterator{delGenericSeriesIterator: d} +} +func (d *delGenericSeriesIterator) ToChunkSeriesIterator() chunks.Iterator { + return &delChunkSeriesIterator{delGenericSeriesIterator: d} +} + +// delSeriesIterator allows to iterate over samples for the single series. +type delSeriesIterator struct { + *delGenericSeriesIterator + + curr chunkenc.Iterator +} + +func (p *delSeriesIterator) Next() bool { + if p.curr != nil && p.curr.Next() { + return true + } + + for p.next() { + if p.currDelIter != nil { + p.curr = p.currDelIter + } else { + p.curr = p.currChkMeta.Chunk.Iterator(nil) + } + if p.curr.Next() { + return true + } + } + return false +} + +func (p *delSeriesIterator) Seek(t int64) bool { + if p.curr != nil && p.curr.Seek(t) { + return true + } + for p.Next() { + if p.curr.Seek(t) { + return true + } + } + return false +} + +func (p *delSeriesIterator) At() (int64, float64) { return p.curr.At() } + +func (p *delSeriesIterator) Err() error { + if err := p.delGenericSeriesIterator.Err(); err != nil { + return err + } + if p.curr != nil { + return p.curr.Err() + } + return nil +} + +type delChunkSeriesIterator struct { + *delGenericSeriesIterator + + curr chunks.Meta +} + +func (p *delChunkSeriesIterator) Next() bool { + if !p.next() { + return false + } + + p.curr = p.currChkMeta + if p.currDelIter == nil { + return true + } + + // Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened. + newChunk := chunkenc.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + p.err = err + return false + } + + if !p.currDelIter.Next() { + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "iterate chunk while re-encoding") + return false + } + + // Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator. + p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk") + return false + } + + t, v := p.currDelIter.At() + p.curr.MinTime = t + app.Append(t, v) + + for p.currDelIter.Next() { + t, v = p.currDelIter.At() + app.Append(t, v) + } + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "iterate chunk while re-encoding") + return false + } + + p.curr.Chunk = newChunk + p.curr.MaxTime = t + return true +} + +func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr } + +// TODO(bwplotka): Add relabelling. diff --git a/pkg/component/component.go b/pkg/component/component.go index a418b9461f..91ec2f9ec5 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -92,6 +92,7 @@ var ( Bucket = source{component: component{name: "bucket"}} Cleanup = source{component: component{name: "cleanup"}} Mark = source{component: component{name: "mark"}} + Rewrite = source{component: component{name: "rewrite"}} Compact = source{component: component{name: "compact"}} Downsample = source{component: component{name: "downsample"}} Replicate = source{component: component{name: "replicate"}} diff --git a/pkg/objstore/filesystem/filesystem.go b/pkg/objstore/filesystem/filesystem.go index 3a42d0ccd3..72c0c017ef 100644 --- a/pkg/objstore/filesystem/filesystem.go +++ b/pkg/objstore/filesystem/filesystem.go @@ -54,7 +54,7 @@ func NewBucket(rootDir string) (*Bucket, error) { // Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error { +func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error { absDir := filepath.Join(b.rootDir, dir) info, err := os.Stat(absDir) if err != nil { @@ -108,7 +108,7 @@ func (r *rangeReaderCloser) Close() error { } // Attributes returns information about the specified object. -func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { +func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { file := filepath.Join(b.rootDir, name) stat, err := os.Stat(file) if err != nil { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8173cb3ebc..e6aa993816 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -806,7 +806,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() } - relabelConf, err := block.ParseRelabelConfig([]byte(sc.relabel)) + relabelConf, err := block.ParseRelabelConfig([]byte(sc.relabel), block.SelectorSupportedRelabelActions) testutil.Ok(t, err) rec := &recorder{Bucket: bkt} diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index b661e03002..4688ac424c 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -47,7 +47,7 @@ for x in "${toolsCommands[@]}"; do ${THANOS_BIN} tools "${x}" --help &>"docs/components/flags/tools_${x}.txt" done -toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup" "mark") +toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup" "mark" "rewrite") for x in "${toolsBucketCommands[@]}"; do ${THANOS_BIN} tools bucket "${x}" --help &>"docs/components/flags/tools_bucket_${x}.txt" done