Skip to content

Commit

Permalink
end2end: update sendVotes to avoid infinite for loop, add the error w…
Browse files Browse the repository at this point in the history
…hen retry count reached the max allowed

check the errs in each test that calls sendVotes
  • Loading branch information
mariajdab committed Sep 5, 2023
1 parent 0c4a1a9 commit cd77535
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 26 deletions.
5 changes: 4 additions & 1 deletion cmd/end2endtest/ballot.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func (t *E2EBallotElection) Run() error {
VoterAccount: acct,
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", len(t.voterAccounts), "time", time.Since(startTime),
Expand Down
5 changes: 4 additions & 1 deletion cmd/end2endtest/censusize.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (t *E2EMaxCensusSizeElection) Run() error {
VoterAccount: acct,
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", len(t.voterAccounts[1:]), "time", time.Since(startTime),
Expand Down
6 changes: 5 additions & 1 deletion cmd/end2endtest/csp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -75,7 +76,10 @@ func (t *E2ECSPElection) Run() error {
VoterAccount: acct,
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", c.nvotes, "time", time.Since(startTime),
Expand Down
12 changes: 10 additions & 2 deletions cmd/end2endtest/dynamicensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func (t *E2EDynamicensusElection) Run() error {
VoterAccount: acct,
})
}
t.elections[0].sendVotes(votes)
errs := t.elections[0].sendVotes(votes)
if len(errs) > 0 {
errCh <- fmt.Errorf("error from electionID: %s, %+v", electionID, errs)
return
}

log.Infow("votes submitted successfully",
"n", len(t.elections[0].voterAccounts[1:]), "time", time.Since(startTime),
Expand Down Expand Up @@ -252,7 +256,11 @@ func (t *E2EDynamicensusElection) Run() error {
VoterAccount: acct,
})
}
t.elections[1].sendVotes(votes)
errs := t.elections[1].sendVotes(votes)
if len(errs) > 0 {
errCh <- fmt.Errorf("error from electionID: %s, %+v", electionID, errs)
return
}

log.Infow("votes submitted successfully",
"n", len(t.elections[1].voterAccounts[1:]), "time", time.Since(startTime),
Expand Down
6 changes: 5 additions & 1 deletion cmd/end2endtest/encrypted.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -72,7 +73,10 @@ func (t *E2EEncryptedElection) Run() error {
Keys: keys,
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", len(t.voterAccounts), "time", time.Since(startTime),
Expand Down
57 changes: 40 additions & 17 deletions cmd/end2endtest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
nextBlock = "nextBlock"
sameBlock = "sameBlock"
defaultWeight = 10
retriesSend = retries / 2
)

type ballotData struct {
Expand Down Expand Up @@ -474,11 +475,10 @@ func (t *e2eElection) overwriteVote(choices []int, indexAcct int, waitType strin
Choices: []int{choices[i]}}})
for _, err := range errs {
// check the error expected for overwrite with waitUntilNextBlock
if strings.Contains(err.Error(), "overwrite count reached") {
log.Debug("error expected: ", err.Error())
} else {
if !strings.Contains(err.Error(), "overwrite count reached") {
return fmt.Errorf("unexpected overwrite error: %w", err)
}
log.Debug("error expected: ", err.Error())
}
switch waitType {
case sameBlock:
Expand Down Expand Up @@ -561,11 +561,16 @@ func ballotVotes(b ballotData, nvotes int) ([][]int, [][]*types.BigInt) {

// sendVotes sends a batch of votes concurrently
// (number of goroutines defined in t.config.parallelCount)
func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) (errs map[int]error) {
errs = make(map[int]error)
var timeouts atomic.Uint32
// sendVotes sends a batch of votes concurrently
// (number of goroutines defined in t.config.parallelCount)
func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) map[int]error {
var errs = make(map[int]error)
// used to avoid infinite for loop
var timeoutsRetry, mempoolRetry, warnRetry atomic.Uint32
var wg sync.WaitGroup
var queues []map[int]*apiclient.VoteData
var mutex sync.Mutex

for p := 0; p < t.config.parallelCount; p++ {
queues = append(queues, make(map[int]*apiclient.VoteData, len(votes)))
}
Expand All @@ -583,39 +588,57 @@ func (t *e2eElection) sendVotes(votes []*apiclient.VoteData) (errs map[int]error
for len(queue) > 0 {
log.Infow("thread sending votes", "queue", len(queue))
for i, vote := range queue {
accPrivKey := vote.VoterAccount.PrivateKey()
voterApi := t.api.Clone(accPrivKey.String())
_, err := voterApi.Vote(vote)
_, err := t.api.Vote(vote)
switch {
case err == nil:
delete(queue, i)
case errors.Is(err, context.DeadlineExceeded) || os.IsTimeout(err):
// if the context deadline is reached, no need to print it, just retry
timeouts.Add(1)
if timeoutsRetry.Load() > retriesSend {
mutex.Lock()
errs[i] = err
mutex.Unlock()
return
}
timeoutsRetry.Add(1)
case strings.Contains(err.Error(), "mempool is full"):
log.Warn(err)
// wait and retry
_ = voterApi.WaitUntilNextBlock()
case strings.Contains(err.Error(), "already exists") ||
strings.Contains(err.Error(), "overwrite count reached"):
waitErr := t.api.WaitUntilNextBlock()
if waitErr != nil {
if mempoolRetry.Load() > retriesSend {
mutex.Lock()
errs[i] = err
mutex.Unlock()
return
}
mempoolRetry.Add(1)
}
case strings.Contains(err.Error(), "already exists"):
// don't retry
delete(queue, i)
mutex.Lock()
errs[i] = err
case strings.Contains(err.Error(), "expired sik root"):
delete(queue, i)
errs[i] = err
mutex.Unlock()
default:
if warnRetry.Load() > retriesSend {
mutex.Lock()
errs[i] = err
mutex.Unlock()
return
}
// any other error, print it and wait a bit
log.Warn(err)
time.Sleep(100 * time.Millisecond)
warnRetry.Add(1)
}
}
}
}(queues[p])
}
wg.Wait()
log.Infow("sent votes",
"n", len(votes), "timeouts", timeouts.Load(), "failed", len(errs))
"n", len(votes), "timeouts", timeoutsRetry.Load(), "errors", len(errs))
return errs
}

Expand Down
5 changes: 4 additions & 1 deletion cmd/end2endtest/overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func (t *E2EOverwriteElection) Run() error {
VoterAccount: acct,
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", c.nvotes, "time", time.Since(startTime),
Expand Down
6 changes: 5 additions & 1 deletion cmd/end2endtest/plaintext.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"os"
"time"

Expand Down Expand Up @@ -64,7 +65,10 @@ func (t *E2EPlaintextElection) Run() error {
VoterAccount: acct,
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", c.nvotes, "time", time.Since(startTime),
Expand Down
6 changes: 5 additions & 1 deletion cmd/end2endtest/zkweighted.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"math/big"
"os"
"time"
Expand Down Expand Up @@ -65,7 +66,10 @@ func (t *E2EAnonElection) Run() error {
VoteWeight: big.NewInt(defaultWeight / 2),
})
}
t.sendVotes(votes)
errs := t.sendVotes(votes)
if len(errs) > 0 {
return fmt.Errorf("error in sendVotes %+v", errs)
}

log.Infow("votes submitted successfully",
"n", len(t.voterAccounts), "time", time.Since(startTime),
Expand Down

0 comments on commit cd77535

Please sign in to comment.