Skip to content

Commit

Permalink
chore: shard limited queries with a fixed sharding factor (#13576)
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Welch <[email protected]>
(cherry picked from commit 6631c0f)
  • Loading branch information
slim-bean committed Jul 19, 2024
1 parent 9202e81 commit 50c8f82
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func NewMiddleware(
return nil, nil, err
}

limitedTripperware, err := NewLimitedTripperware(cfg, log, limits, schema, metrics, codec, iqo, metricsNamespace)
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, codec, iqo, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -615,14 +615,35 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
}

// NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression.
func NewLimitedTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, metricsNamespace string) (base.Middleware, error) {
func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics),
}

if cfg.ShardedQueries {
queryRangeMiddleware = append(queryRangeMiddleware,
NewQueryShardMiddleware(
log,
schema.Configs,
engineOpts,
metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware
metrics.MiddlewareMapperMetrics.shardMapper,
limits,
// Too many shards on limited queries results in slowing down this type of query
// and overwhelming the frontend, therefore we fix the number of shards to prevent this.
32, // 0 is unlimited shards
statsHandler,
cfg.ShardAggregations,
),
)
}

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware, base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
Expand Down

0 comments on commit 50c8f82

Please sign in to comment.