Skip to content

Commit

Permalink
Added deletion modifier + tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Nov 9, 2020
1 parent ef02fe0 commit b7aefd0
Show file tree
Hide file tree
Showing 19 changed files with 1,073 additions and 452 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func runCompact(
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func main() {
}

if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "Tracing will be disabled")
tracer = client.NoopTracer()
} else {
tracer, closer, err = client.NewTracer(ctx, logger, metrics, confContentYaml)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func runStore(
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down
123 changes: 106 additions & 17 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package main

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand All @@ -25,11 +28,14 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
v1 "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compactv2"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
Expand All @@ -47,6 +53,7 @@ import (
"github.com/thanos-io/thanos/pkg/verifier"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/yaml.v3"
)

const extpromPrefix = "thanos_bucket_"
Expand Down Expand Up @@ -508,7 +515,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down Expand Up @@ -713,15 +720,20 @@ func compare(s1, s2 string) bool {
}

func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series. Once rewritten, the old block is marked for deletion."+
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series"+
"NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that"+
"is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+
"and the data you wanted to rewrite could already part of bigger block.\n\n"+
"Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)")
"Resources needed: 1 CPU and:"+
"* For deletions: At max 1/32 of posting offsets, 1/32 of symbols, largest labels for single series and biggest XOR chunk."+
"After rewrite, it's caller responsibility to delete or mark source block for deletion to avoid overlaps."+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).")
tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String()
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand All @@ -733,6 +745,16 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
Expand All @@ -742,23 +764,90 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
ids = append(ids, u)
}

var backupBkt objstore.InstrumentedBucket
if !*dryRun {
confContentYaml, err := objStoreBackupConfig.Content()
if err != nil {
return err
}

backupBkt, err = client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
if err != nil {
return err
}
if err := os.RemoveAll(*tmpDir); err != nil {
return err
}
if err := os.MkdirAll(*tmpDir, os.ModePerm); err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
g.Add(func() error {
chunkPool := chunkenc.NewPool()
changeLog := compactv2.NewChangeLog(ioutil.Discard)
for _, id := range ids {
// Delete series from block & repair.
// Delete series from block & modify.
level.Info(logger).Log("msg", "downloading block", "source", id)
if err := block.Download(ctx, logger, bkt, id, filepath.Join(*tmpDir, id.String())); err != nil {
return errors.Wrapf(err, "download %v", id)
}

meta, err := metadata.Read(filepath.Join(*tmpDir, id.String()))
if err != nil {
return errors.Wrapf(err, "read meta of %v", id)
}
b, err := tsdb.OpenBlock(logger, filepath.Join(*tmpDir, id.String()), chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %v", id)
}

p := compactv2.NewProgressLogger(logger, int(b.Meta().Stats.NumSeries))
newID := ulid.MustNew(ulid.Now(), rand.Reader)
meta.ULID = newID
meta.Thanos.Rewrite.Sources = append(meta.Thanos.Rewrite.Sources, meta.Compaction.Sources...)
meta.Thanos.Rewrite.DeletionsApplied = append(meta.Thanos.Rewrite.DeletionsApplied, deletions)
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource

if err := os.MkdirAll(filepath.Join(*tmpDir, newID.String()), os.ModePerm); err != nil {
return err
}
if err := meta.WriteToDir(logger, filepath.Join(*tmpDir, newID.String())); err != nil {
return err
}

if *provideChangeLog {
f, err := os.Open(filepath.Join(*tmpDir, newID.String(), "change.log"))
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, f, "close changelog")

changeLog = compactv2.NewChangeLog(f)
level.Info(logger).Log("msg", "changelog will be available", "file", filepath.Join(*tmpDir, newID.String(), "change.log"))
}

d, err := block.NewDiskWriter(ctx, logger, filepath.Join(*tmpDir, newID.String()))
if err != nil {
return err
}

var comp *compactv2.Compactor
if *dryRun {
comp = compactv2.NewDryRun(*tmpDir, logger, changeLog, chunkPool)
} else {
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

if *dryRun {
level.Info(logger).Log("msg", "dry run finished. Changes should be printed to stderr")
return nil
}

level.Info(logger).Log("msg", "wrote new block after modifications; flushing", "source", id, "new", newID)
meta.Stats, err = d.Flush()
if err != nil {
return errors.Wrap(err, "flush")
}
level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil {
return errors.Wrap(err, "upload")
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
}
level.Info(logger).Log("msg", "marking for deletion done", "IDs", strings.Join(*blockIDs, ","))
return nil
Expand Down
62 changes: 62 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,68 @@ Flags:
process downsamplings.
```

### Bucket Rewrite

`tools bucket rewrite` reewrites chosen blocks in the bucket, while deleting or modifying series.


For example we can remove all non counters from the block you have on your disk (e.g in Prometheus dir):

```bash
thanos tools bucket rewrite --no-dry-run \
--id 01DN3SK96XDAEKRB1AN30AAW6E \
--objstore.config "
type: FILESYSTEM
config:
directory: <local dir>
" \
--rewrite.to-delete-config "
- matchers: \"{__name__!~\\\".*total\\\"}\"
"
```

[embedmd]:# (flags/tools_bucket_rewrite.txt $)
```$
usage: thanos tools bucket downsample [<flags>]
continuously downsamples blocks in an object store bucket
Flags:
-h, --help Show context-sensitive help (also try --help-long
and --help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--tracing.config-file=<file-path>
Path to YAML file with tracing configuration. See
format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config=<content>
Alternative to 'tracing.config-file' flag (lower
priority). Content of YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (lower
priority). Content of YAML file that contains
object store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--http-grace-period=2m Time to wait after an interrupt received for HTTP
Server.
--data-dir="./data" Data directory in which to cache blocks and
process downsamplings.
```

## Rules-check

The `tools rules-check` subcommand contains tools for validation of Prometheus rules.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ replace (
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => ../prometheus
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9 h1:T6pkPNGKXv21lLfgD/mnIABj9aOhmz8HphDmKllfKWs=
github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9/go.mod h1:MDRkz271loM/PrYN+wUNEaTMDGSP760MQzB0yEjdgSQ=
github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e h1:RL9eMwESCEcfN5a9xUNBB/1mTg/M3OZrPdPcVA5SCLE=
github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e/go.mod h1:MDRkz271loM/PrYN+wUNEaTMDGSP760MQzB0yEjdgSQ=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
Expand Down
7 changes: 5 additions & 2 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,13 +802,16 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
return nil
}

var (
SelectorSupportedRelabelActions = map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}}
)

// ParseRelabelConfig parses relabel configuration.
func ParseRelabelConfig(contentYaml []byte) ([]*relabel.Config, error) {
func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}
supportedActions := map[relabel.Action]struct{}{relabel.Keep: {}, relabel.Drop: {}, relabel.HashMod: {}}

for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
source_labels:
- message
`
relabelConfig, err := ParseRelabelConfig([]byte(relabelContentYaml))
relabelConfig, err := ParseRelabelConfig([]byte(relabelContentYaml), SelectorSupportedRelabelActions)
testutil.Ok(t, err)

f := NewLabelShardedMetaFilter(relabelConfig)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
`
for i := 0; i < 3; i++ {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYamlFmt, BlockIDLabel, i)))
relabelConfig, err := ParseRelabelConfig([]byte(fmt.Sprintf(relabelContentYamlFmt, BlockIDLabel, i)), SelectorSupportedRelabelActions)
testutil.Ok(t, err)

f := NewLabelShardedMetaFilter(relabelConfig)
Expand Down Expand Up @@ -1203,13 +1203,13 @@ func Test_ParseRelabelConfig(t *testing.T) {
regex: "A"
source_labels:
- cluster
`))
`), SelectorSupportedRelabelActions)
testutil.Ok(t, err)

_, err = ParseRelabelConfig([]byte(`
- action: labelmap
regex: "A"
`))
`), SelectorSupportedRelabelActions)
testutil.NotOk(t, err)
testutil.Equals(t, "unsupported relabel action: labelmap", err.Error())
}
8 changes: 4 additions & 4 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT

chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname))
if err != nil {
return resid, errors.Wrap(err, "open chunk seriesWriter")
return resid, errors.Wrap(err, "open chunk Compactor")
}
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk seriesWriter")
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk Compactor")

indexw, err := index.NewWriter(context.TODO(), filepath.Join(resdir, IndexFilename))
if err != nil {
return resid, errors.Wrap(err, "open index seriesWriter")
return resid, errors.Wrap(err, "open index Compactor")
}
defer runutil.CloseWithErrCapture(&err, indexw, "repair index seriesWriter")
defer runutil.CloseWithErrCapture(&err, indexw, "repair index Compactor")

// TODO(fabxc): adapt so we properly handle the version once we update to an upstream
// that has multiple.
Expand Down
Loading

0 comments on commit b7aefd0

Please sign in to comment.