From cd7753539c235c5e845731fd88e60d3bbc779951 Mon Sep 17 00:00:00 2001 From: maria jose Date: Fri, 28 Jul 2023 20:05:03 -0400 Subject: [PATCH] end2end: update sendVotes to avoid infinite for loop, add the error when retry count reached the max allowed check the errs in each test that calls sendVotes --- cmd/end2endtest/ballot.go | 5 ++- cmd/end2endtest/censusize.go | 5 ++- cmd/end2endtest/csp.go | 6 +++- cmd/end2endtest/dynamicensus.go | 12 +++++-- cmd/end2endtest/encrypted.go | 6 +++- cmd/end2endtest/helpers.go | 57 +++++++++++++++++++++++---------- cmd/end2endtest/overwrite.go | 5 ++- cmd/end2endtest/plaintext.go | 6 +++- cmd/end2endtest/zkweighted.go | 6 +++- 9 files changed, 82 insertions(+), 26 deletions(-) diff --git a/cmd/end2endtest/ballot.go b/cmd/end2endtest/ballot.go index 3ce9761da..0efcb273a 100644 --- a/cmd/end2endtest/ballot.go +++ b/cmd/end2endtest/ballot.go @@ -85,7 +85,10 @@ func (t *E2EBallotElection) Run() error { VoterAccount: acct, }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", len(t.voterAccounts), "time", time.Since(startTime), diff --git a/cmd/end2endtest/censusize.go b/cmd/end2endtest/censusize.go index 48fc60e28..78ffc25d5 100644 --- a/cmd/end2endtest/censusize.go +++ b/cmd/end2endtest/censusize.go @@ -71,7 +71,10 @@ func (t *E2EMaxCensusSizeElection) Run() error { VoterAccount: acct, }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", len(t.voterAccounts[1:]), "time", time.Since(startTime), diff --git a/cmd/end2endtest/csp.go b/cmd/end2endtest/csp.go index 19883d49d..946c35841 100644 --- a/cmd/end2endtest/csp.go +++ b/cmd/end2endtest/csp.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "os" "time" @@ -75,7 +76,10 @@ func (t *E2ECSPElection) Run() error { VoterAccount: acct, }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", c.nvotes, "time", time.Since(startTime), diff --git a/cmd/end2endtest/dynamicensus.go b/cmd/end2endtest/dynamicensus.go index d5b4b615a..644c6f310 100644 --- a/cmd/end2endtest/dynamicensus.go +++ b/cmd/end2endtest/dynamicensus.go @@ -134,7 +134,11 @@ func (t *E2EDynamicensusElection) Run() error { VoterAccount: acct, }) } - t.elections[0].sendVotes(votes) + errs := t.elections[0].sendVotes(votes) + if len(errs) > 0 { + errCh <- fmt.Errorf("error from electionID: %s, %+v", electionID, errs) + return + } log.Infow("votes submitted successfully", "n", len(t.elections[0].voterAccounts[1:]), "time", time.Since(startTime), @@ -252,7 +256,11 @@ func (t *E2EDynamicensusElection) Run() error { VoterAccount: acct, }) } - t.elections[1].sendVotes(votes) + errs := t.elections[1].sendVotes(votes) + if len(errs) > 0 { + errCh <- fmt.Errorf("error from electionID: %s, %+v", electionID, errs) + return + } log.Infow("votes submitted successfully", "n", len(t.elections[1].voterAccounts[1:]), "time", time.Since(startTime), diff --git a/cmd/end2endtest/encrypted.go b/cmd/end2endtest/encrypted.go index 526a7dc01..d9b0aec1f 100644 --- a/cmd/end2endtest/encrypted.go +++ b/cmd/end2endtest/encrypted.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "os" "time" @@ -72,7 +73,10 @@ func (t *E2EEncryptedElection) Run() error { Keys: keys, }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", len(t.voterAccounts), "time", time.Since(startTime), diff --git a/cmd/end2endtest/helpers.go b/cmd/end2endtest/helpers.go index e9abb8365..e1d3a6ca3 100644 --- a/cmd/end2endtest/helpers.go +++ b/cmd/end2endtest/helpers.go @@ -26,6 +26,7 @@ const ( nextBlock = "nextBlock" sameBlock = "sameBlock" defaultWeight = 10 + retriesSend = retries / 2 ) type ballotData struct { @@ -474,11 +475,10 @@ func (t *e2eElection) overwriteVote(choices []int, indexAcct int, waitType strin Choices: []int{choices[i]}}}) for _, err := range errs { // check the error expected for overwrite with waitUntilNextBlock - if strings.Contains(err.Error(), "overwrite count reached") { - log.Debug("error expected: ", err.Error()) - } else { + if !strings.Contains(err.Error(), "overwrite count reached") { return fmt.Errorf("unexpected overwrite error: %w", err) } + log.Debug("error expected: ", err.Error()) } switch waitType { case sameBlock: @@ -561,11 +561,16 @@ func ballotVotes(b ballotData, nvotes int) ([][]int, [][]*types.BigInt) { // sendVotes sends a batch of votes concurrently // (number of goroutines defined in t.config.parallelCount) -func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) (errs map[int]error) { - errs = make(map[int]error) - var timeouts atomic.Uint32 +// sendVotes sends a batch of votes concurrently +// (number of goroutines defined in t.config.parallelCount) +func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) map[int]error { + var errs = make(map[int]error) + // used to avoid infinite for loop + var timeoutsRetry, mempoolRetry, warnRetry atomic.Uint32 var wg sync.WaitGroup var queues []map[int]*apiclient.VoteData + var mutex sync.Mutex + for p := 0; p < t.config.parallelCount; p++ { queues = append(queues, make(map[int]*apiclient.VoteData, len(votes))) } @@ -583,31 +588,49 @@ func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) (errs map[int]error for len(queue) > 0 { log.Infow("thread sending votes", "queue", len(queue)) for i, vote := range queue { - accPrivKey := vote.VoterAccount.PrivateKey() - voterApi := t.api.Clone(accPrivKey.String()) - _, err := voterApi.Vote(vote) + _, err := t.api.Vote(vote) switch { case err == nil: delete(queue, i) case errors.Is(err, context.DeadlineExceeded) || os.IsTimeout(err): // if the context deadline is reached, no need to print it, just retry - timeouts.Add(1) + if timeoutsRetry.Load() > retriesSend { + mutex.Lock() + errs[i] = err + mutex.Unlock() + return + } + timeoutsRetry.Add(1) case strings.Contains(err.Error(), "mempool is full"): log.Warn(err) // wait and retry - _ = voterApi.WaitUntilNextBlock() - case strings.Contains(err.Error(), "already exists") || - strings.Contains(err.Error(), "overwrite count reached"): + waitErr := t.api.WaitUntilNextBlock() + if waitErr != nil { + if mempoolRetry.Load() > retriesSend { + mutex.Lock() + errs[i] = err + mutex.Unlock() + return + } + mempoolRetry.Add(1) + } + case strings.Contains(err.Error(), "already exists"): // don't retry delete(queue, i) + mutex.Lock() errs[i] = err - case strings.Contains(err.Error(), "expired sik root"): - delete(queue, i) - errs[i] = err + mutex.Unlock() default: + if warnRetry.Load() > retriesSend { + mutex.Lock() + errs[i] = err + mutex.Unlock() + return + } // any other error, print it and wait a bit log.Warn(err) time.Sleep(100 * time.Millisecond) + warnRetry.Add(1) } } } @@ -615,7 +638,7 @@ func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) (errs map[int]error } wg.Wait() log.Infow("sent votes", - "n", len(votes), "timeouts", timeouts.Load(), "failed", len(errs)) + "n", len(votes), "timeouts", timeoutsRetry.Load(), "errors", len(errs)) return errs } diff --git a/cmd/end2endtest/overwrite.go b/cmd/end2endtest/overwrite.go index c9adb21c8..e4a5bd81f 100644 --- a/cmd/end2endtest/overwrite.go +++ b/cmd/end2endtest/overwrite.go @@ -67,7 +67,10 @@ func (t *E2EOverwriteElection) Run() error { VoterAccount: acct, }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", c.nvotes, "time", time.Since(startTime), diff --git a/cmd/end2endtest/plaintext.go b/cmd/end2endtest/plaintext.go index 2271fd8e5..6b2679d2a 100644 --- a/cmd/end2endtest/plaintext.go +++ b/cmd/end2endtest/plaintext.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "os" "time" @@ -64,7 +65,10 @@ func (t *E2EPlaintextElection) Run() error { VoterAccount: acct, }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", c.nvotes, "time", time.Since(startTime), diff --git a/cmd/end2endtest/zkweighted.go b/cmd/end2endtest/zkweighted.go index 7487b232d..94d9ef00a 100644 --- a/cmd/end2endtest/zkweighted.go +++ b/cmd/end2endtest/zkweighted.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "math/big" "os" "time" @@ -65,7 +66,10 @@ func (t *E2EAnonElection) Run() error { VoteWeight: big.NewInt(defaultWeight / 2), }) } - t.sendVotes(votes) + errs := t.sendVotes(votes) + if len(errs) > 0 { + return fmt.Errorf("error in sendVotes %+v", errs) + } log.Infow("votes submitted successfully", "n", len(t.voterAccounts), "time", time.Since(startTime),