From 83d2c14d30d793047c9ce7642910195119d35928 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 8 Feb 2023 14:55:04 +0200 Subject: [PATCH] *: fix problems with pkg/gate (#6110) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * query/querier: reuse same gate We want to use the same gate everywhere. Signed-off-by: Giedrius Statkevičius * gate: fix up metric name Signed-off-by: Giedrius Statkevičius * query/gate: refactor gate Signed-off-by: Giedrius Statkevičius --------- Signed-off-by: Giedrius Statkevičius --- cmd/thanos/query.go | 1 + cmd/thanos/store.go | 2 +- pkg/api/query/v1_test.go | 8 +- pkg/cacheutil/cacheutil_test.go | 2 +- pkg/cacheutil/memcached_client.go | 1 + pkg/cacheutil/redis_client.go | 2 + pkg/gate/gate.go | 120 +++++++++++++++++++++++------- pkg/gate/gate_test.go | 2 +- pkg/query/querier.go | 5 +- pkg/receive/limiter.go | 1 + 10 files changed, 109 insertions(+), 35 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 3f76d73fd3..2c0539ffaa 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -699,6 +699,7 @@ func runQuery( gate.New( extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg), maxConcurrentQueries, + gate.Queries, ), store.NewSeriesStatsAggregator( reg, diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 0a454a4c92..207937f1ce 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -344,7 +344,7 @@ func runStore( return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", conf.maxConcurrency) } - queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), int(conf.maxConcurrency)) + queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), int(conf.maxConcurrency), gate.Queries) chunkPool, err := store.NewDefaultChunkBytesPool(uint64(conf.chunkPoolSize)) if err != nil { diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 55706a1bc9..d8403522e9 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -195,7 +195,7 @@ func TestQueryEndpoints(t *testing.T) { queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout), queryEngine: qe, lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) }, - gate: gate.New(nil, 4), + gate: gate.New(nil, 4, gate.Queries), defaultRangeQueryStep: time.Second, queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", @@ -736,7 +736,7 @@ func TestMetadataEndpoints(t *testing.T) { queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout), queryEngine: qe, lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) }, - gate: gate.New(nil, 4), + gate: gate.New(nil, 4, gate.Queries), queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), @@ -749,7 +749,7 @@ func TestMetadataEndpoints(t *testing.T) { queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout), queryEngine: qe, lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) }, - gate: gate.New(nil, 4), + gate: gate.New(nil, 4, gate.Queries), defaultMetadataTimeRange: apiLookbackDelta, queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", @@ -1461,7 +1461,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) { for i, test := range tests { api := QueryAPI{ enableAutodownsampling: test.enableAutodownsampling, - gate: gate.New(nil, 4), + gate: gate.New(nil, 4, gate.Queries), queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{ Name: "query_range_hist", }), diff --git a/pkg/cacheutil/cacheutil_test.go b/pkg/cacheutil/cacheutil_test.go index d47a997972..f31416ce59 100644 --- a/pkg/cacheutil/cacheutil_test.go +++ b/pkg/cacheutil/cacheutil_test.go @@ -62,7 +62,7 @@ func TestDoWithBatch(t *testing.T) { items: []string{"key1", "key2", "key3", "key4", "key5"}, batchSize: 2, expectedBatches: 3, - concurrency: gate.New(prometheus.NewPedanticRegistry(), 1), + concurrency: gate.New(prometheus.NewPedanticRegistry(), 1, gate.Queries), }, } diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 1478b58863..eceb34699c 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -286,6 +286,7 @@ func newMemcachedClient( getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), config.MaxGetMultiConcurrency, + gate.Gets, ), } diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 7785207301..0d0b8e8cc2 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -233,10 +233,12 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg), config.MaxGetMultiConcurrency, + gate.Gets, ), setMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_redis_setmulti_", reg), config.MaxSetMultiConcurrency, + gate.Sets, ), } duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ diff --git a/pkg/gate/gate.go b/pkg/gate/gate.go index 3df0fc6989..f5858b86ab 100644 --- a/pkg/gate/gate.go +++ b/pkg/gate/gate.go @@ -5,6 +5,7 @@ package gate import ( "context" + "fmt" "time" "github.com/prometheus/client_golang/prometheus" @@ -12,26 +13,6 @@ import ( promgate "github.com/prometheus/prometheus/util/gate" ) -var ( - MaxGaugeOpts = prometheus.GaugeOpts{ - Name: "gate_queries_max", - Help: "Maximum number of concurrent queries.", - } - InFlightGaugeOpts = prometheus.GaugeOpts{ - Name: "gate_queries_in_flight", - Help: "Number of queries that are currently in flight.", - } - TotalCounterOpts = prometheus.CounterOpts{ - Name: "gate_queries_total", - Help: "Total number of queries.", - } - DurationHistogramOpts = prometheus.HistogramOpts{ - Name: "gate_duration_seconds", - Help: "How many seconds it took for queries to wait at the gate.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - } -) - // Gate controls the maximum number of concurrently running and waiting queries. // // Example of use: @@ -75,7 +56,94 @@ func NewKeeper(reg prometheus.Registerer) *Keeper { // // Deprecated: see Keeper. func (k *Keeper) NewGate(maxConcurrent int) Gate { - return New(k.reg, maxConcurrent) + return New(k.reg, maxConcurrent, Queries) +} + +type OperationName string + +const ( + Queries OperationName = "queries" + Selects OperationName = "selects" + Gets OperationName = "gets" + Sets OperationName = "sets" + WriteRequests OperationName = "write_requests" +) + +type GateFactory interface { + New() Gate +} + +type gateProducer struct { + reg prometheus.Registerer + opName OperationName + maxConcurrent int + + durationHist prometheus.Histogram + total prometheus.Counter + inflight prometheus.Gauge +} + +func (g *gateProducer) New() Gate { + var gate Gate + if g.maxConcurrent <= 0 { + gate = NewNoop() + } else { + gate = promgate.New(g.maxConcurrent) + } + + return InstrumentGateDuration( + g.durationHist, + InstrumentGateTotal( + g.total, + InstrumentGateInFlight( + g.inflight, + gate, + ), + ), + ) +} + +var ( + maxGaugeOpts = func(opName OperationName) prometheus.GaugeOpts { + return prometheus.GaugeOpts{ + Name: fmt.Sprintf("gate_%s_max", opName), + Help: fmt.Sprintf("Maximum number of concurrent %s.", opName), + } + } + inFlightGaugeOpts = func(opName OperationName) prometheus.GaugeOpts { + return prometheus.GaugeOpts{ + Name: fmt.Sprintf("gate_%s_in_flight", opName), + Help: fmt.Sprintf("Number of %s that are currently in flight.", opName), + } + } + totalCounterOpts = func(opName OperationName) prometheus.CounterOpts { + return prometheus.CounterOpts{ + Name: fmt.Sprintf("gate_%s_total", opName), + Help: fmt.Sprintf("Total number of %s.", opName), + } + } + durationHistogramOpts = func(opName OperationName) prometheus.HistogramOpts { + return prometheus.HistogramOpts{ + Name: fmt.Sprintf("gate_%s_duration_seconds", opName), + Help: fmt.Sprintf("How many seconds it took for %s to wait at the gate.", opName), + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, + } + } +) + +// NewGateFactory creates a Gate factory. They act like Gate but each produced Gate +// acts individually in terms of the limit and they have unified metrics. +func NewGateFactory(reg prometheus.Registerer, maxConcurrent int, opName OperationName) GateFactory { + promauto.With(reg).NewGauge(maxGaugeOpts(opName)).Set(float64(maxConcurrent)) + + return &gateProducer{ + reg: reg, + opName: opName, + maxConcurrent: maxConcurrent, + durationHist: promauto.With(reg).NewHistogram(durationHistogramOpts(opName)), + total: promauto.With(reg).NewCounter(totalCounterOpts(opName)), + inflight: promauto.With(reg).NewGauge(inFlightGaugeOpts(opName)), + } } // New returns an instrumented gate limiting the number of requests being @@ -86,8 +154,8 @@ func (k *Keeper) NewGate(maxConcurrent int) Gate { // // It can be called several times but not with the same registerer otherwise it // will panic when trying to register the same metric multiple times. -func New(reg prometheus.Registerer, maxConcurrent int) Gate { - promauto.With(reg).NewGauge(MaxGaugeOpts).Set(float64(maxConcurrent)) +func New(reg prometheus.Registerer, maxConcurrent int, opName OperationName) Gate { + promauto.With(reg).NewGauge(maxGaugeOpts(opName)).Set(float64(maxConcurrent)) var gate Gate if maxConcurrent <= 0 { @@ -97,11 +165,11 @@ func New(reg prometheus.Registerer, maxConcurrent int) Gate { } return InstrumentGateDuration( - promauto.With(reg).NewHistogram(DurationHistogramOpts), + promauto.With(reg).NewHistogram(durationHistogramOpts(opName)), InstrumentGateTotal( - promauto.With(reg).NewCounter(TotalCounterOpts), + promauto.With(reg).NewCounter(totalCounterOpts(opName)), InstrumentGateInFlight( - promauto.With(reg).NewGauge(InFlightGaugeOpts), + promauto.With(reg).NewGauge(inFlightGaugeOpts(opName)), gate, ), ), diff --git a/pkg/gate/gate_test.go b/pkg/gate/gate_test.go index 4159ceec2d..d79e426f84 100644 --- a/pkg/gate/gate_test.go +++ b/pkg/gate/gate_test.go @@ -13,7 +13,7 @@ import ( func TestGateAllowsDisablingLimits(t *testing.T) { reg := prometheus.NewRegistry() - g := New(reg, 0) + g := New(reg, 0, Queries) require.NoError(t, g.Start(context.Background())) } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index e0eeab489f..e3b7570437 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -66,6 +66,8 @@ func NewQueryableCreator( maxConcurrentSelects int, selectTimeout time.Duration, ) QueryableCreator { + gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects) + return func( deduplicate bool, replicaLabels []string, @@ -77,7 +79,6 @@ func NewQueryableCreator( shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, ) storage.Queryable { - reg = extprom.WrapRegistererWithPrefix("concurrent_selects_", reg) return &queryable{ logger: logger, replicaLabels: replicaLabels, @@ -88,7 +89,7 @@ func NewQueryableCreator( partialResponse: partialResponse, skipChunks: skipChunks, gateProviderFn: func() gate.Gate { - return gate.New(reg, maxConcurrentSelects) + return gf.New() }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index ff5bbe3199..ac79b4c5bd 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -144,6 +144,7 @@ func (l *Limiter) loadConfig() error { l.registerer, ), int(maxWriteConcurrency), + gate.WriteRequests, ) } l.requestLimiter = newConfigRequestLimiter(