Skip to content

Commit

Permalink
query: Improve store response timeouts (#1789)
Browse files Browse the repository at this point in the history
* Improve proxyStore timeouts.

Signed-off-by: Aleskey Sin <[email protected]>

* Fix send to closed channel.

Signed-off-by: Aleskey Sin <[email protected]>

* Update for PR.

Signed-off-by: Aleskey Sin <[email protected]>

* Fix recv done channel.

Signed-off-by: Aleskey Sin <[email protected]>

* PR fixes.

Signed-off-by: Aleskey Sin <[email protected]>
  • Loading branch information
IKSIN authored Feb 18, 2020
1 parent 2e3ece1 commit a354bfb
Show file tree
Hide file tree
Showing 2 changed files with 429 additions and 93 deletions.
122 changes: 64 additions & 58 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,7 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb
}

func (s ctxRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
return
case s.ch <- r:
return
}
s.ch <- r
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
Expand Down Expand Up @@ -348,6 +343,21 @@ type streamSeriesSet struct {
closeSeries context.CancelFunc
}

type recvResponse struct {
r *storepb.SeriesResponse
err error
}

func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) {
frameTimeoutCtx := context.Background()
var cancel context.CancelFunc
if responseTimeout != 0 {
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout)
return frameTimeoutCtx, cancel
}
return frameTimeoutCtx, func() {}
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
Expand Down Expand Up @@ -383,78 +393,74 @@ func startStreamSeriesSet(
emptyStreamResponses.Inc()
}
}()
for {
r, err := s.stream.Recv()

if err == io.EOF {
return
}

if err != nil {
wrapErr := errors.Wrapf(err, "receive series from %s", s.name)
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr))
rCh := make(chan *recvResponse)
done := make(chan struct{})
go func() {
for {
r, err := s.stream.Recv()
select {
case <-done:
close(rCh)
return
case rCh <- &recvResponse{r: r, err: err}:
}
}
}()
for {
frameTimeoutCtx, cancel := frameCtx(s.responseTimeout)
defer cancel()
var rr *recvResponse
select {
case <-ctx.Done():
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done)
return
case <-frameTimeoutCtx.Done():
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data in %s from %s", s.responseTimeout.String(), s.name), done)
return
case rr = <-rCh:
}

s.errMtx.Lock()
s.err = wrapErr
s.errMtx.Unlock()
if rr.err == io.EOF {
close(done)
return
}

if rr.err != nil {
wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name)
s.handleErr(wrapErr, done)
return
}
numResponses++

if w := r.GetWarning(); w != "" {
if w := rr.r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}

select {
case s.recvCh <- r.GetSeries():
continue
case <-ctx.Done():
return
}

s.recvCh <- rr.r.GetSeries()
}
}()
return s
}

// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
ctx := s.ctx
timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name)

if s.responseTimeout != 0 {
timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)
func (s *streamSeriesSet) handleErr(err error, done chan struct{}) {
defer close(done)
s.closeSeries()

timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout)
defer done()
ctx = timeoutCtx
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
}

select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
// closeSeries to shutdown a goroutine in startStreamSeriesSet.
s.closeSeries()

err := errors.Wrap(ctx.Err(), timeoutMsg)
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return false
}
// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
s.currSeries, ok = <-s.recvCh
return ok
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
Loading

0 comments on commit a354bfb

Please sign in to comment.