Skip to content

Commit

Permalink
Tools: Added bucket unmark command to remove deletion or no-compact m…
Browse files Browse the repository at this point in the history
…arkers on the blocks.

Signed-off-by: maheshbaliga <[email protected]>
  • Loading branch information
maheshbaliga committed Dec 16, 2022
1 parent 4054531 commit be128f2
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5889](https://github.com/thanos-io/thanos/pull/5889) Query Frontend: Support sharding vertical sharding `label_replace` and `label_join` functions.
- [#5819](https://github.com/thanos-io/thanos/pull/5819) Store: Add a few objectives for Store's data touched/fetched amount and sizes. They are: 50, 95, and 99 quantiles.
- [#5940](https://github.com/thanos-io/thanos/pull/5940) Objstore: Support for authenticating to Swift using application credentials.
- [#5977](https://github.com/thanos-io/thanos/pull/5977) Tools: Added bucket unmark command to remove deletion or no-compact markers on the block.

### Changed

Expand Down
54 changes: 54 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ func (tbc *bucketRetentionConfig) registerBucketRetentionFlag(cmd extkingpin.Fla
return tbc
}

func (tbc *bucketMarkBlockConfig) registerBucketUnmarkBlockFlag(cmd extkingpin.FlagClause) *bucketMarkBlockConfig {
cmd.Flag("id", "ID (ULID) of the blocks to be unmarked for deletion or no-compact (repeated flag)").Required().StringsVar(&tbc.blockIDs)
cmd.Flag("marker", "Marker that was put. One of: deletion-mark.json|no-compact-mark.json.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
return tbc
}

func registerBucket(app extkingpin.AppClause) {
cmd := app.Command("bucket", "Bucket utility commands")

Expand All @@ -277,6 +283,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
registerBucketRetention(cmd, objStoreConfig)
registerBucketUnmarkBlock(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -1370,3 +1377,50 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketUnmarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Unmark.String(), "Unmark block marked for deletion or no-compact in a safe way. NOTE: If the compactor is currently running compacting same block, this operation would be potentially a noop.")

tbc := &bucketMarkBlockConfig{}
tbc.registerBucketUnmarkBlockFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Unmark.String())
if err != nil {
return err
}

var ids []ulid.ULID
for _, id := range tbc.blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("block.id is not a valid UUID, got: %v", id)
}
ids = append(ids, u)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
g.Add(func() error {
for _, id := range ids {
switch tbc.marker {
case metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename:
if err := block.UnmarkBlock(ctx, logger, bkt, id, promauto.With(nil).NewCounter(prometheus.CounterOpts{}), tbc.marker); err != nil {
return errors.Wrapf(err, "unmark %v for %v", id, tbc.marker)
}
default:
return errors.Errorf("not supported marker %v", tbc.marker)
}
}
level.Info(logger).Log("msg", "unmarking done", "IDs", strings.Join(tbc.blockIDs, ","))
return nil
}, func(err error) {
cancel()
})
return nil
})
}
71 changes: 71 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools bucket unmark --id=ID --marker=MARKER
Unmark block marked for deletion or no-compact in a safe way. NOTE: If the
compactor is currently running compacting same block, this operation would
be potentially a noop.
tools rules-check --rules=RULES
Check if the rule files are valid or not.
Expand Down Expand Up @@ -187,6 +192,11 @@ Subcommands:
Retention applies retention policies on the given bucket. Please make sure
no compactor is running on the same bucket at the same time.
tools bucket unmark --id=ID --marker=MARKER
Unmark block marked for deletion or no-compact in a safe way. NOTE: If the
compactor is currently running compacting same block, this operation would
be potentially a noop.
```

Expand Down Expand Up @@ -718,6 +728,67 @@ Flags:
```

### Bucket unmark

`tools bucket unmark` can be used to manually unmark block for deletion or no-compact.

NOTE: If the [Compactor](compact.md) is currently running and compacting exactly same block, this operation would be potentially a noop."

```bash
thanos tools bucket unmark \
--id "01C8320GCGEWBZF51Q46TTQEH9" --id "01C8J352831FXGZQMN2NTJ08DY"
--marker "deletion-mark.json"
--objstore.config-file "bucket.yml"
```

The example content of `bucket.yml`:

```yaml mdox-exec="go run scripts/cfggen/main.go --name=gcs.Config"
type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket unmark --help"
usage: thanos tools bucket unmark --id=ID --marker=MARKER
Unmark block marked for deletion or no-compact in a safe way. NOTE: If the
compactor is currently running compacting same block, this operation would be
potentially a noop.
Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--id=ID ... ID (ULID) of the blocks to be unmarked for deletion
or no-compact (repeated flag)
--log.format=logfmt Log format to use. Possible options: logfmt or json.
--log.level=info Log filtering level.
--marker=MARKER Marker that was put. One of:
deletion-mark.json|no-compact-mark.json.
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (mutually
exclusive). Content of YAML file that contains
object store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
with tracing configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config-file=<file-path>
Path to YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--version Show application version.
```

### Bucket Rewrite

`tools bucket rewrite` rewrites chosen blocks in the bucket, while deleting or modifying series.
Expand Down
19 changes: 19 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,22 @@ func MarkForNoDownsample(ctx context.Context, logger log.Logger, bkt objstore.Bu
level.Info(logger).Log("msg", "block has been marked for no downsample", "block", id)
return nil
}

// UnmarkBlock removes the file which marked the block for deletion or no-compact.
func UnmarkBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, unmarkedBlock prometheus.Counter, markedFilename string) error {
markedFile := path.Join(id.String(), markedFilename)
markedFileExists, err := bkt.Exists(ctx, markedFile)
if err != nil {
return errors.Wrapf(err, "check if %s file exists in bucket", markedFile)
}
if !markedFileExists {
level.Warn(logger).Log("msg", "requested to unmark, but file does not exist; this should not happen; investigate", "err", errors.Errorf("file %s does not exist in bucket", markedFile))
return nil
}
if err := bkt.Delete(ctx, markedFile); err != nil {
return errors.Wrapf(err, "delete file %s from bucket", markedFile)
}
unmarkedBlock.Inc()
level.Info(logger).Log("msg", "block has been unmarked", "block", id)
return nil
}
94 changes: 94 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,97 @@ func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader) error
}
return nil
}

func TestUnmarkForDeletion(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()
tmpDir := t.TempDir()
for _, testcases := range []struct {
name string
preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)
blocksUnmarked int
}{
{
name: "unmarked block for deletion",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
},
blocksUnmarked: 1,
},
{
name: "block not marked for deletion, message logged and metric not incremented",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksUnmarked: 0,
},
} {
t.Run(testcases.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "cluster-eu1", Value: "service-1"}},
{{Name: "cluster-eu1", Value: "service-2"}},
{{Name: "cluster-eu1", Value: "service-3"}},
{{Name: "cluster-us1", Value: "service-1"}},
{{Name: "cluster-us1", Value: "service-2"}},
}, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testcases.preDelete(t, id, bkt)
counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = UnmarkBlock(ctx, log.NewNopLogger(), bkt, id, counter, metadata.DeletionMarkFilename)
testutil.Ok(t, err)
testutil.Equals(t, float64(testcases.blocksUnmarked), promtest.ToFloat64(counter))
})
}
}

func TestUnmarkForNoCompact(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()
tmpDir := t.TempDir()
for _, testCases := range []struct {
name string
preDelete func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)
blocksUnmarked int
}{
{
name: "unmarked block for no-compact",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
NoCompactTime: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
},
blocksUnmarked: 1,
},
{
name: "block not marked for no-compact, message logged and metric not incremented",
preDelete: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksUnmarked: 0,
},
} {
t.Run(testCases.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "cluster-eu1", Value: "service-1"}},
{{Name: "cluster-eu1", Value: "service-2"}},
{{Name: "cluster-eu1", Value: "service-3"}},
{{Name: "cluster-us1", Value: "service-1"}},
{{Name: "cluster-us1", Value: "service-2"}},
}, 100, 0, 1000, labels.Labels{{Name: "region-1", Value: "eu-west"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testCases.preDelete(t, id, bkt)
counter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = UnmarkBlock(ctx, log.NewNopLogger(), bkt, id, counter, metadata.NoCompactMarkFilename)
testutil.Ok(t, err)
testutil.Equals(t, float64(testCases.blocksUnmarked), promtest.ToFloat64(counter))
})
}
}
1 change: 1 addition & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
Downsample = source{component: component{name: "downsample"}}
Replicate = source{component: component{name: "replicate"}}
QueryFrontend = source{component: component{name: "query-frontend"}}
Unmark = source{component: component{name: "unmark"}}
Debug = sourceStoreAPI{component: component{name: "debug"}}
Receive = sourceStoreAPI{component: component{name: "receive"}}
Rule = sourceStoreAPI{component: component{name: "rule"}}
Expand Down

0 comments on commit be128f2

Please sign in to comment.