Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tombstone cli and mask store series using tombstones #4790

Closed
wants to merge 14 commits into from
32 changes: 27 additions & 5 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/alecthomas/units"
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
Expand All @@ -17,11 +18,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"

commonmodel "github.com/prometheus/common/model"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/prometheus/common/route"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
Expand All @@ -45,6 +43,7 @@ import (
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tombstone"
"github.com/thanos-io/thanos/pkg/ui"
)

Expand All @@ -62,8 +61,10 @@ type storeConfig struct {
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
syncTombstonesInterval time.Duration
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
blockSyncConcurrency int
blockMetaFetchConcurrency int
tombstoneFetchConcurrency int
filterConf *store.FilterConfig
selectorRelabelConf extflag.PathOrContent
advertiseCompatibilityLabel bool
Expand Down Expand Up @@ -123,6 +124,12 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&sc.blockMetaFetchConcurrency)

cmd.Flag("sync-tombstone-duration", "Repeat interval for syncing the tombstones between local and remote view.").
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need another flag to enable/disable tombstone masking.

Default("3m").DurationVar(&sc.syncTombstonesInterval)

cmd.Flag("tombstone-fetch-concurrency", "Number of goroutines to use when fetching tombstones from object storage.").
Default("4").IntVar(&sc.tombstoneFetchConcurrency)

sc.filterConf = &store.FilterConfig{}

cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Expand Down Expand Up @@ -310,6 +317,11 @@ func runStore(
return errors.Wrap(err, "meta fetcher")
}

tombstoneFetcher, err := tombstone.NewFetcher(logger, conf.tombstoneFetchConcurrency, bkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []tombstone.Filter{})
if err != nil {
return errors.Wrap(err, "tombstone fetcher")
}

// Limit the concurrency on queries against the Thanos store.
if conf.maxConcurrency < 0 {
return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", conf.maxConcurrency)
Expand Down Expand Up @@ -338,6 +350,7 @@ func runStore(
bs, err := store.NewBucketStore(
bkt,
metaFetcher,
tombstoneFetcher,
conf.dataDir,
store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount),
Expand All @@ -356,8 +369,8 @@ func runStore(

// bucketStoreReady signals when bucket store is ready.
bucketStoreReady := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

Expand Down Expand Up @@ -401,6 +414,15 @@ func runStore(
}),
)

{
g.Add(func() error {
return runutil.Repeat(conf.syncTombstonesInterval, ctx.Done(), func() error {
return bs.SyncTombstones(ctx)
})
}, func(error) {
cancel()
})
}
// Start query (proxy) gRPC StoreAPI.
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)
Expand Down
61 changes: 61 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

Expand Down Expand Up @@ -60,6 +61,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tombstone"
"github.com/thanos-io/thanos/pkg/ui"
"github.com/thanos-io/thanos/pkg/verifier"
)
Expand Down Expand Up @@ -153,6 +155,13 @@ type bucketMarkBlockConfig struct {
blockIDs []string
}

type bucketDeleteConfig struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to rename it because delete command might be ambiguous, whether it deletes series or deletes the whole block.
Maybe we can rename it to delete-series or create-tombstone, as we may need undelete-series or remove-tombstone in the future...

timeout time.Duration
matchers string
author string
reason string
}

func (tbc *bucketVerifyConfig) registerBucketVerifyFlag(cmd extkingpin.FlagClause) *bucketVerifyConfig {
cmd.Flag("repair", "Attempt to repair blocks for which issues were detected").
Short('r').Default("false").BoolVar(&tbc.repair)
Expand Down Expand Up @@ -264,6 +273,15 @@ func (tbc *bucketRetentionConfig) registerBucketRetentionFlag(cmd extkingpin.Fla
return tbc
}

func (tbc *bucketDeleteConfig) registerBucketDeleteFlag(cmd extkingpin.FlagClause) *bucketDeleteConfig {
cmd.Flag("timeout", "Timeout to upload tombstone file to the remote storage").Default("5m").DurationVar(&tbc.timeout)
cmd.Flag("matchers", "The string representing label matchers").Default("").StringVar(&tbc.matchers)
cmd.Flag("author", "Author of the deletion request").Default("not specified").StringVar(&tbc.author)
cmd.Flag("reason", "Reason to perform the deletion request").Default("not specified").StringVar(&tbc.reason)

return tbc
}

func registerBucket(app extkingpin.AppClause) {
cmd := app.Command("bucket", "Bucket utility commands")

Expand All @@ -278,6 +296,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
registerBucketDelete(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -1367,3 +1386,45 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketDelete(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command("delete", "Delete series command for the object storage. NOTE: Currently it only performs Store API masking in the object storage at chunk level with respect to the tombstones created by the user (Doesn't actually delete the data in objstore).")

tbc := &bucketDeleteConfig{}
tbc.registerBucketDeleteFlag(cmd)

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to delete. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d (Calculates the actual timestamp at the tombstone creation time) or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to delete. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d (Calculates the actual timestamp at the tombstone creation time) or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, "delete")
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, bkt, "tools delete")

// Dummy actor to immediately kill the group after the run function returns.
g.Add(func() error { return nil }, func(error) {})

m, err := parser.ParseMetricSelector(tbc.matchers)
if err != nil {
return err
}

newID := ulid.MustNew(ulid.Now(), rand.Reader)
ts := tombstone.NewTombstone(newID, m, minTime.PrometheusTimestamp(), maxTime.PrometheusTimestamp(),
tbc.author, tbc.reason)

ctx, cancel := context.WithTimeout(context.Background(), tbc.timeout)
defer cancel()
return tombstone.UploadTombstone(ctx, ts, bkt)
})
}
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ Flags:
this limit is exceeded. 0 means no limit.
--sync-block-duration=3m Repeat interval for syncing the blocks between
local and remote view.
--sync-tombstone-duration=3m
Repeat interval for syncing the tombstones
between local and remote view.
--tombstone-fetch-concurrency=4
Number of goroutines to use when fetching
tombstones from object storage.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file with
Expand Down
12 changes: 12 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.

tools bucket delete [<flags>]
Delete series command for the object storage. NOTE: Currently it only
performs Store API masking in the object storage at chunk level with respect
to the tombstones created by the user (Doesn't actually delete the data in
objstore).

tools rules-check --rules=RULES
Check if the rule files are valid or not.

Expand Down Expand Up @@ -187,6 +193,12 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.

tools bucket delete [<flags>]
Delete series command for the object storage. NOTE: Currently it only
performs Store API masking in the object storage at chunk level with respect
to the tombstones created by the user (Doesn't actually delete the data in
objstore).


```

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
github.com/felixge/fgprof v0.9.1
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.5.1
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-kit/log v0.2.0
github.com/go-openapi/strfmt v0.21.1
github.com/go-redis/redis/v8 v8.11.4
Expand Down Expand Up @@ -131,6 +130,7 @@ require (
github.com/elastic/go-sysinfo v1.1.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type MetadataFetcher interface {
UpdateOnChange(func([]metadata.Meta, error))
}

// Filter allows filtering or modifying metas from the provided map or returns error.
// MetadataFilter allows filtering or modifying metas from the provided map or returns error.
type MetadataFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, modified *extprom.TxGaugeVec) error
}
Expand Down
Loading