Skip to content

Commit

Permalink
test(datastore): improve concurrency of LQ test (#6610)
Browse files Browse the repository at this point in the history
  • Loading branch information
telpirion authored Sep 6, 2022
1 parent 97aafa4 commit b7868af
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -584,9 +583,6 @@ func TestIntegration_LargeQuery(t *testing.T) {

q := NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Order("I")

// Wait group to allow us to run query tests in parallel below.
var wg sync.WaitGroup

// Check we get the expected count and results for various limits/offsets.
queryTests := []struct {
limit, offset, want int
Expand All @@ -607,12 +603,13 @@ func TestIntegration_LargeQuery(t *testing.T) {
{limit: 1000, offset: 100, want: n - 100},
{limit: 500, offset: 500, want: n - 500},
}
for _, tt := range queryTests {
for i, tt := range queryTests {
q := q.Limit(tt.limit).Offset(tt.offset)
wg.Add(1)
limit := tt.limit
offset := tt.offset
want := tt.want

go func(limit, offset, want int) {
defer wg.Done()
t.Run(fmt.Sprintf("queryTest_%d", i), func(t *testing.T) {
// Check Count returns the expected number of results.
count, err := client.Count(ctx, q)
if err != nil {
Expand All @@ -638,10 +635,11 @@ func TestIntegration_LargeQuery(t *testing.T) {
break
}
}
}(tt.limit, tt.offset, tt.want)
t.Parallel()
})
}

// Also check iterator cursor behaviour.
// Also check iterator cursor behavior.
cursorTests := []struct {
limit, offset int // Query limit and offset.
count int // The number of times to call "next"
Expand All @@ -662,12 +660,13 @@ func TestIntegration_LargeQuery(t *testing.T) {
{count: 200, offset: 500, limit: -1, want: 700},
{count: 200, offset: 1000, limit: -1, want: -1}, // No more results.
}
for _, tt := range cursorTests {
wg.Add(1)

go func(count, limit, offset, want int) {
defer wg.Done()
for i, tt := range cursorTests {
count := tt.count
limit := tt.limit
offset := tt.offset
want := tt.want

t.Run(fmt.Sprintf("cursorTest_%d", i), func(t *testing.T) {
ctx := context.WithValue(ctx, ckey{}, fmt.Sprintf("c=%d,l=%d,o=%d", count, limit, offset))
// Run iterator through count calls to Next.
it := client.Run(ctx, q.Limit(limit).Offset(offset).KeysOnly())
Expand Down Expand Up @@ -703,9 +702,9 @@ func TestIntegration_LargeQuery(t *testing.T) {
case entity.I != want:
t.Errorf("count=%d, limit=%d, offset=%d: got.I = %d, want %d", count, limit, offset, entity.I, want)
}
}(tt.count, tt.limit, tt.offset, tt.want)
t.Parallel()
})
}
wg.Wait()
}

func TestIntegration_EventualConsistency(t *testing.T) {
Expand Down

0 comments on commit b7868af

Please sign in to comment.