diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 1146337d0fa..ca24ed889dd 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -324,15 +324,9 @@ compactor: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] - # The compaction strategy to use. Supported values are: default, - # split-and-merge. - # CLI flag: -compactor.compaction-strategy - [compaction_strategy: | default = "default"] - # The sorting to use when deciding which compaction jobs should run first for - # a given tenant. Changing this setting is not supported by the default - # compaction strategy. Supported values are: - # smallest-range-oldest-blocks-first, newest-blocks-first. + # a given tenant. Supported values are: smallest-range-oldest-blocks-first, + # newest-blocks-first. # CLI flag: -compactor.compaction-jobs-order [compaction_jobs_order: | default = "smallest-range-oldest-blocks-first"] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 36eca3bfb9e..ed76fe9f534 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3683,14 +3683,8 @@ sharding_ring: # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] -# The compaction strategy to use. Supported values are: default, -# split-and-merge. -# CLI flag: -compactor.compaction-strategy -[compaction_strategy: | default = "default"] - # The sorting to use when deciding which compaction jobs should run first for a -# given tenant. Changing this setting is not supported by the default compaction -# strategy. Supported values are: smallest-range-oldest-blocks-first, +# given tenant. Supported values are: smallest-range-oldest-blocks-first, # newest-blocks-first. # CLI flag: -compactor.compaction-jobs-order [compaction_jobs_order: | default = "smallest-range-oldest-blocks-first"] diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 0eadda1f3f3..c754e3ac651 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -49,21 +49,15 @@ const ( // PartialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned. // Keep it long as it is based on block creation time not upload start time. PartialUploadThresholdAge = 2 * 24 * time.Hour - - CompactionStrategyDefault = "default" - CompactionStrategySplitMerge = "split-and-merge" ) var ( errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" errInvalidCompactionOrder = fmt.Errorf("unsupported compaction order (supported values: %s)", strings.Join(CompactionOrders, ", ")) - errUnsupportedCompactionOrder = "the %s compaction strategy does not support %s order" errInvalidMaxOpeningBlocksConcurrency = fmt.Errorf("invalid max-opening-blocks-concurrency value, must be positive") errInvalidMaxClosingBlocksConcurrency = fmt.Errorf("invalid max-closing-blocks-concurrency value, must be positive") errInvalidSymbolFlushersConcurrency = fmt.Errorf("invalid symbols-flushers-concurrency value, must be positive") RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) - - compactionStrategies = []string{CompactionStrategyDefault, CompactionStrategySplitMerge} ) // BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks. @@ -112,8 +106,6 @@ type Config struct { ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `yaml:"sharding_ring"` - // Compaction strategy. - CompactionStrategy string `yaml:"compaction_strategy"` CompactionJobsOrder string `yaml:"compaction_jobs_order"` // No need to add options to customize the retry backoff, @@ -147,8 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.") f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") - f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-strategy", CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(compactionStrategies, ", "))) - f.StringVar(&cfg.CompactionJobsOrder, "compactor.compaction-jobs-order", CompactionOrderOldestFirst, fmt.Sprintf("The sorting to use when deciding which compaction jobs should run first for a given tenant. Changing this setting is not supported by the default compaction strategy. Supported values are: %s.", strings.Join(CompactionOrders, ", "))) + f.StringVar(&cfg.CompactionJobsOrder, "compactor.compaction-jobs-order", CompactionOrderOldestFirst, fmt.Sprintf("The sorting to use when deciding which compaction jobs should run first for a given tenant. Supported values are: %s.", strings.Join(CompactionOrders, ", "))) f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+ "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") @@ -180,18 +171,10 @@ func (cfg *Config) Validate() error { return errInvalidSymbolFlushersConcurrency } - if !util.StringsContain(compactionStrategies, cfg.CompactionStrategy) { - return fmt.Errorf("unsupported compaction strategy (supported values: %s)", strings.Join(compactionStrategies, ", ")) - } - if !util.StringsContain(CompactionOrders, cfg.CompactionJobsOrder) { return errInvalidCompactionOrder } - if cfg.CompactionStrategy == CompactionStrategyDefault && cfg.CompactionJobsOrder != CompactionOrderOldestFirst { - return fmt.Errorf(errUnsupportedCompactionOrder, cfg.CompactionStrategy, cfg.CompactionJobsOrder) - } - return nil } @@ -281,10 +264,8 @@ func NewMultitenantCompactor(compactorCfg Config, storageCfg mimir_tsdb.BlocksSt // Configure the compactor and grouper factories. if compactorCfg.BlocksGrouperFactory != nil && compactorCfg.BlocksCompactorFactory != nil { // Nothing to do because it was already set by a downstream project. - } else if compactorCfg.CompactionStrategy == CompactionStrategySplitMerge { - configureSplitAndMergeCompactor(&compactorCfg) } else { - configureDefaultCompactor(&compactorCfg) + configureSplitAndMergeCompactor(&compactorCfg) } blocksGrouperFactory := compactorCfg.BlocksGrouperFactory @@ -466,16 +447,10 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error { } allowedTenants := util.NewAllowedTenants(c.compactorCfg.EnabledTenants, c.compactorCfg.DisabledTenants) - switch { - case !c.compactorCfg.ShardingEnabled: + if !c.compactorCfg.ShardingEnabled { c.shardingStrategy = newNoShardingStrategy(allowedTenants) - case c.compactorCfg.CompactionStrategy == CompactionStrategyDefault: - c.shardingStrategy = newDefaultShardingStrategy(allowedTenants, c.ring, c.ringLifecycler) - case c.compactorCfg.CompactionStrategy == CompactionStrategySplitMerge: + } else { c.shardingStrategy = newSplitAndMergeShardingStrategy(allowedTenants, c.ring, c.ringLifecycler, c.cfgProvider) - default: - // Should not happen, but just in case. - return errors.Errorf("invalid configuration: sharding is enabled, but using unknown compaction strategy: %s", c.compactorCfg.CompactionStrategy) } // Create the blocks cleaner (service). @@ -803,42 +778,6 @@ func (n *noShardingStrategy) ownJob(job *Job) (bool, error) { return n.ownUser(job.UserID()), nil } -// defaultShardingStrategy is used with default compaction strategy. Only one compactor -// can own the user. There is no shuffle sharding. -type defaultShardingStrategy struct { - allowedTenants *util.AllowedTenants - ring *ring.Ring - ringLifecycler *ring.Lifecycler -} - -func newDefaultShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.Lifecycler) *defaultShardingStrategy { - return &defaultShardingStrategy{ - allowedTenants: allowedTenants, - ring: ring, - ringLifecycler: ringLifecycler, - } -} - -func (d *defaultShardingStrategy) ownUser(userID string) (bool, error) { - if !d.allowedTenants.IsAllowed(userID) { - return false, nil - } - - return instanceOwnsTokenInRing(d.ring, d.ringLifecycler.Addr, userID) -} - -func (d *defaultShardingStrategy) blocksCleanerOwnUser(userID string) (bool, error) { - return d.ownUser(userID) -} - -func (d *defaultShardingStrategy) compactorOwnUser(userID string) (bool, error) { - return d.ownUser(userID) -} - -func (d *defaultShardingStrategy) ownJob(job *Job) (bool, error) { - return d.ownUser(job.UserID()) -} - // splitAndMergeShardingStrategy is used with split-and-merge compaction strategy. // All compactors from user's shard own the user for compaction purposes, and plan jobs. // Each job is only owned and executed by single compactor. diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 2261e0552a6..643cdc3a5f2 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -16,6 +16,7 @@ import ( "os" "path" "path/filepath" + "regexp" "strconv" "strings" "testing" @@ -119,13 +120,6 @@ func TestConfig_Validate(t *testing.T) { }, expected: errInvalidCompactionOrder.Error(), }, - "should fail on unsupported compaction jobs order": { - setup: func(cfg *Config) { - cfg.CompactionStrategy = CompactionStrategyDefault - cfg.CompactionJobsOrder = CompactionOrderNewestFirst - }, - expected: errors.Errorf(errUnsupportedCompactionOrder, CompactionStrategyDefault, CompactionOrderNewestFirst).Error(), - }, "should fail on invalid value of max-opening-blocks-concurrency": { setup: func(cfg *Config) { cfg.MaxOpeningBlocksConcurrency = 0 }, expected: errInvalidMaxOpeningBlocksConcurrency.Error(), @@ -444,12 +438,15 @@ func TestMultitenantCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASi userID := "test-user" bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{userID}, nil) - bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) + bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D", userID + "/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) bucketClient.MockExists(path.Join(userID, mimir_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet(userID+"/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) + bucketClient.MockGet(userID+"/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet(userID+"/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) @@ -458,7 +455,7 @@ func TestMultitenantCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASi require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) // Wait until all retry attempts have completed. - test.Poll(t, time.Second, 1.0, func() interface{} { + test.Poll(t, time.Minute, 1.0, func() interface{} { return prom_testutil.ToFloat64(c.compactionRunsFailed) }) @@ -486,19 +483,26 @@ func TestMultitenantCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASi func TestMultitenantCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { t.Parallel() - // Mock the bucket to contain two users, each one with one block. + // Mock the bucket to contain two users, each one with two blocks (to make sure that grouper doesn't skip them). bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockExists(path.Join("user-1", mimir_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockExists(path.Join("user-2", mimir_tsdb.TenantDeletionMarkPath), false, nil) - bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) - bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01FS51A7GQ1RQWV35DBVYQM4KF"}, nil) + bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01FRSF035J26D6CGX7STCSD1KG"}, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FS51A7GQ1RQWV35DBVYQM4KF/meta.json", mockBlockMetaJSON("01FS51A7GQ1RQWV35DBVYQM4KF"), nil) + bucketClient.MockGet("user-1/01FS51A7GQ1RQWV35DBVYQM4KF/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FS51A7GQ1RQWV35DBVYQM4KF/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01FRSF035J26D6CGX7STCSD1KG/meta.json", mockBlockMetaJSON("01FRSF035J26D6CGX7STCSD1KG"), nil) + bucketClient.MockGet("user-2/01FRSF035J26D6CGX7STCSD1KG/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01FRSF035J26D6CGX7STCSD1KG/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) @@ -535,15 +539,17 @@ func TestMultitenantCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing. `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, `level=info component=compactor org_id=user-1 msg="start sync of metas"`, `level=info component=compactor org_id=user-1 msg="start of GC"`, + `level=debug component=compactor org_id=user-1 msg="grouper found a compactable blocks group" groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 job="stage: merge, range start: 1574776800000, range end: 1574784000000, shard: , blocks: 01DTVP434PA9VFXSW2JKB3392D (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC),01FS51A7GQ1RQWV35DBVYQM4KF (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC)"`, `level=info component=compactor org_id=user-1 msg="start of compactions"`, - `level=info component=compactor org_id=user-1 groupKey=0@17241709254077376921 msg="compaction job finished" success=true`, + `level=info component=compactor org_id=user-1 groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 msg="compaction job finished" success=true`, `level=info component=compactor org_id=user-1 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-1`, `level=info component=compactor msg="starting compaction of user blocks" user=user-2`, `level=info component=compactor org_id=user-2 msg="start sync of metas"`, `level=info component=compactor org_id=user-2 msg="start of GC"`, + `level=debug component=compactor org_id=user-2 msg="grouper found a compactable blocks group" groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 job="stage: merge, range start: 1574776800000, range end: 1574784000000, shard: , blocks: 01DTW0ZCPDDNV4BV83Q2SV4QAZ (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC),01FRSF035J26D6CGX7STCSD1KG (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC)"`, `level=info component=compactor org_id=user-2 msg="start of compactions"`, - `level=info component=compactor org_id=user-2 groupKey=0@17241709254077376921 msg="compaction job finished" success=true`, + `level=info component=compactor org_id=user-2 groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 msg="compaction job finished" success=true`, `level=info component=compactor org_id=user-2 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-2`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) @@ -616,18 +622,24 @@ func TestMultitenantCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing. func TestMultitenantCompactor_ShouldStopCompactingTenantOnReachingMaxCompactionTime(t *testing.T) { t.Parallel() - // By using two blocks with different labels, we get two compaction jobs. Only one of these jobs will be started, + // By using blocks with different labels, we get two compaction jobs. Only one of these jobs will be started, // and since its planning will take longer than maxCompactionTime, we stop compactions early. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockExists(path.Join("user-1", mimir_tsdb.TenantDeletionMarkPath), false, nil) - bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01FN3VCQV5X342W2ZKMQQXAZRX"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01FN3VCQV5X342W2ZKMQQXAZRX", "user-1/01FS51A7GQ1RQWV35DBVYQM4KF", "user-1/01FRQGQB7RWQ2TS0VWA82QTPXE"}, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSONWithTimeRangeAndLabels("01DTVP434PA9VFXSW2JKB3392D", 1574776800000, 1574784000000, map[string]string{"A": "B"}), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) - bucketClient.MockGet("user-1/01FN3VCQV5X342W2ZKMQQXAZRX/meta.json", mockBlockMetaJSONWithTimeRangeAndLabels("01FN3VCQV5X342W2ZKMQQXAZRX", 1637839280000, 1637842892000, map[string]string{"C": "D"}), nil) + bucketClient.MockGet("user-1/01FS51A7GQ1RQWV35DBVYQM4KF/meta.json", mockBlockMetaJSONWithTimeRangeAndLabels("01FS51A7GQ1RQWV35DBVYQM4KF", 1574776800000, 1574784000000, map[string]string{"A": "B"}), nil) + bucketClient.MockGet("user-1/01FS51A7GQ1RQWV35DBVYQM4KF/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FS51A7GQ1RQWV35DBVYQM4KF/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN3VCQV5X342W2ZKMQQXAZRX/meta.json", mockBlockMetaJSONWithTimeRangeAndLabels("01FN3VCQV5X342W2ZKMQQXAZRX", 1574776800000, 1574784000000, map[string]string{"C": "D"}), nil) bucketClient.MockGet("user-1/01FN3VCQV5X342W2ZKMQQXAZRX/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01FN3VCQV5X342W2ZKMQQXAZRX/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FRQGQB7RWQ2TS0VWA82QTPXE/meta.json", mockBlockMetaJSONWithTimeRangeAndLabels("01FRQGQB7RWQ2TS0VWA82QTPXE", 1574776800000, 1574784000000, map[string]string{"C": "D"}), nil) + bucketClient.MockGet("user-1/01FRQGQB7RWQ2TS0VWA82QTPXE/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FRQGQB7RWQ2TS0VWA82QTPXE/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) @@ -663,9 +675,11 @@ func TestMultitenantCompactor_ShouldStopCompactingTenantOnReachingMaxCompactionT `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, `level=info component=compactor org_id=user-1 msg="start sync of metas"`, `level=info component=compactor org_id=user-1 msg="start of GC"`, + `level=debug component=compactor org_id=user-1 msg="grouper found a compactable blocks group" groupKey=0@414047632870839233-merge--1574776800000-1574784000000 job="stage: merge, range start: 1574776800000, range end: 1574784000000, shard: , blocks: 01DTVP434PA9VFXSW2JKB3392D (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC),01FS51A7GQ1RQWV35DBVYQM4KF (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC)"`, + `level=debug component=compactor org_id=user-1 msg="grouper found a compactable blocks group" groupKey=0@12695595599644216241-merge--1574776800000-1574784000000 job="stage: merge, range start: 1574776800000, range end: 1574784000000, shard: , blocks: 01FN3VCQV5X342W2ZKMQQXAZRX (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC),01FRQGQB7RWQ2TS0VWA82QTPXE (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC)"`, `level=info component=compactor org_id=user-1 msg="start of compactions"`, `level=info component=compactor org_id=user-1 msg="max compaction time reached, no more compactions will be started"`, - `level=info component=compactor org_id=user-1 groupKey=0@12695595599644216241 msg="compaction job finished" success=true`, + `level=info component=compactor org_id=user-1 groupKey=0@12695595599644216241-merge--1574776800000-1574784000000 msg="compaction job finished" success=true`, `level=info component=compactor org_id=user-1 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-1`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) @@ -944,16 +958,22 @@ func TestMultitenantCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneIn bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockExists(path.Join("user-1", mimir_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockExists(path.Join("user-2", mimir_tsdb.TenantDeletionMarkPath), false, nil) - bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) - bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01FSTQ95C8FS0ZAGTQS2EF1NEG"}, nil) + bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-2/01FSV54G6QFQH1G9QE93G3B9TB"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FSTQ95C8FS0ZAGTQS2EF1NEG/meta.json", mockBlockMetaJSON("01FSTQ95C8FS0ZAGTQS2EF1NEG"), nil) + bucketClient.MockGet("user-1/01FSTQ95C8FS0ZAGTQS2EF1NEG/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01FSTQ95C8FS0ZAGTQS2EF1NEG/no-compact-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01FSV54G6QFQH1G9QE93G3B9TB/meta.json", mockBlockMetaJSON("01FSV54G6QFQH1G9QE93G3B9TB"), nil) + bucketClient.MockGet("user-2/01FSV54G6QFQH1G9QE93G3B9TB/deletion-mark.json", "", nil) + bucketClient.MockGet("user-2/01FSV54G6QFQH1G9QE93G3B9TB/no-compact-mark.json", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) @@ -998,15 +1018,17 @@ func TestMultitenantCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneIn `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, `level=info component=compactor org_id=user-1 msg="start sync of metas"`, `level=info component=compactor org_id=user-1 msg="start of GC"`, + `level=debug component=compactor org_id=user-1 msg="grouper found a compactable blocks group" groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 job="stage: merge, range start: 1574776800000, range end: 1574784000000, shard: , blocks: 01DTVP434PA9VFXSW2JKB3392D (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC),01FSTQ95C8FS0ZAGTQS2EF1NEG (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC)"`, `level=info component=compactor org_id=user-1 msg="start of compactions"`, - `level=info component=compactor org_id=user-1 groupKey=0@17241709254077376921 msg="compaction job finished" success=true`, + `level=info component=compactor org_id=user-1 groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 msg="compaction job finished" success=true`, `level=info component=compactor org_id=user-1 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-1`, `level=info component=compactor msg="starting compaction of user blocks" user=user-2`, `level=info component=compactor org_id=user-2 msg="start sync of metas"`, `level=info component=compactor org_id=user-2 msg="start of GC"`, + `level=debug component=compactor org_id=user-2 msg="grouper found a compactable blocks group" groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 job="stage: merge, range start: 1574776800000, range end: 1574784000000, shard: , blocks: 01DTW0ZCPDDNV4BV83Q2SV4QAZ (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC),01FSV54G6QFQH1G9QE93G3B9TB (min time: 2019-11-26 14:00:00 +0000 UTC, max time: 2019-11-26 16:00:00 +0000 UTC)"`, `level=info component=compactor org_id=user-2 msg="start of compactions"`, - `level=info component=compactor org_id=user-2 groupKey=0@17241709254077376921 msg="compaction job finished" success=true`, + `level=info component=compactor org_id=user-2 groupKey=0@17241709254077376921-merge--1574776800000-1574784000000 msg="compaction job finished" success=true`, `level=info component=compactor org_id=user-2 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-2`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) @@ -1158,7 +1180,6 @@ func TestMultitenantCompactor_ShouldSkipCompactionForJobsNoMoreOwnedAfterPlannin cfg := prepareConfig() cfg.CompactionConcurrency = 1 - cfg.CompactionStrategy = CompactionStrategySplitMerge cfg.ShardingEnabled = true cfg.ShardingRing.InstanceID = "compactor-1" cfg.ShardingRing.InstanceAddr = "1.2.3.4" @@ -1482,12 +1503,7 @@ func prepareWithConfigProvider(t *testing.T, compactorCfg Config, bucketClient o return tsdbCompactor, tsdbPlanner, nil } - grouper := defaultBlocksGrouperFactory - if compactorCfg.CompactionStrategy == CompactionStrategySplitMerge { - grouper = splitAndMergeGrouperFactory - } - - c, err := newMultitenantCompactor(compactorCfg, storageCfg, limits, logger, registry, bucketClientFactory, grouper, blocksCompactorFactory) + c, err := newMultitenantCompactor(compactorCfg, storageCfg, limits, logger, registry, bucketClientFactory, splitAndMergeGrouperFactory, blocksCompactorFactory) require.NoError(t, err) return c, tsdbCompactor, tsdbPlanner, logs, registry @@ -1744,12 +1760,11 @@ const ( func TestOwnUser(t *testing.T) { type testCase struct { - compactors int - compactionStrategy string - sharding bool - enabledUsers []string - disabledUsers []string - compactorShards map[string]int + compactors int + sharding bool + enabledUsers []string + disabledUsers []string + compactorShards map[string]int check func(t *testing.T, comps []*MultitenantCompactor) } @@ -1759,25 +1774,9 @@ func TestOwnUser(t *testing.T) { testCases := map[string]testCase{ "5 compactors, no sharding": { - compactors: 5, - compactionStrategy: CompactionStrategyDefault, - sharding: false, - compactorShards: map[string]int{user1: 2}, // Not used when sharding is disabled. - - check: func(t *testing.T, comps []*MultitenantCompactor) { - require.Len(t, owningCompactors(t, comps, user1, ownUserReasonCompactor), 5) - require.Len(t, owningCompactors(t, comps, user1, ownUserReasonBlocksCleaner), 5) - - require.Len(t, owningCompactors(t, comps, user2, ownUserReasonCompactor), 5) - require.Len(t, owningCompactors(t, comps, user2, ownUserReasonBlocksCleaner), 5) - }, - }, - - "5 compactors, no sharding, split-and-merge": { - compactors: 5, - compactionStrategy: CompactionStrategySplitMerge, - sharding: false, - compactorShards: map[string]int{user1: 2}, // Not used when sharding is disabled. + compactors: 5, + sharding: false, + compactorShards: map[string]int{user1: 2}, // Not used when sharding is disabled. check: func(t *testing.T, comps []*MultitenantCompactor) { require.Len(t, owningCompactors(t, comps, user1, ownUserReasonCompactor), 5) @@ -1788,26 +1787,10 @@ func TestOwnUser(t *testing.T) { }, }, - "5 compactors, sharding enabled, default strategy": { - compactors: 5, - compactionStrategy: CompactionStrategyDefault, - sharding: true, - compactorShards: map[string]int{user1: 2}, // Not used for CompactionStrategyDefault. - - check: func(t *testing.T, comps []*MultitenantCompactor) { - require.Len(t, owningCompactors(t, comps, user1, ownUserReasonCompactor), 1) - require.Len(t, owningCompactors(t, comps, user1, ownUserReasonBlocksCleaner), 1) - - require.Len(t, owningCompactors(t, comps, user2, ownUserReasonCompactor), 1) - require.Len(t, owningCompactors(t, comps, user2, ownUserReasonBlocksCleaner), 1) - }, - }, - - "5 compactors, sharding enabled, split-merge strategy, no compactor shard size": { - compactors: 5, - compactionStrategy: CompactionStrategySplitMerge, - sharding: true, - compactorShards: nil, // no limits + "5 compactors, sharding enabled, no compactor shard size": { + compactors: 5, + sharding: true, + compactorShards: nil, // no limits check: func(t *testing.T, comps []*MultitenantCompactor) { require.Len(t, owningCompactors(t, comps, user1, ownUserReasonCompactor), 5) @@ -1818,11 +1801,10 @@ func TestOwnUser(t *testing.T) { }, }, - "10 compactors, sharding enabled, split-merge strategy, with non-zero shard sizes": { - compactors: 10, - compactionStrategy: CompactionStrategySplitMerge, - sharding: true, - compactorShards: map[string]int{user1: 2, user2: 3}, + "10 compactors, sharding enabled, with non-zero shard sizes": { + compactors: 10, + sharding: true, + compactorShards: map[string]int{user1: 2, user2: 3}, check: func(t *testing.T, comps []*MultitenantCompactor) { require.Len(t, owningCompactors(t, comps, user1, ownUserReasonCompactor), 2) @@ -1837,11 +1819,10 @@ func TestOwnUser(t *testing.T) { }, }, - "10 compactors, sharding enabled, split-merge strategy, with zero shard size": { - compactors: 10, - compactionStrategy: CompactionStrategySplitMerge, - sharding: true, - compactorShards: map[string]int{user2: 0}, + "10 compactors, sharding enabled, with zero shard size": { + compactors: 10, + sharding: true, + compactorShards: map[string]int{user2: 0}, check: func(t *testing.T, comps []*MultitenantCompactor) { require.Len(t, owningCompactors(t, comps, user2, ownUserReasonCompactor), 10) @@ -1865,7 +1846,6 @@ func TestOwnUser(t *testing.T) { cfg := prepareConfig() cfg.CompactionInterval = 10 * time.Minute // We will only call compaction manually. - cfg.CompactionStrategy = tc.compactionStrategy cfg.EnabledTenants = tc.enabledUsers cfg.DisabledTenants = tc.disabledUsers @@ -1945,6 +1925,8 @@ func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { Chunks: []chunks.Meta{ tsdbutil.ChunkFromSamples([]tsdbutil.Sample{newSample(20, 20), newSample(21, 21)}), tsdbutil.ChunkFromSamples([]tsdbutil.Sample{newSample(10, 10), newSample(11, 11)}), + // Extend block to cover 2h. + tsdbutil.ChunkFromSamples([]tsdbutil.Sample{newSample(0, 0), newSample(2*time.Hour.Milliseconds()-1, 0)}), }, }, } @@ -1952,7 +1934,10 @@ func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { const user = "user" storageDir := t.TempDir() - meta, err := testutil.GenerateBlockFromSpec(user, filepath.Join(storageDir, user), specs) + // We need two blocks to start compaction. + meta1, err := testutil.GenerateBlockFromSpec(user, filepath.Join(storageDir, user), specs) + require.NoError(t, err) + meta2, err := testutil.GenerateBlockFromSpec(user, filepath.Join(storageDir, user), specs) require.NoError(t, err) bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) @@ -1961,7 +1946,7 @@ func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { cfg := prepareConfig() c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bkt) - tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{meta}, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{meta1, meta2}, nil) // Start the compactor require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -1975,11 +1960,16 @@ func TestMultitenantCompactor_OutOfOrderCompaction(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) // Verify that compactor has found block with out of order chunks, and this block is now marked for no-compaction. - require.Contains(t, logs.String(), "level=info component=compactor org_id=user msg=\"block has been marked for no compaction\" block="+meta.ULID.String()) + r := regexp.MustCompile("level=info component=compactor org_id=user msg=\"block has been marked for no compaction\" block=([0-9A-Z]+)") + matches := r.FindStringSubmatch(logs.String()) + require.Len(t, matches, 2) // Entire string match + single group match. + + skippedBlock := matches[1] + require.True(t, skippedBlock == meta1.ULID.String() || skippedBlock == meta2.ULID.String()) m := &metadata.NoCompactMark{} - require.NoError(t, metadata.ReadMarker(context.Background(), log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, meta.ULID.String()), m)) - require.Equal(t, meta.ULID, m.ID) + require.NoError(t, metadata.ReadMarker(context.Background(), log.NewNopLogger(), objstore.WithNoopInstr(bkt), path.Join(user, skippedBlock), m)) + require.Equal(t, skippedBlock, m.ID.String()) require.NotZero(t, m.NoCompactTime) require.Equal(t, metadata.NoCompactReason(metadata.OutOfOrderChunksNoCompactReason), m.Reason) diff --git a/pkg/compactor/default_compactor.go b/pkg/compactor/default_compactor.go deleted file mode 100644 index 17a4a3fefb6..00000000000 --- a/pkg/compactor/default_compactor.go +++ /dev/null @@ -1,32 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package compactor - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/compact/downsample" -) - -func defaultBlocksGrouperFactory(ctx context.Context, cfg Config, cfgProvider ConfigProvider, userID string, logger log.Logger, reg prometheus.Registerer) Grouper { - return NewDefaultGrouper(userID, metadata.NoneFunc) -} - -func defaultBlocksCompactorFactory(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (Compactor, Planner, error) { - compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) - if err != nil { - return nil, nil, err - } - - planner := NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) - return compactor, planner, nil -} - -func configureDefaultCompactor(cfg *Config) { - cfg.BlocksGrouperFactory = defaultBlocksGrouperFactory - cfg.BlocksCompactorFactory = defaultBlocksCompactorFactory -} diff --git a/pkg/compactor/split_merge_compactor_test.go b/pkg/compactor/split_merge_compactor_test.go index e06d368fba9..9bffd5665a1 100644 --- a/pkg/compactor/split_merge_compactor_test.go +++ b/pkg/compactor/split_merge_compactor_test.go @@ -4,7 +4,6 @@ package compactor import ( "context" - "io/ioutil" "os" "strings" "testing" @@ -566,7 +565,6 @@ func TestMultitenantCompactor_ShouldSupportSplitAndMergeCompactor(t *testing.T) compactorCfg := prepareConfig() compactorCfg.DataDir = workDir compactorCfg.BlockRanges = compactionRanges - compactorCfg.CompactionStrategy = CompactionStrategySplitMerge cfgProvider := newMockConfigProvider() cfgProvider.splitAndMergeShards[userID] = testData.numShards @@ -625,171 +623,6 @@ func TestMultitenantCompactor_ShouldSupportSplitAndMergeCompactor(t *testing.T) } } -func TestMultitenantCompactor_ShouldSupportRollbackFromSplitAndMergeToDefaultCompactor(t *testing.T) { - const ( - userID = "user-1" - numSeries = 100 - blockRange = 2 * time.Hour - numShards = 2 - ) - - var ( - ctx = context.Background() - logger = log.NewLogfmtLogger(os.Stdout) - blockRangeMillis = blockRange.Milliseconds() - compactionRanges = mimir_tsdb.DurationList{blockRange, 2 * blockRange} - ) - - // Create a temporary directory for local storage. - storageDir, err := ioutil.TempDir(os.TempDir(), "storage") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(storageDir)) - }) - - storageCfg := mimir_tsdb.BlocksStorageConfig{} - flagext.DefaultValues(&storageCfg) - storageCfg.Bucket.Backend = bucket.Filesystem - storageCfg.Bucket.Filesystem.Directory = storageDir - - bkt, err := bucket.NewClient(ctx, storageCfg.Bucket, "test", logger, nil) - require.NoError(t, err) - - // Create some blocks to compact in the storage. - block1 := createTSDBBlock(t, bkt, userID, 0*blockRangeMillis, 1*blockRangeMillis, numSeries, nil) - block2 := createTSDBBlock(t, bkt, userID, 1*blockRangeMillis, 2*blockRangeMillis, numSeries, nil) - - expected := []metadata.Meta{ - { - BlockMeta: tsdb.BlockMeta{ - MinTime: 0, - MaxTime: 2 * blockRangeMillis, - Compaction: tsdb.BlockMetaCompaction{ - Sources: []ulid.ULID{block1, block2}, - }, - }, - Thanos: metadata.Thanos{ - Labels: map[string]string{mimir_tsdb.CompactorShardIDExternalLabel: "1_of_2"}, - }, - }, { - BlockMeta: tsdb.BlockMeta{ - MinTime: 0, - MaxTime: 2 * blockRangeMillis, - Compaction: tsdb.BlockMetaCompaction{ - Sources: []ulid.ULID{block1, block2}, - }, - }, - Thanos: metadata.Thanos{ - Labels: map[string]string{mimir_tsdb.CompactorShardIDExternalLabel: "2_of_2"}, - }, - }, - } - - workDir := t.TempDir() - fetcherDir := t.TempDir() - - compactorCfg := prepareConfig() - compactorCfg.DataDir = workDir - compactorCfg.BlockRanges = compactionRanges - - cfgProvider := newMockConfigProvider() - cfgProvider.splitAndMergeShards[userID] = numShards - - t.Run("run split-and-merge compaction strategy", func(t *testing.T) { - compactorCfg.CompactionStrategy = CompactionStrategySplitMerge - - reg := prometheus.NewPedanticRegistry() - c, err := NewMultitenantCompactor(compactorCfg, storageCfg, cfgProvider, logger, reg) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) - }) - - // Wait until the first compaction run completed. - test.Poll(t, 15*time.Second, nil, func() interface{} { - return testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_compactor_runs_completed_total Total number of compaction runs successfully completed. - # TYPE cortex_compactor_runs_completed_total counter - cortex_compactor_runs_completed_total 1 - `), "cortex_compactor_runs_completed_total") - }) - - // List any (non deleted) block from the storage. - userBucket := bucket.NewUserBucketClient(userID, bkt, nil) - fetcher, err := block.NewMetaFetcher(logger, - 1, - userBucket, - fetcherDir, - reg, - []block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)}, - nil) - require.NoError(t, err) - metas, partials, err := fetcher.Fetch(ctx) - require.NoError(t, err) - require.Empty(t, partials) - - // Sort blocks by MinTime and labels so that we get a stable comparison. - actual := sortMetasByMinTime(convertMetasMapToSlice(metas)) - - // Compare actual blocks with the expected ones. - require.Len(t, actual, len(expected)) - for i, e := range expected { - assert.Equal(t, e.MinTime, actual[i].MinTime) - assert.Equal(t, e.MaxTime, actual[i].MaxTime) - assert.Equal(t, e.Compaction.Sources, actual[i].Compaction.Sources) - assert.Equal(t, e.Thanos.Labels, actual[i].Thanos.Labels) - } - }) - - t.Run("rollback to default compaction strategy", func(t *testing.T) { - compactorCfg.CompactionStrategy = CompactionStrategyDefault - - reg := prometheus.NewPedanticRegistry() - c, err := NewMultitenantCompactor(compactorCfg, storageCfg, cfgProvider, logger, reg) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) - }) - - // Wait until the first compaction run completed. - test.Poll(t, 15*time.Second, nil, func() interface{} { - return testutil.GatherAndCompare(reg, strings.NewReader(` - # HELP cortex_compactor_runs_completed_total Total number of compaction runs successfully completed. - # TYPE cortex_compactor_runs_completed_total counter - cortex_compactor_runs_completed_total 1 - `), "cortex_compactor_runs_completed_total") - }) - - // List any (non deleted) block from the storage. - userBucket := bucket.NewUserBucketClient(userID, bkt, nil) - fetcher, err := block.NewMetaFetcher(logger, - 1, - userBucket, - fetcherDir, - reg, - []block.MetadataFilter{NewExcludeMarkedForDeletionFilter(userBucket)}, - nil) - require.NoError(t, err) - metas, partials, err := fetcher.Fetch(ctx) - require.NoError(t, err) - require.Empty(t, partials) - - // Sort blocks by MinTime and labels so that we get a stable comparison. - actual := sortMetasByMinTime(convertMetasMapToSlice(metas)) - - // Compare actual blocks with the expected ones. - require.Len(t, actual, len(expected)) - for i, e := range expected { - assert.Equal(t, e.MinTime, actual[i].MinTime) - assert.Equal(t, e.MaxTime, actual[i].MaxTime) - assert.Equal(t, e.Compaction.Sources, actual[i].Compaction.Sources) - assert.Equal(t, e.Thanos.Labels, actual[i].Thanos.Labels) - } - }) -} - func convertMetasMapToSlice(metas map[ulid.ULID]*metadata.Meta) []*metadata.Meta { var out []*metadata.Meta for _, m := range metas { diff --git a/pkg/compactor/split_merge_grouper.go b/pkg/compactor/split_merge_grouper.go index 8ee8fe72a30..61a98c2ccb5 100644 --- a/pkg/compactor/split_merge_grouper.go +++ b/pkg/compactor/split_merge_grouper.go @@ -161,6 +161,12 @@ func planCompaction(userID string, blocks []*metadata.Meta, ranges []int64, shar jobs = append(jobs[:idx], jobs[idx+1:]...) } + // Jobs will be sorted later using configured job sorting algorithm. + // Here we sort them by sharding key, to keep the output stable for testing. + sort.SliceStable(jobs, func(i, j int) bool { + return jobs[i].shardingKey() < jobs[j].shardingKey() + }) + return jobs } diff --git a/pkg/compactor/split_merge_job.go b/pkg/compactor/split_merge_job.go index 14880613af9..abb68b3e5bb 100644 --- a/pkg/compactor/split_merge_job.go +++ b/pkg/compactor/split_merge_job.go @@ -4,6 +4,7 @@ package compactor import ( "fmt" + "sort" "strings" "time" @@ -84,6 +85,9 @@ func (j *job) String() string { blocks = append(blocks, fmt.Sprintf("%s (min time: %s, max time: %s)", block.ULID.String(), minT.String(), maxT.String())) } + // Keep the output stable for tests. + sort.Strings(blocks) + return fmt.Sprintf("stage: %s, range start: %d, range end: %d, shard: %s, blocks: %s", j.stage, j.rangeStart, j.rangeEnd, j.shardID, strings.Join(blocks, ",")) }