Skip to content

Commit

Permalink
add tombstone cli and mask store series using tombstones
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Oct 18, 2021
1 parent fa476b5 commit 689ceb1
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 7 deletions.
11 changes: 10 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ func runStore(

// bucketStoreReady signals when bucket store is ready.
bucketStoreReady := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

Expand All @@ -382,6 +382,15 @@ func runStore(
cancel()
})
}
{
g.Add(func() error {
return runutil.Repeat(time.Minute*3, ctx.Done(), func() error {
return bs.SyncTombstones(ctx)
})
}, func(error) {
cancel()
})
}
// Start query (proxy) gRPC StoreAPI.
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)
Expand Down
59 changes: 59 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"

Expand All @@ -54,6 +55,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tombstone"
"github.com/thanos-io/thanos/pkg/ui"
"github.com/thanos-io/thanos/pkg/verifier"
"golang.org/x/text/language"
Expand Down Expand Up @@ -139,6 +141,13 @@ type bucketMarkBlockConfig struct {
blockIDs []string
}

type bucketDeleteConfig struct {
timeout time.Duration
matchers string
author string
reason string
}

func (tbc *bucketVerifyConfig) registerBucketVerifyFlag(cmd extkingpin.FlagClause) *bucketVerifyConfig {
cmd.Flag("repair", "Attempt to repair blocks for which issues were detected").
Short('r').Default("false").BoolVar(&tbc.repair)
Expand Down Expand Up @@ -246,6 +255,15 @@ func (tbc *bucketRetentionConfig) registerBucketRetentionFlag(cmd extkingpin.Fla
return tbc
}

func (tbc *bucketDeleteConfig) registerBucketDeleteFlag(cmd extkingpin.FlagClause) *bucketDeleteConfig {
cmd.Flag("timeout", "Timeout to upload tombstone file to the remote storage").Default("5m").DurationVar(&tbc.timeout)
cmd.Flag("matchers", "The string representing label matchers").Default("").StringVar(&tbc.matchers)
cmd.Flag("author", "Author of the deletion request").Default("not specified").StringVar(&tbc.author)
cmd.Flag("reason", "Reason to perform the deletion request").Default("not specified").StringVar(&tbc.reason)

return tbc
}

func registerBucket(app extkingpin.AppClause) {
cmd := app.Command("bucket", "Bucket utility commands")

Expand All @@ -260,6 +278,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
registerBucketDelete(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -1285,3 +1304,43 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketDelete(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command("delete", "Delete series command for the object storage. NOTE: Currently it only performs Store API masking in the object storage at chunk level with respect to the tombstones created by the user (Doesn't actually delete the data in objstore).")

tbc := &bucketDeleteConfig{}
tbc.registerBucketDeleteFlag(cmd)

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to delete. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d (Calculates the actual timestamp at the tombstone creation time) or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to delete. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d (Calculates the actual timestamp at the tombstone creation time) or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, "delete")
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, bkt, "tools delete")

// Dummy actor to immediately kill the group after the run function returns.
g.Add(func() error { return nil }, func(error) {})

m, err := parser.ParseMetricSelector(tbc.matchers)
if err != nil {
return err
}

ts := tombstone.NewTombstone(m, minTime.PrometheusTimestamp(), maxTime.PrometheusTimestamp(), tbc.author, tbc.reason)

ctx, cancel := context.WithTimeout(context.Background(), tbc.timeout)
defer cancel()
return tombstone.UploadTombstone(ctx, ts, bkt, logger)
})
}
12 changes: 12 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools bucket delete [<flags>]
Delete series command for the object storage. NOTE: Currently it only
performs Store API masking in the object storage at chunk level with respect
to the tombstones created by the user (Doesn't actually delete the data in
objstore).
tools rules-check --rules=RULES
Check if the rule files are valid or not.
Expand Down Expand Up @@ -186,6 +192,12 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools bucket delete [<flags>]
Delete series command for the object storage. NOTE: Currently it only
performs Store API masking in the object storage at chunk level with respect
to the tombstones created by the user (Doesn't actually delete the data in
objstore).
```

Expand Down
49 changes: 44 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
promtombstones "github.com/prometheus/prometheus/tsdb/tombstones"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -52,6 +53,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tombstone"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -297,6 +299,9 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

tombstonesMtx sync.RWMutex
tombstones []*tombstone.Tombstone

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

Expand Down Expand Up @@ -772,6 +777,7 @@ func blockSeries(
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
tombstones []*tombstone.Tombstone,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
if err != nil {
Expand Down Expand Up @@ -802,6 +808,7 @@ func blockSeries(
lset labels.Labels
chks []chunks.Meta
)
PostingsLoop:
for _, id := range ps {
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime)
if err != nil {
Expand All @@ -813,11 +820,35 @@ func blockSeries(
}

s := seriesEntry{}
if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil {
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}
var tombstoneIntervals promtombstones.Intervals
for _, ts := range tombstones {
for _, matcher := range ts.Matchers {
if val := lset.Get(matcher.Name); val != "" {
if matcher.Matches(val) {
if skipChunks {
continue PostingsLoop
}
tombstoneIntervals.Add(promtombstones.Interval{Mint: ts.MinTime, Maxt: ts.MaxTime})
}
}
}
}

if !skipChunks {
// Schedule loading chunks.
s.refs = make([]uint64, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
ChunkMetasLoop:
for j, meta := range chks {
for _, it := range tombstoneIntervals {
if meta.OverlapsClosedInterval(it.Mint, it.Maxt) {
continue ChunkMetasLoop
}
}

// seriesEntry s is appended to res, but not at every outer loop iteration,
// therefore len(res) is the index we need here, not outer loop iteration number.
if err := chunkr.addLoad(meta.Ref, len(res), j); err != nil {
Expand All @@ -835,9 +866,6 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "exceeded chunks limit")
}
}
if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil {
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

s.lset = labelpb.ExtendSortedLabels(lset, extLset)
res = append(res, s)
Expand Down Expand Up @@ -1006,6 +1034,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}

s.mtx.RLock()
s.tombstonesMtx.RLock()
tombstones := s.tombstones
s.tombstonesMtx.RUnlock()
for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
if !ok {
Expand Down Expand Up @@ -1058,6 +1089,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
tombstones,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1259,7 +1291,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

result = strutil.MergeSlices(res, extRes)
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil)
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down Expand Up @@ -1390,7 +1422,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
result = res
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil)
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down Expand Up @@ -1443,6 +1475,13 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}, nil
}

func (s *BucketStore) SyncTombstones(ctx context.Context) (err error) {
s.tombstonesMtx.Lock()
s.tombstones, err = tombstone.ReadTombstones(ctx, s.bkt, s.logger)
s.tombstonesMtx.Unlock()
return
}

// bucketBlockSet holds all blocks of an equal label set. It internally splits
// them up by downsampling resolution and allows querying.
type bucketBlockSet struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2290,7 +2290,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
indexReader := blk.indexReader()
chunkReader := blk.chunkReader()

seriesSet, _, err := blockSeries(context.Background(), nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates)
seriesSet, _, err := blockSeries(context.Background(), nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil)
testutil.Ok(b, err)

// Ensure at least 1 series has been returned (as expected).
Expand Down
100 changes: 100 additions & 0 deletions pkg/tombstone/tombstone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package tombstone

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/pkg/timestamp"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)

const (
// TombstoneDir is the name of directory to upload tombstones.
TombstoneDir = "thanos/tombstones"
)

// Tombstone represents a tombstone.
type Tombstone struct {
Matchers metadata.Matchers `json:"matchers"`
MinTime int64 `json:"minTime"`
MaxTime int64 `json:"maxTime"`
CreationTime int64 `json:"creationTime"`
Author string `json:"author"`
Reason string `json:"reason"`
}

// NewTombstone returns a new instance of Tombstone.
func NewTombstone(matchers metadata.Matchers, minTime, maxTime int64, author string, reason string) *Tombstone {
return &Tombstone{
Matchers: matchers,
MinTime: minTime,
MaxTime: maxTime,
CreationTime: timestamp.FromTime(time.Now()),
Author: author,
Reason: reason,
}
}

// GenName generates file name based on Matchers, MinTime and MaxTime of a tombstone.
func GenName() string {
return fmt.Sprintf("tombstones-%s.json", ulid.MustNew(uint64(time.Now().Unix()), nil))
}

// UploadTombstone uploads the given tombstone to object storage.
func UploadTombstone(ctx context.Context, tombstone *Tombstone, bkt objstore.Bucket, logger log.Logger) error {
b, err := json.Marshal(tombstone)
if err != nil {
return err
}

tmpDir := os.TempDir()

tsPath := tmpDir + "/tombstone.json"
if err := ioutil.WriteFile(tsPath, b, 0644); err != nil {
return err
}

return objstore.UploadFile(ctx, logger, bkt, tsPath, path.Join(TombstoneDir, GenName()))

}

// ReadTombstones returns all the tombstones present in the object storage.
func ReadTombstones(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger) ([]*Tombstone, error) {
var ts []*Tombstone

if err := bkt.Iter(ctx, TombstoneDir, func(name string) error {
tombstoneFilename := path.Join("", name)
tombstoneFile, err := bkt.Get(ctx, tombstoneFilename)
if err != nil {
return nil
}
defer runutil.CloseWithLogOnErr(logger, tombstoneFile, "close bkt tombstone reader")

var t *Tombstone
tombstone, err := ioutil.ReadAll(tombstoneFile)
if err != nil {
return nil
}
if err := json.Unmarshal(tombstone, t); err != nil {
return nil
}
ts = append(ts, t)
return nil
}); err != nil {
return nil, err
}
return ts, nil
}

0 comments on commit 689ceb1

Please sign in to comment.