diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 18e90ccc21..244ae5592d 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -258,7 +258,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto } func (p *PrometheusStore) queryPrometheus( - s storepb.Store_SeriesServer, + s flushableServer, r *storepb.SeriesRequest, extLsetToRemove map[string]struct{}, ) error { @@ -325,7 +325,7 @@ func (p *PrometheusStore) queryPrometheus( } } - return nil + return s.Flush() } func (p *PrometheusStore) handleSampledPrometheusResponse( diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index e0d6052b65..44fb4e2c20 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -49,12 +49,14 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 + // region is an external label; by adding it as an internal label too we also trigger + // the resorting code paths a := p.Appender() - _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+100, 1) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+100, 1) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+200, 2) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+200, 2) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+300, 3) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+300, 3) testutil.Ok(t, err) testutil.Ok(t, a.Commit()) @@ -154,6 +156,38 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Equals(t, []string(nil), srv.Warnings) testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error()) } + // Querying with pushdown. + { + srv := newStoreSeriesServer(ctx) + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT + 101, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + QueryHints: &storepb.QueryHints{Func: &storepb.Func{Name: "min_over_time"}, Range: &storepb.Range{Millis: 300}}, + }, srv)) + + testutil.Equals(t, 1, len(srv.SeriesSet)) + + testutil.Equals(t, []labelpb.ZLabel{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + {Name: "__thanos_pushed_down", Value: "true"}, + }, srv.SeriesSet[0].Labels) + testutil.Equals(t, []string(nil), srv.Warnings) + testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) + + c := srv.SeriesSet[0].Chunks[0] + testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) + + chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) + testutil.Ok(t, err) + + samples := expandChunk(chk.Iterator(nil)) + testutil.Equals(t, []sample{{baseT + 300, 1}}, samples) + + } } type sample struct { diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index f7c0ad9813..bf7e900a9b 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "io" "math" "math/rand" "net/http" @@ -157,11 +158,16 @@ func newPrometheus(binPath, prefix string) (*Prometheus, error) { return nil, err } - // Just touch an empty config file. We don't need to actually scrape anything. - _, err = os.Create(filepath.Join(db.Dir(), "prometheus.yml")) + f, err := os.Create(filepath.Join(db.Dir(), "prometheus.yml")) if err != nil { return nil, err } + defer f.Close() + + // Some well-known external labels so that we can test label resorting + if _, err = io.WriteString(f, "global:\n external_labels:\n region: eu-west"); err != nil { + return nil, err + } return &Prometheus{ dir: db.Dir(),