Skip to content

Commit

Permalink
Merge pull request etcd-io#15845 from serathius/robustness-traffic
Browse files Browse the repository at this point in the history
test/robustness: Create dedicated traffic package
  • Loading branch information
serathius authored May 9, 2023
2 parents 1e479f8 + ad20230 commit d81d3c3
Show file tree
Hide file tree
Showing 8 changed files with 558 additions and 490 deletions.
3 changes: 2 additions & 1 deletion tests/robustness/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
)

const (
triggerTimeout = time.Minute
triggerTimeout = time.Minute
waitBetweenFailpointTriggers = time.Second
)

var (
Expand Down
109 changes: 14 additions & 95 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,88 +27,7 @@ import (
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/model"
)

const (
// waitBetweenFailpointTriggers
waitBetweenFailpointTriggers = time.Second
)

var (
LowTraffic = trafficConfig{
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
requestProgress: false,
traffic: etcdTraffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 45},
{choice: LargePut, weight: 5},
{choice: Delete, weight: 10},
{choice: MultiOpTxn, weight: 10},
{choice: PutWithLease, weight: 10},
{choice: LeaseRevoke, weight: 10},
{choice: CompareAndSet, weight: 10},
},
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: false,
traffic: etcdTraffic{
keyCount: 10,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 85},
{choice: MultiOpTxn, weight: 10},
{choice: LargePut, weight: 5},
},
},
}
KubernetesTraffic = trafficConfig{
name: "Kubernetes",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: kubernetesTraffic{
averageKeyCount: 5,
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 75},
{choice: KubernetesDelete, weight: 15},
{choice: KubernetesCreate, weight: 10},
},
},
}
ReqProgTraffic = trafficConfig{
name: "RequestProgressTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: true,
traffic: etcdTraffic{
keyCount: 10,
largePutSize: 8196,
leaseTTL: DefaultLeaseTTL,
writeChoices: []choiceWeight[etcdRequestType]{
{choice: Put, weight: 95},
{choice: LargePut, weight: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
LowTraffic, HighTraffic, KubernetesTraffic,
}
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

func TestRobustness(t *testing.T) {
Expand All @@ -121,12 +40,12 @@ func TestRobustness(t *testing.T) {
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
traffic *trafficConfig
traffic *traffic.Config
}
scenarios := []scenario{}
for _, traffic := range trafficList {
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} {
scenarios = append(scenarios, scenario{
name: "ClusterOfSize1/" + traffic.name,
name: "ClusterOfSize1/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: &traffic,
config: *e2e.NewConfig(
Expand All @@ -149,7 +68,7 @@ func TestRobustness(t *testing.T) {
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
}
scenarios = append(scenarios, scenario{
name: "ClusterOfSize3/" + traffic.name,
name: "ClusterOfSize3/" + traffic.Name,
failpoint: RandomFailpoint,
traffic: &traffic,
config: *e2e.NewConfig(clusterOfSize3Options...),
Expand All @@ -174,15 +93,15 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, scenario{
name: "Issue13766",
failpoint: KillFailpoint,
traffic: &HighTraffic,
traffic: &traffic.HighTraffic,
config: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
),
})
scenarios = append(scenarios, scenario{
name: "Issue15220",
failpoint: RandomFailpoint,
traffic: &ReqProgTraffic,
traffic: &traffic.ReqProgTraffic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
),
Expand All @@ -191,7 +110,7 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, scenario{
name: "Issue15271",
failpoint: BlackholeUntilSnapshot,
traffic: &HighTraffic,
traffic: &traffic.HighTraffic,
config: *e2e.NewConfig(
e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true),
Expand All @@ -201,7 +120,7 @@ func TestRobustness(t *testing.T) {
}
for _, scenario := range scenarios {
if scenario.traffic == nil {
scenario.traffic = &defaultTraffic
scenario.traffic = &traffic.LowTraffic
}

t.Run(scenario.name, func(t *testing.T) {
Expand All @@ -218,7 +137,7 @@ func TestRobustness(t *testing.T) {
}
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *trafficConfig, failpoint FailpointConfig) {
func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2e.EtcdProcessClusterConfig, traffic *traffic.Config, failpoint FailpointConfig) {
r := report{lg: lg}
var err error
r.clus, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
Expand All @@ -238,7 +157,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
forcestopCluster(r.clus)

watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.clus, r.responses, traffic.requestProgress || watchProgressNotifyEnabled)
validateWatchResponses(t, r.clus, r.responses, traffic.RequestProgress || watchProgressNotifyEnabled)

r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)
Expand All @@ -249,7 +168,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
panicked = false
}

func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, tCfg traffic.Config, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
g := errgroup.Group{}
finishTraffic := make(chan struct{})

Expand All @@ -262,12 +181,12 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
operations = simulateTraffic(ctx, t, lg, clus, traffic, finishTraffic)
operations = traffic.SimulateTraffic(ctx, t, lg, clus, tCfg, finishTraffic)
maxRevisionChan <- operationsMaxRevision(operations)
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress)
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, tCfg.RequestProgress)
return nil
})
g.Wait()
Expand Down
Loading

0 comments on commit d81d3c3

Please sign in to comment.