From 646380a63745a3712c9e1731f90d4e8666b0fa0e Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Mon, 2 Dec 2024 12:07:04 +0900 Subject: [PATCH] query: refactor workDispatcher workDispatcher was too long and was hard to read. This change refactors each of the code that handles separate cases so that it's manageable to see what's going on. --- query/workmanager.go | 519 ++++++++++++++++++++++--------------------- 1 file changed, 264 insertions(+), 255 deletions(-) diff --git a/query/workmanager.go b/query/workmanager.go index 9d6317fb..01599dc2 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -76,9 +76,9 @@ type PeerRanking interface { // we have given to it. // TODO(halseth): support more than one active job at a time. type activeWorker struct { - w Worker + w Worker activeJobs map[uint64]*queryJob - onExit chan struct{} + onExit chan struct{} } // Config holds the configuration options for a new WorkManager. @@ -114,6 +114,15 @@ type peerWorkManager struct { // workers will be sent. jobResults chan *jobResult + workQueue *workQueue + workers map[string]*activeWorker + + batchIndex uint64 + currentBatches map[uint64]*batchProgress + + queryIndex uint64 + currentQueries map[uint64]uint64 + quit chan struct{} wg sync.WaitGroup } @@ -124,11 +133,18 @@ var _ WorkManager = (*peerWorkManager)(nil) // NewWorkManager returns a new WorkManager with the regular worker // implementation. func NewWorkManager(cfg *Config) WorkManager { + work := &workQueue{} + heap.Init(work) + return &peerWorkManager{ - cfg: cfg, - newBatches: make(chan *batch), - jobResults: make(chan *jobResult), - quit: make(chan struct{}), + cfg: cfg, + newBatches: make(chan *batch), + jobResults: make(chan *jobResult), + workQueue: work, + workers: make(map[string]*activeWorker), + currentBatches: make(map[uint64]*batchProgress), + currentQueries: make(map[uint64]uint64), + quit: make(chan struct{}), } } @@ -152,6 +168,238 @@ func (w *peerWorkManager) Stop() error { return nil } +func (w *peerWorkManager) handlePeerConnected(peer Peer) { + log.Debugf("Starting worker for peer %v", + peer.Addr()) + + r := w.cfg.NewWorker(peer) + + // We'll create a channel that will close after the + // worker's Run method returns, to know when we can + // remove it from our set of active workers. + onExit := make(chan struct{}) + w.workers[peer.Addr()] = &activeWorker{ + w: r, + activeJobs: make(map[uint64]*queryJob), + onExit: onExit, + } + + w.cfg.Ranking.AddPeer(peer.Addr()) + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer close(onExit) + + r.Run(w.jobResults, w.quit) + }() +} + +func (w *peerWorkManager) handleJobResult(result *jobResult) { + log.Tracef("Result for job %v received from peer %v "+ + "(err=%v)", result.job.index, + result.peer.Addr(), result.err) + + // Delete the job from the worker's active job, such + // that the slot gets opened for more work. + r := w.workers[result.peer.Addr()] + delete(r.activeJobs, result.job.Index()) + + // Get the index of this query's batch, and delete it + // from the map of current queries, since we don't have + // to track it anymore. We'll add it back if the result + // turns out to be an error. + batchNum := w.currentQueries[result.job.index] + delete(w.currentQueries, result.job.index) + + // In case the batch is already canceled we return + // early. + batch, ok := w.currentBatches[batchNum] + if !ok { + log.Warnf("Query(%d) result from peer %v "+ + "discarded with retries %d, because "+ + "batch already canceled: %v", + result.job.index, + result.peer.Addr(), + result.job.tries, result.err) + + return + } + + switch { + // If the query ended because it was canceled, drop it. + case result.err == ErrJobCanceled: + log.Tracef("Query(%d) was canceled before "+ + "result was available from peer %v", + result.job.index, result.peer.Addr()) + + // If this is the first job in this batch that + // was canceled, forward the error on the + // batch's error channel. We do this since a + // cancellation applies to the whole batch. + batch.errChan <- result.err + delete(w.currentBatches, batchNum) + + log.Debugf("Canceled batch %v", batchNum) + return + + // If the query ended with any other error, put it back + // into the work queue if it has not reached the + // maximum number of retries. + case result.err != nil: + // Refresh peer rank on disconnect. + if result.err == ErrPeerDisconnected { + w.cfg.Ranking.ResetRanking( + result.peer.Addr(), + ) + } else { + // Punish the peer for the failed query. + w.cfg.Ranking.Punish(result.peer.Addr()) + } + + if !batch.noRetryMax { + result.job.tries++ + } + + // Check if this query has reached its maximum + // number of retries. If so, remove it from the + // batch and don't reschedule it. + if !batch.noRetryMax && + result.job.tries >= batch.maxRetries { + + log.Warnf("Query(%d) from peer %v "+ + "failed and reached maximum "+ + "number of retries, not "+ + "rescheduling: %v", + result.job.index, + result.peer.Addr(), result.err) + + // Return the error and cancel the + // batch. + batch.errChan <- result.err + delete(w.currentBatches, batchNum) + + log.Debugf("Canceled batch %v", + batchNum) + + return + } + + log.Warnf("Query(%d) from peer %v failed, "+ + "rescheduling: %v", result.job.index, + result.peer.Addr(), result.err) + + // If it was a timeout, we dynamically increase + // it for the next attempt. + if result.err == ErrQueryTimeout { + newTimeout := result.job.timeout * 2 + if newTimeout > maxQueryTimeout { + newTimeout = maxQueryTimeout + } + result.job.timeout = newTimeout + } + + heap.Push(w.workQueue, result.job) + w.currentQueries[result.job.index] = batchNum + + // Otherwise, we got a successful result and update the + // status of the batch this query is a part of. + default: + // Reward the peer for the successful query. + w.cfg.Ranking.Reward(result.peer.Addr()) + + // Decrement the number of queries remaining in + // the batch. + batch.rem-- + log.Tracef("Remaining jobs for batch "+ + "%v: %v ", batchNum, batch.rem) + + // If this was the last query in flight + // for this batch, we can notify that + // it finished, and delete it. + if batch.rem == 0 { + batch.errChan <- nil + delete(w.currentBatches, batchNum) + + log.Tracef("Batch %v done", + batchNum) + return + } + } + + // If the total timeout for this batch has passed, + // return an error. + select { + case <-batch.timeout: + batch.errChan <- ErrQueryTimeout + delete(w.currentBatches, batchNum) + + // When deleting the particular batch + // number we need to make sure to cancel + // all queued and ongoing queryJobs + // to not waste resources when the batch + // call is already canceled. + if batch.cancelChan != nil { + close(batch.cancelChan) + } + + log.Warnf("Query(%d) failed with "+ + "error: %v. Timing out.", + result.job.index, result.err) + + log.Warnf("Batch %v timed out", + batchNum) + + default: + } + +} + +type batchProgress struct { + noRetryMax bool + maxRetries uint8 + timeout <-chan time.Time + rem int + errChan chan error + cancelChan chan struct{} +} + +func (w *peerWorkManager) handleNewBatches(batch *batch) { + // Add all new queries in the batch to our work queue, + // with priority given by the order they were + // scheduled. + log.Debugf("Adding new batch(%d) of %d queries to "+ + "work queue", w.batchIndex, len(batch.requests)) + + // Internal cancel channel of a batch request. + cancelChan := make(chan struct{}) + + for _, q := range batch.requests { + heap.Push(w.workQueue, &queryJob{ + index: w.queryIndex, + timeout: minQueryTimeout, + encoding: batch.options.encoding, + cancelChan: batch.options.cancelChan, + internalCancelChan: cancelChan, + Request: q, + }) + w.currentQueries[w.queryIndex] = w.batchIndex + w.queryIndex++ + } + + w.currentBatches[w.batchIndex] = &batchProgress{ + noRetryMax: batch.options.noRetryMax, + maxRetries: batch.options.numRetries, + timeout: time.After( + batch.options.timeout, + ), + rem: len(batch.requests), + errChan: batch.errChan, + cancelChan: cancelChan, + } + w.batchIndex++ +} + // workDispatcher receives batches of queries to be performed from external // callers, and dispatches these to active workers. It makes sure to // prioritize the queries in the order they come in, such that early queries @@ -171,53 +419,24 @@ func (w *peerWorkManager) workDispatcher() { } defer cancel() - // Init a work queue which will be used to sort the incoming queries in - // a first come first served fashion. We use a heap structure such - // that we can efficiently put failed queries back in the queue. - work := &workQueue{} - heap.Init(work) - - type batchProgress struct { - noRetryMax bool - maxRetries uint8 - timeout <-chan time.Time - rem int - errChan chan error - cancelChan chan struct{} - } - - // We set up a batch index counter to keep track of batches that still - // have queries in flight. This lets us track when all queries for a - // batch have been finished, and return an (non-)error to the caller. - batchIndex := uint64(0) - currentBatches := make(map[uint64]*batchProgress) - // When the work dispatcher exits, we'll loop through the remaining // batches and send on their error channel. defer func() { - for _, b := range currentBatches { + for _, b := range w.currentBatches { b.errChan <- ErrWorkManagerShuttingDown } }() - // We set up a counter that we'll increase with each incoming query, - // and will serve as the priority of each. In addition we map each - // query to the batch they are part of. - queryIndex := uint64(0) - currentQueries := make(map[uint64]uint64) - - workers := make(map[string]*activeWorker) - Loop: for { // If the work queue is non-empty, we'll take out the first // element in order to distribute it to a worker. - if work.Len() > 0 { - next := work.Peek().(*queryJob) + if w.workQueue.Len() > 0 { + next := w.workQueue.Peek().(*queryJob) // Find the peers with free work slots available. var freeWorkers []string - for p, r := range workers { + for p, r := range w.workers { // Only one active job at a time is currently // supported. if len(r.activeJobs) >= 1 { @@ -233,7 +452,7 @@ Loop: // Give the job to the highest ranked peer with free // slots available. for _, p := range freeWorkers { - r := workers[p] + r := w.workers[p] // The worker has free work slots, it should // pick up the query. @@ -241,7 +460,7 @@ Loop: case r.w.NewJob() <- next: log.Tracef("Sent job %v to worker %v", next.Index(), p) - heap.Pop(work) + heap.Pop(w.workQueue) r.activeJobs[next.Index()] = next // Go back to start of loop, to check @@ -251,7 +470,7 @@ Loop: // Remove workers no longer active. case <-r.onExit: - delete(workers, p) + delete(w.workers, p) continue case <-w.quit: @@ -268,225 +487,15 @@ Loop: // Spin up a goroutine that runs a worker each time a peer // connects. case peer := <-peersConnected: - log.Debugf("Starting worker for peer %v", - peer.Addr()) - - r := w.cfg.NewWorker(peer) - - // We'll create a channel that will close after the - // worker's Run method returns, to know when we can - // remove it from our set of active workers. - onExit := make(chan struct{}) - workers[peer.Addr()] = &activeWorker{ - w: r, - activeJobs: make(map[uint64]*queryJob), - onExit: onExit, - } - - w.cfg.Ranking.AddPeer(peer.Addr()) - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer close(onExit) - - r.Run(w.jobResults, w.quit) - }() + w.handlePeerConnected(peer) // A new result came back. case result := <-w.jobResults: - log.Tracef("Result for job %v received from peer %v "+ - "(err=%v)", result.job.index, - result.peer.Addr(), result.err) - - // Delete the job from the worker's active job, such - // that the slot gets opened for more work. - r := workers[result.peer.Addr()] - delete(r.activeJobs, result.job.Index()) - - // Get the index of this query's batch, and delete it - // from the map of current queries, since we don't have - // to track it anymore. We'll add it back if the result - // turns out to be an error. - batchNum := currentQueries[result.job.index] - delete(currentQueries, result.job.index) - - // In case the batch is already canceled we return - // early. - batch, ok := currentBatches[batchNum] - if !ok { - log.Warnf("Query(%d) result from peer %v "+ - "discarded with retries %d, because "+ - "batch already canceled: %v", - result.job.index, - result.peer.Addr(), - result.job.tries, result.err) - - continue Loop - } - - switch { - // If the query ended because it was canceled, drop it. - case result.err == ErrJobCanceled: - log.Tracef("Query(%d) was canceled before "+ - "result was available from peer %v", - result.job.index, result.peer.Addr()) - - // If this is the first job in this batch that - // was canceled, forward the error on the - // batch's error channel. We do this since a - // cancellation applies to the whole batch. - batch.errChan <- result.err - delete(currentBatches, batchNum) - - log.Debugf("Canceled batch %v", batchNum) - continue Loop - - // If the query ended with any other error, put it back - // into the work queue if it has not reached the - // maximum number of retries. - case result.err != nil: - // Refresh peer rank on disconnect. - if result.err == ErrPeerDisconnected { - w.cfg.Ranking.ResetRanking( - result.peer.Addr(), - ) - } else { - // Punish the peer for the failed query. - w.cfg.Ranking.Punish(result.peer.Addr()) - } - - if !batch.noRetryMax { - result.job.tries++ - } - - // Check if this query has reached its maximum - // number of retries. If so, remove it from the - // batch and don't reschedule it. - if !batch.noRetryMax && - result.job.tries >= batch.maxRetries { - - log.Warnf("Query(%d) from peer %v "+ - "failed and reached maximum "+ - "number of retries, not "+ - "rescheduling: %v", - result.job.index, - result.peer.Addr(), result.err) - - // Return the error and cancel the - // batch. - batch.errChan <- result.err - delete(currentBatches, batchNum) - - log.Debugf("Canceled batch %v", - batchNum) - - continue Loop - } - - log.Warnf("Query(%d) from peer %v failed, "+ - "rescheduling: %v", result.job.index, - result.peer.Addr(), result.err) - - // If it was a timeout, we dynamically increase - // it for the next attempt. - if result.err == ErrQueryTimeout { - newTimeout := result.job.timeout * 2 - if newTimeout > maxQueryTimeout { - newTimeout = maxQueryTimeout - } - result.job.timeout = newTimeout - } - - heap.Push(work, result.job) - currentQueries[result.job.index] = batchNum - - // Otherwise, we got a successful result and update the - // status of the batch this query is a part of. - default: - // Reward the peer for the successful query. - w.cfg.Ranking.Reward(result.peer.Addr()) - - // Decrement the number of queries remaining in - // the batch. - batch.rem-- - log.Tracef("Remaining jobs for batch "+ - "%v: %v ", batchNum, batch.rem) - - // If this was the last query in flight - // for this batch, we can notify that - // it finished, and delete it. - if batch.rem == 0 { - batch.errChan <- nil - delete(currentBatches, batchNum) - - log.Tracef("Batch %v done", - batchNum) - continue Loop - } - } - - // If the total timeout for this batch has passed, - // return an error. - select { - case <-batch.timeout: - batch.errChan <- ErrQueryTimeout - delete(currentBatches, batchNum) - - // When deleting the particular batch - // number we need to make sure to cancel - // all queued and ongoing queryJobs - // to not waste resources when the batch - // call is already canceled. - if batch.cancelChan != nil { - close(batch.cancelChan) - } - - log.Warnf("Query(%d) failed with "+ - "error: %v. Timing out.", - result.job.index, result.err) - - log.Warnf("Batch %v timed out", - batchNum) - - default: - } + w.handleJobResult(result) // A new batch of queries where scheduled. case batch := <-w.newBatches: - // Add all new queries in the batch to our work queue, - // with priority given by the order they were - // scheduled. - log.Debugf("Adding new batch(%d) of %d queries to "+ - "work queue", batchIndex, len(batch.requests)) - - // Internal cancel channel of a batch request. - cancelChan := make(chan struct{}) - - for _, q := range batch.requests { - heap.Push(work, &queryJob{ - index: queryIndex, - timeout: minQueryTimeout, - encoding: batch.options.encoding, - cancelChan: batch.options.cancelChan, - internalCancelChan: cancelChan, - Request: q, - }) - currentQueries[queryIndex] = batchIndex - queryIndex++ - } - - currentBatches[batchIndex] = &batchProgress{ - noRetryMax: batch.options.noRetryMax, - maxRetries: batch.options.numRetries, - timeout: time.After( - batch.options.timeout, - ), - rem: len(batch.requests), - errChan: batch.errChan, - cancelChan: cancelChan, - } - batchIndex++ + w.handleNewBatches(batch) case <-w.quit: return