diff --git a/tests/functional/tester/case.go b/tests/functional/tester/case.go index dbe41aab93e..678536a0556 100644 --- a/tests/functional/tester/case.go +++ b/tests/functional/tester/case.go @@ -101,17 +101,6 @@ func (c *caseFollower) Recover(clus *Cluster) error { return c.recoverMember(clus, c.last) } -func (c *caseFollower) Desc() string { - if c.desc != "" { - return c.desc - } - return c.rpcpbCase.String() -} - -func (c *caseFollower) TestCase() rpcpb.Case { - return c.rpcpbCase -} - type caseLeader struct { caseByFunc last int @@ -139,10 +128,6 @@ func (c *caseLeader) Recover(clus *Cluster) error { return c.recoverMember(clus, c.last) } -func (c *caseLeader) TestCase() rpcpb.Case { - return c.rpcpbCase -} - type caseQuorum struct { caseByFunc injected map[int]struct{} @@ -167,17 +152,6 @@ func (c *caseQuorum) Recover(clus *Cluster) error { return nil } -func (c *caseQuorum) Desc() string { - if c.desc != "" { - return c.desc - } - return c.rpcpbCase.String() -} - -func (c *caseQuorum) TestCase() rpcpb.Case { - return c.rpcpbCase -} - func pickQuorum(size int) (picked map[int]struct{}) { picked = make(map[int]struct{}) r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -230,15 +204,15 @@ type caseUntilSnapshot struct { // all delay failure cases except the ones failing with latency // greater than election timeout (trigger leader election and // cluster keeps operating anyways) -var slowCases = map[rpcpb.Case]bool{ - rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: true, - rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true, - rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true, - rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: true, - rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true, - rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true, - rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: true, - rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: true, +var slowCases = map[rpcpb.Case]struct{}{ + rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: {}, + rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: {}, + rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: {}, + rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: {}, + rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: {}, + rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: {}, + rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: {}, + rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: {}, } func (c *caseUntilSnapshot) Inject(clus *Cluster) error { @@ -268,7 +242,7 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error { // healthy cluster could accept 1000 req/sec at least. // 3x time to trigger snapshot. retries := int(snapshotCount) / 1000 * 3 - if v, ok := slowCases[c.TestCase()]; v && ok { + if _, ok := slowCases[c.TestCase()]; ok { // slow network takes more retries retries *= 5 } diff --git a/tests/functional/tester/case_external.go b/tests/functional/tester/case_external.go index cf4ee10bf2f..69cc07e01b1 100644 --- a/tests/functional/tester/case_external.go +++ b/tests/functional/tester/case_external.go @@ -22,8 +22,6 @@ import ( ) type caseExternal struct { - Case - desc string rpcpbCase rpcpb.Case diff --git a/tests/functional/tester/cluster.go b/tests/functional/tester/cluster.go index 9b28748c393..de63309924d 100644 --- a/tests/functional/tester/cluster.go +++ b/tests/functional/tester/cluster.go @@ -43,10 +43,9 @@ import ( type Cluster struct { lg *zap.Logger - agentConns []*grpc.ClientConn - agentClients []rpcpb.TransportClient - agentStreams []rpcpb.Transport_TransportClient - agentRequests []*rpcpb.Request + agentConns []*grpc.ClientConn + agentClients []rpcpb.TransportClient + agentStreams []rpcpb.Transport_TransportClient testerHTTPServer *http.Server @@ -80,7 +79,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) { clus.agentConns = make([]*grpc.ClientConn, len(clus.Members)) clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members)) clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members)) - clus.agentRequests = make([]*rpcpb.Request, len(clus.Members)) clus.cases = make([]Case, 0) lg.Info("creating members") @@ -260,16 +258,16 @@ func (clus *Cluster) updateCases() { fpFailures, fperr := failpointFailures(clus) if len(fpFailures) == 0 { clus.lg.Info("no failpoints found!", zap.Error(fperr)) + } else { + clus.cases = append(clus.cases, fpFailures...) } - clus.cases = append(clus.cases, - fpFailures...) case "FAILPOINTS_WITH_DISK_IO_LATENCY": fpFailures, fperr := failpointDiskIOFailures(clus) if len(fpFailures) == 0 { clus.lg.Info("no failpoints found!", zap.Error(fperr)) + } else { + clus.cases = append(clus.cases, fpFailures...) } - clus.cases = append(clus.cases, - fpFailures...) } } } @@ -446,13 +444,13 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error { func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Response, error) { // maintain the initial member object // throughout the test time - clus.agentRequests[idx] = &rpcpb.Request{ + req := &rpcpb.Request{ Operation: op, Member: clus.Members[idx], Tester: clus.Tester, } - err := clus.agentStreams[idx].Send(clus.agentRequests[idx]) + err := clus.agentStreams[idx].Send(req) clus.lg.Info( "sent request", zap.String("operation", op.String()), diff --git a/tests/functional/tester/stresser_lease.go b/tests/functional/tester/stresser_lease.go index 9f28c86eb8a..42d1d0f6b1b 100644 --- a/tests/functional/tester/stresser_lease.go +++ b/tests/functional/tester/stresser_lease.go @@ -54,7 +54,9 @@ type leaseStresser struct { aliveLeases *atomicLeases alivedLeasesWithShortTTL *atomicLeases revokedLeases *atomicLeases - shortLivedLeases *atomicLeases + // The tester doesn't keep alive the shortLivedLeases, + // so they will expire after the TTL. + shortLivedLeases *atomicLeases runWg sync.WaitGroup aliveWg sync.WaitGroup @@ -188,18 +190,16 @@ func (ls *leaseStresser) run() { } func (ls *leaseStresser) restartKeepAlives() { - for leaseID := range ls.aliveLeases.getLeasesMap() { - ls.aliveWg.Add(1) - go func(id int64) { - ls.keepLeaseAlive(id) - }(leaseID) - } - for leaseID := range ls.alivedLeasesWithShortTTL.getLeasesMap() { - ls.aliveWg.Add(1) - go func(id int64) { - ls.keepLeaseAlive(id) - }(leaseID) + f := func(leases *atomicLeases) { + for leaseID := range leases.getLeasesMap() { + ls.aliveWg.Add(1) + go func(id int64) { + ls.keepLeaseAlive(id) + }(leaseID) + } } + f(ls.aliveLeases) + f(ls.alivedLeasesWithShortTTL) } func (ls *leaseStresser) createLeases() { @@ -349,7 +349,17 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { defer ls.aliveWg.Done() ctx, cancel := context.WithCancel(ls.ctx) stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) - defer func() { cancel() }() + if err != nil { + ls.lg.Error( + "keepLeaseAlive lease creates stream error", + zap.String("stress-type", ls.stype.String()), + zap.String("endpoint", ls.m.EtcdClientEndpoint), + zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), + zap.Error(err), + ) + } + defer cancel() + for { select { case <-time.After(500 * time.Millisecond): @@ -361,11 +371,17 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.Error(ls.ctx.Err()), ) - // it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase. - // this scenario is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase. - // to circumvent that scenario, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration. - // if it is renewed, this means that invariant checking have at least ttl/2 time before lease expires which is long enough for the checking to finish. - // if it is not renewed, we remove the lease from the alive map so that the lease doesn't expire during invariant checking + // It is possible that lease expires at invariant checking phase + // but not at keepLeaseAlive() phase. This scenario is possible + // when alive lease is just about to expire when keepLeaseAlive() + // exists and expires at invariant checking phase. To circumvent + // this scenario, we check each lease before keepalive loop exist + // to see if it has been renewed in last TTL/2 duration. If it is + // renewed, it means that invariant checking have at least ttl/2 + // time before lease expires which is long enough for the checking + // to finish. If it is not renewed, we remove the lease from the + // alive map so that the lease doesn't expire during invariant + // checking. renewTime, ok := ls.aliveLeases.read(leaseID) if ok && renewTime.Add(defaultTTL/2*time.Second).Before(time.Now()) { ls.aliveLeases.remove(leaseID) @@ -379,31 +395,6 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) { return } - if err != nil { - ls.lg.Debug( - "keepLeaseAlive lease creates stream error", - zap.String("stress-type", ls.stype.String()), - zap.String("endpoint", ls.m.EtcdClientEndpoint), - zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - zap.Error(err), - ) - cancel() - ctx, cancel = context.WithCancel(ls.ctx) - stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID)) - cancel() - continue - } - if err != nil { - ls.lg.Debug( - "keepLeaseAlive failed to receive lease keepalive response", - zap.String("stress-type", ls.stype.String()), - zap.String("endpoint", ls.m.EtcdClientEndpoint), - zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), - zap.Error(err), - ) - continue - } - ls.lg.Debug( "keepLeaseAlive waiting on lease stream", zap.String("stress-type", ls.stype.String()), diff --git a/tests/functional/tester/utils.go b/tests/functional/tester/utils.go index 74e34146d53..4403ff34663 100644 --- a/tests/functional/tester/utils.go +++ b/tests/functional/tester/utils.go @@ -15,7 +15,7 @@ package tester import ( - "fmt" + "errors" "math/rand" "net" "net/url" @@ -67,7 +67,7 @@ func errsToError(errs []error) error { for i, err := range errs { stringArr[i] = err.Error() } - return fmt.Errorf(strings.Join(stringArr, ", ")) + return errors.New(strings.Join(stringArr, ", ")) } func randBytes(size int) []byte {