Skip to content

Commit

Permalink
query: fix retry query case.
Browse files Browse the repository at this point in the history
In case the backend is very unstable and times out the batch we
need to make sure ongoing queryJobs are droped and already
registered queryJobs are removed from the heap as well.
  • Loading branch information
ziggie1984 committed Apr 23, 2024
1 parent 43f5a58 commit 63e861a
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 56 deletions.
1 change: 1 addition & 0 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func defaultQueryOptions() *queryOptions {
timeout: defaultQueryTimeout,
encoding: defaultQueryEncoding,
numRetries: defaultNumRetries,
cancelChan: make(chan struct{}),
}
}

Expand Down
28 changes: 23 additions & 5 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ var (
// queryJob is the internal struct that wraps the Query to work on, in
// addition to some information about the query.
type queryJob struct {
tries uint8
index uint64
timeout time.Duration
encoding wire.MessageEncoding
cancelChan <-chan struct{}
tries uint8
index uint64
timeout time.Duration
encoding wire.MessageEncoding
cancelChan <-chan struct{}
internalCancelChan <-chan struct{}
*Request
}

Expand Down Expand Up @@ -128,6 +129,16 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
// result will be sent back.
break

case <-job.internalCancelChan:
log.Tracef("Worker %v found job with index %v "+
"already internally canceled (batch timed out)",
peer.Addr(), job.Index())

// We break to the below loop, where we'll check the
// internal cancel channel again and the ErrJobCanceled
// result will be sent back.
break

// We received a non-canceled query job, send it to the peer.
default:
log.Tracef("Worker %v queuing job %T with index %v",
Expand Down Expand Up @@ -214,6 +225,13 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
jobErr = ErrJobCanceled
break Loop

case <-job.internalCancelChan:
log.Tracef("Worker %v job %v internally "+
"canceled", peer.Addr(), job.Index())

jobErr = ErrJobCanceled
break Loop

case <-quit:
return
}
Expand Down
127 changes: 76 additions & 51 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (w *peerWorkManager) workDispatcher() {
timeout <-chan time.Time
rem int
errChan chan error
cancelChan chan struct{}
}

// We set up a batch index counter to keep track of batches that still
Expand Down Expand Up @@ -309,7 +310,20 @@ Loop:
// turns out to be an error.
batchNum := currentQueries[result.job.index]
delete(currentQueries, result.job.index)
batch := currentBatches[batchNum]

// In case the batch is already canceled we return
// early.
batch, ok := currentBatches[batchNum]
if !ok {
log.Warnf("Query(%d) result from peer %v "+
"discarded with retries %d, because "+
"batch already canceled: %v",
result.job.index,
result.peer.Addr(),
result.job.tries, result.err)

continue Loop
}

switch {
// If the query ended because it was canceled, drop it.
Expand All @@ -322,30 +336,34 @@ Loop:
// was canceled, forward the error on the
// batch's error channel. We do this since a
// cancellation applies to the whole batch.
if batch != nil {
batch.errChan <- result.err
delete(currentBatches, batchNum)
batch.errChan <- result.err
delete(currentBatches, batchNum)

log.Debugf("Canceled batch %v",
batchNum)
continue Loop
}
log.Debugf("Canceled batch %v", batchNum)
continue Loop

// If the query ended with any other error, put it back
// into the work queue if it has not reached the
// maximum number of retries.
case result.err != nil:
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(
result.peer.Addr(),
)
} else {
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
}

if batch != nil && !batch.noRetryMax {
if !batch.noRetryMax {
result.job.tries++
}

// Check if this query has reached its maximum
// number of retries. If so, remove it from the
// batch and don't reschedule it.
if batch != nil && !batch.noRetryMax &&
if !batch.noRetryMax &&
result.job.tries >= batch.maxRetries {

log.Warnf("Query(%d) from peer %v "+
Expand Down Expand Up @@ -380,11 +398,6 @@ Loop:
result.job.timeout = newTimeout
}

// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(result.peer.Addr())
}

heap.Push(work, result.job)
currentQueries[result.job.index] = batchNum

Expand All @@ -396,42 +409,47 @@ Loop:

// Decrement the number of queries remaining in
// the batch.
if batch != nil {
batch.rem--
log.Tracef("Remaining jobs for batch "+
"%v: %v ", batchNum, batch.rem)

// If this was the last query in flight
// for this batch, we can notify that
// it finished, and delete it.
if batch.rem == 0 {
batch.errChan <- nil
delete(currentBatches, batchNum)

log.Tracef("Batch %v done",
batchNum)
continue Loop
}
batch.rem--
log.Tracef("Remaining jobs for batch "+
"%v: %v ", batchNum, batch.rem)

// If this was the last query in flight
// for this batch, we can notify that
// it finished, and delete it.
if batch.rem == 0 {
batch.errChan <- nil
delete(currentBatches, batchNum)

log.Tracef("Batch %v done",
batchNum)
continue Loop
}
}

// If the total timeout for this batch has passed,
// return an error.
if batch != nil {
select {
case <-batch.timeout:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)
select {
case <-batch.timeout:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)

// When deleting the particular batch
// number we need to make sure to cancel
// all queued and ongoing queryJobs
// to not waste resources when the batch
// call is already canceled.
if batch.cancelChan != nil {
close(batch.cancelChan)
}

log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)
log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)

log.Debugf("Batch %v timed out",
batchNum)
log.Warnf("Batch %v timed out",
batchNum)

default:
}
default:
}

// A new batch of queries where scheduled.
Expand All @@ -442,13 +460,17 @@ Loop:
log.Debugf("Adding new batch(%d) of %d queries to "+
"work queue", batchIndex, len(batch.requests))

// Internal cancel channel of a batch request.
cancelChan := make(chan struct{})

for _, q := range batch.requests {
heap.Push(work, &queryJob{
index: queryIndex,
timeout: minQueryTimeout,
encoding: batch.options.encoding,
cancelChan: batch.options.cancelChan,
Request: q,
index: queryIndex,
timeout: minQueryTimeout,
encoding: batch.options.encoding,
cancelChan: batch.options.cancelChan,
internalCancelChan: cancelChan,
Request: q,
})
currentQueries[queryIndex] = batchIndex
queryIndex++
Expand All @@ -457,9 +479,12 @@ Loop:
currentBatches[batchIndex] = &batchProgress{
noRetryMax: batch.options.noRetryMax,
maxRetries: batch.options.numRetries,
timeout: time.After(batch.options.timeout),
timeout: time.After(
batch.options.timeout,
),
rem: len(batch.requests),
errChan: batch.errChan,
cancelChan: cancelChan,
}
batchIndex++

Expand Down

0 comments on commit 63e861a

Please sign in to comment.