diff --git a/CHANGELOG.md b/CHANGELOG.md index e8feac8514..fe71361ade 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`. - [#1939](https://github.com/thanos-io/thanos/pull/1939) Ruler: Add TLS and authentication support for query endpoints with the `--query.config` and `--query.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. - [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints. +- #2030 Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores. ### Changed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 349e9bf2ea..b2edceceef 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -235,7 +235,7 @@ func runQuery( dialOpts, unhealthyStoreTimeout, ) - proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) + proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) queryableCreator = query.NewQueryableCreator(logger, proxy) engine = promql.NewEngine( promql.EngineOpts{ diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 574c5e0e4b..b38f2c952b 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -15,6 +15,7 @@ import ( grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -49,12 +50,34 @@ type ProxyStore struct { selectorLabels labels.Labels responseTimeout time.Duration + metrics *proxyStoreMetrics +} + +type proxyStoreMetrics struct { + emptyStreamResponses prometheus.Counter +} + +func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { + var m proxyStoreMetrics + + m.emptyStreamResponses = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_proxy_store_empty_stream_responses_total", + Help: "Total number of empty responses received.", + }) + + if reg != nil { + reg.MustRegister( + m.emptyStreamResponses, + ) + } + return &m } // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( logger log.Logger, + reg prometheus.Registerer, stores func() []Client, component component.StoreAPI, selectorLabels labels.Labels, @@ -64,12 +87,14 @@ func NewProxyStore( logger = log.NewNopLogger() } + metrics := newProxyStoreMetrics(reg) s := &ProxyStore{ logger: logger, stores: stores, component: component, selectorLabels: selectorLabels, responseTimeout: responseTimeout, + metrics: metrics, } return s } @@ -260,7 +285,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe // Schedule streamSeriesSet that translates gRPC streamed response // into seriesSet (if series) or respCh if warnings. seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, - wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout)) + wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses)) } level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) @@ -330,6 +355,7 @@ func startStreamSeriesSet( name string, partialResponse bool, responseTimeout time.Duration, + emptyStreamResponses prometheus.Counter, ) *streamSeriesSet { s := &streamSeriesSet{ ctx: ctx, @@ -348,6 +374,12 @@ func startStreamSeriesSet( defer wg.Done() defer close(s.recvCh) + numResponses := 0 + defer func() { + if numResponses == 0 { + emptyStreamResponses.Inc() + } + }() for { r, err := s.stream.Recv() @@ -368,6 +400,8 @@ func startStreamSeriesSet( return } + numResponses++ + if w := r.GetWarning(); w != "" { s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) continue diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index d19bf58f9a..ce26e2ba41 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -53,6 +53,7 @@ func TestProxyStore_Info(t *testing.T) { defer cancel() q := NewProxyStore(nil, + nil, func() []Client { return nil }, component.Query, nil, 0*time.Second, @@ -438,6 +439,7 @@ func TestProxyStore_Series(t *testing.T) { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore(nil, + nil, func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, @@ -560,6 +562,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { } { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore(nil, + nil, func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, @@ -602,6 +605,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { }, } q := NewProxyStore(nil, + nil, func() []Client { return cls }, component.Query, nil, @@ -661,6 +665,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { } q := NewProxyStore(nil, + nil, func() []Client { return cls }, component.Query, labels.FromStrings("fed", "a"), @@ -699,6 +704,7 @@ func TestProxyStore_LabelValues(t *testing.T) { }}, } q := NewProxyStore(nil, + nil, func() []Client { return cls }, component.Query, nil, @@ -801,6 +807,7 @@ func TestProxyStore_LabelNames(t *testing.T) { } { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore( + nil, nil, func() []Client { return tc.storeAPIs }, component.Query,