Skip to content

Commit

Permalink
Global markers bucket now deletes both markers, and only reports "Exi…
Browse files Browse the repository at this point in the history
…sts" if both markers exists. (#1015)

* Global markers bucket now deletes both files, and only reports "Exists" if both files exist.

Signed-off-by: Peter Štibraný <[email protected]>

* Use verifyPathExists at more places.

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <[email protected]>

* Fix test and typo.

Signed-off-by: Peter Štibraný <[email protected]>

* Extract commong getGlobalMarkPathFromBlockMark function.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Feb 4, 2022
1 parent 3ae9013 commit ec0b1c3
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@
* [BUGFIX] Querier: Disable query scheduler SRV DNS lookup. #689
* [BUGFIX] Query-frontend: fix API error messages that were mentioning Prometheus `--enable-feature=promql-negative-offset` and `--enable-feature=promql-at-modifier` flags. #688
* [BUGFIX] Query-frontend: worker's cancellation channels are now buffered to ensure that all request cancellations are properly handled. #741
* [BUGFIX] Compactor: compactor should now be able to correctly mark blocks for deletion and no-compaction, if such marking was previously interrupted. #1015

### Mixin (changes since `grafana/cortex-jsonnet` `1.9.0`)

Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ func TestMultitenantCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil)
bucketClient.MockExists("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", false, nil)
bucketClient.MockExists("user-1/markers/01DTVP434PA9VFXSW2JKB3392D-deletion-mark.json", false, nil)

bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
Expand Down
75 changes: 52 additions & 23 deletions pkg/storage/tsdb/bucketindex/markers_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io/ioutil"
"path"

"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -34,12 +35,8 @@ func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket {

// Upload implements objstore.Bucket.
func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader) error {
globalMarkPath := ""
if blockID, ok := b.isBlockDeletionMark(name); ok {
globalMarkPath = path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
} else if blockID, ok := b.isNoCompactMark(name); ok {
globalMarkPath = path.Clean(path.Join(path.Dir(name), "../", NoCompactMarkFilepath(blockID)))
} else {
globalMarkPath := getGlobalMarkPathFromBlockMark(name)
if globalMarkPath == "" {
return b.parent.Upload(ctx, name, r)
}

Expand All @@ -60,28 +57,32 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read

// Delete implements objstore.Bucket.
func (b *globalMarkersBucket) Delete(ctx context.Context, name string) error {
// Call the parent.
if err := b.parent.Delete(ctx, name); err != nil {
return err
// Call the parent. Only return error here (without deleting global marker too) if error is different than "not found".
err1 := b.parent.Delete(ctx, name)
if err1 != nil && !b.parent.IsObjNotFoundErr(err1) {
return err1
}

// Delete the marker in the global markers location too.
globalMarkPath := ""
if blockID, ok := b.isBlockDeletionMark(name); ok {
globalMarkPath = path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
} else if blockID, ok := b.isNoCompactMark(name); ok {
globalMarkPath = path.Clean(path.Join(path.Dir(name), "../", NoCompactMarkFilepath(blockID)))
globalMarkPath := getGlobalMarkPathFromBlockMark(name)
if globalMarkPath == "" {
return err1
}

if globalMarkPath != "" {
if err := b.parent.Delete(ctx, globalMarkPath); err != nil {
if !b.parent.IsObjNotFoundErr(err) {
return err
}
var err2 error
if err := b.parent.Delete(ctx, globalMarkPath); err != nil {
if !b.parent.IsObjNotFoundErr(err) {
err2 = err
}
}

return nil
if err1 != nil {
// In this case err1 is "ObjNotFound". If we tried to wrap it together with err2, we would need to
// handle this possibility in globalMarkersBucket.IsObjNotFoundErr(). Instead we just ignore err2, if any.
return err1
}

return err2
}

// Name implements objstore.Bucket.
Expand Down Expand Up @@ -111,7 +112,21 @@ func (b *globalMarkersBucket) GetRange(ctx context.Context, name string, off, le

// Exists implements objstore.Bucket.
func (b *globalMarkersBucket) Exists(ctx context.Context, name string) (bool, error) {
return b.parent.Exists(ctx, name)
globalMarkPath := getGlobalMarkPathFromBlockMark(name)
if globalMarkPath == "" {
return b.parent.Exists(ctx, name)
}

// Report "exists" only if BOTH (block-local, and global) files exist, otherwise Thanos
// code will never try to upload the file again, if it finds that it exist.
ok1, err1 := b.parent.Exists(ctx, name)
ok2, err2 := b.parent.Exists(ctx, globalMarkPath)

var me multierror.MultiError
me.Add(err1)
me.Add(err2)

return ok1 && ok2, me.Err()
}

// IsObjNotFoundErr implements objstore.Bucket.
Expand Down Expand Up @@ -142,7 +157,21 @@ func (b *globalMarkersBucket) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpe
return b
}

func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool) {
// getGlobalMarkPathFromBlockMark returns path to global mark, if name points to a block-local mark file. If name
// doesn't point to a block-local mark file, returns empty string.
func getGlobalMarkPathFromBlockMark(name string) string {
if blockID, ok := isBlockDeletionMark(name); ok {
return path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
}

if blockID, ok := isNoCompactMark(name); ok {
return path.Clean(path.Join(path.Dir(name), "../", NoCompactMarkFilepath(blockID)))
}

return ""
}

func isBlockDeletionMark(name string) (ulid.ULID, bool) {
if path.Base(name) != metadata.DeletionMarkFilename {
return ulid.ULID{}, false
}
Expand All @@ -152,7 +181,7 @@ func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool)
return block.IsBlockDir(path.Dir(name))
}

func (b *globalMarkersBucket) isNoCompactMark(name string) (ulid.ULID, bool) {
func isNoCompactMark(name string) (ulid.ULID, bool) {
if path.Base(name) != metadata.NoCompactMarkFilename {
return ulid.ULID{}, false
}
Expand Down
156 changes: 139 additions & 17 deletions pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,149 @@ func TestGlobalMarkersBucket_Delete_ShouldSucceedIfDeletionMarkDoesNotExistInThe
}
}

func TestGlobalMarkersBucket_DeleteShouldDeleteGlobalMarkIfBlockMarkerDoesntExist(t *testing.T) {
ctx := context.Background()

blockID := ulid.MustNew(1, nil)

for name, tc := range map[string]struct {
blockMarker string
globalMarker string
}{
"deletion mark": {
blockMarker: path.Join(blockID.String(), metadata.DeletionMarkFilename),
globalMarker: BlockDeletionMarkFilepath(blockID),
},
"no compact": {
blockMarker: path.Join(blockID.String(), metadata.NoCompactMarkFilename),
globalMarker: NoCompactMarkFilepath(blockID),
},
} {
t.Run(name, func(t *testing.T) {
// Create a mocked block deletion mark in the global location.
bkt, _ := mimir_testutil.PrepareFilesystemBucket(t)
bkt = BucketWithGlobalMarkers(bkt)

// Upload global only
require.NoError(t, bkt.Upload(ctx, tc.globalMarker, strings.NewReader("{}")))

// Verify global exists.
verifyPathExists(t, bkt, tc.globalMarker, true)

// Delete block marker.
err := bkt.Delete(ctx, tc.blockMarker)
require.Error(t, err)
require.True(t, bkt.IsObjNotFoundErr(err))

// Ensure global one been actually deleted.
verifyPathExists(t, bkt, tc.globalMarker, false)
})
}
}

func TestUploadToGlobalMarkerPath(t *testing.T) {
bkt, _ := mimir_testutil.PrepareFilesystemBucket(t)
bkt = BucketWithGlobalMarkers(bkt)
blockID := ulid.MustNew(1, nil)
for name, tc := range map[string]struct {
blockMarker string
globalMarker string
}{
"deletion mark": {
blockMarker: path.Join(blockID.String(), metadata.DeletionMarkFilename),
globalMarker: BlockDeletionMarkFilepath(blockID),
},
"no compact": {
blockMarker: path.Join(blockID.String(), metadata.NoCompactMarkFilename),
globalMarker: NoCompactMarkFilepath(blockID),
},
} {
t.Run(name, func(t *testing.T) {
bkt, _ := mimir_testutil.PrepareFilesystemBucket(t)
bkt = BucketWithGlobalMarkers(bkt)

// Verify that uploading block mark file uploads it to the global markers location too.
require.NoError(t, bkt.Upload(context.Background(), tc.blockMarker, strings.NewReader("mark file")))

verifyPathExists(t, bkt, tc.globalMarker, true)
})
}
}

func TestGlobalMarkersBucket_ExistShouldReportTrueOnlyIfBothExist(t *testing.T) {
blockID := ulid.MustNew(1, nil)

// Verify that uploading deletion mark file uploads it to the global markers location too.
require.NoError(t, bkt.Upload(context.Background(), path.Join(blockID.String(), metadata.DeletionMarkFilename), strings.NewReader("mark file")))
ok, err := bkt.Exists(context.Background(), BlockDeletionMarkFilepath(blockID))
require.NoError(t, err)
require.True(t, ok)
for name, tc := range map[string]struct {
blockMarker string
globalMarker string
}{
"deletion mark": {
blockMarker: path.Join(blockID.String(), metadata.DeletionMarkFilename),
globalMarker: BlockDeletionMarkFilepath(blockID),
},
"no compact": {
blockMarker: path.Join(blockID.String(), metadata.NoCompactMarkFilename),
globalMarker: NoCompactMarkFilepath(blockID),
},
} {
t.Run(name, func(t *testing.T) {
bkt, _ := mimir_testutil.PrepareFilesystemBucket(t)
bkt = BucketWithGlobalMarkers(bkt)

// Upload to global marker only
require.NoError(t, bkt.Upload(context.Background(), tc.globalMarker, strings.NewReader("mark file")))

// Verify global exists, but block marker doesn't.
verifyPathExists(t, bkt, tc.globalMarker, true)
verifyPathExists(t, bkt, tc.blockMarker, false)

// Now upload to block marker (also overwrites global)
require.NoError(t, bkt.Upload(context.Background(), tc.blockMarker, strings.NewReader("mark file")))

// Verify global exists and block marker does too.
verifyPathExists(t, bkt, tc.globalMarker, true)
verifyPathExists(t, bkt, tc.blockMarker, true)

// Verify the same for no-compact mark.
require.NoError(t, bkt.Upload(context.Background(), path.Join(blockID.String(), metadata.NoCompactMarkFilename), strings.NewReader("mark file")))
ok, err = bkt.Exists(context.Background(), NoCompactMarkFilepath(blockID))
// Now delete global file, and only keep block.
require.NoError(t, bkt.Delete(context.Background(), tc.globalMarker))

// Verify global doesn't exist anymore. Block marker also returns false, even though it *does* exist.
verifyPathExists(t, bkt, tc.globalMarker, false)
verifyPathExists(t, bkt, tc.blockMarker, false)
})
}
}

func verifyPathExists(t *testing.T, bkt objstore.Bucket, name string, expected bool) {
t.Helper()

ok, err := bkt.Exists(context.Background(), name)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, expected, ok)
}

func TestGlobalMarkersBucket_getGlobalMarkPathFromBlockMark(t *testing.T) {
type testCase struct {
name string
expected string
}

tests := []testCase{
{name: "", expected: ""},
{name: "01FV060K6XXCS8BCD2CH6C3GBR/index", expected: ""},
}

for _, marker := range []string{metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename} {
tests = append(tests, testCase{name: marker, expected: ""})
tests = append(tests, testCase{name: "01FV060K6XXCS8BCD2CH6C3GBR/" + marker, expected: "markers/01FV060K6XXCS8BCD2CH6C3GBR-" + marker})
tests = append(tests, testCase{name: "/path/to/01FV060K6XXCS8BCD2CH6C3GBR/" + marker, expected: "/path/to/markers/01FV060K6XXCS8BCD2CH6C3GBR-" + marker})
tests = append(tests, testCase{name: "invalid-block-id/" + marker, expected: ""})
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := getGlobalMarkPathFromBlockMark(tc.name)
assert.Equal(t, tc.expected, result)
})
}
}

func TestGlobalMarkersBucket_isBlockDeletionMark(t *testing.T) {
Expand Down Expand Up @@ -96,11 +222,9 @@ func TestGlobalMarkersBucket_isBlockDeletionMark(t *testing.T) {
},
}

b := BucketWithGlobalMarkers(nil).(*globalMarkersBucket)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualID, actualOk := b.isBlockDeletionMark(tc.name)
actualID, actualOk := isBlockDeletionMark(tc.name)
assert.Equal(t, tc.expectedOk, actualOk)
assert.Equal(t, tc.expectedID, actualID)
})
Expand Down Expand Up @@ -135,11 +259,9 @@ func TestGlobalMarkersBucket_isNoCompactMark(t *testing.T) {
},
}

b := BucketWithGlobalMarkers(nil).(*globalMarkersBucket)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualID, actualOk := b.isNoCompactMark(tc.name)
actualID, actualOk := isNoCompactMark(tc.name)
assert.Equal(t, tc.expectedOk, actualOk)
assert.Equal(t, tc.expectedID, actualID)
})
Expand Down

0 comments on commit ec0b1c3

Please sign in to comment.