From f7a5681591afd4cf8859b4bc769cfa0236d20382 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Mon, 8 Nov 2021 08:22:28 +0800 Subject: [PATCH] batch_test: fix error --- pkg/orchestrator/batch.go | 3 ++- pkg/orchestrator/batch_test.go | 48 +++++++++++++++------------------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go index 5e5fc78963f..8b953c8a228 100644 --- a/pkg/orchestrator/batch.go +++ b/pkg/orchestrator/batch.go @@ -19,7 +19,7 @@ import ( ) const ( - maxBatchPatchSize = 96 + maxBatchPatchSize = 64 maxBatchResponseSize = 64 ) @@ -52,6 +52,7 @@ BATCH: select { case response := <-watchCh: if err := response.Err(); err != nil { + // we should always return revision return nil, revision, errors.Trace(err) } if revision >= response.Header.GetRevision() { diff --git a/pkg/orchestrator/batch_test.go b/pkg/orchestrator/batch_test.go index 352b422128c..513f85d59ec 100644 --- a/pkg/orchestrator/batch_test.go +++ b/pkg/orchestrator/batch_test.go @@ -77,23 +77,23 @@ func TestGetBatchResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) defer cancel() - + // record the max batch size + maxBatchSize := 0 + // record tick times tickCounter := 0 - ticker := time.NewTicker(10 * time.Millisecond) + + ticker := time.NewTicker(20 * time.Millisecond) prefix := "/getBatch" revision := int64(0) watchCh := cli.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision+1)) - patchSize := 64 - + patchNum := 256 // put batch to etcd go func() { - for i := 0; i < 5; i++ { - for j := 0; j < patchSize; j++ { - time.Sleep(10 * time.Millisecond) - _, err := cli.Put(ctx, prefix+fmt.Sprintf("/key%d", j), "abc") - if err == nil || err.Error() == "etcdserver: request timed out" { - continue - } + for j := 0; j < patchNum; j++ { + time.Sleep(5 * time.Millisecond) + _, err := cli.Put(ctx, prefix+fmt.Sprintf("/key%d", j), "abc") + if err == nil || err.Error() == "etcdserver: request timed out" { + continue } } }() @@ -104,11 +104,10 @@ RUN: for { responses := make([]clientv3.WatchResponse, 0) select { - case <-ctx.Done(): - break RUN case <-ticker.C: + // when there is no more response from watchCh in 2s, break the loop if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration { - cancel() + break RUN } case response := <-watchCh: err := response.Err() @@ -119,25 +118,20 @@ RUN: continue } revision = response.Header.GetRevision() - // ProgressNotify implies no new events. - if response.IsProgressNotify() { - continue - } - responses = append(responses, response) batchResponses, rev, err := getBatchResponse(watchCh, revision) + revision = rev require.Nil(t, err) responses = append(responses, batchResponses...) - revision = rev - if tickCounter == 1 { - // make sure batch successfully - require.LessOrEqual(t, 2, len(responses)) + if len(responses) > maxBatchSize { + maxBatchSize = len(responses) } tickCounter++ - // simulate time consume by reactor tick - if tickCounter == 1 { - time.Sleep(1 * time.Second) + // simulate time consumed by reactor tick + if tickCounter%32 == 0 { + time.Sleep(200 * time.Millisecond) } } } - require.Less(t, tickCounter, patchSize*5) + require.LessOrEqual(t, 2, maxBatchSize) + require.Less(t, tickCounter, patchNum) }