Skip to content

Commit

Permalink
Loki: Implement timeouts migration (grafana#7555)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Improve the experience of migrating to our new timeouts behavior, which
is the unifying of the engine:timeout and querier:query_timeout into a
new configuration named limits_config:query_timeout.
This PR makes sure users won't be surprised by their timeouts being
shorter, but at the same time, it makes it easier to migrate to the new
configuration.

Co-authored-by: Owen Diehl <[email protected]>
  • Loading branch information
2 people authored and changhyuni committed Nov 8, 2022
1 parent a7737b1 commit 4e790a2
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 17 deletions.
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.E
logger: ng.logger,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)),
timeout: ng.opts.Timeout,
parse: func(_ context.Context, _ string) (syntax.Expr, error) {
return mapped, nil
},
Expand Down
24 changes: 9 additions & 15 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
"github.com/grafana/loki/pkg/util/validation"
)

const (
DefaultEngineTimeout = 5 * time.Minute
)

var (
QueryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "logql",
Expand Down Expand Up @@ -110,7 +114,7 @@ type EngineOpts struct {

func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// TODO: remove this configuration after next release.
f.DurationVar(&opts.Timeout, prefix+".engine.timeout", 5*time.Minute, "Timeout for query execution. Instead, rely only on querier.query-timeout. (deprecated)")
f.DurationVar(&opts.Timeout, prefix+".engine.timeout", DefaultEngineTimeout, "Timeout for query execution. Instead, rely only on querier.query-timeout. (deprecated)")
f.DurationVar(&opts.MaxLookBackPeriod, prefix+".engine.max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.")
}

Expand Down Expand Up @@ -152,9 +156,8 @@ func (ng *Engine) Query(params Params) Query {
parse: func(_ context.Context, query string) (syntax.Expr, error) {
return syntax.ParseExpr(query)
},
record: true,
limits: ng.limits,
timeout: ng.Timeout,
record: true,
limits: ng.limits,
}
}

Expand All @@ -169,7 +172,6 @@ type query struct {
params Params
parse func(context.Context, string) (syntax.Expr, error)
limits Limits
timeout time.Duration
evaluator Evaluator
record bool
}
Expand Down Expand Up @@ -238,16 +240,8 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}

func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
queryTimeout := q.timeout
if q.timeout == 0 {
queryTimeout = time.Minute * 5
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Warn(q.logger).Log("msg", fmt.Sprintf("couldn't fetch tenantID to evaluate query timeout, using default value of %s", queryTimeout), "err", err)
return nil, err
}
queryTimeout = q.limits.QueryTimeout(userID) + time.Second
}
userID, _ := tenant.TenantID(ctx)
queryTimeout := q.limits.QueryTimeout(userID)

ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
Expand Down
59 changes: 59 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/loki/common"
"github.com/grafana/loki/pkg/lokifrontend"
"github.com/grafana/loki/pkg/querier"
Expand Down Expand Up @@ -221,6 +222,64 @@ func (c *Config) Validate() error {
return err
}

if err := AdjustForTimeoutsMigration(c); err != nil {
return err
}

return nil
}

// AdjustForTimeoutsMigration will adjust Loki timeouts configuration to be in accordance with the next major release.
//
// We're preparing to unify the querier:engine:timeout and querier:query_timeout into a single timeout named limits_config:query_timeout.
// The migration encompasses of:
// - If limits_config:query_timeout is explicitly configured, use it everywhere as it is a new configuration and by
// configuring it, users are expressing that they're willing of using it.
// - If none are explicitly configured, use the default engine:timeout everywhere as it is longer than the default limits_config:query_timeout
// and otherwise users would start to experience shorter timeouts without expecting it.
// - If only the querier:engine:timeout was explicitly configured, warn the user and use it everywhere.
func AdjustForTimeoutsMigration(c *Config) error {
engineTimeoutIsDefault := c.Querier.Engine.Timeout == logql.DefaultEngineTimeout
perTenantTimeoutIsDefault := c.LimitsConfig.QueryTimeout.String() == validation.DefaultPerTenantQueryTimeout
if engineTimeoutIsDefault && perTenantTimeoutIsDefault {
if err := c.LimitsConfig.QueryTimeout.Set(c.Querier.Engine.Timeout.String()); err != nil {
return fmt.Errorf("couldn't set per-tenant query_timeout as the engine timeout value: %w", err)
}
level.Warn(util_log.Logger).Log("msg",
fmt.Sprintf(
"per-tenant timeout not configured, using default engine timeout (%q). This behavior will change in the next major to always use the default per-tenant timeout (%q).",
c.Querier.Engine.Timeout.String(),
c.LimitsConfig.QueryTimeout.String(),
),
)
return nil
}

if !perTenantTimeoutIsDefault && !engineTimeoutIsDefault {
level.Warn(util_log.Logger).Log("msg",
fmt.Sprintf(
"using configured per-tenant timeout (%q) as the default (can be overridden per-tenant in the limits_config). Configured engine timeout (%q) is deprecated and will be ignored.",
c.LimitsConfig.QueryTimeout.String(),
c.Querier.Engine.Timeout.String(),
),
)
return nil
}

if perTenantTimeoutIsDefault && !engineTimeoutIsDefault {
if err := c.LimitsConfig.QueryTimeout.Set(c.Querier.Engine.Timeout.String()); err != nil {
return fmt.Errorf("couldn't set per-tenant query_timeout as the engine timeout value: %w", err)
}
level.Warn(util_log.Logger).Log("msg",
fmt.Sprintf(
"using configured engine timeout (%q) as the default (can be overridden per-tenant in the limits_config). Be aware that engine timeout (%q) is deprecated and will be removed in the next major version.",
c.Querier.Engine.Timeout.String(),
c.LimitsConfig.QueryTimeout.String(),
),
)
return nil
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (

defaultPerStreamRateLimit = 3 << 20 // 3MB
defaultPerStreamBurstLimit = 5 * defaultPerStreamRateLimit

DefaultPerTenantQueryTimeout = "1m"
)

// Limits describe all the limits for users; can be used to describe global default
Expand Down Expand Up @@ -195,7 +197,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.MaxQueryLength.Set("721h")
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit to length of chunk store queries, 0 to disable.")
f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series returned by a metric query. When the limit is reached an error is returned.")
_ = l.QueryTimeout.Set("1m")
_ = l.QueryTimeout.Set(DefaultPerTenantQueryTimeout)
f.Var(&l.QueryTimeout, "querier.query-timeout", "Timeout when querying backends (ingesters or storage) during the execution of a query request. If a specific per-tenant timeout is used, this timeout is ignored.")

_ = l.MaxQueryLookback.Set("0s")
Expand Down

0 comments on commit 4e790a2

Please sign in to comment.