diff --git a/query/workmanager.go b/query/workmanager.go index 9d6317fb..01599dc2 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -76,9 +76,9 @@ type PeerRanking interface { // we have given to it. // TODO(halseth): support more than one active job at a time. type activeWorker struct { - w Worker + w Worker activeJobs map[uint64]*queryJob - onExit chan struct{} + onExit chan struct{} } // Config holds the configuration options for a new WorkManager. @@ -114,6 +114,15 @@ type peerWorkManager struct { // workers will be sent. jobResults chan *jobResult + workQueue *workQueue + workers map[string]*activeWorker + + batchIndex uint64 + currentBatches map[uint64]*batchProgress + + queryIndex uint64 + currentQueries map[uint64]uint64 + quit chan struct{} wg sync.WaitGroup } @@ -124,11 +133,18 @@ var _ WorkManager = (*peerWorkManager)(nil) // NewWorkManager returns a new WorkManager with the regular worker // implementation. func NewWorkManager(cfg *Config) WorkManager { + work := &workQueue{} + heap.Init(work) + return &peerWorkManager{ - cfg: cfg, - newBatches: make(chan *batch), - jobResults: make(chan *jobResult), - quit: make(chan struct{}), + cfg: cfg, + newBatches: make(chan *batch), + jobResults: make(chan *jobResult), + workQueue: work, + workers: make(map[string]*activeWorker), + currentBatches: make(map[uint64]*batchProgress), + currentQueries: make(map[uint64]uint64), + quit: make(chan struct{}), } } @@ -152,6 +168,238 @@ func (w *peerWorkManager) Stop() error { return nil } +func (w *peerWorkManager) handlePeerConnected(peer Peer) { + log.Debugf("Starting worker for peer %v", + peer.Addr()) + + r := w.cfg.NewWorker(peer) + + // We'll create a channel that will close after the + // worker's Run method returns, to know when we can + // remove it from our set of active workers. + onExit := make(chan struct{}) + w.workers[peer.Addr()] = &activeWorker{ + w: r, + activeJobs: make(map[uint64]*queryJob), + onExit: onExit, + } + + w.cfg.Ranking.AddPeer(peer.Addr()) + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer close(onExit) + + r.Run(w.jobResults, w.quit) + }() +} + +func (w *peerWorkManager) handleJobResult(result *jobResult) { + log.Tracef("Result for job %v received from peer %v "+ + "(err=%v)", result.job.index, + result.peer.Addr(), result.err) + + // Delete the job from the worker's active job, such + // that the slot gets opened for more work. + r := w.workers[result.peer.Addr()] + delete(r.activeJobs, result.job.Index()) + + // Get the index of this query's batch, and delete it + // from the map of current queries, since we don't have + // to track it anymore. We'll add it back if the result + // turns out to be an error. + batchNum := w.currentQueries[result.job.index] + delete(w.currentQueries, result.job.index) + + // In case the batch is already canceled we return + // early. + batch, ok := w.currentBatches[batchNum] + if !ok { + log.Warnf("Query(%d) result from peer %v "+ + "discarded with retries %d, because "+ + "batch already canceled: %v", + result.job.index, + result.peer.Addr(), + result.job.tries, result.err) + + return + } + + switch { + // If the query ended because it was canceled, drop it. + case result.err == ErrJobCanceled: + log.Tracef("Query(%d) was canceled before "+ + "result was available from peer %v", + result.job.index, result.peer.Addr()) + + // If this is the first job in this batch that + // was canceled, forward the error on the + // batch's error channel. We do this since a + // cancellation applies to the whole batch. + batch.errChan <- result.err + delete(w.currentBatches, batchNum) + + log.Debugf("Canceled batch %v", batchNum) + return + + // If the query ended with any other error, put it back + // into the work queue if it has not reached the + // maximum number of retries. + case result.err != nil: + // Refresh peer rank on disconnect. + if result.err == ErrPeerDisconnected { + w.cfg.Ranking.ResetRanking( + result.peer.Addr(), + ) + } else { + // Punish the peer for the failed query. + w.cfg.Ranking.Punish(result.peer.Addr()) + } + + if !batch.noRetryMax { + result.job.tries++ + } + + // Check if this query has reached its maximum + // number of retries. If so, remove it from the + // batch and don't reschedule it. + if !batch.noRetryMax && + result.job.tries >= batch.maxRetries { + + log.Warnf("Query(%d) from peer %v "+ + "failed and reached maximum "+ + "number of retries, not "+ + "rescheduling: %v", + result.job.index, + result.peer.Addr(), result.err) + + // Return the error and cancel the + // batch. + batch.errChan <- result.err + delete(w.currentBatches, batchNum) + + log.Debugf("Canceled batch %v", + batchNum) + + return + } + + log.Warnf("Query(%d) from peer %v failed, "+ + "rescheduling: %v", result.job.index, + result.peer.Addr(), result.err) + + // If it was a timeout, we dynamically increase + // it for the next attempt. + if result.err == ErrQueryTimeout { + newTimeout := result.job.timeout * 2 + if newTimeout > maxQueryTimeout { + newTimeout = maxQueryTimeout + } + result.job.timeout = newTimeout + } + + heap.Push(w.workQueue, result.job) + w.currentQueries[result.job.index] = batchNum + + // Otherwise, we got a successful result and update the + // status of the batch this query is a part of. + default: + // Reward the peer for the successful query. + w.cfg.Ranking.Reward(result.peer.Addr()) + + // Decrement the number of queries remaining in + // the batch. + batch.rem-- + log.Tracef("Remaining jobs for batch "+ + "%v: %v ", batchNum, batch.rem) + + // If this was the last query in flight + // for this batch, we can notify that + // it finished, and delete it. + if batch.rem == 0 { + batch.errChan <- nil + delete(w.currentBatches, batchNum) + + log.Tracef("Batch %v done", + batchNum) + return + } + } + + // If the total timeout for this batch has passed, + // return an error. + select { + case <-batch.timeout: + batch.errChan <- ErrQueryTimeout + delete(w.currentBatches, batchNum) + + // When deleting the particular batch + // number we need to make sure to cancel + // all queued and ongoing queryJobs + // to not waste resources when the batch + // call is already canceled. + if batch.cancelChan != nil { + close(batch.cancelChan) + } + + log.Warnf("Query(%d) failed with "+ + "error: %v. Timing out.", + result.job.index, result.err) + + log.Warnf("Batch %v timed out", + batchNum) + + default: + } + +} + +type batchProgress struct { + noRetryMax bool + maxRetries uint8 + timeout <-chan time.Time + rem int + errChan chan error + cancelChan chan struct{} +} + +func (w *peerWorkManager) handleNewBatches(batch *batch) { + // Add all new queries in the batch to our work queue, + // with priority given by the order they were + // scheduled. + log.Debugf("Adding new batch(%d) of %d queries to "+ + "work queue", w.batchIndex, len(batch.requests)) + + // Internal cancel channel of a batch request. + cancelChan := make(chan struct{}) + + for _, q := range batch.requests { + heap.Push(w.workQueue, &queryJob{ + index: w.queryIndex, + timeout: minQueryTimeout, + encoding: batch.options.encoding, + cancelChan: batch.options.cancelChan, + internalCancelChan: cancelChan, + Request: q, + }) + w.currentQueries[w.queryIndex] = w.batchIndex + w.queryIndex++ + } + + w.currentBatches[w.batchIndex] = &batchProgress{ + noRetryMax: batch.options.noRetryMax, + maxRetries: batch.options.numRetries, + timeout: time.After( + batch.options.timeout, + ), + rem: len(batch.requests), + errChan: batch.errChan, + cancelChan: cancelChan, + } + w.batchIndex++ +} + // workDispatcher receives batches of queries to be performed from external // callers, and dispatches these to active workers. It makes sure to // prioritize the queries in the order they come in, such that early queries @@ -171,53 +419,24 @@ func (w *peerWorkManager) workDispatcher() { } defer cancel() - // Init a work queue which will be used to sort the incoming queries in - // a first come first served fashion. We use a heap structure such - // that we can efficiently put failed queries back in the queue. - work := &workQueue{} - heap.Init(work) - - type batchProgress struct { - noRetryMax bool - maxRetries uint8 - timeout <-chan time.Time - rem int - errChan chan error - cancelChan chan struct{} - } - - // We set up a batch index counter to keep track of batches that still - // have queries in flight. This lets us track when all queries for a - // batch have been finished, and return an (non-)error to the caller. - batchIndex := uint64(0) - currentBatches := make(map[uint64]*batchProgress) - // When the work dispatcher exits, we'll loop through the remaining // batches and send on their error channel. defer func() { - for _, b := range currentBatches { + for _, b := range w.currentBatches { b.errChan <- ErrWorkManagerShuttingDown } }() - // We set up a counter that we'll increase with each incoming query, - // and will serve as the priority of each. In addition we map each - // query to the batch they are part of. - queryIndex := uint64(0) - currentQueries := make(map[uint64]uint64) - - workers := make(map[string]*activeWorker) - Loop: for { // If the work queue is non-empty, we'll take out the first // element in order to distribute it to a worker. - if work.Len() > 0 { - next := work.Peek().(*queryJob) + if w.workQueue.Len() > 0 { + next := w.workQueue.Peek().(*queryJob) // Find the peers with free work slots available. var freeWorkers []string - for p, r := range workers { + for p, r := range w.workers { // Only one active job at a time is currently // supported. if len(r.activeJobs) >= 1 { @@ -233,7 +452,7 @@ Loop: // Give the job to the highest ranked peer with free // slots available. for _, p := range freeWorkers { - r := workers[p] + r := w.workers[p] // The worker has free work slots, it should // pick up the query. @@ -241,7 +460,7 @@ Loop: case r.w.NewJob() <- next: log.Tracef("Sent job %v to worker %v", next.Index(), p) - heap.Pop(work) + heap.Pop(w.workQueue) r.activeJobs[next.Index()] = next // Go back to start of loop, to check @@ -251,7 +470,7 @@ Loop: // Remove workers no longer active. case <-r.onExit: - delete(workers, p) + delete(w.workers, p) continue case <-w.quit: @@ -268,225 +487,15 @@ Loop: // Spin up a goroutine that runs a worker each time a peer // connects. case peer := <-peersConnected: - log.Debugf("Starting worker for peer %v", - peer.Addr()) - - r := w.cfg.NewWorker(peer) - - // We'll create a channel that will close after the - // worker's Run method returns, to know when we can - // remove it from our set of active workers. - onExit := make(chan struct{}) - workers[peer.Addr()] = &activeWorker{ - w: r, - activeJobs: make(map[uint64]*queryJob), - onExit: onExit, - } - - w.cfg.Ranking.AddPeer(peer.Addr()) - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer close(onExit) - - r.Run(w.jobResults, w.quit) - }() + w.handlePeerConnected(peer) // A new result came back. case result := <-w.jobResults: - log.Tracef("Result for job %v received from peer %v "+ - "(err=%v)", result.job.index, - result.peer.Addr(), result.err) - - // Delete the job from the worker's active job, such - // that the slot gets opened for more work. - r := workers[result.peer.Addr()] - delete(r.activeJobs, result.job.Index()) - - // Get the index of this query's batch, and delete it - // from the map of current queries, since we don't have - // to track it anymore. We'll add it back if the result - // turns out to be an error. - batchNum := currentQueries[result.job.index] - delete(currentQueries, result.job.index) - - // In case the batch is already canceled we return - // early. - batch, ok := currentBatches[batchNum] - if !ok { - log.Warnf("Query(%d) result from peer %v "+ - "discarded with retries %d, because "+ - "batch already canceled: %v", - result.job.index, - result.peer.Addr(), - result.job.tries, result.err) - - continue Loop - } - - switch { - // If the query ended because it was canceled, drop it. - case result.err == ErrJobCanceled: - log.Tracef("Query(%d) was canceled before "+ - "result was available from peer %v", - result.job.index, result.peer.Addr()) - - // If this is the first job in this batch that - // was canceled, forward the error on the - // batch's error channel. We do this since a - // cancellation applies to the whole batch. - batch.errChan <- result.err - delete(currentBatches, batchNum) - - log.Debugf("Canceled batch %v", batchNum) - continue Loop - - // If the query ended with any other error, put it back - // into the work queue if it has not reached the - // maximum number of retries. - case result.err != nil: - // Refresh peer rank on disconnect. - if result.err == ErrPeerDisconnected { - w.cfg.Ranking.ResetRanking( - result.peer.Addr(), - ) - } else { - // Punish the peer for the failed query. - w.cfg.Ranking.Punish(result.peer.Addr()) - } - - if !batch.noRetryMax { - result.job.tries++ - } - - // Check if this query has reached its maximum - // number of retries. If so, remove it from the - // batch and don't reschedule it. - if !batch.noRetryMax && - result.job.tries >= batch.maxRetries { - - log.Warnf("Query(%d) from peer %v "+ - "failed and reached maximum "+ - "number of retries, not "+ - "rescheduling: %v", - result.job.index, - result.peer.Addr(), result.err) - - // Return the error and cancel the - // batch. - batch.errChan <- result.err - delete(currentBatches, batchNum) - - log.Debugf("Canceled batch %v", - batchNum) - - continue Loop - } - - log.Warnf("Query(%d) from peer %v failed, "+ - "rescheduling: %v", result.job.index, - result.peer.Addr(), result.err) - - // If it was a timeout, we dynamically increase - // it for the next attempt. - if result.err == ErrQueryTimeout { - newTimeout := result.job.timeout * 2 - if newTimeout > maxQueryTimeout { - newTimeout = maxQueryTimeout - } - result.job.timeout = newTimeout - } - - heap.Push(work, result.job) - currentQueries[result.job.index] = batchNum - - // Otherwise, we got a successful result and update the - // status of the batch this query is a part of. - default: - // Reward the peer for the successful query. - w.cfg.Ranking.Reward(result.peer.Addr()) - - // Decrement the number of queries remaining in - // the batch. - batch.rem-- - log.Tracef("Remaining jobs for batch "+ - "%v: %v ", batchNum, batch.rem) - - // If this was the last query in flight - // for this batch, we can notify that - // it finished, and delete it. - if batch.rem == 0 { - batch.errChan <- nil - delete(currentBatches, batchNum) - - log.Tracef("Batch %v done", - batchNum) - continue Loop - } - } - - // If the total timeout for this batch has passed, - // return an error. - select { - case <-batch.timeout: - batch.errChan <- ErrQueryTimeout - delete(currentBatches, batchNum) - - // When deleting the particular batch - // number we need to make sure to cancel - // all queued and ongoing queryJobs - // to not waste resources when the batch - // call is already canceled. - if batch.cancelChan != nil { - close(batch.cancelChan) - } - - log.Warnf("Query(%d) failed with "+ - "error: %v. Timing out.", - result.job.index, result.err) - - log.Warnf("Batch %v timed out", - batchNum) - - default: - } + w.handleJobResult(result) // A new batch of queries where scheduled. case batch := <-w.newBatches: - // Add all new queries in the batch to our work queue, - // with priority given by the order they were - // scheduled. - log.Debugf("Adding new batch(%d) of %d queries to "+ - "work queue", batchIndex, len(batch.requests)) - - // Internal cancel channel of a batch request. - cancelChan := make(chan struct{}) - - for _, q := range batch.requests { - heap.Push(work, &queryJob{ - index: queryIndex, - timeout: minQueryTimeout, - encoding: batch.options.encoding, - cancelChan: batch.options.cancelChan, - internalCancelChan: cancelChan, - Request: q, - }) - currentQueries[queryIndex] = batchIndex - queryIndex++ - } - - currentBatches[batchIndex] = &batchProgress{ - noRetryMax: batch.options.noRetryMax, - maxRetries: batch.options.numRetries, - timeout: time.After( - batch.options.timeout, - ), - rem: len(batch.requests), - errChan: batch.errChan, - cancelChan: cancelChan, - } - batchIndex++ + w.handleNewBatches(batch) case <-w.quit: return