diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index bb3ea9f888c..0c0ef8d3b27 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -32,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" @@ -426,11 +427,10 @@ func runQuery( ins := extpromhttp.NewInstrumentationMiddleware(reg) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins) + ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins) api := v1.NewQueryAPI( logger, - reg, stores, engine, queryableCreator, @@ -442,7 +442,10 @@ func runQuery( queryReplicaLabels, flagsMap, instantDefaultMaxSourceResolution, - maxConcurrentQueries, + gate.New( + extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg), + maxConcurrentQueries, + ), ) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index dbd76e067b7..b674898ad63 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -30,7 +30,6 @@ import ( "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" @@ -40,7 +39,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/api" - "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" @@ -64,7 +62,6 @@ const ( type QueryAPI struct { baseAPI *api.BaseAPI logger log.Logger - reg prometheus.Registerer gate gate.Gate queryableCreate query.QueryableCreator queryEngine *promql.Engine @@ -82,7 +79,6 @@ type QueryAPI struct { // NewQueryAPI returns an initialized QueryAPI type. func NewQueryAPI( logger log.Logger, - reg *prometheus.Registry, storeSet *query.StoreSet, qe *promql.Engine, c query.QueryableCreator, @@ -93,15 +89,14 @@ func NewQueryAPI( replicaLabels []string, flagsMap map[string]string, defaultInstantQueryMaxSourceResolution time.Duration, - maxConcurrentQueries int, + gate gate.Gate, ) *QueryAPI { return &QueryAPI{ baseAPI: api.NewBaseAPI(logger, flagsMap), logger: logger, - reg: reg, queryEngine: qe, queryableCreate: c, - gate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg)).NewGate(maxConcurrentQueries), + gate: gate, ruleGroups: ruleGroups, enableAutodownsampling: enableAutodownsampling, diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 6c300e01977..4cd1827eaad 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -116,7 +116,7 @@ func TestEndpoints(t *testing.T) { MaxSamples: 10000, Timeout: timeout, }), - gate: gate.NewKeeper(nil).NewGate(4), + gate: gate.New(nil, 4), } start := time.Unix(0, 0) @@ -1053,7 +1053,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) { for i, test := range tests { api := QueryAPI{ enableAutodownsampling: test.enableAutodownsampling, - gate: gate.NewKeeper(nil).NewGate(4), + gate: gate.New(nil, 4), } v := url.Values{} v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam) diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 1ac20d436b6..d4857ec7958 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -231,8 +231,10 @@ func newMemcachedClient( dnsProvider: dnsProvider, asyncQueue: make(chan func(), config.MaxAsyncBufferSize), stop: make(chan struct{}, 1), - getMultiGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg)). - NewGate(config.MaxGetMultiConcurrency), + getMultiGate: gate.New( + extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), + config.MaxGetMultiConcurrency, + ), } c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ diff --git a/pkg/gate/gate.go b/pkg/gate/gate.go index a87e3af5aa7..116358de510 100644 --- a/pkg/gate/gate.go +++ b/pkg/gate/gate.go @@ -12,67 +12,138 @@ import ( promgate "github.com/prometheus/prometheus/pkg/gate" ) -// Gate is an interface that mimics prometheus/pkg/gate behavior. +var ( + InFlightGaugeOpts = prometheus.GaugeOpts{ + Name: "gate_queries_in_flight", + Help: "Number of queries that are currently in flight.", + } + DurationHistogramOpts = prometheus.HistogramOpts{ + Name: "gate_duration_seconds", + Help: "How many seconds it took for queries to wait at the gate.", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, + } +) + +// Gate controls the maximum number of concurrently running and waiting queries. +// +// Example of use: +// +// g := gate.New(r, 5) +// +// if err := g.Start(ctx); err != nil { +// return +// } +// defer g.Done() +// type Gate interface { + // Start initiates a new request and waits until it's our turn to fulfill a request. Start(ctx context.Context) error + // Done finishes a query. Done() } -// Gate wraps the Prometheus gate with extra metrics. -type gate struct { - g *promgate.Gate - m *metrics -} - -type metrics struct { - inflightQueries prometheus.Gauge - gateTiming prometheus.Histogram -} - // Keeper is used to create multiple gates sharing the same metrics. +// +// Deprecated: when Keeper is used to create several gates, the metric tracking +// the number of in-flight metric isn't meaningful because it is hard to say +// whether requests are being blocked or not. For clients that call +// gate.(*Keeper).NewGate only once, it is recommended to use gate.New() +// instead. Otherwise it is recommended to use the +// github.com/prometheus/prometheus/pkg/gate package directly and wrap the +// returned gate with gate.InstrumentGateDuration(). type Keeper struct { - m *metrics + reg prometheus.Registerer } // NewKeeper creates a new Keeper. +// +// Deprecated: see Keeper func NewKeeper(reg prometheus.Registerer) *Keeper { return &Keeper{ - m: &metrics{ - inflightQueries: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "gate_queries_in_flight", - Help: "Number of queries that are currently in flight.", - }), - gateTiming: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "gate_duration_seconds", - Help: "How many seconds it took for queries to wait at the gate.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - }), - }, + reg: reg, } } -// NewGate returns a new Gate that collects metrics. +// NewGate returns a new Gate ready for use. +// +// Deprecated: see Keeper. func (k *Keeper) NewGate(maxConcurrent int) Gate { - return &gate{g: promgate.New(maxConcurrent), m: k.m} + return New(k.reg, maxConcurrent) +} + +// New returns an instrumented gate limiting the number of requests being +// executed concurrently. +// +// The gate implementation is based on the +// github.com/prometheus/prometheus/pkg/gate package. +// +// It can be called several times but not with the same registerer otherwise it +// will panic when trying to register the same metric multiple times. +func New(reg prometheus.Registerer, maxConcurrent int) Gate { + return InstrumentGateDuration( + promauto.With(reg).NewHistogram(DurationHistogramOpts), + InstrumentGateInFlight( + promauto.With(reg).NewGauge(InFlightGaugeOpts), + promgate.New(maxConcurrent), + ), + ) } -// Start initiates a new request and waits until it's our turn to fulfill a request. -func (g *gate) Start(ctx context.Context) error { +type instrumentedDurationGate struct { + g Gate + duration prometheus.Observer +} + +// InstrumentGateDuration instruments the provided Gate to track how much time +// the request has been waiting in the gate. +func InstrumentGateDuration(duration prometheus.Observer, g Gate) Gate { + return &instrumentedDurationGate{ + g: g, + duration: duration, + } +} + +// Start implements the Gate interface. +func (g *instrumentedDurationGate) Start(ctx context.Context) error { start := time.Now() defer func() { - g.m.gateTiming.Observe(time.Since(start).Seconds()) + g.duration.Observe(time.Since(start).Seconds()) }() + return g.g.Start(ctx) +} + +// Done implements the Gate interface. +func (g *instrumentedDurationGate) Done() { + g.g.Done() +} + +type instrumentedInFlightGate struct { + g Gate + inflight prometheus.Gauge +} + +// InstrumentGateInFlight instruments the provided Gate to track how many +// requests are currently in flight. +func InstrumentGateInFlight(inflight prometheus.Gauge, g Gate) Gate { + return &instrumentedInFlightGate{ + g: g, + inflight: inflight, + } +} + +// Start implements the Gate interface. +func (g *instrumentedInFlightGate) Start(ctx context.Context) error { if err := g.g.Start(ctx); err != nil { return err } - g.m.inflightQueries.Inc() + g.inflight.Inc() return nil } -// Done finishes a query. -func (g *gate) Done() { - g.m.inflightQueries.Dec() +// Done implements the Gate interface. +func (g *instrumentedInFlightGate) Done() { + g.inflight.Dec() g.g.Done() } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 8d8765471b7..6624cd8deeb 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -13,6 +13,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promgate "github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -32,20 +34,21 @@ type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatche // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator { - keeper := gate.NewKeeper(reg) + duration := promauto.With(reg).NewHistogram(gate.DurationHistogramOpts) return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ - logger: logger, - reg: reg, - replicaLabels: replicaLabels, - storeMatchers: storeMatchers, - proxy: proxy, - deduplicate: deduplicate, - maxResolutionMillis: maxResolutionMillis, - partialResponse: partialResponse, - skipChunks: skipChunks, - gateKeeper: keeper, + logger: logger, + replicaLabels: replicaLabels, + storeMatchers: storeMatchers, + proxy: proxy, + deduplicate: deduplicate, + maxResolutionMillis: maxResolutionMillis, + partialResponse: partialResponse, + skipChunks: skipChunks, + gateFn: func() gate.Gate { + return gate.InstrumentGateDuration(duration, promgate.New(maxConcurrentSelects)) + }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, } @@ -54,7 +57,6 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto type queryable struct { logger log.Logger - reg prometheus.Registerer replicaLabels []string storeMatchers [][]storepb.LabelMatcher proxy storepb.StoreServer @@ -62,20 +64,19 @@ type queryable struct { maxResolutionMillis int64 partialResponse bool skipChunks bool - gateKeeper *gate.Keeper + gateFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateFn(), q.selectTimeout), nil } type querier struct { ctx context.Context logger log.Logger - reg prometheus.Registerer cancel func() mint, maxt int64 replicaLabels map[string]struct{} @@ -94,7 +95,6 @@ type querier struct { func newQuerier( ctx context.Context, logger log.Logger, - reg prometheus.Registerer, mint, maxt int64, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, @@ -117,7 +117,6 @@ func newQuerier( return &querier{ ctx: ctx, logger: logger, - reg: reg, cancel: cancel, selectGate: selectGate, selectTimeout: selectTimeout, diff --git a/pkg/ui/query.go b/pkg/ui/query.go index 53f7bd13396..57b25326b8f 100644 --- a/pkg/ui/query.go +++ b/pkg/ui/query.go @@ -12,7 +12,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/route" @@ -31,11 +30,10 @@ type Query struct { cwd string birth time.Time version api.ThanosVersion - reg prometheus.Registerer now func() model.Time } -func NewQueryUI(logger log.Logger, reg prometheus.Registerer, storeSet *query.StoreSet, externalPrefix, prefixHeader string) *Query { +func NewQueryUI(logger log.Logger, storeSet *query.StoreSet, externalPrefix, prefixHeader string) *Query { tmplVariables := map[string]string{ "Component": component.Query.String(), } @@ -49,7 +47,6 @@ func NewQueryUI(logger log.Logger, reg prometheus.Registerer, storeSet *query.St cwd: runtimeInfo().CWD, birth: runtimeInfo().StartTime, version: *api.BuildInfo, - reg: reg, now: model.Now, } }