Skip to content

Commit

Permalink
Updated Thanos to latest (#5459)
Browse files Browse the repository at this point in the history
* Updated Thanos to latest. Fixed Cortex compactor and bucket client code due to Thanos change.

Signed-off-by: Alex Le <[email protected]>

* fix tests

Signed-off-by: Alex Le <[email protected]>

* fix tests

Signed-off-by: Alex Le <[email protected]>

* trigger workflow

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Jul 15, 2023
1 parent 21eb075 commit 150209c
Show file tree
Hide file tree
Showing 25 changed files with 480 additions and 333 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/spf13/afero v1.9.5
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca
github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054
github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.8
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1160,12 +1160,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca h1:JRF7i58HovirZQVJGwCClQsMK6CCmK2fvialXjeoSpI=
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4 h1:SYs56N3zGaE8wwkU+QAfqeAC9SMjGWQORzrYSs58NAQ=
github.com/thanos-io/objstore v0.0.0-20230713070940-eb01c83b89a4/go.mod h1:Vc+D0zxX8fT7VOe8Gj0J6vzw0kcTrMCEgE140wCz1c0=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054 h1:kBuXA0B+jXX89JAJTymw7g/v/4jyjCSgfPcWQeFUOoM=
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054/go.mod h1:C0Cdk0kFFEDS3qkTgScF9ONSjrPxqnScGPoIgah3NJY=
github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a h1:6GRazFOeBtPpoDFLO7s8AS5upOWPCzQ96mq/UpH/4QI=
github.com/thanos-io/thanos v0.31.1-0.20230714171248-723dfd08764a/go.mod h1:PGAlwITP7IvWQXra3VbWomRqz8xrSlAR3ee6Z8k4No0=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
33 changes: 19 additions & 14 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)

c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until all retry attempts have completed.
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -850,7 +850,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {

c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)

tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -907,7 +907,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -1002,9 +1002,9 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) {
cfg.SkipBlocksWithOutOfOrderChunksEnabled = true
c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)

tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil)
tsdbCompac.On("CompactWithBlockPopulator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil)

tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: b1,
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -1193,7 +1193,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

// Start all compactors
Expand Down Expand Up @@ -1325,7 +1325,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

// Start all compactors
Expand Down Expand Up @@ -1647,13 +1647,18 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo
return args.Get(0).(ulid.ULID), args.Error(1)
}

func (m *tsdbCompactorMock) CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (uid ulid.ULID, err error) {
args := m.Called(dest, dirs, open, blockPopulator)
return args.Get(0).(ulid.ULID), args.Error(1)
}

type tsdbPlannerMock struct {
mock.Mock
noCompactMarkFilters []*compact.GatherNoCompactionMarkFilter
}

func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
args := m.Called(ctx, metasByMinTime)
func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) {
args := m.Called(ctx, metasByMinTime, errChan, extensions)
return args.Get(0).([]*metadata.Meta), args.Error(1)
}

Expand Down Expand Up @@ -1809,7 +1814,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

require.Equal(t, 2, len(compactors))
Expand Down Expand Up @@ -1892,10 +1897,10 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) {
c, tsdbCompactor, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)

ctx, cancel := context.WithCancel(context.Background())
tsdbCompactor.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ulid.ULID{}, context.Canceled).Run(func(args mock.Arguments) {
tsdbCompactor.On("CompactWithBlockPopulator", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ulid.ULID{}, context.Canceled).Run(func(args mock.Arguments) {
cancel()
})
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: b1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/shuffle_sharding_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewShuffleShardingPlanner(
}
}

func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) {
// Ensure all blocks fits within the largest range. This is a double check
// to ensure there's no bug in the previous blocks grouping, given this Plan()
// is just a pass-through.
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/shuffle_sharding_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) {
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
)
actual, err := p.Plan(context.Background(), testData.blocks)
actual, err := p.Plan(context.Background(), testData.blocks, nil, nil)

if testData.expectedErr != nil {
assert.Equal(t, err, testData.expectedErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/opentracing/opentracing-go"
objstoretracing "github.com/thanos-io/objstore/tracing"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc"
)
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/tracing/opentracing"

"github.com/cortexproject/cortex/pkg/storage/bucket/azure"
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
Expand Down Expand Up @@ -122,7 +123,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
return nil, err
}

client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg))
client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
Expand All @@ -140,8 +141,9 @@ func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus
return bucketClient
}

return objstore.BucketWithMetrics(
"", // bucket label value
return objstore.WrapWithMetrics(
bucketClient,
prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, prometheus.WrapRegistererWithPrefix("thanos_", reg)))
prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
"", // bucket label value
)
}
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestBucketWithGlobalMarkers_ShouldWorkCorrectlyWithBucketMetrics(t *testing
// global markers (intentionally in the middle of the chain) and
// user prefix.
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
bkt = objstore.BucketWithMetrics("", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg))
bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg), "")
bkt = BucketWithGlobalMarkers(bkt)
userBkt := bucket.NewUserBucketClient("user-1", bkt, nil)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/bucketindex/markers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestMigrateBlockDeletionMarksToGlobalLocation(t *testing.T) {

t.Run("doesn't increase thanos_objstore_bucket_operation_failures_total for NotFound deletion markers", func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
bkt = objstore.BucketWithMetrics("", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg))
bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg), "")
require.NoError(t, bkt.Upload(ctx, path.Join("user-1", block2.String(), metadata.MetaFilename), strings.NewReader("{}")))
require.NoError(t, MigrateBlockDeletionMarksToGlobalLocation(ctx, bkt, "user-1", nil))

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/bucketindex/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetric(t *testing.

// Mock some blocks in the storage.
bkt = BucketWithGlobalMarkers(bkt)
bkt = objstore.BucketWithMetrics("test-bucket", bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry))
bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket")
block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20)
block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30)
block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40)
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetricCustomerKey(

// Mock some blocks in the storage.
bkt = BucketWithGlobalMarkers(bkt)
bkt = objstore.BucketWithMetrics("test-bucket", bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry))
bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket")
block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20)
block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) {
bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

return objstore.BucketWithMetrics("test", bkt, nil), storageDir
return objstore.WrapWithMetrics(bkt, nil, "test"), storageDir
}

type MockBucketFailure struct {
Expand Down
7 changes: 5 additions & 2 deletions vendor/github.com/thanos-io/objstore/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions vendor/github.com/thanos-io/objstore/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 150209c

Please sign in to comment.