Skip to content

Commit

Permalink
tools: Added thanos bucket tool rewrite (for now: allowing block seri…
Browse files Browse the repository at this point in the history
…es deletions).

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Nov 23, 2020
1 parent 1970b17 commit 870a9ce
Show file tree
Hide file tree
Showing 22 changed files with 1,746 additions and 21 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# Changelog

All notable changes to this project will be documented in this file.
Expand All @@ -11,6 +12,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

## Unreleased

- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.

## v0.17.0

### Added

- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUn
github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\
NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/client_golang/prometheus/promauto.{NewCounter,\
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
github.com/prometheus/prometheus/tsdb/errors=github.com/thanos-io/thanos/pkg/errutil,\
sync/atomic=go.uber.org/atomic" ./...
@$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func runCompact(
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func main() {
}

if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "Tracing will be disabled")
tracer = client.NoopTracer()
} else {
tracer, closer, err = client.NewTracer(ctx, logger, metrics, confContentYaml)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func runStore(
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down
158 changes: 155 additions & 3 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package main

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand All @@ -25,11 +28,14 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
v1 "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compactv2"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
Expand All @@ -47,6 +53,7 @@ import (
"github.com/thanos-io/thanos/pkg/verifier"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/yaml.v3"
)

const extpromPrefix = "thanos_bucket_"
Expand Down Expand Up @@ -74,6 +81,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketDownsample(cmd, objStoreConfig)
registerBucketCleanup(cmd, objStoreConfig)
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -500,7 +508,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down Expand Up @@ -709,13 +717,14 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().Strings()
marker := cmd.Flag("marker", "Marker to be put.").Required().Enum(metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
details := cmd.Flag("details", "Human readable details to be put into marker.").Required().String()

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Mark.String())
if err != nil {
return err
}
Expand All @@ -724,7 +733,7 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
return errors.Errorf("block.id is not a valid UUID, got: %v", id)
}
ids = append(ids, u)
}
Expand Down Expand Up @@ -753,3 +762,146 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series "+
"Resulted block has modified stats in meta.json. Additionally compaction.sources are altered to not confuse readers of meta.json. "+
"Instead thanos.rewrite section is added with useful info like old sources and deletion requests. "+
"NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that "+
"is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion) "+
"and the data you wanted to rewrite could already part of bigger block.\n\n"+
"Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus) "+
"After rewrite, it's caller responsibility to delete or mark source block for deletion to avoid overlaps. "+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String()
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String())
if err != nil {
return err
}

deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
}
ids = append(ids, u)
}

if err := os.RemoveAll(*tmpDir); err != nil {
return err
}
if err := os.MkdirAll(*tmpDir, os.ModePerm); err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
chunkPool := chunkenc.NewPool()
changeLog := compactv2.NewChangeLog(ioutil.Discard)
for _, id := range ids {
// Delete series from block & modify.
level.Info(logger).Log("msg", "downloading block", "source", id)
if err := block.Download(ctx, logger, bkt, id, filepath.Join(*tmpDir, id.String())); err != nil {
return errors.Wrapf(err, "download %v", id)
}

meta, err := metadata.ReadFromDir(filepath.Join(*tmpDir, id.String()))
if err != nil {
return errors.Wrapf(err, "read meta of %v", id)
}
b, err := tsdb.OpenBlock(logger, filepath.Join(*tmpDir, id.String()), chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %v", id)
}

p := compactv2.NewProgressLogger(logger, int(b.Meta().Stats.NumSeries))
newID := ulid.MustNew(ulid.Now(), rand.Reader)
meta.ULID = newID
meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{
Sources: meta.Compaction.Sources,
DeletionsApplied: deletions,
})
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource

if err := os.MkdirAll(filepath.Join(*tmpDir, newID.String()), os.ModePerm); err != nil {
return err
}

if *provideChangeLog {
f, err := os.OpenFile(filepath.Join(*tmpDir, newID.String(), "change.log"), os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, f, "close changelog")

changeLog = compactv2.NewChangeLog(f)
level.Info(logger).Log("msg", "changelog will be available", "file", filepath.Join(*tmpDir, newID.String(), "change.log"))
}

d, err := block.NewDiskWriter(ctx, logger, filepath.Join(*tmpDir, newID.String()))
if err != nil {
return err
}

var comp *compactv2.Compactor
if *dryRun {
comp = compactv2.NewDryRun(*tmpDir, logger, changeLog, chunkPool)
} else {
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

if *dryRun {
level.Info(logger).Log("msg", "dry run finished. Changes should be printed to stderr")
return nil
}

level.Info(logger).Log("msg", "wrote new block after modifications; flushing", "source", id, "new", newID)
meta.Stats, err = d.Flush()
if err != nil {
return errors.Wrap(err, "flush")
}
if err := meta.WriteToDir(logger, filepath.Join(*tmpDir, newID.String())); err != nil {
return err
}

level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil {
return errors.Wrap(err, "upload")
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
}
level.Info(logger).Log("msg", "rewrite done", "IDs", strings.Join(*blockIDs, ","))
return nil
}, func(err error) {
cancel()
})
return nil
})
}
Loading

0 comments on commit 870a9ce

Please sign in to comment.