diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 1af4242d28..c422f516f8 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -198,12 +198,7 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb } func (s ctxRespSender) send(r *storepb.SeriesResponse) { - select { - case <-s.ctx.Done(): - return - case s.ch <- r: - return - } + s.ch <- r } // Series returns all series for a requested time range and label matcher. Requested series are taken from other @@ -348,6 +343,21 @@ type streamSeriesSet struct { closeSeries context.CancelFunc } +type recvResponse struct { + r *storepb.SeriesResponse + err error +} + +func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) { + frameTimeoutCtx := context.Background() + var cancel context.CancelFunc + if responseTimeout != 0 { + frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout) + return frameTimeoutCtx, cancel + } + return frameTimeoutCtx, func() {} +} + func startStreamSeriesSet( ctx context.Context, logger log.Logger, @@ -383,78 +393,74 @@ func startStreamSeriesSet( emptyStreamResponses.Inc() } }() - for { - r, err := s.stream.Recv() - if err == io.EOF { - return - } - - if err != nil { - wrapErr := errors.Wrapf(err, "receive series from %s", s.name) - if partialResponse { - s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr)) + rCh := make(chan *recvResponse) + done := make(chan struct{}) + go func() { + for { + r, err := s.stream.Recv() + select { + case <-done: + close(rCh) return + case rCh <- &recvResponse{r: r, err: err}: } + } + }() + for { + frameTimeoutCtx, cancel := frameCtx(s.responseTimeout) + defer cancel() + var rr *recvResponse + select { + case <-ctx.Done(): + s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done) + return + case <-frameTimeoutCtx.Done(): + s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data in %s from %s", s.responseTimeout.String(), s.name), done) + return + case rr = <-rCh: + } - s.errMtx.Lock() - s.err = wrapErr - s.errMtx.Unlock() + if rr.err == io.EOF { + close(done) return } + if rr.err != nil { + wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name) + s.handleErr(wrapErr, done) + return + } numResponses++ - if w := r.GetWarning(); w != "" { + if w := rr.r.GetWarning(); w != "" { s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) continue } - - select { - case s.recvCh <- r.GetSeries(): - continue - case <-ctx.Done(): - return - } - + s.recvCh <- rr.r.GetSeries() } }() return s } -// Next blocks until new message is received or stream is closed or operation is timed out. -func (s *streamSeriesSet) Next() (ok bool) { - ctx := s.ctx - timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name) - - if s.responseTimeout != 0 { - timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name) +func (s *streamSeriesSet) handleErr(err error, done chan struct{}) { + defer close(done) + s.closeSeries() - timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout) - defer done() - ctx = timeoutCtx + if s.partialResponse { + level.Warn(s.logger).Log("err", err, "msg", "returning partial response") + s.warnCh.send(storepb.NewWarnSeriesResponse(err)) + return } + s.errMtx.Lock() + s.err = err + s.errMtx.Unlock() +} - select { - case s.currSeries, ok = <-s.recvCh: - return ok - case <-ctx.Done(): - // closeSeries to shutdown a goroutine in startStreamSeriesSet. - s.closeSeries() - - err := errors.Wrap(ctx.Err(), timeoutMsg) - if s.partialResponse { - level.Warn(s.logger).Log("err", err, "msg", "returning partial response") - s.warnCh.send(storepb.NewWarnSeriesResponse(err)) - return false - } - s.errMtx.Lock() - s.err = err - s.errMtx.Unlock() - - level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request") - return false - } +// Next blocks until new message is received or stream is closed or operation is timed out. +func (s *streamSeriesSet) Next() (ok bool) { + s.currSeries, ok = <-s.recvCh + return ok } func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 0e5af9492f..cdc3ac7728 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -5,6 +5,7 @@ package store import ( "context" + "fmt" "io" "math" "os" @@ -49,6 +50,7 @@ func (c *testClient) String() string { func (c *testClient) Addr() string { return "testaddr" } + func TestProxyStore_Info(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() @@ -412,32 +414,6 @@ func TestProxyStore_Series(t *testing.T) { }, expectedErr: errors.New("fetch series for [name:\"ext\" value:\"1\" ] test: error!"), }, - { - title: "use no chunk to only get labels", - storeAPIs: []Client{ - &testClient{ - StoreClient: &mockedStoreAPI{ - RespSeries: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("a", "a")), - }, - }, - minTime: 1, - maxTime: 300, - labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, - }, - }, - req: &storepb.SeriesRequest{ - MinTime: 1, - MaxTime: 300, - Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, - SkipChunks: true, - }, - expectedSeries: []rawSeries{ - { - lset: []storepb.Label{{Name: "a", Value: "a"}}, - }, - }, - }, } { if ok := t.Run(tc.title, func(t *testing.T) { @@ -488,8 +464,54 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { expectedWarningsLen int }{ { - title: "partial response disabled one thanos query is slow to respond", + title: "partial response disabled; 1st store is slow, 2nd store is fast;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response disabled; 1st store is fast, 2nd store is slow;", storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, &testClient{ StoreClient: &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ @@ -502,6 +524,33 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { minTime: 1, maxTime: 300, }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response disabled; 1st store is slow on 2nd series, 2nd store is fast;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{3, 1}, {4, 2}, {5, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{6, 1}, {7, 2}, {8, 3}}), + }, + RespDuration: 10 * time.Second, + SlowSeriesIndex: 2, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, &testClient{ StoreClient: &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ @@ -523,7 +572,125 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), }, { - title: "partial response enabled one thanos query is slow to respond", + title: "partial response disabled; 1st store is fast to respond, 2nd store is slow on 2nd series;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{3, 1}, {4, 2}, {5, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{6, 1}, {7, 2}, {8, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + SlowSeriesIndex: 2, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response enabled; 1st store is slow to respond, 2nd store is fast;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "b", Value: "c"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 2, + }, + { + title: "partial response enabled; 1st store is fast, 2nd store is slow;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 2, + }, + { + title: "partial response enabled; 1st store is fast, 2-3 is slow, 4th is fast;", storeAPIs: []Client{ &testClient{ StoreClient: &mockedStoreAPI{ @@ -548,6 +715,154 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { minTime: 1, maxTime: 300, }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("c", "d"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("d", "f"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + { + lset: []storepb.Label{{Name: "d", Value: "f"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 4, + }, + { + title: "partial response enabled; 1st store is slow on 2nd series, 2nd store is fast", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 10 * time.Second, + SlowSeriesIndex: 2, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + { + lset: []storepb.Label{{Name: "b", Value: "c"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 3, + }, + { + title: "partial response disabled; all stores respond 3s", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 3 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedErr: errors.New("test: failed to receive any data from test: context deadline exceeded"), + }, + { + title: "partial response enabled; all stores respond 3s", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 3 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 3 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, }, req: &storepb.SeriesRequest{ MinTime: 1, @@ -559,6 +874,10 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { lset: []storepb.Label{{Name: "a", Value: "b"}}, chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, }, + { + lset: []storepb.Label{{Name: "b", Value: "c"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, }, expectedWarningsLen: 2, }, @@ -572,9 +891,13 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { 4*time.Second, ) - s := newStoreSeriesServer(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s := newStoreSeriesServer(ctx) + t0 := time.Now() err := q.Series(tc.req, s) + elapsedTime := time.Since(t0) if tc.expectedErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tc.expectedErr.Error(), err.Error()) @@ -585,6 +908,8 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { seriesEquals(t, tc.expectedSeries, s.SeriesSet) testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings) + + testutil.Assert(t, elapsedTime < 5010*time.Millisecond, fmt.Sprintf("Request has taken %f, expected: <%d, it seems that responseTimeout doesn't work properly.", elapsedTime.Seconds(), 5)) }); !ok { return } @@ -1015,6 +1340,8 @@ type mockedStoreAPI struct { RespLabelNames *storepb.LabelNamesResponse RespError error RespDuration time.Duration + // Index of series in store to slow response. + SlowSeriesIndex int LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest @@ -1028,7 +1355,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ . func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { s.LastSeriesReq = req - return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration}, s.RespError + return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -1047,14 +1374,17 @@ func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValu type StoreSeriesClient struct { // This field just exist to pseudo-implement the unused methods of the interface. storepb.Store_SeriesClient - ctx context.Context - i int - respSet []*storepb.SeriesResponse - respDur time.Duration + ctx context.Context + i int + respSet []*storepb.SeriesResponse + respDur time.Duration + slowSeriesIndex int } func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { - time.Sleep(c.respDur) + if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) { + time.Sleep(c.respDur) + } if c.i >= len(c.respSet) { return nil, io.EOF