Skip to content

Commit

Permalink
batch_test: fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Nov 8, 2021
1 parent b0c066b commit f7a5681
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 28 deletions.
3 changes: 2 additions & 1 deletion pkg/orchestrator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

const (
maxBatchPatchSize = 96
maxBatchPatchSize = 64
maxBatchResponseSize = 64
)

Expand Down Expand Up @@ -52,6 +52,7 @@ BATCH:
select {
case response := <-watchCh:
if err := response.Err(); err != nil {
// we should always return revision
return nil, revision, errors.Trace(err)
}
if revision >= response.Header.GetRevision() {
Expand Down
48 changes: 21 additions & 27 deletions pkg/orchestrator/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ func TestGetBatchResponse(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

// record the max batch size
maxBatchSize := 0
// record tick times
tickCounter := 0
ticker := time.NewTicker(10 * time.Millisecond)

ticker := time.NewTicker(20 * time.Millisecond)
prefix := "/getBatch"
revision := int64(0)
watchCh := cli.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision+1))
patchSize := 64

patchNum := 256
// put batch to etcd
go func() {
for i := 0; i < 5; i++ {
for j := 0; j < patchSize; j++ {
time.Sleep(10 * time.Millisecond)
_, err := cli.Put(ctx, prefix+fmt.Sprintf("/key%d", j), "abc")
if err == nil || err.Error() == "etcdserver: request timed out" {
continue
}
for j := 0; j < patchNum; j++ {
time.Sleep(5 * time.Millisecond)
_, err := cli.Put(ctx, prefix+fmt.Sprintf("/key%d", j), "abc")
if err == nil || err.Error() == "etcdserver: request timed out" {
continue
}
}
}()
Expand All @@ -104,11 +104,10 @@ RUN:
for {
responses := make([]clientv3.WatchResponse, 0)
select {
case <-ctx.Done():
break RUN
case <-ticker.C:
// when there is no more response from watchCh in 2s, break the loop
if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration {
cancel()
break RUN
}
case response := <-watchCh:
err := response.Err()
Expand All @@ -119,25 +118,20 @@ RUN:
continue
}
revision = response.Header.GetRevision()
// ProgressNotify implies no new events.
if response.IsProgressNotify() {
continue
}
responses = append(responses, response)
batchResponses, rev, err := getBatchResponse(watchCh, revision)
revision = rev
require.Nil(t, err)
responses = append(responses, batchResponses...)
revision = rev
if tickCounter == 1 {
// make sure batch successfully
require.LessOrEqual(t, 2, len(responses))
if len(responses) > maxBatchSize {
maxBatchSize = len(responses)
}
tickCounter++
// simulate time consume by reactor tick
if tickCounter == 1 {
time.Sleep(1 * time.Second)
// simulate time consumed by reactor tick
if tickCounter%32 == 0 {
time.Sleep(200 * time.Millisecond)
}
}
}
require.Less(t, tickCounter, patchSize*5)
require.LessOrEqual(t, 2, maxBatchSize)
require.Less(t, tickCounter, patchNum)
}

0 comments on commit f7a5681

Please sign in to comment.