diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d23b9a5b31..1753592a57 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -125,15 +125,21 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error { - ext := p.externalLabels() + externalLabels := p.externalLabels() - match, newMatchers, err := labelsMatches(ext, r.Matchers) + match, newMatchers, err := matchesExternalLabels(r.Matchers, externalLabels) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } + if !match { return nil } + + if len(newMatchers) == 0 { + return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) + } + q := prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} // TODO(fabxc): import common definitions from prompb once we have a stable gRPC @@ -166,7 +172,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie span.SetTag("series_count", len(resp.Results[0].Timeseries)) for _, e := range resp.Results[0].Timeseries { - lset := p.translateAndExtendLabels(e.Labels, ext) + lset := p.translateAndExtendLabels(e.Labels, externalLabels) if len(e.Samples) == 0 { // As found in https://github.com/improbable-eng/thanos/issues/381 @@ -286,8 +292,10 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom return &data, nil } -func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []storepb.LabelMatcher, error) { - if len(lset) == 0 { +// matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them. +// It also returns false if given matchers are not matching external labels. +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []storepb.LabelMatcher, error) { + if len(externalLabels) == 0 { return true, ms, nil } @@ -299,7 +307,7 @@ func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []store return false, nil, err } - extValue := lset.Get(m.Name) + extValue := externalLabels.Get(m.Name) if extValue == "" { // Agnostic to external labels. newMatcher = append(newMatcher, m) @@ -312,6 +320,7 @@ func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []store return false, nil, nil } } + return true, newMatcher, nil } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index b6c812ed17..386e4a48db 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -61,36 +61,53 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { }, nil) testutil.Ok(t, err) - // Query all three samples except for the first one. Since we round up queried data - // to seconds, we can test whether the extra sample gets stripped properly. - srv := newStoreSeriesServer(ctx) + { + // Query all three samples except for the first one. Since we round up queried data + // to seconds, we can test whether the extra sample gets stripped properly. + srv := newStoreSeriesServer(ctx) + + err = proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT + 101, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + }, srv) + testutil.Ok(t, err) - err = proxy.Series(&storepb.SeriesRequest{ - MinTime: baseT + 101, - MaxTime: baseT + 300, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, - }, - }, srv) - testutil.Ok(t, err) + testutil.Equals(t, 1, len(srv.SeriesSet)) - testutil.Equals(t, 1, len(srv.SeriesSet)) + testutil.Equals(t, []storepb.Label{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + }, srv.SeriesSet[0].Labels) - testutil.Equals(t, []storepb.Label{ - {Name: "a", Value: "b"}, - {Name: "region", Value: "eu-west"}, - }, srv.SeriesSet[0].Labels) + testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) - testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) + c := srv.SeriesSet[0].Chunks[0] + testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) - c := srv.SeriesSet[0].Chunks[0] - testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) + chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) + testutil.Ok(t, err) - chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) - testutil.Ok(t, err) + samples := expandChunk(chk.Iterator()) + testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples) - samples := expandChunk(chk.Iterator()) - testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples) + } + // Querying by external labels only. + { + srv := newStoreSeriesServer(ctx) + + err = proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT + 101, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, + }, + }, srv) + testutil.NotOk(t, err) + testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error()) + } } type sample struct { diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c99b21d867..bcb470743f 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -181,7 +181,7 @@ func (s ctxRespSender) send(r *storepb.SeriesResponse) { // Series returns all series for a requested time range and label matcher. Requested series are taken from other // stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range. func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, newMatchers, err := labelsMatches(s.selectorLabels, r.Matchers) + match, newMatchers, err := matchesExternalLabels(r.Matchers, s.selectorLabels) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -189,6 +189,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return nil } + if len(newMatchers) == 0 { + return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) + } + var ( g, gctx = errgroup.WithContext(srv.Context()) @@ -220,7 +224,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe for _, st := range s.stores() { // We might be able to skip the store if its meta information indicates // it cannot have series matching our query. - // NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error. + // NOTE: all matchers are validated in matchesExternalLabels method so we explicitly ignore error. spanStoreMathes, gctx := tracing.StartSpan(gctx, "store_matches") ok, _ := storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...) spanStoreMathes.Finish() diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 74915d7bb4..872cec293a 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -644,7 +644,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { &storepb.SeriesRequest{ MinTime: 1, MaxTime: 300, - Matchers: []storepb.LabelMatcher{{Name: "fed", Value: "a", Type: storepb.LabelMatcher_EQ}}, + Matchers: []storepb.LabelMatcher{{Name: "any", Value: ".*", Type: storepb.LabelMatcher_RE}}, }, s, )) testutil.Equals(t, 0, len(s.SeriesSet)) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 126829e827..80cf75b9a5 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -22,29 +22,29 @@ import ( // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { - logger log.Logger - db *tsdb.DB - component component.SourceStoreAPI - labels labels.Labels + logger log.Logger + db *tsdb.DB + component component.SourceStoreAPI + externalLabels labels.Labels } // NewTSDBStore creates a new TSDBStore. -func NewTSDBStore(logger log.Logger, reg prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore { +func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } return &TSDBStore{ - logger: logger, - db: db, - component: component, - labels: externalLabels, + logger: logger, + db: db, + component: component, + externalLabels: externalLabels, } } // Info returns store information about the Prometheus instance. func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) { res := &storepb.InfoResponse{ - Labels: make([]storepb.Label, 0, len(s.labels)), + Labels: make([]storepb.Label, 0, len(s.externalLabels)), StoreType: s.component.ToProto(), MinTime: 0, MaxTime: math.MaxInt64, @@ -52,7 +52,7 @@ func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb. if blocks := s.db.Blocks(); len(blocks) > 0 { res.MinTime = blocks[0].Meta().MinTime } - for _, l := range s.labels { + for _, l := range s.externalLabels { res.Labels = append(res.Labels, storepb.Label{ Name: l.Name, Value: l.Value, @@ -73,13 +73,19 @@ func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb. // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, newMatchers, err := labelsMatches(s.labels, r.Matchers) + match, newMatchers, err := matchesExternalLabels(r.Matchers, s.externalLabels) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } + if !match { return nil } + + if len(newMatchers) == 0 { + return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) + } + matchers, err := translateMatchers(newMatchers) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -113,7 +119,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Errorf(codes.Internal, "encode chunk: %s", err) } - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.labels) + respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) respSeries.Chunks = append(respSeries.Chunks[:0], c...) if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {