Skip to content

Commit

Permalink
store: proxy: fix queries never timing out bug (#2411) (#2443)
Browse files Browse the repository at this point in the history
* store: proxy: add test for deadlocking problem

Signed-off-by: Giedrius Statkevičius <[email protected]>

* store: proxy: add fix for timeouts

Checking here if the series context has ended is the correct fix here.
We want to check it because if any of the other Series() calls error out
then the context is canceled. So, it is equal to checking for errors
"downstream", in `mergedSeriesSet`.

Also, `handleErr()` here is the correct function to use because in such
a case we want to set `s.err` -- if `io.EOF` still hasn't been received
then it means that StoreAPI still has some data that it wants to send
but hasn't yet.

With this, the previously added test passes.

Signed-off-by: Giedrius Statkevičius <[email protected]>

Co-authored-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
squat and GiedriusS authored Apr 15, 2020
1 parent 69f9c4e commit c983b0b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Fixed

- [#2411](https://github.com/thanos-io/thanos/pull/2411) Query: fix a bug where queries might not time out sometimes due to issues with one or more StoreAPIs

## [v0.12.0](https://github.com/thanos-io/thanos/releases/tag/v0.12.0) - 2020.04.15

### Fixed
Expand Down
7 changes: 6 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,12 @@ func startStreamSeriesSet(
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}
s.recvCh <- rr.r.GetSeries()
select {
case s.recvCh <- rr.r.GetSeries():
case <-ctx.Done():
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done)
return
}
}
}()
return s
Expand Down
66 changes: 65 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,60 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
expectedErr error
expectedWarningsLen int
}{
{
title: "partial response disabled; 1st errors out after some delay; 2nd store is fast",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 2 * time.Second,
SlowSeriesIndex: 1,
injectedError: errors.New("test"),
injectedErrorIndex: 1,
},
labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}},
minTime: 1,
maxTime: 300,
},
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),

storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),

storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
},
},
labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}},
minTime: 1,
maxTime: 300,
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseDisabled: true,
},
expectedErr: errors.New("test: receive series from test: test"),
},
{
title: "partial response disabled; 1st store is slow, 2nd store is fast;",
storeAPIs: []Client{
Expand Down Expand Up @@ -1349,6 +1403,10 @@ type mockedStoreAPI struct {
LastSeriesReq *storepb.SeriesRequest
LastLabelValuesReq *storepb.LabelValuesRequest
LastLabelNamesReq *storepb.LabelNamesRequest

// injectedError will be injected into Recv() if not nil.
injectedError error
injectedErrorIndex int
}

func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) {
Expand All @@ -1358,7 +1416,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ .
func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) {
s.LastSeriesReq = req

return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError
return &StoreSeriesClient{injectedErrorIndex: s.injectedErrorIndex, injectedError: s.injectedError, ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError
}

func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
Expand All @@ -1382,12 +1440,18 @@ type StoreSeriesClient struct {
respSet []*storepb.SeriesResponse
respDur time.Duration
slowSeriesIndex int

injectedError error
injectedErrorIndex int
}

func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) {
if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) {
time.Sleep(c.respDur)
}
if c.injectedError != nil && (c.injectedErrorIndex == c.i || c.injectedErrorIndex == 0) {
return nil, c.injectedError
}

if c.i >= len(c.respSet) {
return nil, io.EOF
Expand Down

0 comments on commit c983b0b

Please sign in to comment.