Skip to content

Commit

Permalink
query: Add unit test for the batch time-out.
Browse files Browse the repository at this point in the history
  • Loading branch information
ziggie1984 committed Apr 3, 2024
1 parent 1ef869f commit 0cdeb8f
Showing 1 changed file with 152 additions and 0 deletions.
152 changes: 152 additions & 0 deletions query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"fmt"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -479,3 +480,154 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
}
}
}

// queryJobWithWorkerIndex is used to know which worker was used for the
// corresponding job request to signal the result back to the right worker.
type queryJobWithWorkerIndex struct {
worker int
job *queryJob
}

// mergeWorkChannels is used to merge the channels of single works into one
// to have more control of the concurrency.
func mergeWorkChannels(workers []*mockWorker) <-chan queryJobWithWorkerIndex {
var wg sync.WaitGroup
merged := make(chan queryJobWithWorkerIndex)

// Function to copy data from each input channel to the merged channel
readFromWorker := func(input <-chan *queryJob, worker int) {
defer wg.Done()
for {
value, ok := <-input
if !ok {
// Channel is closed, exit the loop
return
}
merged <- queryJobWithWorkerIndex{
worker: worker,
job: value,
}
}
}

// Start a goroutine for each input channel
wg.Add(len(workers))
for i, work := range workers {
go readFromWorker(work.nextJob, i)
}

// Wait for all copying to be done, then close the merged channel
go func() {
wg.Wait()
close(merged)
}()

return merged
}

// TestWorkManagerTimeOutBatch tests that as soon as a batch times-out all the
// ongoing jobQueries already registered with workers and also the queued ones
// are canceled.
func TestWorkManagerTimeOutBatch(t *testing.T) {
const numQueries = 100
const numWorkers = 10

// Start the workDispatcher goroutine.
wm, workers := startWorkManager(t, numWorkers)

// mergeChan is the channel which receives all the jobQueries
// sequentially which are sent to the registered workers.
mergeChan := mergeWorkChannels(workers)

// activeQueries are the queries currently registered with the workers.
var activeQueries []queryJobWithWorkerIndex

// Schedule a batch of queries.
var queries []*Request
for i := 0; i < numQueries; i++ {
q := &Request{}
queries = append(queries, q)
}

// Send the query, and include a channel to cancel the batch.
cancelChan := make(chan struct{})
// We will timeout the batch to simulate a slow peer connection and make
// sure we cancel all ongoing queries including the ones which are still
// queued up.
errChan := wm.Query(queries, Cancel(cancelChan), Timeout(1*time.Second))

// Send a query to every active worker.
for i := 0; i < numWorkers; i++ {
select {
case jobQuery := <-mergeChan:
activeQueries = append(activeQueries, jobQuery)
case <-errChan:
t.Fatalf("did not expect on errChan")
case <-time.After(5 * time.Second):
t.Fatalf("next job not received")
}
}

// We wait before we send the result for one query to exceed the timeout
// of the batch.
time.Sleep(2 * time.Second)

// We need to signal a result for one of the active workers so that
// the batch timeout is checked.
workerIndex := activeQueries[0].worker
workers[workerIndex].results <- &jobResult{
job: activeQueries[0].job,
err: nil,
}

// We expect the cancelChan to be closed for this batch.
select {
case <-cancelChan:
case <-time.After(time.Second):
t.Fatalf("expected for the cancelChan to close")
}

// As soon as the batch times-out an error is sent via the errChan.
select {
case err := <-errChan:
require.ErrorIs(t, err, ErrQueryTimeout)
case <-time.After(time.Second):
t.Fatalf("expected for the errChan to signal")
}

// The cancelChan got closed, this happens when the batch times-out.
// So all the ongoing queries are canceled as well.
for i := 1; i < numWorkers; i++ {
job := activeQueries[i].job
select {
case <-job.cancelChan:
workers[i].results <- &jobResult{
job: job,
err: nil,
}
case <-time.After(time.Second):
t.Fatalf("next job not received")
}
}

// Make also sure that all the queued queries for this batch are
// canceled as well.
for i := numWorkers; i < numQueries; i++ {
select {
case res := <-mergeChan:
job := res.job
workerIndex := res.worker
select {
case <-job.cancelChan:
workers[workerIndex].results <- &jobResult{
job: job,
err: nil,
}
case <-time.After(time.Second):
t.Fatalf("next job ??? not received")
}
case <-time.After(time.Second):
t.Fatalf("next job not received")
}
}
}

0 comments on commit 0cdeb8f

Please sign in to comment.