Skip to content

Commit

Permalink
Remove -compactor.compaction-strategy and default compactor. (#820)
Browse files Browse the repository at this point in the history
* Remove -compactor.compaction-strategy and default compactor.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Jan 21, 2022
1 parent 2164aa9 commit 59da7cc
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 369 deletions.
10 changes: 2 additions & 8 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,15 +324,9 @@ compactor:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default,
# split-and-merge.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]

# The sorting to use when deciding which compaction jobs should run first for
# a given tenant. Changing this setting is not supported by the default
# compaction strategy. Supported values are:
# smallest-range-oldest-blocks-first, newest-blocks-first.
# a given tenant. Supported values are: smallest-range-oldest-blocks-first,
# newest-blocks-first.
# CLI flag: -compactor.compaction-jobs-order
[compaction_jobs_order: <string> | default = "smallest-range-oldest-blocks-first"]
```
8 changes: 1 addition & 7 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3600,14 +3600,8 @@ sharding_ring:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]
# The compaction strategy to use. Supported values are: default,
# split-and-merge.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]
# The sorting to use when deciding which compaction jobs should run first for a
# given tenant. Changing this setting is not supported by the default compaction
# strategy. Supported values are: smallest-range-oldest-blocks-first,
# given tenant. Supported values are: smallest-range-oldest-blocks-first,
# newest-blocks-first.
# CLI flag: -compactor.compaction-jobs-order
[compaction_jobs_order: <string> | default = "smallest-range-oldest-blocks-first"]
Expand Down
69 changes: 4 additions & 65 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,15 @@ const (
// PartialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned.
// Keep it long as it is based on block creation time not upload start time.
PartialUploadThresholdAge = 2 * 24 * time.Hour

CompactionStrategyDefault = "default"
CompactionStrategySplitMerge = "split-and-merge"
)

var (
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
errInvalidCompactionOrder = fmt.Errorf("unsupported compaction order (supported values: %s)", strings.Join(CompactionOrders, ", "))
errUnsupportedCompactionOrder = "the %s compaction strategy does not support %s order"
errInvalidMaxOpeningBlocksConcurrency = fmt.Errorf("invalid max-opening-blocks-concurrency value, must be positive")
errInvalidMaxClosingBlocksConcurrency = fmt.Errorf("invalid max-closing-blocks-concurrency value, must be positive")
errInvalidSymbolFlushersConcurrency = fmt.Errorf("invalid symbols-flushers-concurrency value, must be positive")
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

compactionStrategies = []string{CompactionStrategyDefault, CompactionStrategySplitMerge}
)

// BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
Expand Down Expand Up @@ -112,8 +106,6 @@ type Config struct {
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingRing RingConfig `yaml:"sharding_ring"`

// Compaction strategy.
CompactionStrategy string `yaml:"compaction_strategy"`
CompactionJobsOrder string `yaml:"compaction_jobs_order"`

// No need to add options to customize the retry backoff,
Expand Down Expand Up @@ -147,8 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.")
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-strategy", CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(compactionStrategies, ", ")))
f.StringVar(&cfg.CompactionJobsOrder, "compactor.compaction-jobs-order", CompactionOrderOldestFirst, fmt.Sprintf("The sorting to use when deciding which compaction jobs should run first for a given tenant. Changing this setting is not supported by the default compaction strategy. Supported values are: %s.", strings.Join(CompactionOrders, ", ")))
f.StringVar(&cfg.CompactionJobsOrder, "compactor.compaction-jobs-order", CompactionOrderOldestFirst, fmt.Sprintf("The sorting to use when deciding which compaction jobs should run first for a given tenant. Supported values are: %s.", strings.Join(CompactionOrders, ", ")))
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
Expand Down Expand Up @@ -180,18 +171,10 @@ func (cfg *Config) Validate() error {
return errInvalidSymbolFlushersConcurrency
}

if !util.StringsContain(compactionStrategies, cfg.CompactionStrategy) {
return fmt.Errorf("unsupported compaction strategy (supported values: %s)", strings.Join(compactionStrategies, ", "))
}

if !util.StringsContain(CompactionOrders, cfg.CompactionJobsOrder) {
return errInvalidCompactionOrder
}

if cfg.CompactionStrategy == CompactionStrategyDefault && cfg.CompactionJobsOrder != CompactionOrderOldestFirst {
return fmt.Errorf(errUnsupportedCompactionOrder, cfg.CompactionStrategy, cfg.CompactionJobsOrder)
}

return nil
}

Expand Down Expand Up @@ -281,10 +264,8 @@ func NewMultitenantCompactor(compactorCfg Config, storageCfg mimir_tsdb.BlocksSt
// Configure the compactor and grouper factories.
if compactorCfg.BlocksGrouperFactory != nil && compactorCfg.BlocksCompactorFactory != nil {
// Nothing to do because it was already set by a downstream project.
} else if compactorCfg.CompactionStrategy == CompactionStrategySplitMerge {
configureSplitAndMergeCompactor(&compactorCfg)
} else {
configureDefaultCompactor(&compactorCfg)
configureSplitAndMergeCompactor(&compactorCfg)
}

blocksGrouperFactory := compactorCfg.BlocksGrouperFactory
Expand Down Expand Up @@ -466,16 +447,10 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {
}

allowedTenants := util.NewAllowedTenants(c.compactorCfg.EnabledTenants, c.compactorCfg.DisabledTenants)
switch {
case !c.compactorCfg.ShardingEnabled:
if !c.compactorCfg.ShardingEnabled {
c.shardingStrategy = newNoShardingStrategy(allowedTenants)
case c.compactorCfg.CompactionStrategy == CompactionStrategyDefault:
c.shardingStrategy = newDefaultShardingStrategy(allowedTenants, c.ring, c.ringLifecycler)
case c.compactorCfg.CompactionStrategy == CompactionStrategySplitMerge:
} else {
c.shardingStrategy = newSplitAndMergeShardingStrategy(allowedTenants, c.ring, c.ringLifecycler, c.cfgProvider)
default:
// Should not happen, but just in case.
return errors.Errorf("invalid configuration: sharding is enabled, but using unknown compaction strategy: %s", c.compactorCfg.CompactionStrategy)
}

// Create the blocks cleaner (service).
Expand Down Expand Up @@ -803,42 +778,6 @@ func (n *noShardingStrategy) ownJob(job *Job) (bool, error) {
return n.ownUser(job.UserID()), nil
}

// defaultShardingStrategy is used with default compaction strategy. Only one compactor
// can own the user. There is no shuffle sharding.
type defaultShardingStrategy struct {
allowedTenants *util.AllowedTenants
ring *ring.Ring
ringLifecycler *ring.Lifecycler
}

func newDefaultShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.Lifecycler) *defaultShardingStrategy {
return &defaultShardingStrategy{
allowedTenants: allowedTenants,
ring: ring,
ringLifecycler: ringLifecycler,
}
}

func (d *defaultShardingStrategy) ownUser(userID string) (bool, error) {
if !d.allowedTenants.IsAllowed(userID) {
return false, nil
}

return instanceOwnsTokenInRing(d.ring, d.ringLifecycler.Addr, userID)
}

func (d *defaultShardingStrategy) blocksCleanerOwnUser(userID string) (bool, error) {
return d.ownUser(userID)
}

func (d *defaultShardingStrategy) compactorOwnUser(userID string) (bool, error) {
return d.ownUser(userID)
}

func (d *defaultShardingStrategy) ownJob(job *Job) (bool, error) {
return d.ownUser(job.UserID())
}

// splitAndMergeShardingStrategy is used with split-and-merge compaction strategy.
// All compactors from user's shard own the user for compaction purposes, and plan jobs.
// Each job is only owned and executed by single compactor.
Expand Down
Loading

0 comments on commit 59da7cc

Please sign in to comment.