From b7aefd069abaddfe2328df8dd20b6d45fde0c43c Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Sun, 8 Nov 2020 19:42:42 +0100 Subject: [PATCH] Added deletion modifier + tests. Signed-off-by: Bartlomiej Plotka --- cmd/thanos/compact.go | 2 +- cmd/thanos/main.go | 1 - cmd/thanos/store.go | 2 +- cmd/thanos/tools_bucket.go | 123 ++++- docs/components/tools.md | 62 +++ go.mod | 2 +- go.sum | 4 +- pkg/block/fetcher.go | 7 +- pkg/block/fetcher_test.go | 8 +- pkg/block/index.go | 8 +- pkg/block/metadata/meta.go | 32 ++ pkg/block/writer_modifiers.go | 105 ----- pkg/block/writer_series_test.go | 258 ---------- .../compactor.go} | 127 +++-- pkg/compactv2/compactor_test.go | 439 ++++++++++++++++++ pkg/compactv2/modifiers.go | 334 +++++++++++++ pkg/objstore/filesystem/filesystem.go | 7 +- pkg/store/bucket_test.go | 2 +- scripts/genflagdocs.sh | 2 +- 19 files changed, 1073 insertions(+), 452 deletions(-) delete mode 100644 pkg/block/writer_modifiers.go delete mode 100644 pkg/block/writer_series_test.go rename pkg/{block/writer_series.go => compactv2/compactor.go} (78%) create mode 100644 pkg/compactv2/compactor_test.go create mode 100644 pkg/compactv2/modifiers.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ba09d20b465..0a76989dc43 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -178,7 +178,7 @@ func runCompact( return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml) + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index b4f127f679f..a4a3dd5bac7 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -97,7 +97,6 @@ func main() { } if len(confContentYaml) == 0 { - level.Info(logger).Log("msg", "Tracing will be disabled") tracer = client.NoopTracer() } else { tracer, closer, err = client.NewTracer(ctx, logger, metrics, confContentYaml) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 17c13391824..82e4ec67b3f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -235,7 +235,7 @@ func runStore( return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml) + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 13e1e54ff56..f6a6e4e54d1 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -5,9 +5,12 @@ package main import ( "context" + "crypto/rand" "encoding/json" "fmt" + "io/ioutil" "os" + "path/filepath" "sort" "strconv" "strings" @@ -25,11 +28,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" v1 "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/compactv2" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extkingpin" @@ -47,6 +53,7 @@ import ( "github.com/thanos-io/thanos/pkg/verifier" "golang.org/x/text/language" "golang.org/x/text/message" + "gopkg.in/yaml.v3" ) const extpromPrefix = "thanos_bucket_" @@ -508,7 +515,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml) + relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } @@ -713,15 +720,20 @@ func compare(s1, s2 string) bool { } func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { - cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series. Once rewritten, the old block is marked for deletion."+ + cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series"+ "NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that"+ "is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+ "and the data you wanted to rewrite could already part of bigger block.\n\n"+ "Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+ - "WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)") + "Resources needed: 1 CPU and:"+ + "* For deletions: At max 1/32 of posting offsets, 1/32 of symbols, largest labels for single series and biggest XOR chunk."+ + "After rewrite, it's caller responsibility to delete or mark source block for deletion to avoid overlaps."+ + "WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.") blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings() - objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).") + tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String() dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() + toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true) + provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool() cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { @@ -733,6 +745,16 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat return err } + deletionsYaml, err := toDelete.Content() + if err != nil { + return err + } + + var deletions []metadata.DeletionRequest + if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil { + return err + } + var ids []ulid.ULID for _, id := range *blockIDs { u, err := ulid.Parse(id) @@ -742,23 +764,90 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat ids = append(ids, u) } - var backupBkt objstore.InstrumentedBucket - if !*dryRun { - confContentYaml, err := objStoreBackupConfig.Content() - if err != nil { - return err - } - - backupBkt, err = client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String()) - if err != nil { - return err - } + if err := os.RemoveAll(*tmpDir); err != nil { + return err + } + if err := os.MkdirAll(*tmpDir, os.ModePerm); err != nil { + return err } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) g.Add(func() error { + chunkPool := chunkenc.NewPool() + changeLog := compactv2.NewChangeLog(ioutil.Discard) for _, id := range ids { - // Delete series from block & repair. + // Delete series from block & modify. + level.Info(logger).Log("msg", "downloading block", "source", id) + if err := block.Download(ctx, logger, bkt, id, filepath.Join(*tmpDir, id.String())); err != nil { + return errors.Wrapf(err, "download %v", id) + } + + meta, err := metadata.Read(filepath.Join(*tmpDir, id.String())) + if err != nil { + return errors.Wrapf(err, "read meta of %v", id) + } + b, err := tsdb.OpenBlock(logger, filepath.Join(*tmpDir, id.String()), chunkPool) + if err != nil { + return errors.Wrapf(err, "open block %v", id) + } + + p := compactv2.NewProgressLogger(logger, int(b.Meta().Stats.NumSeries)) + newID := ulid.MustNew(ulid.Now(), rand.Reader) + meta.ULID = newID + meta.Thanos.Rewrite.Sources = append(meta.Thanos.Rewrite.Sources, meta.Compaction.Sources...) + meta.Thanos.Rewrite.DeletionsApplied = append(meta.Thanos.Rewrite.DeletionsApplied, deletions) + meta.Compaction.Sources = []ulid.ULID{newID} + meta.Thanos.Source = metadata.BucketRewriteSource + + if err := os.MkdirAll(filepath.Join(*tmpDir, newID.String()), os.ModePerm); err != nil { + return err + } + if err := meta.WriteToDir(logger, filepath.Join(*tmpDir, newID.String())); err != nil { + return err + } + + if *provideChangeLog { + f, err := os.Open(filepath.Join(*tmpDir, newID.String(), "change.log")) + if err != nil { + return err + } + defer runutil.CloseWithLogOnErr(logger, f, "close changelog") + + changeLog = compactv2.NewChangeLog(f) + level.Info(logger).Log("msg", "changelog will be available", "file", filepath.Join(*tmpDir, newID.String(), "change.log")) + } + + d, err := block.NewDiskWriter(ctx, logger, filepath.Join(*tmpDir, newID.String())) + if err != nil { + return err + } + + var comp *compactv2.Compactor + if *dryRun { + comp = compactv2.NewDryRun(*tmpDir, logger, changeLog, chunkPool) + } else { + comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool) + } + + level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml)) + if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil { + return errors.Wrapf(err, "writing series from %v to %v", id, newID) + } + + if *dryRun { + level.Info(logger).Log("msg", "dry run finished. Changes should be printed to stderr") + return nil + } + + level.Info(logger).Log("msg", "wrote new block after modifications; flushing", "source", id, "new", newID) + meta.Stats, err = d.Flush() + if err != nil { + return errors.Wrap(err, "flush") + } + level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID) + if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil { + return errors.Wrap(err, "upload") + } + level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID) } level.Info(logger).Log("msg", "marking for deletion done", "IDs", strings.Join(*blockIDs, ",")) return nil diff --git a/docs/components/tools.md b/docs/components/tools.md index 2179f9d5523..90316c2feae 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -549,6 +549,68 @@ Flags: process downsamplings. ``` + +### Bucket Rewrite + +`tools bucket rewrite` reewrites chosen blocks in the bucket, while deleting or modifying series. + + +For example we can remove all non counters from the block you have on your disk (e.g in Prometheus dir): + +```bash +thanos tools bucket rewrite --no-dry-run \ + --id 01DN3SK96XDAEKRB1AN30AAW6E \ + --objstore.config " +type: FILESYSTEM +config: + directory: +" \ + --rewrite.to-delete-config " +- matchers: \"{__name__!~\\\".*total\\\"}\" +" + +``` + +[embedmd]:# (flags/tools_bucket_rewrite.txt $) +```$ +usage: thanos tools bucket downsample [] + +continuously downsamples blocks in an object store bucket + +Flags: + -h, --help Show context-sensitive help (also try --help-long + and --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. Possible options: logfmt or + json. + --tracing.config-file= + Path to YAML file with tracing configuration. See + format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag (lower + priority). Content of YAML file with tracing + configuration. See format details: + https://thanos.io/tip/thanos/tracing.md/#configuration + --objstore.config-file= + Path to YAML file that contains object store + configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag (lower + priority). Content of YAML file that contains + object store configuration. See format details: + https://thanos.io/tip/thanos/storage.md/#configuration + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. + --http-grace-period=2m Time to wait after an interrupt received for HTTP + Server. + --data-dir="./data" Data directory in which to cache blocks and + process downsamplings. + +``` + ## Rules-check The `tools rules-check` subcommand contains tools for validation of Prometheus rules. diff --git a/go.mod b/go.mod index 1cf45a9a266..752b159e8f6 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ replace ( // Update to v1.1.1 to make sure windows CI pass. github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1 // Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs. - github.com/prometheus/prometheus => ../prometheus + github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible google.golang.org/grpc => google.golang.org/grpc v1.29.1 diff --git a/go.sum b/go.sum index 2caed653262..3d2e5e51e5d 100644 --- a/go.sum +++ b/go.sum @@ -975,8 +975,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9 h1:T6pkPNGKXv21lLfgD/mnIABj9aOhmz8HphDmKllfKWs= -github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9/go.mod h1:MDRkz271loM/PrYN+wUNEaTMDGSP760MQzB0yEjdgSQ= +github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e h1:RL9eMwESCEcfN5a9xUNBB/1mTg/M3OZrPdPcVA5SCLE= +github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e/go.mod h1:MDRkz271loM/PrYN+wUNEaTMDGSP760MQzB0yEjdgSQ= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 45cd47b6260..dda4a035638 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -802,13 +802,16 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL return nil } +var ( + SelectorSupportedRelabelActions = map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}} +) + // ParseRelabelConfig parses relabel configuration. -func ParseRelabelConfig(contentYaml []byte) ([]*relabel.Config, error) { +func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) { var relabelConfig []*relabel.Config if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil { return nil, errors.Wrap(err, "parsing relabel configuration") } - supportedActions := map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}} for _, cfg := range relabelConfig { if _, ok := supportedActions[cfg.Action]; !ok { diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index a2e7d3cf0bd..cf0692f92e5 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -309,7 +309,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { source_labels: - message ` - relabelConfig, err := ParseRelabelConfig([]byte(relabelContentYaml)) + relabelConfig, err := ParseRelabelConfig([]byte(relabelContentYaml), SelectorSupportedRelabelActions) testutil.Ok(t, err) f := NewLabelShardedMetaFilter(relabelConfig) @@ -375,7 +375,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { ` for i := 0; i < 3; i++ { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYamlFmt, BlockIDLabel, i))) + relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYamlFmt, BlockIDLabel, i)), SelectorSupportedRelabelActions) testutil.Ok(t, err) f := NewLabelShardedMetaFilter(relabelConfig) @@ -1203,13 +1203,13 @@ func Test_ParseRelabelConfig(t *testing.T) { regex: "A" source_labels: - cluster - `)) + `), SelectorSupportedRelabelActions) testutil.Ok(t, err) _, err = ParseRelabelConfig([]byte(` - action: labelmap regex: "A" - `)) + `), SelectorSupportedRelabelActions) testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } diff --git a/pkg/block/index.go b/pkg/block/index.go index d245f99b91b..599d7543b68 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -282,15 +282,15 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname)) if err != nil { - return resid, errors.Wrap(err, "open chunk seriesWriter") + return resid, errors.Wrap(err, "open chunk Compactor") } - defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk seriesWriter") + defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk Compactor") indexw, err := index.NewWriter(context.TODO(), filepath.Join(resdir, IndexFilename)) if err != nil { - return resid, errors.Wrap(err, "open index seriesWriter") + return resid, errors.Wrap(err, "open index Compactor") } - defer runutil.CloseWithErrCapture(&err, indexw, "repair index seriesWriter") + defer runutil.CloseWithErrCapture(&err, indexw, "repair index Compactor") // TODO(fabxc): adapt so we properly handle the version once we update to an upstream // that has multiple. diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index db9e3792aa4..cd595b51dba 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -16,15 +16,21 @@ import ( "path/filepath" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/thanos/pkg/runutil" + "gopkg.in/yaml.v3" ) type SourceType string const ( + // TODO(bwplotka): Merge with pkg/component package. UnknownSource SourceType = "" SidecarSource SourceType = "sidecar" ReceiveSource SourceType = "receive" @@ -32,6 +38,7 @@ const ( CompactorRepairSource SourceType = "compactor.repair" RulerSource SourceType = "ruler" BucketRepairSource SourceType = "bucket.repair" + BucketRewriteSource SourceType = "bucket.rewrite" TestSource SourceType = "test" ) @@ -72,6 +79,31 @@ type Thanos struct { // Useful to avoid API call to get size of each file, as well as for debugging purposes. // Optional, added in v0.17.0. Files []File `json:"files,omitempty"` + + // Rewrite is present when any rewrite (deletion, relabel etc) were applied to this block. + Rewrite RewriteMeta `json:"rewrite"` +} + +type RewriteMeta struct { + // ULIDs of all source head blocks that went into the block. + Sources []ulid.ULID `json:"sources,omitempty"` + // Deletions if applied (in order). + DeletionsApplied [][]DeletionRequest `json:"deletions_applied,omitempty"` +} + +type Matchers []*labels.Matcher + +func (m *Matchers) UnmarshalYAML(value *yaml.Node) (err error) { + *m, err = parser.ParseMetricSelector(value.Value) + if err != nil { + return err + } + return nil +} + +type DeletionRequest struct { + Matchers Matchers `json:"matchers" yaml:"matchers"` + Intervals tombstones.Intervals `json:"intervals,omitempty" yaml:"intervals,omitempty"` } type File struct { diff --git a/pkg/block/writer_modifiers.go b/pkg/block/writer_modifiers.go deleted file mode 100644 index 12abff1d803..00000000000 --- a/pkg/block/writer_modifiers.go +++ /dev/null @@ -1,105 +0,0 @@ -package block - -import ( - "math" - - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/index" - "github.com/prometheus/prometheus/tsdb/tombstones" -) - -type Modifier interface { - Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) -} - -type DeletionModifier struct { - deletions []DeleteRequest -} - -func WithDeletionModifier(deletions []DeleteRequest) *DeletionModifier { - return &DeletionModifier{deletions: deletions} -} - -func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) { - return sym, &delModifierSeriesSet{ - d: d, - - ChunkSeriesSet: set, - log: log, - } -} - -type delModifierSeriesSet struct { - storage.ChunkSeriesSet - - d *DeletionModifier - log printChangeLog - - err error -} - -func (d *delModifierSeriesSet) Next() bool { - for d.ChunkSeriesSet.Next() { - s := d.ChunkSeriesSet.At() - lbls := s.Labels() - - var intervals tombstones.Intervals - for _, deletions := range d.d.deletions { - for _, m := range deletions.Matchers { - v := lbls.Get(m.Name) - if v == "" { - continue - } - - if m.Matches(v) { - continue - } - for _, in := range deletions.intervals { - intervals = intervals.Add(in) - } - break - } - } - - if (tombstones.Interval{Mint: math.MinInt64, Maxt: math.MaxInt64}.IsSubrange(intervals)) { - // Quick path for skipping series completely. - chksIter := d.ChunkSeriesSet.At().Iterator() - var chks []chunks.Meta - for chksIter.Next() { - chks = append(chks, chksIter.At()) - } - d.err = chksIter.Err() - if d.err != nil { - return false - } - - deleted := tombstones.Intervals{} - if len(chks) > 0 { - deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)].MaxTime}) - } - d.log.DeleteSeries(lbls, deleted) - continue - } - } - return false -} -func (d *delModifierSeriesSet) At() storage.ChunkSeries { - -} - -func (d *delModifierSeriesSet) Err() error { - panic("implement me") -} - -func (d *delModifierSeriesSet) Warnings() storage.Warnings { - panic("implement me") -} - -// TODO(bwplotka): Add relabelling. - -type DeleteRequest struct { - Matchers []*labels.Matcher - intervals tombstones.Intervals -} diff --git a/pkg/block/writer_series_test.go b/pkg/block/writer_series_test.go deleted file mode 100644 index 1b3560e17b7..00000000000 --- a/pkg/block/writer_series_test.go +++ /dev/null @@ -1,258 +0,0 @@ -package block - -import ( - "context" - "io/ioutil" - "os" - "path/filepath" - "sort" - "testing" - "time" - - "github.com/go-kit/kit/log" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/index" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/testutil" -) - -func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - defer cancel() - - logger := log.NewLogfmtLogger(os.Stderr) - for _, tcase := range []struct { - name string - - input [][]seriesSamples - expected []seriesSamples - expectedErr error - expectedStats tsdb.BlockStats - modifiers struct{} - }{ - { - name: "empty block", - expectedErr: errors.New("cannot write from no readers"), - }, - { - name: "1 blocks no modify", - input: [][]seriesSamples{ - { - {lset: labels.Labels{{Name: "a", Value: "1"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "2"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "3"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, - }, - }, - expected: []seriesSamples{ - {lset: labels.Labels{{Name: "a", Value: "1"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "2"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "3"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, - }, - expectedStats: tsdb.BlockStats{ - NumSamples: 18, - NumSeries: 3, - NumChunks: 4, - }, - }, - { - name: "2 blocks compact no modify", - input: [][]seriesSamples{ - { - {lset: labels.Labels{{Name: "a", Value: "1"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, - {lset: labels.Labels{{Name: "a", Value: "2"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, - {lset: labels.Labels{{Name: "a", Value: "3"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, - }, - { - {lset: labels.Labels{{Name: "a", Value: "1"}}, - chunks: [][]sample{{{10, 10}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "2"}}, - chunks: [][]sample{{{10, 11}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "3"}}, - chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "4"}}, - chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, - }, - }, - expected: []seriesSamples{ - {lset: labels.Labels{{Name: "a", Value: "1"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "2"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "3"}}, - chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 12}, {11, 11}, {20, 20}}}}, - {lset: labels.Labels{{Name: "a", Value: "4"}}, - chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, - }, - expectedStats: tsdb.BlockStats{ - NumSamples: 21, - NumSeries: 4, - NumChunks: 7, - }, - }, - } { - t.Run(tcase.name, func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "test-series-writer") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() - - chunkPool := chunkenc.NewPool() - s := NewSeriesWriter(tmpDir, logger, chunkPool) - - var blocks []Reader - for _, b := range tcase.input { - id := ulid.MustNew(uint64(len(blocks)+1), nil) - bdir := filepath.Join(tmpDir, id.String()) - testutil.Ok(t, os.MkdirAll(bdir, os.ModePerm)) - testutil.Ok(t, createBlockSeries(bdir, b)) - // Meta does not matter, but let's create for OpenBlock to work. - testutil.Ok(t, metadata.Meta{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: id}}.WriteToDir(logger, bdir)) - block, err := tsdb.OpenBlock(logger, bdir, chunkPool) - testutil.Ok(t, err) - blocks = append(blocks, block) - } - - id := ulid.MustNew(uint64(len(blocks)+1), nil) - d, err := NewDiskWriter(ctx, logger, filepath.Join(tmpDir, id.String())) - testutil.Ok(t, err) - if tcase.expectedErr != nil { - err := s.WriteSeries(ctx, nil, d) - testutil.NotOk(t, err) - testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) - return - } - testutil.Ok(t, s.WriteSeries(ctx, blocks, d)) - - stats, err := d.Flush() - testutil.Ok(t, err) - testutil.Equals(t, tcase.expectedStats, stats) - testutil.Equals(t, tcase.expected, readBlockSeries(t, filepath.Join(tmpDir, id.String()))) - }) - } -} - -type sample struct { - t int64 - v float64 -} - -type seriesSamples struct { - lset labels.Labels - chunks [][]sample -} - -func readBlockSeries(t *testing.T, bDir string) []seriesSamples { - indexr, err := index.NewFileReader(filepath.Join(bDir, IndexFilename)) - testutil.Ok(t, err) - defer indexr.Close() - - chunkr, err := chunks.NewDirReader(filepath.Join(bDir, ChunksDirname), nil) - testutil.Ok(t, err) - defer chunkr.Close() - - all, err := indexr.Postings(index.AllPostingsKey()) - testutil.Ok(t, err) - all = indexr.SortedPostings(all) - - var series []seriesSamples - var chks []chunks.Meta - for all.Next() { - s := seriesSamples{} - testutil.Ok(t, indexr.Series(all.At(), &s.lset, &chks)) - - for _, c := range chks { - c.Chunk, err = chunkr.Chunk(c.Ref) - testutil.Ok(t, err) - - var chk []sample - iter := c.Chunk.Iterator(nil) - for iter.Next() { - sa := sample{} - sa.t, sa.v = iter.At() - chk = append(chk, sa) - } - testutil.Ok(t, iter.Err()) - s.chunks = append(s.chunks, chk) - } - series = append(series, s) - } - testutil.Ok(t, all.Err()) - return series -} - -func createBlockSeries(bDir string, inputSeries []seriesSamples) (err error) { - d, err := NewDiskWriter(context.TODO(), log.NewNopLogger(), bDir) - if err != nil { - return err - } - defer func() { - if err != nil { - _, _ = d.Flush() - _ = os.RemoveAll(bDir) - } - }() - - sort.Slice(inputSeries, func(i, j int) bool { - return labels.Compare(inputSeries[i].lset, inputSeries[j].lset) < 0 - }) - - // Gather symbols. - symbols := map[string]struct{}{} - for _, input := range inputSeries { - for _, l := range input.lset { - symbols[l.Name] = struct{}{} - symbols[l.Value] = struct{}{} - } - } - - symbolsSlice := make([]string, 0, len(symbols)) - for s := range symbols { - symbolsSlice = append(symbolsSlice, s) - } - sort.Strings(symbolsSlice) - for _, s := range symbolsSlice { - if err := d.AddSymbol(s); err != nil { - return err - } - } - var ref uint64 - for _, input := range inputSeries { - var chks []chunks.Meta - for _, chk := range input.chunks { - x := chunkenc.NewXORChunk() - a, err := x.Appender() - if err != nil { - return err - } - for _, sa := range chk { - a.Append(sa.t, sa.v) - } - chks = append(chks, chunks.Meta{Chunk: x, MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t}) - } - if err := d.WriteChunks(chks...); err != nil { - return errors.Wrap(err, "write chunks") - } - if err := d.AddSeries(ref, input.lset, chks...); err != nil { - return errors.Wrap(err, "add series") - } - ref++ - } - - if _, err = d.Flush(); err != nil { - return errors.Wrap(err, "flush") - } - return nil -} diff --git a/pkg/block/writer_series.go b/pkg/compactv2/compactor.go similarity index 78% rename from pkg/block/writer_series.go rename to pkg/compactv2/compactor.go index 5a90b748264..694a1934481 100644 --- a/pkg/block/writer_series.go +++ b/pkg/compactv2/compactor.go @@ -1,4 +1,4 @@ -package block +package compactv2 import ( "context" @@ -7,6 +7,7 @@ import ( "strings" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -16,9 +17,10 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block" ) -type printChangeLog interface { +type ChangeLogger interface { DeleteSeries(del labels.Labels, intervals tombstones.Intervals) ModifySeries(old labels.Labels, new labels.Labels) } @@ -27,6 +29,12 @@ type changeLog struct { w io.Writer } +func NewChangeLog(w io.Writer) *changeLog { + return &changeLog{ + w: w, + } +} + func (l *changeLog) DeleteSeries(del labels.Labels, intervals tombstones.Intervals) { _, _ = fmt.Fprintf(l.w, "Deleted %v %v\n", del.String(), intervals) } @@ -35,12 +43,34 @@ func (l *changeLog) ModifySeries(old labels.Labels, new labels.Labels) { _, _ = fmt.Fprintf(l.w, "Relabelled %v %v\n", old.String(), new.String()) } -type seriesWriter struct { +type ProgressLogger interface { + SeriesProcessed() +} + +type progressLogger struct { + logger log.Logger + + series int + processed int +} + +func NewProgressLogger(logger log.Logger, series int) *progressLogger { + return &progressLogger{logger: logger, series: series} +} + +func (p *progressLogger) SeriesProcessed() { + p.processed++ + if (p.series/10) == 0 || p.processed%(p.series/10) == 0 { + level.Info(p.logger).Log("msg", fmt.Sprintf("processed %0.2f%s of %v series", 100*(float64(p.processed)/float64(p.series)), "%", p.series)) + } +} + +type Compactor struct { tmpDir string logger log.Logger chunkPool chunkenc.Pool - changeLogger printChangeLog + changeLogger ChangeLogger dryRun bool } @@ -50,8 +80,8 @@ type seriesReader struct { cr tsdb.ChunkReader } -func NewSeriesWriter(tmpDir string, logger log.Logger, changeLogger printChangeLog, pool chunkenc.Pool) *seriesWriter { - return &seriesWriter{ +func New(tmpDir string, logger log.Logger, changeLogger ChangeLogger, pool chunkenc.Pool) *Compactor { + return &Compactor{ tmpDir: tmpDir, logger: logger, changeLogger: changeLogger, @@ -59,8 +89,14 @@ func NewSeriesWriter(tmpDir string, logger log.Logger, changeLogger printChangeL } } +func NewDryRun(tmpDir string, logger log.Logger, changeLogger ChangeLogger, pool chunkenc.Pool) *Compactor { + s := New(tmpDir, logger, changeLogger, pool) + s.dryRun = true + return s +} + // TODO(bwplotka): Upstream this. -func (w *seriesWriter) WriteSeries(ctx context.Context, readers []Reader, sWriter Writer, modifiers ...Modifier) (err error) { +func (w *Compactor) WriteSeries(ctx context.Context, readers []block.Reader, sWriter block.Writer, p ProgressLogger, modifiers ...Modifier) (err error) { if len(readers) == 0 { return errors.New("cannot write from no readers") } @@ -104,14 +140,34 @@ func (w *seriesWriter) WriteSeries(ctx context.Context, readers []Reader, sWrite } for _, m := range modifiers { - symbols, set = m.Modify(symbols, set, w.changeLogger) + symbols, set = m.Modify(symbols, set, w.changeLogger, p) } if w.dryRun { + // Even for dry run, we need to exhaust iterators to see potential changes. + for set.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + s := set.At() + iter := s.Iterator() + for iter.Next() { + } + if err := iter.Err(); err != nil { + level.Error(w.logger).Log("msg", "error while iterating over chunks", "series", s.Labels(), "err", err) + } + p.SeriesProcessed() + } + if err := set.Err(); err != nil { + level.Error(w.logger).Log("msg", "error while iterating over set", "err", err) + } return nil } - if err := w.write(ctx, symbols, set, sWriter); err != nil { + if err := w.write(ctx, symbols, set, sWriter, p); err != nil { return errors.Wrap(err, "write") } return nil @@ -214,18 +270,12 @@ func (s *lazyPopulateChunkSeriesSet) Err() error { func (s *lazyPopulateChunkSeriesSet) Warnings() storage.Warnings { return nil } -// populatableChunk allows to trigger when you want to have chunks populated. -type populatableChunk interface { - Populate(intervals tombstones.Intervals) (err error) -} - type lazyPopulatableChunk struct { m *chunks.Meta cr tsdb.ChunkReader populated chunkenc.Chunk - bufIter *tsdb.DeletedIterator } type errChunkIterator struct{ err error } @@ -246,12 +296,7 @@ func (e errChunk) Iterator(chunkenc.Iterator) chunkenc.Iterator { return e.err } func (e errChunk) NumSamples() int { return 0 } func (e errChunk) Compact() {} -func (l *lazyPopulatableChunk) Populate(intervals tombstones.Intervals) { - if len(intervals) > 0 && (tombstones.Interval{Mint: l.m.MinTime, Maxt: l.m.MaxTime}.IsSubrange(intervals)) { - l.m.Chunk = EmptyChunk - return - } - +func (l *lazyPopulatableChunk) populate() { // TODO(bwplotka): In most cases we don't need to parse anything, just copy. Extend reader/writer for this. var err error l.populated, err = l.cr.Chunk(l.m.Ref) @@ -260,70 +305,52 @@ func (l *lazyPopulatableChunk) Populate(intervals tombstones.Intervals) { return } - var matching tombstones.Intervals - for _, interval := range intervals { - if l.m.OverlapsClosedInterval(interval.Mint, interval.Maxt) { - matching = matching.Add(interval) - } - } - - if len(matching) == 0 { - l.m.Chunk = l.populated - return - } - - // TODO(bwplotka): Optimize by using passed iterator. - l.bufIter = &tsdb.DeletedIterator{Intervals: matching, Iter: l.populated.Iterator(nil)} - return - + l.m.Chunk = l.populated } func (l *lazyPopulatableChunk) Bytes() []byte { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.Bytes() } func (l *lazyPopulatableChunk) Encoding() chunkenc.Encoding { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.Encoding() } func (l *lazyPopulatableChunk) Appender() (chunkenc.Appender, error) { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.Appender() } func (l *lazyPopulatableChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { if l.populated == nil { - l.Populate(nil) + l.populate() } - if l.bufIter == nil { - return l.populated.Iterator(iterator) - } - return l.bufIter + return l.populated.Iterator(iterator) } func (l *lazyPopulatableChunk) NumSamples() int { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.NumSamples() } func (l *lazyPopulatableChunk) Compact() { if l.populated == nil { - l.Populate(nil) + l.populate() } l.populated.Compact() } -func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, populatedSet storage.ChunkSeriesSet, sWriter SeriesWriter) error { +func (w *Compactor) write(ctx context.Context, symbols index.StringIter, populatedSet storage.ChunkSeriesSet, sWriter block.SeriesWriter, p ProgressLogger) error { var ( chks []chunks.Meta ref uint64 @@ -361,6 +388,8 @@ func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, popu // Skip the series with all deleted chunks. if len(chks) == 0 { + // All series will be ignored. + p.SeriesProcessed() continue } @@ -370,13 +399,13 @@ func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, popu if err := sWriter.AddSeries(ref, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } - for _, chk := range chks { if err := w.chunkPool.Put(chk.Chunk); err != nil { return errors.Wrap(err, "put chunk") } } ref++ + p.SeriesProcessed() } if populatedSet.Err() != nil { return errors.Wrap(populatedSet.Err(), "iterate populated chunk series set") diff --git a/pkg/compactv2/compactor_test.go b/pkg/compactv2/compactor_test.go new file mode 100644 index 00000000000..a0eb7877e8c --- /dev/null +++ b/pkg/compactv2/compactor_test.go @@ -0,0 +1,439 @@ +package compactv2 + +import ( + "bytes" + "context" + "io/ioutil" + "math" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + logger := log.NewLogfmtLogger(os.Stderr) + for _, tcase := range []struct { + name string + + input [][]seriesSamples + modifiers []Modifier + dryRun bool + + expected []seriesSamples + expectedErr error + expectedStats tsdb.BlockStats + expectedChanges string + }{ + { + name: "empty block", + expectedErr: errors.New("cannot write from no readers"), + }, + { + name: "1 blocks no modify", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "2 blocks compact no modify", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "4"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "4"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 21, + NumSeries: 4, + NumChunks: 7, + }, + }, + { + name: "1 blocks + delete modifier, empty deletion request", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier()}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, deletion request no deleting anything", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "0")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, deletion request no deleting anything - by specifying no intervals.", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "0")}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, delete second series", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "2")}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 12, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 blocks + delete modifier, delete second series and part of first 3rd", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "2")}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + }, metadata.DeletionRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "3")}, + Intervals: tombstones.Intervals{{Mint: 10, Maxt: 11}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\"} [{0 20}]\nDeleted {a=\"3\"} [{10 11}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 10, + NumSeries: 2, + NumChunks: 2, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-series-writer") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + chunkPool := chunkenc.NewPool() + + changes := bytes.Buffer{} + changeLog := &changeLog{w: &changes} + var s *Compactor + if tcase.dryRun { + s = NewDryRun(tmpDir, logger, changeLog, chunkPool) + } else { + s = New(tmpDir, logger, changeLog, chunkPool) + } + + var series int + var blocks []block.Reader + for _, b := range tcase.input { + series += len(b) + id := ulid.MustNew(uint64(len(blocks)+1), nil) + bdir := filepath.Join(tmpDir, id.String()) + testutil.Ok(t, os.MkdirAll(bdir, os.ModePerm)) + testutil.Ok(t, createBlockSeries(bdir, b)) + // Meta does not matter, but let's create for OpenBlock to work. + testutil.Ok(t, metadata.Meta{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: id}}.WriteToDir(logger, bdir)) + block, err := tsdb.OpenBlock(logger, bdir, chunkPool) + testutil.Ok(t, err) + blocks = append(blocks, block) + } + + id := ulid.MustNew(uint64(len(blocks)+1), nil) + d, err := block.NewDiskWriter(ctx, logger, filepath.Join(tmpDir, id.String())) + testutil.Ok(t, err) + p := NewProgressLogger(logger, series) + if tcase.expectedErr != nil { + err := s.WriteSeries(ctx, blocks, d, p, tcase.modifiers...) + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, s.WriteSeries(ctx, blocks, d, p, tcase.modifiers...)) + + stats, err := d.Flush() + testutil.Ok(t, err) + + testutil.Equals(t, tcase.expectedChanges, changes.String()) + testutil.Equals(t, tcase.expectedStats, stats) + testutil.Equals(t, tcase.expected, readBlockSeries(t, filepath.Join(tmpDir, id.String()))) + }) + } +} + +type sample struct { + t int64 + v float64 +} + +type seriesSamples struct { + lset labels.Labels + chunks [][]sample +} + +func readBlockSeries(t *testing.T, bDir string) []seriesSamples { + indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename)) + testutil.Ok(t, err) + defer indexr.Close() + + chunkr, err := chunks.NewDirReader(filepath.Join(bDir, block.ChunksDirname), nil) + testutil.Ok(t, err) + defer chunkr.Close() + + all, err := indexr.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + all = indexr.SortedPostings(all) + + var series []seriesSamples + var chks []chunks.Meta + for all.Next() { + s := seriesSamples{} + testutil.Ok(t, indexr.Series(all.At(), &s.lset, &chks)) + + for _, c := range chks { + c.Chunk, err = chunkr.Chunk(c.Ref) + testutil.Ok(t, err) + + var chk []sample + iter := c.Chunk.Iterator(nil) + for iter.Next() { + sa := sample{} + sa.t, sa.v = iter.At() + chk = append(chk, sa) + } + testutil.Ok(t, iter.Err()) + s.chunks = append(s.chunks, chk) + } + series = append(series, s) + } + testutil.Ok(t, all.Err()) + return series +} + +func createBlockSeries(bDir string, inputSeries []seriesSamples) (err error) { + d, err := block.NewDiskWriter(context.TODO(), log.NewNopLogger(), bDir) + if err != nil { + return err + } + defer func() { + if err != nil { + _, _ = d.Flush() + _ = os.RemoveAll(bDir) + } + }() + + sort.Slice(inputSeries, func(i, j int) bool { + return labels.Compare(inputSeries[i].lset, inputSeries[j].lset) < 0 + }) + + // Gather symbols. + symbols := map[string]struct{}{} + for _, input := range inputSeries { + for _, l := range input.lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + symbolsSlice := make([]string, 0, len(symbols)) + for s := range symbols { + symbolsSlice = append(symbolsSlice, s) + } + sort.Strings(symbolsSlice) + for _, s := range symbolsSlice { + if err := d.AddSymbol(s); err != nil { + return err + } + } + var ref uint64 + for _, input := range inputSeries { + var chks []chunks.Meta + for _, chk := range input.chunks { + x := chunkenc.NewXORChunk() + a, err := x.Appender() + if err != nil { + return err + } + for _, sa := range chk { + a.Append(sa.t, sa.v) + } + chks = append(chks, chunks.Meta{Chunk: x, MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t}) + } + if err := d.WriteChunks(chks...); err != nil { + return errors.Wrap(err, "write chunks") + } + if err := d.AddSeries(ref, input.lset, chks...); err != nil { + return errors.Wrap(err, "add series") + } + ref++ + } + + if _, err = d.Flush(); err != nil { + return errors.Wrap(err, "flush") + } + return nil +} diff --git a/pkg/compactv2/modifiers.go b/pkg/compactv2/modifiers.go new file mode 100644 index 00000000000..b1d396ff1c0 --- /dev/null +++ b/pkg/compactv2/modifiers.go @@ -0,0 +1,334 @@ +package compactv2 + +import ( + "fmt" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type Modifier interface { + Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) +} + +type DeletionModifier struct { + deletions []metadata.DeletionRequest +} + +var ( + SupportedRelabelActions = map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}} +) + +func WithDeletionModifier(deletions ...metadata.DeletionRequest) *DeletionModifier { + return &DeletionModifier{deletions: deletions} +} + +func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) { + // Modify symbols as well ): + + fmt.Println("modify", d.deletions) + return sym, &delModifierSeriesSet{ + d: d, + + ChunkSeriesSet: set, + log: log, + p: p, + } +} + +type delModifierSeriesSet struct { + storage.ChunkSeriesSet + + d *DeletionModifier + log ChangeLogger + p ProgressLogger + + curr *storage.ChunkSeriesEntry + err error +} + +func (d *delModifierSeriesSet) Next() bool { +SeriesLoop: + for d.ChunkSeriesSet.Next() { + s := d.ChunkSeriesSet.At() + lbls := s.Labels() + + var intervals tombstones.Intervals + for _, deletions := range d.d.deletions { + for _, m := range deletions.Matchers { + v := lbls.Get(m.Name) + if v == "" { + continue + } + + if !m.Matches(v) { + continue + } + if len(deletions.Intervals) > 0 { + for _, in := range deletions.Intervals { + intervals = intervals.Add(in) + } + break + } + + // Special case: Delete whole series. + chksIter := s.Iterator() + var chks []chunks.Meta + for chksIter.Next() { + chks = append(chks, chksIter.At()) + } + if d.err = chksIter.Err(); d.err != nil { + return false + } + + var deleted tombstones.Intervals + if len(chks) > 0 { + deleted = deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)-1].MaxTime}) + } + d.log.DeleteSeries(lbls, deleted) + d.p.SeriesProcessed() + continue SeriesLoop + } + } + + d.curr = &storage.ChunkSeriesEntry{ + Lset: lbls, + ChunkIteratorFn: func() chunks.Iterator { + return NewDelGenericSeriesIterator(s.Iterator(), intervals, func(intervals tombstones.Intervals) { + d.log.DeleteSeries(lbls, intervals) + }).ToChunkSeriesIterator() + }, + } + return true + } + return false +} + +// Intersection returns intersection between interval and range of intervals. +func Intersection(i tombstones.Interval, dranges tombstones.Intervals) tombstones.Intervals { + var ret tombstones.Intervals + for _, r := range dranges { + isLeftIn := r.Mint <= i.Maxt + isRightIn := i.Mint <= r.Maxt + if !isLeftIn || !isRightIn { + continue + } + intersection := tombstones.Interval{Mint: r.Mint, Maxt: r.Maxt} + if intersection.Mint < i.Mint { + intersection.Mint = i.Mint + } + if intersection.Maxt > i.Maxt { + intersection.Maxt = i.Maxt + } + ret = ret.Add(intersection) + } + return ret +} + +func (d *delModifierSeriesSet) At() storage.ChunkSeries { + return d.curr +} + +func (d *delModifierSeriesSet) Err() error { + if d.err != nil { + return d.err + } + return d.ChunkSeriesSet.Err() +} + +func (d *delModifierSeriesSet) Warnings() storage.Warnings { + return d.ChunkSeriesSet.Warnings() +} + +type delGenericSeriesIterator struct { + chks chunks.Iterator + + err error + bufIter *tsdb.DeletedIterator + intervals tombstones.Intervals + + currDelIter chunkenc.Iterator + currChkMeta chunks.Meta + logDelete func(intervals tombstones.Intervals) + deleted tombstones.Intervals + + passedAny bool +} + +func NewDelGenericSeriesIterator( + chks chunks.Iterator, + intervals tombstones.Intervals, + logDelete func(intervals tombstones.Intervals), +) *delGenericSeriesIterator { + return &delGenericSeriesIterator{ + chks: chks, + bufIter: &tsdb.DeletedIterator{}, + intervals: intervals, + logDelete: logDelete, + } +} + +func (d *delGenericSeriesIterator) next() (ok bool) { + if d.err != nil { + return false + } + + for d.chks.Next() { + d.currChkMeta = d.chks.At() + + if chk := (tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}); chk.IsSubrange(d.intervals) { + d.deleted = d.deleted.Add(chk) + continue + } + d.bufIter.Intervals = d.bufIter.Intervals[:0] + for _, interval := range d.intervals { + if d.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { + d.bufIter.Intervals = d.bufIter.Intervals.Add(interval) + } + } + if len(d.bufIter.Intervals) == 0 { + d.currDelIter = nil + return true + } + + for _, del := range Intersection(tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}, d.bufIter.Intervals) { + d.deleted = d.deleted.Add(del) + } + + // We don't want full chunk, take just part of it. + d.bufIter.Iter = d.currChkMeta.Chunk.Iterator(nil) + d.currDelIter = d.bufIter + return true + } + if len(d.deleted) > 0 { + d.logDelete(d.deleted) + } + return false +} + +func (d *delGenericSeriesIterator) Err() error { + if d.err != nil { + return d.err + } + return d.chks.Err() +} + +func (d *delGenericSeriesIterator) ToSeriesIterator() chunkenc.Iterator { + return &delSeriesIterator{delGenericSeriesIterator: d} +} +func (d *delGenericSeriesIterator) ToChunkSeriesIterator() chunks.Iterator { + return &delChunkSeriesIterator{delGenericSeriesIterator: d} +} + +// delSeriesIterator allows to iterate over samples for the single series. +type delSeriesIterator struct { + *delGenericSeriesIterator + + curr chunkenc.Iterator +} + +func (p *delSeriesIterator) Next() bool { + if p.curr != nil && p.curr.Next() { + return true + } + + for p.next() { + if p.currDelIter != nil { + p.curr = p.currDelIter + } else { + p.curr = p.currChkMeta.Chunk.Iterator(nil) + } + if p.curr.Next() { + return true + } + } + return false +} + +func (p *delSeriesIterator) Seek(t int64) bool { + if p.curr != nil && p.curr.Seek(t) { + return true + } + for p.Next() { + if p.curr.Seek(t) { + return true + } + } + return false +} + +func (p *delSeriesIterator) At() (int64, float64) { return p.curr.At() } + +func (p *delSeriesIterator) Err() error { + if err := p.delGenericSeriesIterator.Err(); err != nil { + return err + } + if p.curr != nil { + return p.curr.Err() + } + return nil +} + +type delChunkSeriesIterator struct { + *delGenericSeriesIterator + + curr chunks.Meta +} + +func (p *delChunkSeriesIterator) Next() bool { + if !p.next() { + return false + } + + p.curr = p.currChkMeta + if p.currDelIter == nil { + return true + } + + // Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened. + newChunk := chunkenc.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + p.err = err + return false + } + + if !p.currDelIter.Next() { + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "iterate chunk while re-encoding") + return false + } + + // Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator. + p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk") + return false + } + + t, v := p.currDelIter.At() + p.curr.MinTime = t + app.Append(t, v) + + for p.currDelIter.Next() { + t, v = p.currDelIter.At() + app.Append(t, v) + } + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "iterate chunk while re-encoding") + return false + } + + p.curr.Chunk = newChunk + p.curr.MaxTime = t + return true +} + +func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr } + +// TODO(bwplotka): Add relabelling. diff --git a/pkg/objstore/filesystem/filesystem.go b/pkg/objstore/filesystem/filesystem.go index 3a42d0ccd37..d281fda7a1a 100644 --- a/pkg/objstore/filesystem/filesystem.go +++ b/pkg/objstore/filesystem/filesystem.go @@ -54,13 +54,10 @@ func NewBucket(rootDir string) (*Bucket, error) { // Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error { +func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error { absDir := filepath.Join(b.rootDir, dir) info, err := os.Stat(absDir) if err != nil { - if os.IsNotExist(err) { - return nil - } return errors.Wrapf(err, "stat %s", absDir) } if !info.IsDir() { @@ -108,7 +105,7 @@ func (r *rangeReaderCloser) Close() error { } // Attributes returns information about the specified object. -func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { +func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { file := filepath.Join(b.rootDir, name) stat, err := os.Stat(file) if err != nil { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 75086b5b955..d241d98cc74 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -802,7 +802,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() } - relabelConf, err := block.ParseRelabelConfig([]byte(sc.relabel)) + relabelConf, err := block.ParseRelabelConfig([]byte(sc.relabel), block.SelectorSupportedRelabelActions) testutil.Ok(t, err) rec := &recorder{Bucket: bkt} diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index 3dc4cbba404..d54ecc83a87 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -47,7 +47,7 @@ for x in "${toolsCommands[@]}"; do ${THANOS_BIN} tools "${x}" --help &>"docs/components/flags/tools_${x}.txt" done -toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup") +toolsBucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample" "cleanup", "rewrite") for x in "${toolsBucketCommands[@]}"; do ${THANOS_BIN} tools bucket "${x}" --help &>"docs/components/flags/tools_bucket_${x}.txt" done