Skip to content

Commit

Permalink
pkg/gate: Prefix gate metrics for selects (#3154)
Browse files Browse the repository at this point in the history
* pkg/thanos: prefix gate metrics for concurrent selects

Thanos query exposed `gate_queries_in_flight` and
`gate_duration_seconds_bucket` metrics for concurrent selects. These
metrics are now prefixed by `thanos_query_concurrent_selects_`.

Signed-off-by: Simon Pasquier <[email protected]>

* *: refactor instrumentation of the gate package

This change deprecates the gate.(*Keeper) struct. When Keeper is used to
create several gates, the metric tracking the number of in-flight metric
isn't meaningful because it is hard to say whether requests are being
blocked or not.

As such the `thanos_query_concurrent_selects_gate_queries_in_flight` is
removed.

The following metrics have been added to record the maximum number of
concurrent requests per gate:
* `thanos_query_gate_queries_max`
* `thanos_bucket_store_series_gate_queries_max`, previously known as
  `thanos_bucket_store_queries_concurrent_max.`
* `thanos_memcached_getmulti_gate_queries_max`

Signed-off-by: Simon Pasquier <[email protected]>

* Decompose registry wrapping for concurrent selects

Signed-off-by: Simon Pasquier <[email protected]>

* Rename gateFn to gateProviderFn

Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier authored Sep 14, 2020
1 parent 787cd80 commit b1fdb63
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 81 deletions.
19 changes: 14 additions & 5 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
Expand Down Expand Up @@ -304,8 +305,14 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects, queryTimeout)
engine = promql.NewEngine(
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
proxy,
maxConcurrentSelects,
queryTimeout,
)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: reg,
Expand Down Expand Up @@ -420,11 +427,10 @@ func runQuery(

ins := extpromhttp.NewInstrumentationMiddleware(reg)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)
ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewQueryAPI(
logger,
reg,
stores,
engine,
queryableCreator,
Expand All @@ -436,7 +442,10 @@ func runQuery(
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
maxConcurrentQueries,
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
7 changes: 1 addition & 6 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
Expand Down Expand Up @@ -286,11 +285,7 @@ func runStore(
return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency)
}

queriesGate := gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrency)
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
}).Set(float64(maxConcurrency))
queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency)

bs, err := store.NewBucketStore(
logger,
Expand Down
9 changes: 2 additions & 7 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
Expand All @@ -64,7 +62,6 @@ const (
type QueryAPI struct {
baseAPI *api.BaseAPI
logger log.Logger
reg prometheus.Registerer
gate gate.Gate
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
Expand All @@ -82,7 +79,6 @@ type QueryAPI struct {
// NewQueryAPI returns an initialized QueryAPI type.
func NewQueryAPI(
logger log.Logger,
reg *prometheus.Registry,
storeSet *query.StoreSet,
qe *promql.Engine,
c query.QueryableCreator,
Expand All @@ -93,15 +89,14 @@ func NewQueryAPI(
replicaLabels []string,
flagsMap map[string]string,
defaultInstantQueryMaxSourceResolution time.Duration,
maxConcurrentQueries int,
gate gate.Gate,
) *QueryAPI {
return &QueryAPI{
baseAPI: api.NewBaseAPI(logger, flagsMap),
logger: logger,
reg: reg,
queryEngine: qe,
queryableCreate: c,
gate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg)).NewGate(maxConcurrentQueries),
gate: gate,
ruleGroups: ruleGroups,

enableAutodownsampling: enableAutodownsampling,
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/prometheus/common/route"
promgate "github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -116,7 +117,7 @@ func TestEndpoints(t *testing.T) {
MaxSamples: 10000,
Timeout: timeout,
}),
gate: gate.NewKeeper(nil).NewGate(4),
gate: gate.New(nil, 4),
}

start := time.Unix(0, 0)
Expand Down Expand Up @@ -1053,7 +1054,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
for i, test := range tests {
api := QueryAPI{
enableAutodownsampling: test.enableAutodownsampling,
gate: gate.NewKeeper(nil).NewGate(4),
gate: gate.New(nil, 4),
}
v := url.Values{}
v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam)
Expand Down Expand Up @@ -1101,7 +1102,7 @@ func TestParseStoreMatchersParam(t *testing.T) {
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
gate: promgate.New(4),
}
v := url.Values{}
v.Set(StoreMatcherParam, tc.storeMatchers)
Expand Down
6 changes: 4 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ func newMemcachedClient(
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg)).
NewGate(config.MaxGetMultiConcurrency),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
),
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down
143 changes: 110 additions & 33 deletions pkg/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,144 @@ import (
promgate "github.com/prometheus/prometheus/pkg/gate"
)

// Gate is an interface that mimics prometheus/pkg/gate behavior.
var (
MaxGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_max",
Help: "Maximum number of concurrent queries.",
}
InFlightGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}
DurationHistogramOpts = prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}
)

// Gate controls the maximum number of concurrently running and waiting queries.
//
// Example of use:
//
// g := gate.New(r, 5)
//
// if err := g.Start(ctx); err != nil {
// return
// }
// defer g.Done()
//
type Gate interface {
// Start initiates a new request and waits until it's our turn to fulfill a request.
Start(ctx context.Context) error
// Done finishes a query.
Done()
}

// Gate wraps the Prometheus gate with extra metrics.
type gate struct {
g *promgate.Gate
m *metrics
}

type metrics struct {
inflightQueries prometheus.Gauge
gateTiming prometheus.Histogram
}

// Keeper is used to create multiple gates sharing the same metrics.
//
// Deprecated: when Keeper is used to create several gates, the metric tracking
// the number of in-flight metric isn't meaningful because it is hard to say
// whether requests are being blocked or not. For clients that call
// gate.(*Keeper).NewGate only once, it is recommended to use gate.New()
// instead. Otherwise it is recommended to use the
// github.com/prometheus/prometheus/pkg/gate package directly and wrap the
// returned gate with gate.InstrumentGateDuration().
type Keeper struct {
m *metrics
reg prometheus.Registerer
}

// NewKeeper creates a new Keeper.
//
// Deprecated: see Keeper.
func NewKeeper(reg prometheus.Registerer) *Keeper {
return &Keeper{
m: &metrics{
inflightQueries: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}),
gateTiming: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}),
},
reg: reg,
}
}

// NewGate returns a new Gate that collects metrics.
// NewGate returns a new Gate ready for use.
//
// Deprecated: see Keeper.
func (k *Keeper) NewGate(maxConcurrent int) Gate {
return &gate{g: promgate.New(maxConcurrent), m: k.m}
return New(k.reg, maxConcurrent)
}

// New returns an instrumented gate limiting the number of requests being
// executed concurrently.
//
// The gate implementation is based on the
// github.com/prometheus/prometheus/pkg/gate package.
//
// It can be called several times but not with the same registerer otherwise it
// will panic when trying to register the same metric multiple times.
func New(reg prometheus.Registerer, maxConcurrent int) Gate {
promauto.With(reg).NewGauge(MaxGaugeOpts).Set(float64(maxConcurrent))

return InstrumentGateDuration(
promauto.With(reg).NewHistogram(DurationHistogramOpts),
InstrumentGateInFlight(
promauto.With(reg).NewGauge(InFlightGaugeOpts),
promgate.New(maxConcurrent),
),
)
}

type instrumentedDurationGate struct {
g Gate
duration prometheus.Observer
}

// Start initiates a new request and waits until it's our turn to fulfill a request.
func (g *gate) Start(ctx context.Context) error {
// InstrumentGateDuration instruments the provided Gate to track how much time
// the request has been waiting in the gate.
func InstrumentGateDuration(duration prometheus.Observer, g Gate) Gate {
return &instrumentedDurationGate{
g: g,
duration: duration,
}
}

// Start implements the Gate interface.
func (g *instrumentedDurationGate) Start(ctx context.Context) error {
start := time.Now()
defer func() {
g.m.gateTiming.Observe(time.Since(start).Seconds())
g.duration.Observe(time.Since(start).Seconds())
}()

return g.g.Start(ctx)
}

// Done implements the Gate interface.
func (g *instrumentedDurationGate) Done() {
g.g.Done()
}

type instrumentedInFlightGate struct {
g Gate
inflight prometheus.Gauge
}

// InstrumentGateInFlight instruments the provided Gate to track how many
// requests are currently in flight.
func InstrumentGateInFlight(inflight prometheus.Gauge, g Gate) Gate {
return &instrumentedInFlightGate{
g: g,
inflight: inflight,
}
}

// Start implements the Gate interface.
func (g *instrumentedInFlightGate) Start(ctx context.Context) error {
if err := g.g.Start(ctx); err != nil {
return err
}

g.m.inflightQueries.Inc()
g.inflight.Inc()
return nil
}

// Done finishes a query.
func (g *gate) Done() {
g.m.inflightQueries.Dec()
// Done implements the Gate interface.
func (g *instrumentedInFlightGate) Done() {
g.inflight.Dec()
g.g.Done()
}
Loading

0 comments on commit b1fdb63

Please sign in to comment.