Skip to content

Commit

Permalink
*: refactor instrumentation of the gate package
Browse files Browse the repository at this point in the history
This change deprecates the gate.(*Keeper) struct. When Keeper is used to
create several gates, the metric tracking the number of in-flight metric
isn't meaningful because it is hard to say whether requests are being
blocked or not.

As such the `thanos_query_concurrent_selects_gate_queries_in_flight` is
removed.

Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier committed Sep 11, 2020
1 parent 60950ff commit fff2500
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 73 deletions.
9 changes: 6 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
Expand Down Expand Up @@ -426,11 +427,10 @@ func runQuery(

ins := extpromhttp.NewInstrumentationMiddleware(reg)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)
ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewQueryAPI(
logger,
reg,
stores,
engine,
queryableCreator,
Expand All @@ -442,7 +442,10 @@ func runQuery(
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
maxConcurrentQueries,
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
9 changes: 2 additions & 7 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
Expand All @@ -64,7 +62,6 @@ const (
type QueryAPI struct {
baseAPI *api.BaseAPI
logger log.Logger
reg prometheus.Registerer
gate gate.Gate
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
Expand All @@ -82,7 +79,6 @@ type QueryAPI struct {
// NewQueryAPI returns an initialized QueryAPI type.
func NewQueryAPI(
logger log.Logger,
reg *prometheus.Registry,
storeSet *query.StoreSet,
qe *promql.Engine,
c query.QueryableCreator,
Expand All @@ -93,15 +89,14 @@ func NewQueryAPI(
replicaLabels []string,
flagsMap map[string]string,
defaultInstantQueryMaxSourceResolution time.Duration,
maxConcurrentQueries int,
gate gate.Gate,
) *QueryAPI {
return &QueryAPI{
baseAPI: api.NewBaseAPI(logger, flagsMap),
logger: logger,
reg: reg,
queryEngine: qe,
queryableCreate: c,
gate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg)).NewGate(maxConcurrentQueries),
gate: gate,
ruleGroups: ruleGroups,

enableAutodownsampling: enableAutodownsampling,
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/prometheus/common/route"
promgate "github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -116,7 +117,7 @@ func TestEndpoints(t *testing.T) {
MaxSamples: 10000,
Timeout: timeout,
}),
gate: gate.NewKeeper(nil).NewGate(4),
gate: gate.New(nil, 4),
}

start := time.Unix(0, 0)
Expand Down Expand Up @@ -1053,7 +1054,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
for i, test := range tests {
api := QueryAPI{
enableAutodownsampling: test.enableAutodownsampling,
gate: gate.NewKeeper(nil).NewGate(4),
gate: gate.New(nil, 4),
}
v := url.Values{}
v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam)
Expand Down Expand Up @@ -1101,7 +1102,7 @@ func TestParseStoreMatchersParam(t *testing.T) {
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
gate: promgate.New(4),
}
v := url.Values{}
v.Set(StoreMatcherParam, tc.storeMatchers)
Expand Down
6 changes: 4 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ func newMemcachedClient(
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg)).
NewGate(config.MaxGetMultiConcurrency),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
),
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down
137 changes: 104 additions & 33 deletions pkg/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,138 @@ import (
promgate "github.com/prometheus/prometheus/pkg/gate"
)

// Gate is an interface that mimics prometheus/pkg/gate behavior.
var (
InFlightGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}
DurationHistogramOpts = prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}
)

// Gate controls the maximum number of concurrently running and waiting queries.
//
// Example of use:
//
// g := gate.New(r, 5)
//
// if err := g.Start(ctx); err != nil {
// return
// }
// defer g.Done()
//
type Gate interface {
// Start initiates a new request and waits until it's our turn to fulfill a request.
Start(ctx context.Context) error
// Done finishes a query.
Done()
}

// Gate wraps the Prometheus gate with extra metrics.
type gate struct {
g *promgate.Gate
m *metrics
}

type metrics struct {
inflightQueries prometheus.Gauge
gateTiming prometheus.Histogram
}

// Keeper is used to create multiple gates sharing the same metrics.
//
// Deprecated: when Keeper is used to create several gates, the metric tracking
// the number of in-flight metric isn't meaningful because it is hard to say
// whether requests are being blocked or not. For clients that call
// gate.(*Keeper).NewGate only once, it is recommended to use gate.New()
// instead. Otherwise it is recommended to use the
// github.com/prometheus/prometheus/pkg/gate package directly and wrap the
// returned gate with gate.InstrumentGateDuration().
type Keeper struct {
m *metrics
reg prometheus.Registerer
}

// NewKeeper creates a new Keeper.
//
// Deprecated: see Keeper
func NewKeeper(reg prometheus.Registerer) *Keeper {
return &Keeper{
m: &metrics{
inflightQueries: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}),
gateTiming: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}),
},
reg: reg,
}
}

// NewGate returns a new Gate that collects metrics.
// NewGate returns a new Gate ready for use.
//
// Deprecated: see Keeper.
func (k *Keeper) NewGate(maxConcurrent int) Gate {
return &gate{g: promgate.New(maxConcurrent), m: k.m}
return New(k.reg, maxConcurrent)
}

// New returns an instrumented gate limiting the number of requests being
// executed concurrently.
//
// The gate implementation is based on the
// github.com/prometheus/prometheus/pkg/gate package.
//
// It can be called several times but not with the same registerer otherwise it
// will panic when trying to register the same metric multiple times.
func New(reg prometheus.Registerer, maxConcurrent int) Gate {
return InstrumentGateDuration(
promauto.With(reg).NewHistogram(DurationHistogramOpts),
InstrumentGateInFlight(
promauto.With(reg).NewGauge(InFlightGaugeOpts),
promgate.New(maxConcurrent),
),
)
}

// Start initiates a new request and waits until it's our turn to fulfill a request.
func (g *gate) Start(ctx context.Context) error {
type instrumentedDurationGate struct {
g Gate
duration prometheus.Observer
}

// InstrumentGateDuration instruments the provided Gate to track how much time
// the request has been waiting in the gate.
func InstrumentGateDuration(duration prometheus.Observer, g Gate) Gate {
return &instrumentedDurationGate{
g: g,
duration: duration,
}
}

// Start implements the Gate interface.
func (g *instrumentedDurationGate) Start(ctx context.Context) error {
start := time.Now()
defer func() {
g.m.gateTiming.Observe(time.Since(start).Seconds())
g.duration.Observe(time.Since(start).Seconds())
}()

return g.g.Start(ctx)
}

// Done implements the Gate interface.
func (g *instrumentedDurationGate) Done() {
g.g.Done()
}

type instrumentedInFlightGate struct {
g Gate
inflight prometheus.Gauge
}

// InstrumentGateInFlight instruments the provided Gate to track how many
// requests are currently in flight.
func InstrumentGateInFlight(inflight prometheus.Gauge, g Gate) Gate {
return &instrumentedInFlightGate{
g: g,
inflight: inflight,
}
}

// Start implements the Gate interface.
func (g *instrumentedInFlightGate) Start(ctx context.Context) error {
if err := g.g.Start(ctx); err != nil {
return err
}

g.m.inflightQueries.Inc()
g.inflight.Inc()
return nil
}

// Done finishes a query.
func (g *gate) Done() {
g.m.inflightQueries.Dec()
// Done implements the Gate interface.
func (g *instrumentedInFlightGate) Done() {
g.inflight.Dec()
g.g.Done()
}
33 changes: 16 additions & 17 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promgate "github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"

Expand All @@ -32,20 +34,21 @@ type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatche

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
keeper := gate.NewKeeper(reg)
duration := promauto.With(reg).NewHistogram(gate.DurationHistogramOpts)

return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
reg: reg,
replicaLabels: replicaLabels,
storeMatchers: storeMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
gateKeeper: keeper,
logger: logger,
replicaLabels: replicaLabels,
storeMatchers: storeMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
gateFn: func() gate.Gate {
return gate.InstrumentGateDuration(duration, promgate.New(maxConcurrentSelects))
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
}
Expand All @@ -54,28 +57,26 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto

type queryable struct {
logger log.Logger
reg prometheus.Registerer
replicaLabels []string
storeMatchers [][]storepb.LabelMatcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
gateKeeper *gate.Keeper
gateFn func() gate.Gate
maxConcurrentSelects int
selectTimeout time.Duration
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateFn(), q.selectTimeout), nil
}

type querier struct {
ctx context.Context
logger log.Logger
reg prometheus.Registerer
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
Expand All @@ -94,7 +95,6 @@ type querier struct {
func newQuerier(
ctx context.Context,
logger log.Logger,
reg prometheus.Registerer,
mint, maxt int64,
replicaLabels []string,
storeMatchers [][]storepb.LabelMatcher,
Expand All @@ -117,7 +117,6 @@ func newQuerier(
return &querier{
ctx: ctx,
logger: logger,
reg: reg,
cancel: cancel,
selectGate: selectGate,
selectTimeout: selectTimeout,
Expand Down
Loading

0 comments on commit fff2500

Please sign in to comment.