Skip to content

Commit

Permalink
functional_test: code cleanup and minor enhancements
Browse files Browse the repository at this point in the history
Cleaned up some useless or dead code;
Remove some unnecessary methods.

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Oct 20, 2022
1 parent e24402d commit 2aee420
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 79 deletions.
46 changes: 10 additions & 36 deletions tests/functional/tester/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,6 @@ func (c *caseFollower) Recover(clus *Cluster) error {
return c.recoverMember(clus, c.last)
}

func (c *caseFollower) Desc() string {
if c.desc != "" {
return c.desc
}
return c.rpcpbCase.String()
}

func (c *caseFollower) TestCase() rpcpb.Case {
return c.rpcpbCase
}

type caseLeader struct {
caseByFunc
last int
Expand Down Expand Up @@ -139,10 +128,6 @@ func (c *caseLeader) Recover(clus *Cluster) error {
return c.recoverMember(clus, c.last)
}

func (c *caseLeader) TestCase() rpcpb.Case {
return c.rpcpbCase
}

type caseQuorum struct {
caseByFunc
injected map[int]struct{}
Expand All @@ -167,17 +152,6 @@ func (c *caseQuorum) Recover(clus *Cluster) error {
return nil
}

func (c *caseQuorum) Desc() string {
if c.desc != "" {
return c.desc
}
return c.rpcpbCase.String()
}

func (c *caseQuorum) TestCase() rpcpb.Case {
return c.rpcpbCase
}

func pickQuorum(size int) (picked map[int]struct{}) {
picked = make(map[int]struct{})
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down Expand Up @@ -230,15 +204,15 @@ type caseUntilSnapshot struct {
// all delay failure cases except the ones failing with latency
// greater than election timeout (trigger leader election and
// cluster keeps operating anyways)
var slowCases = map[rpcpb.Case]bool{
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: true,
rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: true,
rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: true,
var slowCases = map[rpcpb.Case]struct{}{
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: {},
rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: {},
rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: {},
}

func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
Expand Down Expand Up @@ -268,7 +242,7 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
// healthy cluster could accept 1000 req/sec at least.
// 3x time to trigger snapshot.
retries := int(snapshotCount) / 1000 * 3
if v, ok := slowCases[c.TestCase()]; v && ok {
if _, ok := slowCases[c.TestCase()]; ok {
// slow network takes more retries
retries *= 5
}
Expand Down
2 changes: 0 additions & 2 deletions tests/functional/tester/case_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
)

type caseExternal struct {
Case

desc string
rpcpbCase rpcpb.Case

Expand Down
20 changes: 9 additions & 11 deletions tests/functional/tester/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ import (
type Cluster struct {
lg *zap.Logger

agentConns []*grpc.ClientConn
agentClients []rpcpb.TransportClient
agentStreams []rpcpb.Transport_TransportClient
agentRequests []*rpcpb.Request
agentConns []*grpc.ClientConn
agentClients []rpcpb.TransportClient
agentStreams []rpcpb.Transport_TransportClient

testerHTTPServer *http.Server

Expand Down Expand Up @@ -80,7 +79,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
clus.cases = make([]Case, 0)

lg.Info("creating members")
Expand Down Expand Up @@ -260,16 +258,16 @@ func (clus *Cluster) updateCases() {
fpFailures, fperr := failpointFailures(clus)
if len(fpFailures) == 0 {
clus.lg.Info("no failpoints found!", zap.Error(fperr))
} else {
clus.cases = append(clus.cases, fpFailures...)
}
clus.cases = append(clus.cases,
fpFailures...)
case "FAILPOINTS_WITH_DISK_IO_LATENCY":
fpFailures, fperr := failpointDiskIOFailures(clus)
if len(fpFailures) == 0 {
clus.lg.Info("no failpoints found!", zap.Error(fperr))
} else {
clus.cases = append(clus.cases, fpFailures...)
}
clus.cases = append(clus.cases,
fpFailures...)
}
}
}
Expand Down Expand Up @@ -446,13 +444,13 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Response, error) {
// maintain the initial member object
// throughout the test time
clus.agentRequests[idx] = &rpcpb.Request{
req := &rpcpb.Request{
Operation: op,
Member: clus.Members[idx],
Tester: clus.Tester,
}

err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
err := clus.agentStreams[idx].Send(req)
clus.lg.Info(
"sent request",
zap.String("operation", op.String()),
Expand Down
51 changes: 23 additions & 28 deletions tests/functional/tester/stresser_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ type leaseStresser struct {
aliveLeases *atomicLeases
alivedLeasesWithShortTTL *atomicLeases
revokedLeases *atomicLeases
shortLivedLeases *atomicLeases
// The tester doesn't keep alive the shortLivedLeases,
// so they will expire after the TTL.
shortLivedLeases *atomicLeases

runWg sync.WaitGroup
aliveWg sync.WaitGroup
Expand Down Expand Up @@ -188,18 +190,16 @@ func (ls *leaseStresser) run() {
}

func (ls *leaseStresser) restartKeepAlives() {
for leaseID := range ls.aliveLeases.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
}
for leaseID := range ls.alivedLeasesWithShortTTL.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
f := func(leases *atomicLeases) {
for leaseID := range leases.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
}
}
f(ls.aliveLeases)
f(ls.alivedLeasesWithShortTTL)
}

func (ls *leaseStresser) createLeases() {
Expand Down Expand Up @@ -361,11 +361,17 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()),
)
// it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
// this scenario is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
// to circumvent that scenario, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
// if it is renewed, this means that invariant checking have at least ttl/2 time before lease expires which is long enough for the checking to finish.
// if it is not renewed, we remove the lease from the alive map so that the lease doesn't expire during invariant checking
// It is possible that lease expires at invariant checking phase
// but not at keepLeaseAlive() phase. This scenario is possible
// when alive lease is just about to expire when keepLeaseAlive()
// exists and expires at invariant checking phase. To circumvent
// this scenario, we check each lease before keepalive loop exist
// to see if it has been renewed in last TTL/2 duration. If it is
// renewed, it means that invariant checking have at least ttl/2
// time before lease expires which is long enough for the checking
// to finish. If it is not renewed, we remove the lease from the
// alive map so that the lease doesn't expire during invariant
// checking.
renewTime, ok := ls.aliveLeases.read(leaseID)
if ok && renewTime.Add(defaultTTL/2*time.Second).Before(time.Now()) {
ls.aliveLeases.remove(leaseID)
Expand All @@ -390,17 +396,6 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
cancel()
ctx, cancel = context.WithCancel(ls.ctx)
stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
cancel()
continue
}
if err != nil {
ls.lg.Debug(
"keepLeaseAlive failed to receive lease keepalive response",
zap.String("stress-type", ls.stype.String()),
zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
continue
}

Expand Down
4 changes: 2 additions & 2 deletions tests/functional/tester/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package tester

import (
"fmt"
"errors"
"math/rand"
"net"
"net/url"
Expand Down Expand Up @@ -67,7 +67,7 @@ func errsToError(errs []error) error {
for i, err := range errs {
stringArr[i] = err.Error()
}
return fmt.Errorf(strings.Join(stringArr, ", "))
return errors.New(strings.Join(stringArr, ", "))
}

func randBytes(size int) []byte {
Expand Down

0 comments on commit 2aee420

Please sign in to comment.