Skip to content

Commit

Permalink
feat: add flag for distributed queries with overlapping intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
abelsimonn committed Dec 17, 2024
1 parent 84ee778 commit 719254b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 40 deletions.
18 changes: 12 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func registerQuery(app *extkingpin.App) {
Strings()
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()

// currently, we choose the highest MinT of an engine when querying multiple engines. This flag allows to change this behavior to choose the lowest MinT.
queryDistributedWithOverlappingInterval := cmd.Flag("query.distributed-with-overlapping-interval", "Allow for distributed queries using an engines lowest MinT.").Hidden().Default("false").Bool()

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

defaultMetadataTimeRange := cmd.Flag("query.metadata.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.").Default("0s").Duration()
Expand Down Expand Up @@ -371,12 +374,13 @@ func registerQuery(app *extkingpin.App) {
*tenantCertField,
*enforceTenancy,
*tenantLabel,
*queryDistributedWithOverlappingInterval,
)
})
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
// store nodes, merging and duplicating the data to satisfy user query.
// store nodes, merging and deduplicating the data to satisfy user query.
func runQuery(
g *run.Group,
logger log.Logger,
Expand Down Expand Up @@ -453,6 +457,7 @@ func runQuery(
tenantCertField string,
enforceTenancy bool,
tenantLabel string,
queryDistributedWithOverlappingInterval bool,
) error {
comp := component.Query
if alertQueryURL == "" {
Expand Down Expand Up @@ -688,11 +693,12 @@ func runQuery(
level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.")
defaultEngine = string(apiv1.PromqlEngineThanos)
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
PartitionLabels: queryPartitionLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
PartitionLabels: queryPartitionLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
QueryDistributedWithOverlappingInterval: queryDistributedWithOverlappingInterval,
})
}

Expand Down
18 changes: 12 additions & 6 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import (

// Opts are the options for a PromQL query.
type Opts struct {
AutoDownsample bool
ReplicaLabels []string
PartitionLabels []string
Timeout time.Duration
EnablePartialResponse bool
AutoDownsample bool
ReplicaLabels []string
PartitionLabels []string
Timeout time.Duration
EnablePartialResponse bool
QueryDistributedWithOverlappingInterval bool
}

// Client is a query client that executes PromQL queries.
Expand Down Expand Up @@ -114,6 +115,7 @@ func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEn
// a block due to retention before other replicas did the same.
// See https://github.com/thanos-io/promql-engine/issues/187.
func (r *remoteEngine) MinT() int64 {

r.mintOnce.Do(func() {
var (
hashBuf = make([]byte, 0, 128)
Expand All @@ -126,7 +128,11 @@ func (r *remoteEngine) MinT() int64 {
highestMintByLabelSet[key] = lset.MinTime
continue
}
if lset.MinTime > lsetMinT {
// If we are querying with overlapping intervals, we want to find the first available timestamp
// otherwise we want to find the last available timestamp.
if r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime < lsetMinT {
highestMintByLabelSet[key] = lset.MinTime
} else if !r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime > lsetMinT {
highestMintByLabelSet[key] = lset.MinTime
}
}
Expand Down
53 changes: 25 additions & 28 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ type QuerierBuilder struct {
endpoints []string
strictEndpoints []string

engine apiv1.PromqlEngineType
queryMode string
enableXFunctions bool
engine apiv1.PromqlEngineType
queryMode string
queryDistributedWithOverlappingInterval bool
enableXFunctions bool

replicaLabels []string
tracingConfig string
Expand Down Expand Up @@ -376,6 +377,10 @@ func (q *QuerierBuilder) WithQueryMode(mode string) *QuerierBuilder {
q.queryMode = mode
return q
}
func (q *QuerierBuilder) WithDistributedOverlap(overlap bool) *QuerierBuilder {
q.queryDistributedWithOverlappingInterval = overlap
return q
}

func (q *QuerierBuilder) WithEnableXFunctions() *QuerierBuilder {
q.enableXFunctions = true
Expand Down Expand Up @@ -513,6 +518,9 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
if q.queryMode != "" {
args = append(args, "--query.mode="+q.queryMode)
}
if q.queryDistributedWithOverlappingInterval {
args = append(args, "--query.distributed-with-overlapping-interval")
}
if q.engine != "" {
args = append(args, "--query.promql-engine="+string(q.engine))
}
Expand All @@ -538,21 +546,20 @@ type ReceiveBuilder struct {

f e2e.FutureRunnable

maxExemplars int
capnp bool
ingestion bool
expandedPostingsCache bool
limit int
tenantsLimits receive.TenantsWriteLimitsConfig
metaMonitoring string
metaMonitoringQuery string
hashringConfigs []receive.HashringConfig
relabelConfigs []*relabel.Config
replication int
image string
nativeHistograms bool
labels []string
tenantSplitLabel string
maxExemplars int
capnp bool
ingestion bool
limit int
tenantsLimits receive.TenantsWriteLimitsConfig
metaMonitoring string
metaMonitoringQuery string
hashringConfigs []receive.HashringConfig
relabelConfigs []*relabel.Config
replication int
image string
nativeHistograms bool
labels []string
tenantSplitLabel string
}

func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder {
Expand Down Expand Up @@ -583,11 +590,6 @@ func (r *ReceiveBuilder) WithIngestionEnabled() *ReceiveBuilder {
return r
}

func (r *ReceiveBuilder) WithExpandedPostingsCache() *ReceiveBuilder {
r.expandedPostingsCache = true
return r
}

func (r *ReceiveBuilder) WithLabel(name, value string) *ReceiveBuilder {
r.labels = append(r.labels, fmt.Sprintf(`%s="%s"`, name, value))
return r
Expand Down Expand Up @@ -667,11 +669,6 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable {
args["--receive.local-endpoint"] = r.InternalEndpoint("grpc")
}

if r.expandedPostingsCache {
args["--tsdb.head.expanded-postings-cache-size"] = "1000"
args["--tsdb.block.expanded-postings-cache-size"] = "1000"
}

if r.limit != 0 && r.metaMonitoring != "" {
cfg := receive.RootLimitsConfig{
WriteLimits: receive.WriteLimitsConfig{
Expand Down

0 comments on commit 719254b

Please sign in to comment.