diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b3e8f3054..82b10be0a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 +* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787 ## 1.13.0 in progress * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index e1aaaf44cb..6156b0edd1 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -157,6 +157,11 @@ compactor: # CLI flag: -compactor.block-files-concurrency [block_files_concurrency: | default = 10] + # Number of goroutines to use when fetching blocks from object storage when + # compacting. + # CLI flag: -compactor.blocks-fetch-concurrency + [blocks_fetch_concurrency: | default = 3] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers # global location too. This option can (and should) be safely disabled as soon diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 0fc0f54b97..f15c69e05b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5322,6 +5322,11 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.block-files-concurrency [block_files_concurrency: | default = 10] +# Number of goroutines to use when fetching blocks from object storage when +# compacting. +# CLI flag: -compactor.blocks-fetch-concurrency +[blocks_fetch_concurrency: | default = 3] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers global # location too. This option can (and should) be safely disabled as soon as the diff --git a/go.mod b/go.mod index bbb8a6a222..def0639e8e 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/hashicorp/memberlist v0.3.1 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.3.0 - github.com/minio/minio-go/v7 v7.0.30 + github.com/minio/minio-go/v7 v7.0.32-0.20220706200439-ef3e45ed9cdb github.com/mitchellh/go-wordwrap v1.0.0 github.com/ncw/swift v1.0.52 github.com/oklog/ulid v1.3.1 @@ -56,7 +56,7 @@ require ( github.com/sony/gobreaker v0.4.1 github.com/spf13/afero v1.6.0 github.com/stretchr/testify v1.7.2 - github.com/thanos-io/thanos v0.27.0-rc.0.0.20220712060227-ca4fe82d74f6 + github.com/thanos-io/thanos v0.27.0-rc.0.0.20220715025542-84880ea6d7f8 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20210913144402-035033b78a78 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 1a6c77dc46..903b681c43 100644 --- a/go.sum +++ b/go.sum @@ -1551,8 +1551,8 @@ github.com/minio/minio-go/v6 v6.0.44/go.mod h1:qD0lajrGW49lKZLtXKtCB4X/qkMf0a5tB github.com/minio/minio-go/v6 v6.0.56/go.mod h1:KQMM+/44DSlSGSQWSfRrAZ12FVMmpWNuX37i2AX0jfI= github.com/minio/minio-go/v7 v7.0.2/go.mod h1:dJ80Mv2HeGkYLH1sqS/ksz07ON6csH3S6JUMSQ2zAns= github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo= -github.com/minio/minio-go/v7 v7.0.30 h1:Re+qlwA+LB3mgFGYbztVPzlEjKtGzRVV5Sk38np858k= -github.com/minio/minio-go/v7 v7.0.30/go.mod h1:/sjRKkKIA75CKh1iu8E3qBy7ktBmCCDGII0zbXGwbUk= +github.com/minio/minio-go/v7 v7.0.32-0.20220706200439-ef3e45ed9cdb h1:J7jRWqlD+K3Tp4YbLWcyBKiHoNRy49JR5HA4RetFrAY= +github.com/minio/minio-go/v7 v7.0.32-0.20220706200439-ef3e45ed9cdb/go.mod h1:/sjRKkKIA75CKh1iu8E3qBy7ktBmCCDGII0zbXGwbUk= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= @@ -2040,8 +2040,8 @@ github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117/go.mod h1:kdqF github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU= github.com/thanos-io/thanos v0.13.1-0.20210401085038-d7dff0c84d17/go.mod h1:zU8KqE+6A+HksK4wiep8e/3UvCZLm+Wrw9AqZGaAm9k= github.com/thanos-io/thanos v0.22.0/go.mod h1:SZDWz3phcUcBr4MYFoPFRvl+Z9Nbi45HlwQlwSZSt+Q= -github.com/thanos-io/thanos v0.27.0-rc.0.0.20220712060227-ca4fe82d74f6 h1:6jxorvQx1sk912ykRj29pizu8U480zKmjZb7HnQe+hM= -github.com/thanos-io/thanos v0.27.0-rc.0.0.20220712060227-ca4fe82d74f6/go.mod h1:sklyj/ttQrL8iY3g/pQEAdIhayKW4HvOpbA7TEYK0Xs= +github.com/thanos-io/thanos v0.27.0-rc.0.0.20220715025542-84880ea6d7f8 h1:kdpa2sXUyxQy3KAk00T+4R9rZFCX5vfdwFmf+TWk5BE= +github.com/thanos-io/thanos v0.27.0-rc.0.0.20220715025542-84880ea6d7f8/go.mod h1:Ga4NVtqoPVGjZmKiu5Wz2x6V9o4ohdeV9ZAcztvH13M= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index aab64cb6c8..f6a208591f 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -64,7 +64,8 @@ var ( garbageCollectedBlocks, blocksMarkedForNoCompaction, metadata.NoneFunc, - cfg.BlockFilesConcurrency) + cfg.BlockFilesConcurrency, + cfg.BlocksFetchConcurrency) } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { @@ -84,7 +85,8 @@ var ( ringLifecycle.Addr, limits, userID, - cfg.BlockFilesConcurrency) + cfg.BlockFilesConcurrency, + cfg.BlocksFetchConcurrency) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -166,6 +168,7 @@ type Config struct { TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` BlockFilesConcurrency int `yaml:"block_files_concurrency"` + BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` @@ -216,6 +219,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.") f.IntVar(&cfg.BlockFilesConcurrency, "compactor.block-files-concurrency", 10, "Number of goroutines to use when fetching/uploading block files from object storage.") + f.IntVar(&cfg.BlocksFetchConcurrency, "compactor.blocks-fetch-concurrency", 3, "Number of goroutines to use when fetching blocks from object storage when compacting.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index f5667d83e7..146fd62721 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -41,6 +41,7 @@ type ShuffleShardingGrouper struct { limits Limits userID string blockFilesConcurrency int + blocksFetchConcurrency int ring ring.ReadRing ringLifecyclerAddr string @@ -63,6 +64,7 @@ func NewShuffleShardingGrouper( limits Limits, userID string, blockFilesConcurrency int, + blocksFetchConcurrency int, ) *ShuffleShardingGrouper { if logger == nil { logger = log.NewNopLogger() @@ -100,12 +102,13 @@ func NewShuffleShardingGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - compactorCfg: compactorCfg, - ring: ring, - ringLifecyclerAddr: ringLifecyclerAddr, - limits: limits, - userID: userID, - blockFilesConcurrency: blockFilesConcurrency, + compactorCfg: compactorCfg, + ring: ring, + ringLifecyclerAddr: ringLifecyclerAddr, + limits: limits, + userID: userID, + blockFilesConcurrency: blockFilesConcurrency, + blocksFetchConcurrency: blocksFetchConcurrency, } } @@ -184,6 +187,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re g.blocksMarkedForNoCompact, g.hashFunc, g.blockFilesConcurrency, + g.blocksFetchConcurrency, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index f6480285e1..d1ae8d2a8e 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -241,7 +241,8 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { "test-addr", overrides, "", - 10) + 10, + 3) actual, err := g.Groups(testData.blocks) require.NoError(t, err) require.Len(t, actual, len(testData.expected)) diff --git a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go index 7f145bb978..11b3a5255f 100644 --- a/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go +++ b/vendor/github.com/minio/minio-go/v7/api-put-object-streaming.go @@ -130,32 +130,32 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN var complMultipartUpload completeMultipartUpload // Declare a channel that sends the next part number to be uploaded. - // Buffered to 10000 because thats the maximum number of parts allowed - // by S3. - uploadPartsCh := make(chan uploadPartReq, 10000) + uploadPartsCh := make(chan uploadPartReq) // Declare a channel that sends back the response of a part upload. - // Buffered to 10000 because thats the maximum number of parts allowed - // by S3. - uploadedPartsCh := make(chan uploadedPartRes, 10000) + uploadedPartsCh := make(chan uploadedPartRes) // Used for readability, lastPartNumber is always totalPartsCount. lastPartNumber := totalPartsCount + partitionCtx, partitionCancel := context.WithCancel(ctx) + defer partitionCancel() // Send each part number to the channel to be processed. - for p := 1; p <= totalPartsCount; p++ { - uploadPartsCh <- uploadPartReq{PartNum: p} - } - close(uploadPartsCh) - - partsBuf := make([][]byte, opts.getNumThreads()) - for i := range partsBuf { - partsBuf[i] = make([]byte, 0, partSize) - } + go func() { + defer close(uploadPartsCh) + + for p := 1; p <= totalPartsCount; p++ { + select { + case <-partitionCtx.Done(): + return + case uploadPartsCh <- uploadPartReq{PartNum: p}: + } + } + }() // Receive each part number from the channel allowing three parallel uploads. for w := 1; w <= opts.getNumThreads(); w++ { - go func(w int, partSize int64) { + go func(partSize int64) { for { var uploadReq uploadPartReq var ok bool @@ -181,21 +181,11 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN partSize = lastPartSize } - n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize]) - if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF { - uploadedPartsCh <- uploadedPartRes{ - Error: rerr, - } - // Exit the goroutine. - return - } - - // Get a section reader on a particular offset. - hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress) + sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress) // Proceed to upload the part. objPart, err := c.uploadPart(ctx, bucketName, objectName, - uploadID, hookReader, uploadReq.PartNum, + uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, @@ -218,7 +208,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN Part: uploadReq.Part, } } - }(w, partSize) + }(partSize) } // Gather the responses as they occur and update any @@ -229,12 +219,12 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN return UploadInfo{}, ctx.Err() case uploadRes := <-uploadedPartsCh: if uploadRes.Error != nil { + return UploadInfo{}, uploadRes.Error } // Update the totalUploadedSize. totalUploadedSize += uploadRes.Size - // Store the parts to be completed in order. complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ ETag: uploadRes.Part.ETag, PartNumber: uploadRes.Part.PartNumber, diff --git a/vendor/github.com/minio/minio-go/v7/api.go b/vendor/github.com/minio/minio-go/v7/api.go index a6d6220443..31a9a078d4 100644 --- a/vendor/github.com/minio/minio-go/v7/api.go +++ b/vendor/github.com/minio/minio-go/v7/api.go @@ -111,7 +111,7 @@ type Options struct { // Global constants. const ( libraryName = "minio-go" - libraryVersion = "v7.0.30" + libraryVersion = "v7.0.32" ) // User Agent should always following the below style. @@ -379,7 +379,6 @@ func (c *Client) HealthCheck(hcDuration time.Duration) (context.CancelFunc, erro atomic.StoreInt32(&c.healthStatus, unknown) return case <-timer.C: - timer.Reset(duration) // Do health check the first time and ONLY if the connection is marked offline if c.IsOffline() { gctx, gcancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -394,6 +393,8 @@ func (c *Client) HealthCheck(hcDuration time.Duration) (context.CancelFunc, erro atomic.CompareAndSwapInt32(&c.healthStatus, offline, online) } } + + timer.Reset(duration) } } }(hcDuration) diff --git a/vendor/github.com/minio/minio-go/v7/hook-reader.go b/vendor/github.com/minio/minio-go/v7/hook-reader.go index f251c1e95d..07bc7dbcfc 100644 --- a/vendor/github.com/minio/minio-go/v7/hook-reader.go +++ b/vendor/github.com/minio/minio-go/v7/hook-reader.go @@ -20,6 +20,7 @@ package minio import ( "fmt" "io" + "sync" ) // hookReader hooks additional reader in the source stream. It is @@ -27,6 +28,7 @@ import ( // notified about the exact number of bytes read from the primary // source on each Read operation. type hookReader struct { + mu sync.RWMutex source io.Reader hook io.Reader } @@ -34,6 +36,9 @@ type hookReader struct { // Seek implements io.Seeker. Seeks source first, and if necessary // seeks hook if Seek method is appropriately found. func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) { + hr.mu.Lock() + defer hr.mu.Unlock() + // Verify for source has embedded Seeker, use it. sourceSeeker, ok := hr.source.(io.Seeker) if ok { @@ -43,18 +48,21 @@ func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) { } } - // Verify if hook has embedded Seeker, use it. - hookSeeker, ok := hr.hook.(io.Seeker) - if ok { - var m int64 - m, err = hookSeeker.Seek(offset, whence) - if err != nil { - return 0, err - } - if n != m { - return 0, fmt.Errorf("hook seeker seeked %d bytes, expected source %d bytes", m, n) + if hr.hook != nil { + // Verify if hook has embedded Seeker, use it. + hookSeeker, ok := hr.hook.(io.Seeker) + if ok { + var m int64 + m, err = hookSeeker.Seek(offset, whence) + if err != nil { + return 0, err + } + if n != m { + return 0, fmt.Errorf("hook seeker seeked %d bytes, expected source %d bytes", m, n) + } } } + return n, nil } @@ -62,14 +70,19 @@ func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) { // value 'n' number of bytes are reported through the hook. Returns // error for all non io.EOF conditions. func (hr *hookReader) Read(b []byte) (n int, err error) { + hr.mu.RLock() + defer hr.mu.RUnlock() + n, err = hr.source.Read(b) if err != nil && err != io.EOF { return n, err } - // Progress the hook with the total read bytes from the source. - if _, herr := hr.hook.Read(b[:n]); herr != nil { - if herr != io.EOF { - return n, herr + if hr.hook != nil { + // Progress the hook with the total read bytes from the source. + if _, herr := hr.hook.Read(b[:n]); herr != nil { + if herr != io.EOF { + return n, herr + } } } return n, err @@ -79,7 +92,10 @@ func (hr *hookReader) Read(b []byte) (n int, err error) { // reports the data read from the source to the hook. func newHook(source, hook io.Reader) io.Reader { if hook == nil { - return source + return &hookReader{source: source} + } + return &hookReader{ + source: source, + hook: hook, } - return &hookReader{source, hook} } diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go index 354d1580d8..e9a5dcdd34 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go @@ -219,20 +219,21 @@ type Grouper interface { // DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample // resolution and block's labels. type DefaultGrouper struct { - bkt objstore.Bucket - logger log.Logger - acceptMalformedIndex bool - enableVerticalCompaction bool - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec - garbageCollectedBlocks prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompact prometheus.Counter - hashFunc metadata.HashFunc - blockFilesConcurrency int + bkt objstore.Bucket + logger log.Logger + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + garbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + hashFunc metadata.HashFunc + blockFilesConcurrency int + compactBlocksFetchConcurrency int } // NewDefaultGrouper makes a new DefaultGrouper. @@ -247,6 +248,7 @@ func NewDefaultGrouper( blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, blockFilesConcurrency int, + compactBlocksFetchConcurrency int, ) *DefaultGrouper { return &DefaultGrouper{ bkt: bkt, @@ -273,11 +275,12 @@ func NewDefaultGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - blocksMarkedForNoCompact: blocksMarkedForNoCompact, - garbageCollectedBlocks: garbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - hashFunc: hashFunc, - blockFilesConcurrency: blockFilesConcurrency, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + hashFunc: hashFunc, + blockFilesConcurrency: blockFilesConcurrency, + compactBlocksFetchConcurrency: compactBlocksFetchConcurrency, } } @@ -308,6 +311,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.blocksMarkedForNoCompact, g.hashFunc, g.blockFilesConcurrency, + g.compactBlocksFetchConcurrency, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -328,25 +332,26 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro // Group captures a set of blocks that have the same origin labels and downsampling resolution. // Those blocks generally contain the same series and can thus efficiently be compacted. type Group struct { - logger log.Logger - bkt objstore.Bucket - key string - labels labels.Labels - resolution int64 - mtx sync.Mutex - metasByMinTime []*metadata.Meta - acceptMalformedIndex bool - enableVerticalCompaction bool - compactions prometheus.Counter - compactionRunsStarted prometheus.Counter - compactionRunsCompleted prometheus.Counter - compactionFailures prometheus.Counter - verticalCompactions prometheus.Counter - groupGarbageCollectedBlocks prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompact prometheus.Counter - hashFunc metadata.HashFunc - blockFilesConcurrency int + logger log.Logger + bkt objstore.Bucket + key string + labels labels.Labels + resolution int64 + mtx sync.Mutex + metasByMinTime []*metadata.Meta + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions prometheus.Counter + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionFailures prometheus.Counter + verticalCompactions prometheus.Counter + groupGarbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + hashFunc metadata.HashFunc + blockFilesConcurrency int + compactBlocksFetchConcurrency int } // NewGroup returns a new compaction group. @@ -368,6 +373,7 @@ func NewGroup( blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, blockFilesConcurrency int, + compactBlocksFetchConcurrency int, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() @@ -378,23 +384,24 @@ func NewGroup( } g := &Group{ - logger: logger, - bkt: bkt, - key: key, - labels: lset, - resolution: resolution, - acceptMalformedIndex: acceptMalformedIndex, - enableVerticalCompaction: enableVerticalCompaction, - compactions: compactions, - compactionRunsStarted: compactionRunsStarted, - compactionRunsCompleted: compactionRunsCompleted, - compactionFailures: compactionFailures, - verticalCompactions: verticalCompactions, - groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - blocksMarkedForNoCompact: blocksMarkedForNoCompact, - hashFunc: hashFunc, - blockFilesConcurrency: blockFilesConcurrency, + logger: logger, + bkt: bkt, + key: key, + labels: lset, + resolution: resolution, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + compactions: compactions, + compactionRunsStarted: compactionRunsStarted, + compactionRunsCompleted: compactionRunsCompleted, + compactionFailures: compactionFailures, + verticalCompactions: verticalCompactions, + groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + hashFunc: hashFunc, + blockFilesConcurrency: blockFilesConcurrency, + compactBlocksFetchConcurrency: compactBlocksFetchConcurrency, } return g, nil } @@ -766,8 +773,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - var err error - tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) error { + err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) return err }, opentracing.Tags{"group.key": cg.Key()}) @@ -970,7 +976,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, err error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -987,11 +993,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } var toCompact []*metadata.Meta - tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) error { - toCompact, err = planner.Plan(ctx, cg.metasByMinTime) - return err - }) - if err != nil { + if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { + toCompact, e = planner.Plan(ctx, cg.metasByMinTime) + return e + }); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") } if len(toCompact) == 0 { @@ -1007,61 +1012,69 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Once we have a plan we need to download the actual data. begin := time.Now() + g, errCtx := errgroup.WithContext(ctx) + g.SetLimit(cg.compactBlocksFetchConcurrency) toCompactDirs := make([]string, 0, len(toCompact)) - for _, meta := range toCompact { - bdir := filepath.Join(dir, meta.ULID.String()) - for _, s := range meta.Compaction.Sources { + for _, m := range toCompact { + bdir := filepath.Join(dir, m.ULID.String()) + for _, s := range m.Compaction.Sources { if _, ok := uniqueSources[s]; ok { return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) } uniqueSources[s] = struct{}{} } + func(ctx context.Context, meta *metadata.Meta) { + g.Go(func() error { + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { + return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return retry(errors.Wrapf(err, "download block %s", meta.ULID)) + } - tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) - return err - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID)) - } + // Ensure all input blocks are valid. + var stats block.HealthStats + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { + stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + return e + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return errors.Wrapf(err, "gather index issues for block %s", bdir) + } - // Ensure all input blocks are valid. - var stats block.HealthStats - tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error { - stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) - return err - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir) - } + if err := stats.CriticalErr(); err != nil { + return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) + } - if err := stats.CriticalErr(); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) - } + if err := stats.OutOfOrderChunksErr(); err != nil { + return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) + } - if err := stats.OutOfOrderChunksErr(); err != nil { - return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) - } + if err := stats.Issue347OutsideChunksErr(); err != nil { + return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) + } - if err := stats.Issue347OutsideChunksErr(); err != nil { - return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) - } + if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + return errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", meta.ULID) + } + return nil + }) + }(errCtx, m) - if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, - "block id %s, try running with --debug.accept-malformed-index", meta.ULID) - } toCompactDirs = append(toCompactDirs, bdir) } + + if err := g.Wait(); err != nil { + return false, ulid.ULID{}, err + } + level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) begin = time.Now() - tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) error { - compID, err = comp.Compact(dir, toCompactDirs, nil) - return err - }) - if err != nil { + if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { + compID, e = comp.Compact(dir, toCompactDirs, nil) + return e + }); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) } if compID == (ulid.ULID{}) { @@ -1102,9 +1115,8 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } // Ensure the output block is valid. - tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { - err = block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) - return err + err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { + return block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) }) if !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) @@ -1120,9 +1132,8 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() - tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { - err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) - return err + err = tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { + return block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) }) if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) @@ -1133,9 +1144,8 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, meta := range toCompact { - tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { - err = cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) - return err + err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { + return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) diff --git a/vendor/github.com/thanos-io/thanos/pkg/objstore/s3/s3.go b/vendor/github.com/thanos-io/thanos/pkg/objstore/s3/s3.go index e8b0f80dea..1fdc5347dc 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/objstore/s3/s3.go +++ b/vendor/github.com/thanos-io/thanos/pkg/objstore/s3/s3.go @@ -485,6 +485,10 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { PartSize: partSize, ServerSideEncryption: sse, UserMetadata: b.putUserMetadata, + // 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we + // ensure we pin this number to four. + // TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck. + NumThreads: 4, }, ); err != nil { return errors.Wrap(err, "upload s3 object") diff --git a/vendor/github.com/thanos-io/thanos/pkg/tracing/tracing.go b/vendor/github.com/thanos-io/thanos/pkg/tracing/tracing.go index 1987382d58..9ddb904280 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/tracing/tracing.go +++ b/vendor/github.com/thanos-io/thanos/pkg/tracing/tracing.go @@ -76,13 +76,15 @@ func StartSpan(ctx context.Context, operationName string, opts ...opentracing.St // DoInSpanWtihErr executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. // It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. // It logs the error inside the new span created, which differentiates it from DoInSpan and DoWithSpan. -func DoInSpanWithErr(ctx context.Context, operationName string, doFn func(context.Context) error, opts ...opentracing.StartSpanOption) { +func DoInSpanWithErr(ctx context.Context, operationName string, doFn func(context.Context) error, opts ...opentracing.StartSpanOption) error { span, newCtx := StartSpan(ctx, operationName, opts...) defer span.Finish() err := doFn(newCtx) if err != nil { ext.LogError(span, err) } + + return err } // DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. diff --git a/vendor/modules.txt b/vendor/modules.txt index 279ac6b7d8..32d49bd5a3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -448,7 +448,7 @@ github.com/matttproud/golang_protobuf_extensions/pbutil github.com/miekg/dns # github.com/minio/md5-simd v1.1.2 github.com/minio/md5-simd -# github.com/minio/minio-go/v7 v7.0.30 +# github.com/minio/minio-go/v7 v7.0.32-0.20220706200439-ef3e45ed9cdb ## explicit github.com/minio/minio-go/v7 github.com/minio/minio-go/v7/pkg/credentials @@ -655,7 +655,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/thanos v0.27.0-rc.0.0.20220712060227-ca4fe82d74f6 +# github.com/thanos-io/thanos v0.27.0-rc.0.0.20220715025542-84880ea6d7f8 ## explicit github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader