Skip to content

Commit

Permalink
*: fix problems with pkg/gate (#6110)
Browse files Browse the repository at this point in the history
* query/querier: reuse same gate

We want to use the same gate everywhere.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* gate: fix up metric name

Signed-off-by: Giedrius Statkevičius <[email protected]>

* query/gate: refactor gate

Signed-off-by: Giedrius Statkevičius <[email protected]>

---------

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored Feb 8, 2023
1 parent 1839858 commit 83d2c14
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 35 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func runQuery(
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
gate.Queries,
),
store.NewSeriesStatsAggregator(
reg,
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func runStore(
return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", conf.maxConcurrency)
}

queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), int(conf.maxConcurrency))
queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), int(conf.maxConcurrency), gate.Queries)

chunkPool, err := store.NewDefaultChunkBytesPool(uint64(conf.chunkPoolSize))
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestQueryEndpoints(t *testing.T) {
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
gate: gate.New(nil, 4, gate.Queries),
defaultRangeQueryStep: time.Second,
queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{
Name: "query_range_hist",
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestMetadataEndpoints(t *testing.T) {
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
gate: gate.New(nil, 4, gate.Queries),
queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{
Name: "query_range_hist",
}),
Expand All @@ -749,7 +749,7 @@ func TestMetadataEndpoints(t *testing.T) {
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
gate: gate.New(nil, 4, gate.Queries),
defaultMetadataTimeRange: apiLookbackDelta,
queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{
Name: "query_range_hist",
Expand Down Expand Up @@ -1461,7 +1461,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
for i, test := range tests {
api := QueryAPI{
enableAutodownsampling: test.enableAutodownsampling,
gate: gate.New(nil, 4),
gate: gate.New(nil, 4, gate.Queries),
queryRangeHist: promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{
Name: "query_range_hist",
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/cacheutil/cacheutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestDoWithBatch(t *testing.T) {
items: []string{"key1", "key2", "key3", "key4", "key5"},
batchSize: 2,
expectedBatches: 3,
concurrency: gate.New(prometheus.NewPedanticRegistry(), 1),
concurrency: gate.New(prometheus.NewPedanticRegistry(), 1, gate.Queries),
},
}

Expand Down
1 change: 1 addition & 0 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func newMemcachedClient(
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
gate.Gets,
),
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,12 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_getmulti_", reg),
config.MaxGetMultiConcurrency,
gate.Gets,
),
setMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_redis_setmulti_", reg),
config.MaxSetMultiConcurrency,
gate.Sets,
),
}
duration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Expand Down
120 changes: 94 additions & 26 deletions pkg/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,14 @@ package gate

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promgate "github.com/prometheus/prometheus/util/gate"
)

var (
MaxGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_max",
Help: "Maximum number of concurrent queries.",
}
InFlightGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}
TotalCounterOpts = prometheus.CounterOpts{
Name: "gate_queries_total",
Help: "Total number of queries.",
}
DurationHistogramOpts = prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}
)

// Gate controls the maximum number of concurrently running and waiting queries.
//
// Example of use:
Expand Down Expand Up @@ -75,7 +56,94 @@ func NewKeeper(reg prometheus.Registerer) *Keeper {
//
// Deprecated: see Keeper.
func (k *Keeper) NewGate(maxConcurrent int) Gate {
return New(k.reg, maxConcurrent)
return New(k.reg, maxConcurrent, Queries)
}

type OperationName string

const (
Queries OperationName = "queries"
Selects OperationName = "selects"
Gets OperationName = "gets"
Sets OperationName = "sets"
WriteRequests OperationName = "write_requests"
)

type GateFactory interface {
New() Gate
}

type gateProducer struct {
reg prometheus.Registerer
opName OperationName
maxConcurrent int

durationHist prometheus.Histogram
total prometheus.Counter
inflight prometheus.Gauge
}

func (g *gateProducer) New() Gate {
var gate Gate
if g.maxConcurrent <= 0 {
gate = NewNoop()
} else {
gate = promgate.New(g.maxConcurrent)
}

return InstrumentGateDuration(
g.durationHist,
InstrumentGateTotal(
g.total,
InstrumentGateInFlight(
g.inflight,
gate,
),
),
)
}

var (
maxGaugeOpts = func(opName OperationName) prometheus.GaugeOpts {
return prometheus.GaugeOpts{
Name: fmt.Sprintf("gate_%s_max", opName),
Help: fmt.Sprintf("Maximum number of concurrent %s.", opName),
}
}
inFlightGaugeOpts = func(opName OperationName) prometheus.GaugeOpts {
return prometheus.GaugeOpts{
Name: fmt.Sprintf("gate_%s_in_flight", opName),
Help: fmt.Sprintf("Number of %s that are currently in flight.", opName),
}
}
totalCounterOpts = func(opName OperationName) prometheus.CounterOpts {
return prometheus.CounterOpts{
Name: fmt.Sprintf("gate_%s_total", opName),
Help: fmt.Sprintf("Total number of %s.", opName),
}
}
durationHistogramOpts = func(opName OperationName) prometheus.HistogramOpts {
return prometheus.HistogramOpts{
Name: fmt.Sprintf("gate_%s_duration_seconds", opName),
Help: fmt.Sprintf("How many seconds it took for %s to wait at the gate.", opName),
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}
}
)

// NewGateFactory creates a Gate factory. They act like Gate but each produced Gate
// acts individually in terms of the limit and they have unified metrics.
func NewGateFactory(reg prometheus.Registerer, maxConcurrent int, opName OperationName) GateFactory {
promauto.With(reg).NewGauge(maxGaugeOpts(opName)).Set(float64(maxConcurrent))

return &gateProducer{
reg: reg,
opName: opName,
maxConcurrent: maxConcurrent,
durationHist: promauto.With(reg).NewHistogram(durationHistogramOpts(opName)),
total: promauto.With(reg).NewCounter(totalCounterOpts(opName)),
inflight: promauto.With(reg).NewGauge(inFlightGaugeOpts(opName)),
}
}

// New returns an instrumented gate limiting the number of requests being
Expand All @@ -86,8 +154,8 @@ func (k *Keeper) NewGate(maxConcurrent int) Gate {
//
// It can be called several times but not with the same registerer otherwise it
// will panic when trying to register the same metric multiple times.
func New(reg prometheus.Registerer, maxConcurrent int) Gate {
promauto.With(reg).NewGauge(MaxGaugeOpts).Set(float64(maxConcurrent))
func New(reg prometheus.Registerer, maxConcurrent int, opName OperationName) Gate {
promauto.With(reg).NewGauge(maxGaugeOpts(opName)).Set(float64(maxConcurrent))

var gate Gate
if maxConcurrent <= 0 {
Expand All @@ -97,11 +165,11 @@ func New(reg prometheus.Registerer, maxConcurrent int) Gate {
}

return InstrumentGateDuration(
promauto.With(reg).NewHistogram(DurationHistogramOpts),
promauto.With(reg).NewHistogram(durationHistogramOpts(opName)),
InstrumentGateTotal(
promauto.With(reg).NewCounter(TotalCounterOpts),
promauto.With(reg).NewCounter(totalCounterOpts(opName)),
InstrumentGateInFlight(
promauto.With(reg).NewGauge(InFlightGaugeOpts),
promauto.With(reg).NewGauge(inFlightGaugeOpts(opName)),
gate,
),
),
Expand Down
2 changes: 1 addition & 1 deletion pkg/gate/gate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestGateAllowsDisablingLimits(t *testing.T) {
reg := prometheus.NewRegistry()
g := New(reg, 0)
g := New(reg, 0, Queries)

require.NoError(t, g.Start(context.Background()))
}
5 changes: 3 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func NewQueryableCreator(
maxConcurrentSelects int,
selectTimeout time.Duration,
) QueryableCreator {
gf := gate.NewGateFactory(extprom.WrapRegistererWithPrefix("concurrent_selects_", reg), maxConcurrentSelects, gate.Selects)

return func(
deduplicate bool,
replicaLabels []string,
Expand All @@ -77,7 +79,6 @@ func NewQueryableCreator(
shardInfo *storepb.ShardInfo,
seriesStatsReporter seriesStatsReporter,
) storage.Queryable {
reg = extprom.WrapRegistererWithPrefix("concurrent_selects_", reg)
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
Expand All @@ -88,7 +89,7 @@ func NewQueryableCreator(
partialResponse: partialResponse,
skipChunks: skipChunks,
gateProviderFn: func() gate.Gate {
return gate.New(reg, maxConcurrentSelects)
return gf.New()
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (l *Limiter) loadConfig() error {
l.registerer,
),
int(maxWriteConcurrency),
gate.WriteRequests,
)
}
l.requestLimiter = newConfigRequestLimiter(
Expand Down

0 comments on commit 83d2c14

Please sign in to comment.