Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove -compactor.compaction-strategy and default compactor. #820

Merged
merged 2 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,15 +324,9 @@ compactor:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default,
# split-and-merge.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]

# The sorting to use when deciding which compaction jobs should run first for
# a given tenant. Changing this setting is not supported by the default
# compaction strategy. Supported values are:
# smallest-range-oldest-blocks-first, newest-blocks-first.
# a given tenant. Supported values are: smallest-range-oldest-blocks-first,
# newest-blocks-first.
# CLI flag: -compactor.compaction-jobs-order
[compaction_jobs_order: <string> | default = "smallest-range-oldest-blocks-first"]
```
8 changes: 1 addition & 7 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3683,14 +3683,8 @@ sharding_ring:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default,
# split-and-merge.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]

# The sorting to use when deciding which compaction jobs should run first for a
# given tenant. Changing this setting is not supported by the default compaction
# strategy. Supported values are: smallest-range-oldest-blocks-first,
# given tenant. Supported values are: smallest-range-oldest-blocks-first,
# newest-blocks-first.
# CLI flag: -compactor.compaction-jobs-order
[compaction_jobs_order: <string> | default = "smallest-range-oldest-blocks-first"]
Expand Down
69 changes: 4 additions & 65 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,15 @@ const (
// PartialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned.
// Keep it long as it is based on block creation time not upload start time.
PartialUploadThresholdAge = 2 * 24 * time.Hour

CompactionStrategyDefault = "default"
CompactionStrategySplitMerge = "split-and-merge"
)

var (
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
errInvalidCompactionOrder = fmt.Errorf("unsupported compaction order (supported values: %s)", strings.Join(CompactionOrders, ", "))
errUnsupportedCompactionOrder = "the %s compaction strategy does not support %s order"
errInvalidMaxOpeningBlocksConcurrency = fmt.Errorf("invalid max-opening-blocks-concurrency value, must be positive")
errInvalidMaxClosingBlocksConcurrency = fmt.Errorf("invalid max-closing-blocks-concurrency value, must be positive")
errInvalidSymbolFlushersConcurrency = fmt.Errorf("invalid symbols-flushers-concurrency value, must be positive")
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

compactionStrategies = []string{CompactionStrategyDefault, CompactionStrategySplitMerge}
)

// BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
Expand Down Expand Up @@ -112,8 +106,6 @@ type Config struct {
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingRing RingConfig `yaml:"sharding_ring"`

// Compaction strategy.
CompactionStrategy string `yaml:"compaction_strategy"`
CompactionJobsOrder string `yaml:"compaction_jobs_order"`

// No need to add options to customize the retry backoff,
Expand Down Expand Up @@ -147,8 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.")
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-strategy", CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(compactionStrategies, ", ")))
f.StringVar(&cfg.CompactionJobsOrder, "compactor.compaction-jobs-order", CompactionOrderOldestFirst, fmt.Sprintf("The sorting to use when deciding which compaction jobs should run first for a given tenant. Changing this setting is not supported by the default compaction strategy. Supported values are: %s.", strings.Join(CompactionOrders, ", ")))
f.StringVar(&cfg.CompactionJobsOrder, "compactor.compaction-jobs-order", CompactionOrderOldestFirst, fmt.Sprintf("The sorting to use when deciding which compaction jobs should run first for a given tenant. Supported values are: %s.", strings.Join(CompactionOrders, ", ")))
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
Expand Down Expand Up @@ -180,18 +171,10 @@ func (cfg *Config) Validate() error {
return errInvalidSymbolFlushersConcurrency
}

if !util.StringsContain(compactionStrategies, cfg.CompactionStrategy) {
return fmt.Errorf("unsupported compaction strategy (supported values: %s)", strings.Join(compactionStrategies, ", "))
}

if !util.StringsContain(CompactionOrders, cfg.CompactionJobsOrder) {
return errInvalidCompactionOrder
}

if cfg.CompactionStrategy == CompactionStrategyDefault && cfg.CompactionJobsOrder != CompactionOrderOldestFirst {
return fmt.Errorf(errUnsupportedCompactionOrder, cfg.CompactionStrategy, cfg.CompactionJobsOrder)
}

return nil
}

Expand Down Expand Up @@ -281,10 +264,8 @@ func NewMultitenantCompactor(compactorCfg Config, storageCfg mimir_tsdb.BlocksSt
// Configure the compactor and grouper factories.
if compactorCfg.BlocksGrouperFactory != nil && compactorCfg.BlocksCompactorFactory != nil {
// Nothing to do because it was already set by a downstream project.
} else if compactorCfg.CompactionStrategy == CompactionStrategySplitMerge {
configureSplitAndMergeCompactor(&compactorCfg)
} else {
configureDefaultCompactor(&compactorCfg)
configureSplitAndMergeCompactor(&compactorCfg)
}

blocksGrouperFactory := compactorCfg.BlocksGrouperFactory
Expand Down Expand Up @@ -466,16 +447,10 @@ func (c *MultitenantCompactor) starting(ctx context.Context) error {
}

allowedTenants := util.NewAllowedTenants(c.compactorCfg.EnabledTenants, c.compactorCfg.DisabledTenants)
switch {
case !c.compactorCfg.ShardingEnabled:
if !c.compactorCfg.ShardingEnabled {
c.shardingStrategy = newNoShardingStrategy(allowedTenants)
case c.compactorCfg.CompactionStrategy == CompactionStrategyDefault:
c.shardingStrategy = newDefaultShardingStrategy(allowedTenants, c.ring, c.ringLifecycler)
case c.compactorCfg.CompactionStrategy == CompactionStrategySplitMerge:
} else {
c.shardingStrategy = newSplitAndMergeShardingStrategy(allowedTenants, c.ring, c.ringLifecycler, c.cfgProvider)
default:
// Should not happen, but just in case.
return errors.Errorf("invalid configuration: sharding is enabled, but using unknown compaction strategy: %s", c.compactorCfg.CompactionStrategy)
}

// Create the blocks cleaner (service).
Expand Down Expand Up @@ -803,42 +778,6 @@ func (n *noShardingStrategy) ownJob(job *Job) (bool, error) {
return n.ownUser(job.UserID()), nil
}

// defaultShardingStrategy is used with default compaction strategy. Only one compactor
// can own the user. There is no shuffle sharding.
type defaultShardingStrategy struct {
allowedTenants *util.AllowedTenants
ring *ring.Ring
ringLifecycler *ring.Lifecycler
}

func newDefaultShardingStrategy(allowedTenants *util.AllowedTenants, ring *ring.Ring, ringLifecycler *ring.Lifecycler) *defaultShardingStrategy {
return &defaultShardingStrategy{
allowedTenants: allowedTenants,
ring: ring,
ringLifecycler: ringLifecycler,
}
}

func (d *defaultShardingStrategy) ownUser(userID string) (bool, error) {
if !d.allowedTenants.IsAllowed(userID) {
return false, nil
}

return instanceOwnsTokenInRing(d.ring, d.ringLifecycler.Addr, userID)
}

func (d *defaultShardingStrategy) blocksCleanerOwnUser(userID string) (bool, error) {
return d.ownUser(userID)
}

func (d *defaultShardingStrategy) compactorOwnUser(userID string) (bool, error) {
return d.ownUser(userID)
}

func (d *defaultShardingStrategy) ownJob(job *Job) (bool, error) {
return d.ownUser(job.UserID())
}

// splitAndMergeShardingStrategy is used with split-and-merge compaction strategy.
// All compactors from user's shard own the user for compaction purposes, and plan jobs.
// Each job is only owned and executed by single compactor.
Expand Down
Loading