Skip to content

Commit

Permalink
query: Added PromQL tests framework supporing multiple stores. First …
Browse files Browse the repository at this point in the history
…step towars Query pushdown! (#3631)

* store: Added inprocess server to client.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Added initial PromQL acceptance tests.

* Improved logging.
* Improved debug matching info.
* Fixed important matching bug.
* Found reset bug, added issue #3644 and commented code.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Dec 19, 2020
1 parent 4723967 commit 202fb4d
Show file tree
Hide file tree
Showing 35 changed files with 4,023 additions and 208 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func runRule(

// Start gRPC server.
{
tsdbStore := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, lset)

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand All @@ -684,7 +684,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
return chk, err
}

// Handle counters by reading them properly.
// Handle counters by applying resets directly.
acs := make([]chunkenc.Iterator, 0, len(chks))
for _, achk := range chks {
c, err := achk.Get(AggrCounter)
Expand Down Expand Up @@ -580,6 +580,7 @@ type sample struct {
// It handles overlapped chunks (removes overlaps).
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
// NOTE(bwplotka): This hides resets from PromQL engine. This means it will not work for PromQL resets function.
type ApplyCounterResetsSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
4 changes: 2 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,12 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.MatchersToString(matchers...))
q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand Down
1 change: 1 addition & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (s *chunkSeries) Iterator() chunkenc.Iterator {
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
}
// TODO(bwplotka): This breaks resets function. See https://github.com/thanos-io/thanos/issues/3644
sit = downsample.NewApplyCounterResetsIterator(its...)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
Expand Down
3 changes: 2 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
sms, err := storepb.TranslatePromMatchers(ms...)
sms, err := storepb.PromMatchersToMatchers(ms...)
if err != nil {
return nil, errors.Wrap(err, "convert matchers")
}
Expand All @@ -265,6 +265,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
Expand Down
11 changes: 5 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -41,7 +40,7 @@ type sample struct {
}

func TestQueryableCreator_MaxResolution(t *testing.T) {
testProxy := &storeServer{resps: []*storepb.SeriesResponse{}}
testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}}
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
Expand All @@ -60,7 +59,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {

// Tests E2E how PromQL works with downsampled data.
func TestQuerier_DownsampledData(t *testing.T) {
testProxy := &storeServer{
testProxy := &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store.
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b", "bbbb", "eee"), []sample{{99, 3}, {199, 8}}), // Downsampled chunk from Store.
Expand Down Expand Up @@ -411,7 +410,7 @@ func TestQuerier_Select(t *testing.T) {
}{
{
name: "select overlapping data with partial error",
storeAPI: &storeServer{
storeAPI: &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
Expand Down Expand Up @@ -1468,14 +1467,14 @@ func BenchmarkDedupSeriesIterator(b *testing.B) {
})
}

type storeServer struct {
type testStoreServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.StoreServer

resps []*storepb.SeriesResponse
}

func (s *storeServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *testStoreServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
for _, resp := range s.resps {
err := srv.Send(resp)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,132 @@
package query

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }

func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("proxy", func(t *testing.T) {
var clients []store.Client
q := NewQueryableCreator(
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return clients },
component.Debug, nil, 5*time.Minute),
1000000,
5*time.Minute,
)

createQueryableFn := func(stores []*testStore) storage.Queryable {
clients = clients[:0]
for i, st := range stores {
m, err := storepb.PromMatchersToMatchers(st.matchers...)
testutil.Ok(t, err)

// TODO(bwplotka): Parse external labels.
clients = append(clients, inProcessClient{
t: t,
StoreClient: storepb.ServerAsClient(SelectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0),
name: fmt.Sprintf("store number %v", i),
})
}
return q(true, nil, nil, 0, false, false)
}

for _, fn := range files {
t.Run(fn, func(t *testing.T) {
te, err := newTestFromFile(t, fn)
testutil.Ok(t, err)
testutil.Ok(t, te.run(createQueryableFn))
te.close()
})
}
})
}

// SelectStore allows wrapping another storeAPI with additional time and matcher selection.
type SelectStore struct {
matchers []storepb.LabelMatcher

storepb.StoreServer
mint, maxt int64
}

// SelectedStore wraps given store with SelectStore.
func SelectedStore(wrapped storepb.StoreServer, matchers []storepb.LabelMatcher, mint, maxt int64) *SelectStore {
return &SelectStore{
StoreServer: wrapped,
matchers: matchers,
mint: mint,
maxt: maxt,
}
}

func (s *SelectStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
resp, err := s.StoreServer.Info(ctx, r)
if err != nil {
return nil, err
}
if resp.MinTime < s.mint {
resp.MinTime = s.mint
}
if resp.MaxTime > s.maxt {
resp.MaxTime = s.maxt
}
// TODO(bwplotka): Match labelsets and expose only those?
return resp, nil
}

func (s *SelectStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if r.MinTime < s.mint {
r.MinTime = s.mint
}
if r.MaxTime > s.maxt {
r.MaxTime = s.maxt
}
r.Matchers = append(r.Matchers, s.matchers...)
return s.StoreServer.Series(r, srv)
}
22 changes: 11 additions & 11 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@ var testGRPCOpts = []grpc.DialOption{
grpc.WithInsecure(),
}

type testStore struct {
type mockedStore struct {
infoDelay time.Duration
info storepb.InfoResponse
}

func (s *testStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
func (s *mockedStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
if s.infoDelay > 0 {
time.Sleep(s.infoDelay)
}
return &s.info, nil
}

func (s *testStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *mockedStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
return status.Error(codes.Unimplemented, "not implemented")
}

func (s *testStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
func (s *mockedStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
}

func (s *testStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
func (s *mockedStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
*storepb.LabelValuesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
Expand Down Expand Up @@ -84,7 +84,7 @@ func startTestStores(storeMetas []testStoreMeta) (*testStores, error) {

srv := grpc.NewServer()

storeSrv := &testStore{
storeSrv := &mockedStore{
info: storepb.InfoResponse{
LabelSets: meta.extlsetFn(listener.Addr().String()),
MaxTime: meta.maxTime,
Expand Down Expand Up @@ -1012,12 +1012,12 @@ func TestUpdateStoreStateLastError(t *testing.T) {
storeStatuses: map[string]*StoreStatus{},
}
mockStoreRef := &storeRef{
addr: "testStore",
addr: "mockedStore",
}

mockStoreSet.updateStoreStatus(mockStoreRef, tc.InputError)

b, err := json.Marshal(mockStoreSet.storeStatuses["testStore"].LastError)
b, err := json.Marshal(mockStoreSet.storeStatuses["mockedStore"].LastError)
testutil.Ok(t, err)
testutil.Equals(t, tc.ExpectedLastErr, string(b))
}
Expand All @@ -1028,19 +1028,19 @@ func TestUpdateStoreStateForgetsPreviousErrors(t *testing.T) {
storeStatuses: map[string]*StoreStatus{},
}
mockStoreRef := &storeRef{
addr: "testStore",
addr: "mockedStore",
}

mockStoreSet.updateStoreStatus(mockStoreRef, errors.New("test err"))

b, err := json.Marshal(mockStoreSet.storeStatuses["testStore"].LastError)
b, err := json.Marshal(mockStoreSet.storeStatuses["mockedStore"].LastError)
testutil.Ok(t, err)
testutil.Equals(t, `"test err"`, string(b))

// updating status without and error should clear the previous one.
mockStoreSet.updateStoreStatus(mockStoreRef, nil)

b, err = json.Marshal(mockStoreSet.storeStatuses["testStore"].LastError)
b, err = json.Marshal(mockStoreSet.storeStatuses["mockedStore"].LastError)
testutil.Ok(t, err)
testutil.Equals(t, `null`, string(b))
}
Loading

0 comments on commit 202fb4d

Please sign in to comment.