diff --git a/pkg/cmd/roachtest/tests/failover.go b/pkg/cmd/roachtest/tests/failover.go index 40f21fdaab03..57a06c471872 100644 --- a/pkg/cmd/roachtest/tests/failover.go +++ b/pkg/cmd/roachtest/tests/failover.go @@ -14,8 +14,6 @@ import ( "context" gosql "database/sql" "fmt" - "math/rand" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -37,13 +35,33 @@ func registerFailover(r registry.Registry) { suffix = "/lease=expiration" } + r.Add(registry.TestSpec{ + Name: "failover/partial/lease-gateway" + suffix, + Owner: registry.OwnerKV, + Timeout: 30 * time.Minute, + Cluster: r.MakeClusterSpec(8, spec.CPU(4)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runFailoverPartialLeaseGateway(ctx, t, c, expirationLeases) + }, + }) + + r.Add(registry.TestSpec{ + Name: "failover/partial/lease-leader" + suffix, + Owner: registry.OwnerKV, + Timeout: 30 * time.Minute, + Cluster: r.MakeClusterSpec(7, spec.CPU(4)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runFailoverPartialLeaseLeader(ctx, t, c, expirationLeases) + }, + }) + r.Add(registry.TestSpec{ Name: "failover/partial/lease-liveness" + suffix, Owner: registry.OwnerKV, Timeout: 30 * time.Minute, - Cluster: r.MakeClusterSpec(6, spec.CPU(4)), + Cluster: r.MakeClusterSpec(8, spec.CPU(4)), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runDisconnect(ctx, t, c, expirationLeases) + runFailoverPartialLeaseLiveness(ctx, t, c, expirationLeases) }, }) @@ -96,27 +114,357 @@ func registerFailover(r registry.Registry) { } } -func randSleep(ctx context.Context, rng *rand.Rand, max time.Duration) { - randTimer := time.After(randutil.RandDuration(rng, max)) - select { - case <-randTimer: - case <-ctx.Done(): +// runFailoverPartialLeaseGateway tests a partial network partition between a +// SQL gateway and a user range leaseholder. These must be routed via other +// nodes to be able to serve the request. +// +// Cluster topology: +// +// n1-n3: system ranges and user ranges (2/5 replicas) +// n4-n5: user range leaseholders (2/5 replicas) +// n6-n7: SQL gateways and 1 user replica (1/5 replicas) +// +// 5 user range replicas will be placed on n2-n6, with leases on n4. A partial +// partition will be introduced between n4,n5 and n6,n7, both fully and +// individually. This corresponds to the case where we have three data centers +// with a broken network link between one pair. For example: +// +// n1-n3 (2 replicas, liveness) +// A +// / \ +// / \ +// n4-n5 B --x-- C n6-n7 <--- n8 (workload) +// (2 replicas, leases) (1 replica, SQL gateways) +// +// Once follower routing is implemented, this tests the following scenarios: +// +// - Routes via followers in both A, B, and C when possible. +// - Skips follower replica on local node that can't reach leaseholder (n6). +// - Skips follower replica in C that can't reach leaseholder (n7 via n6). +// - Skips follower replica in B that's unreachable (n5). +// +// We run a kv50 workload on SQL gateways and collect pMax latency for graphing. +func runFailoverPartialLeaseGateway( + ctx context.Context, t test.Test, c cluster.Cluster, expLeases bool, +) { + require.Equal(t, 8, c.Spec().NodeCount) + + rng, _ := randutil.NewTestRand() + + // Create cluster. + opts := option.DefaultStartOpts() + settings := install.MakeClusterSettings() + + failer := makeFailer(t, c, failureModeBlackhole, opts, settings).(partialFailer) + failer.Setup(ctx) + defer failer.Cleanup(ctx) + + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Start(ctx, t.L(), opts, settings, c.Range(1, 7)) + + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = $1`, + expLeases) + require.NoError(t, err) + + // Place all ranges on n1-n3 to start with. + configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) + + // Wait for upreplication. + require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + + // Create the kv database with 5 replicas on n2-n6, and leases on n4. + t.Status("creating workload database") + _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) + require.NoError(t, err) + configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{ + replicas: 5, onlyNodes: []int{2, 3, 4, 5, 6}, leaseNode: 4}) + + c.Run(ctx, c.Node(6), `./cockroach workload init kv --splits 1000 {pgurl:1}`) + + // Wait for the KV table to upreplicate. + waitForUpreplication(t, ctx, conn, `database_name = 'kv'`, 5) + + // The replicate queue takes forever to move the ranges, so we do it + // ourselves. Precreating the database/range and moving it to the correct + // nodes first is not sufficient, since workload will spread the ranges across + // all nodes regardless. + relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 7}, []int{2, 3, 4, 5, 6}) + relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6, 7}, []int{1, 2, 3}) + relocateLeases(t, ctx, conn, `database_name = 'kv'`, 4) + + // Start workload on n8 using n6-n7 as gateways. + t.Status("running workload") + m := c.NewMonitor(ctx, c.Range(1, 7)) + m.Go(func(ctx context.Context) error { + c.Run(ctx, c.Node(8), `./cockroach workload run kv --read-percent 50 `+ + `--duration 20m --concurrency 256 --max-rate 2048 --timeout 1m --tolerate-errors `+ + `--histograms=`+t.PerfArtifactsDir()+`/stats.json `+ + `{pgurl:6-7}`) + return nil + }) + + // Start a worker to fail and recover partial partitions between n4,n5 + // (leases) and n6,n7 (gateways), both fully and individually, for 3 cycles. + // Leases are only placed on n4. + failer.Ready(ctx, m) + m.Go(func(ctx context.Context) error { + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for i := 0; i < 3; i++ { + testcases := []struct { + nodes []int + peers []int + }{ + // Fully partition leases from gateways, must route via n1-n3. In + // addition to n4 leaseholder being unreachable, follower on n5 is + // unreachable, and follower replica on n6 can't reach leaseholder. + {[]int{6, 7}, []int{4, 5}}, + // Partition n6 (gateway with local follower) from n4 (leaseholder). + // Local follower replica can't reach leaseholder. + {[]int{6}, []int{4}}, + // Partition n7 (gateway) from n4 (leaseholder). + {[]int{7}, []int{4}}, + } + for _, tc := range testcases { + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } + + randTimer := time.After(randutil.RandDuration(rng, raftCfg.RangeLeaseRenewalDuration())) + + // Ranges and leases may occasionally escape their constraints. Move + // them to where they should be. + relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 7}, []int{2, 3, 4, 5, 6}) + relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6, 7}, []int{1, 2, 3}) + relocateLeases(t, ctx, conn, `database_name = 'kv'`, 4) + + // Randomly sleep up to the lease renewal interval, to vary the time + // between the last lease renewal and the failure. We start the timer + // before the range relocation above to run them concurrently. + select { + case <-randTimer: + case <-ctx.Done(): + } + + for _, node := range tc.nodes { + t.Status(fmt.Sprintf("failing n%d (blackhole lease/gateway)", node)) + failer.FailPartial(ctx, node, tc.peers) + } + + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } + + for _, node := range tc.nodes { + t.Status(fmt.Sprintf("recovering n%d (blackhole lease/gateway)", node)) + failer.Recover(ctx, node) + } + } + } + return nil + }) + m.Wait() +} + +// runFailoverLeaseLeader tests a partial network partition between leaseholders +// and Raft leaders. These will prevent the leaseholder from making Raft +// proposals, but it can still hold onto leases as long as it can heartbeat +// liveness. +// +// Cluster topology: +// +// n1-n3: system and liveness ranges, SQL gateway +// n4-n6: user ranges +// +// The cluster runs with COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER, which +// will place Raft leaders and leases independently of each other. We can then +// assume that some number of user ranges will randomly have split leader/lease, +// and simply create partial partitions between each of n4-n6 in sequence. +// +// We run a kv50 workload on SQL gateways and collect pMax latency for graphing. +func runFailoverPartialLeaseLeader( + ctx context.Context, t test.Test, c cluster.Cluster, expLeases bool, +) { + require.Equal(t, 7, c.Spec().NodeCount) + + rng, _ := randutil.NewTestRand() + + // Create cluster, disabling leader/leaseholder colocation. We only start + // n1-n3, to precisely place system ranges, since we'll have to disable the + // replicate queue shortly. + opts := option.DefaultStartOpts() + settings := install.MakeClusterSettings() + settings.Env = append(settings.Env, "COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER=true") + + failer := makeFailer(t, c, failureModeBlackhole, opts, settings).(partialFailer) + failer.Setup(ctx) + defer failer.Cleanup(ctx) + + c.Put(ctx, t.Cockroach(), "./cockroach") + c.Start(ctx, t.L(), opts, settings, c.Range(1, 3)) + + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = $1`, + expLeases) + require.NoError(t, err) + + // Place all ranges on n1-n3 to start with, and wait for upreplication. + configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) + require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + + // Disable the replicate queue. It can otherwise end up with stuck + // overreplicated ranges during rebalancing, because downreplication requires + // the Raft leader to be colocated with the leaseholder. + _, err = conn.ExecContext(ctx, `SET CLUSTER SETTING kv.replicate_queue.enabled = false`) + require.NoError(t, err) + + // Now that system ranges are properly placed on n1-n3, start n4-n6. + c.Start(ctx, t.L(), opts, settings, c.Range(4, 6)) + + // Create the kv database on n4-n6. + t.Status("creating workload database") + _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) + require.NoError(t, err) + configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{4, 5, 6}}) + + c.Run(ctx, c.Node(6), `./cockroach workload init kv --splits 1000 {pgurl:1}`) + + // Move ranges to the appropriate nodes. Precreating the database/range and + // moving it to the correct nodes first is not sufficient, since workload will + // spread the ranges across all nodes regardless. + relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6}) + relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6}, []int{1, 2, 3}) + + // Check that we have a few split leaders/leaseholders on n4-n6. We give + // it a few seconds, since metrics are updated every 10 seconds. + for i := 0; ; i++ { + var count float64 + for _, node := range []int{4, 5, 6} { + count += nodeMetric(ctx, t, c, node, "replicas.leaders_not_leaseholders") + } + t.Status(fmt.Sprintf("%.0f split leaders/leaseholders", count)) + if count >= 3 { + break + } else if i >= 10 { + t.Fatalf("timed out waiting for 3 split leaders/leaseholders") + } + time.Sleep(time.Second) } + + // Start workload on n7 using n1-n3 as gateways. + t.Status("running workload") + m := c.NewMonitor(ctx, c.Range(1, 6)) + m.Go(func(ctx context.Context) error { + c.Run(ctx, c.Node(7), `./cockroach workload run kv --read-percent 50 `+ + `--duration 20m --concurrency 256 --max-rate 2048 --timeout 1m --tolerate-errors `+ + `--histograms=`+t.PerfArtifactsDir()+`/stats.json `+ + `{pgurl:1-3}`) + return nil + }) + + // Start a worker to fail and recover partial partitions between each pair of + // n4-n6 for 3 cycles (9 failures total). + failer.Ready(ctx, m) + m.Go(func(ctx context.Context) error { + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for i := 0; i < 3; i++ { + for _, node := range []int{4, 5, 6} { + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } + + randTimer := time.After(randutil.RandDuration(rng, raftCfg.RangeLeaseRenewalDuration())) + + // Ranges may occasionally escape their constraints. Move them to where + // they should be. + relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3}, []int{4, 5, 6}) + relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{4, 5, 6}, []int{1, 2, 3}) + + // Randomly sleep up to the lease renewal interval, to vary the time + // between the last lease renewal and the failure. We start the timer + // before the range relocation above to run them concurrently. + select { + case <-randTimer: + case <-ctx.Done(): + } + + t.Status(fmt.Sprintf("failing n%d (blackhole lease/leader)", node)) + nextNode := node + 1 + if nextNode > 6 { + nextNode = 4 + } + failer.FailPartial(ctx, node, []int{nextNode}) + + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } + + t.Status(fmt.Sprintf("recovering n%d (blackhole lease/leader)", node)) + failer.Recover(ctx, node) + } + } + return nil + }) + m.Wait() } -// 5 nodes fully connected. Break the connection between a pair of nodes 4 and 5 -// while running a workload against nodes 1 through 3. Before each disconnect, -// move all the leases to nodes 4 and 5 in a different pattern. -func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster, expLeases bool) { - require.Equal(t, 6, c.Spec().NodeCount) +// runFailoverPartialLeaseLiveness tests a partial network partition between a +// leaseholder and node liveness. With epoch leases we would normally expect +// this to recover shortly, since the node can't heartbeat its liveness record +// and thus its leases will expire. However, it will maintain Raft leadership, +// and we prevent non-leaders from acquiring leases, which can prevent the lease +// from moving unless we explicitly handle this. See also: +// https://github.com/cockroachdb/cockroach/pull/87244. +// +// Cluster topology: +// +// n1-n3: system ranges and SQL gateways +// n4: liveness leaseholder +// n5-7: user ranges +// +// A partial blackhole network partition is triggered between n4 and each of +// n5-n7 sequentially, 3 times per node for a total of 9 times. A kv50 workload +// is running against SQL gateways on n1-n3, and we collect the pMax latency for +// graphing. +func runFailoverPartialLeaseLiveness( + ctx context.Context, t test.Test, c cluster.Cluster, expLeases bool, +) { + require.Equal(t, 8, c.Spec().NodeCount) rng, _ := randutil.NewTestRand() + // Create cluster. opts := option.DefaultStartOpts() settings := install.MakeClusterSettings() + failer := makeFailer(t, c, failureModeBlackhole, opts, settings).(partialFailer) + failer.Setup(ctx) + defer failer.Cleanup(ctx) + c.Put(ctx, t.Cockroach(), "./cockroach") - c.Start(ctx, t.L(), opts, settings, c.Range(1, 5)) + c.Start(ctx, t.L(), opts, settings, c.Range(1, 7)) conn := c.Conn(ctx, t.L(), 1) defer conn.Close() @@ -125,34 +473,45 @@ func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster, expLease expLeases) require.NoError(t, err) - constrainAllConfig(t, ctx, conn, 3, []int{4, 5}, 0) - constrainConfig(t, ctx, conn, `RANGE liveness`, 3, []int{3, 5}, 4) + // Place all ranges on n1-n3, and an extra liveness leaseholder replica on n4. + configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) + configureZone(t, ctx, conn, `RANGE liveness`, zoneConfig{ + replicas: 4, onlyNodes: []int{1, 2, 3, 4}, leaseNode: 4}) + // Wait for upreplication. require.NoError(t, WaitFor3XReplication(ctx, t, conn)) + // Create the kv database on n5-n7. t.Status("creating workload database") _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) require.NoError(t, err) - constrainConfig(t, ctx, conn, `DATABASE kv`, 3, []int{2, 3, 5}, 0) + configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{5, 6, 7}}) + + c.Run(ctx, c.Node(6), `./cockroach workload init kv --splits 1000 {pgurl:1}`) - c.Run(ctx, c.Node(6), `./cockroach workload init kv --splits 100 {pgurl:1}`) + // The replicate queue takes forever to move the ranges, so we do it + // ourselves. Precreating the database/range and moving it to the correct + // nodes first is not sufficient, since workload will spread the ranges across + // all nodes regardless. + relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3, 4}, []int{5, 6, 7}) + relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{5, 6, 7}, []int{1, 2, 3, 4}) + relocateRanges(t, ctx, conn, `range_id != 2`, []int{4}, []int{1, 2, 3}) - // Start workload on n6 using nodes 1-3 (not part of partition). We could - // additionally test the behavior of running SQL against nodes 4-5 however - // that complicates the analysis as we want to focus on KV behavior. + // Start workload on n8 using n1-n3 as gateways (not partitioned). t.Status("running workload") - m := c.NewMonitor(ctx, c.Range(1, 3)) + m := c.NewMonitor(ctx, c.Range(1, 7)) m.Go(func(ctx context.Context) error { - c.Run(ctx, c.Node(6), `./cockroach workload run kv --read-percent 50 `+ - `--duration 10m --concurrency 256 --max-rate 2048 --timeout 1m --tolerate-errors `+ + c.Run(ctx, c.Node(8), `./cockroach workload run kv --read-percent 50 `+ + `--duration 20m --concurrency 256 --max-rate 2048 --timeout 1m --tolerate-errors `+ `--histograms=`+t.PerfArtifactsDir()+`/stats.json `+ `{pgurl:1-3}`) return nil }) - // Make sure we don't leave an outage if this test fails midway. - defer Cleanup(t, c, ctx) - // Start and stop partial between nodes 4 and 5 every 30 seconds. + // Start a worker to fail and recover partial partitions between n4 (liveness) + // and workload leaseholders n5-n7 for 1 minute each, 3 times per node for 9 + // times total. + failer.Ready(ctx, m) m.Go(func(ctx context.Context) error { var raftCfg base.RaftConfig raftCfg.SetDefaults() @@ -160,30 +519,43 @@ func runDisconnect(ctx context.Context, t test.Test, c cluster.Cluster, expLease ticker := time.NewTicker(time.Minute) defer ticker.Stop() - // All the system ranges will be on all the nodes, so they will all move, - // plus many of the non-system ranges. - for i := 0; i < 9; i++ { - t.Status("Moving ranges to nodes 4 and 5 before partition", i) - relocateLeases(t, ctx, conn, `range_id = 2`, 4) - relocateLeases(t, ctx, conn, `voting_replicas @> ARRAY[5] AND range_id != 2`, 5) + for i := 0; i < 3; i++ { + for _, node := range []int{5, 6, 7} { + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } - // Randomly sleep up to the lease renewal interval, to vary the time - // between the last lease renewal and the failure. - randSleep(ctx, rng, raftCfg.RangeLeaseRenewalDuration()) + randTimer := time.After(randutil.RandDuration(rng, raftCfg.RangeLeaseRenewalDuration())) - t.Status("disconnecting n4 and n5") - Disconnect(t, c, ctx, []int{4, 5}) + // Ranges and leases may occasionally escape their constraints. Move + // them to where they should be. + relocateRanges(t, ctx, conn, `database_name = 'kv'`, []int{1, 2, 3, 4}, []int{5, 6, 7}) + relocateRanges(t, ctx, conn, `database_name != 'kv'`, []int{node}, []int{1, 2, 3}) + relocateRanges(t, ctx, conn, `range_id = 2`, []int{5, 6, 7}, []int{1, 2, 3, 4}) + relocateLeases(t, ctx, conn, `range_id = 2`, 4) - select { - case <-ticker.C: - case <-ctx.Done(): - return ctx.Err() - } + // Randomly sleep up to the lease renewal interval, to vary the time + // between the last lease renewal and the failure. We start the timer + // before the range relocation above to run them concurrently. + select { + case <-randTimer: + case <-ctx.Done(): + } - qps := measureQPS(ctx, t, conn, 5*time.Second) - t.Status("Node 1 QPS after waiting is: ", qps, " recovering nodes") + t.Status(fmt.Sprintf("failing n%d (blackhole lease/liveness)", node)) + failer.FailPartial(ctx, node, []int{4}) - Cleanup(t, c, ctx) + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } + + t.Status(fmt.Sprintf("recovering n%d (blackhole lease/liveness)", node)) + failer.Recover(ctx, node) + } } return nil }) @@ -246,7 +618,7 @@ func runFailoverNonSystem( require.NoError(t, err) // Constrain all existing zone configs to n1-n3. - constrainAllConfig(t, ctx, conn, 3, []int{4, 5, 6}, 0) + configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) // Wait for upreplication. require.NoError(t, WaitFor3XReplication(ctx, t, conn)) @@ -256,7 +628,7 @@ func runFailoverNonSystem( t.Status("creating workload database") _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) require.NoError(t, err) - constrainConfig(t, ctx, conn, `DATABASE kv`, 3, []int{1, 2, 3}, 0) + configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{4, 5, 6}}) c.Run(ctx, c.Node(7), `./cockroach workload init kv --splits 1000 {pgurl:1}`) // The replicate queue takes forever to move the kv ranges from n1-n3 to @@ -390,10 +762,10 @@ func runFailoverLiveness( require.NoError(t, err) // Constrain all existing zone configs to n1-n3. - constrainAllConfig(t, ctx, conn, 3, []int{4}, 0) + configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) // Constrain the liveness range to n1-n4, with leaseholder preference on n4. - constrainConfig(t, ctx, conn, `RANGE liveness`, 4, nil, 4) + configureZone(t, ctx, conn, `RANGE liveness`, zoneConfig{replicas: 4, leaseNode: 4}) require.NoError(t, err) // Wait for upreplication. @@ -404,7 +776,7 @@ func runFailoverLiveness( t.Status("creating workload database") _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) require.NoError(t, err) - constrainConfig(t, ctx, conn, `DATABASE kv`, 3, []int{4}, 0) + configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) c.Run(ctx, c.Node(5), `./cockroach workload init kv --splits 1000 {pgurl:1}`) // The replicate queue takes forever to move the other ranges off of n4 so we @@ -534,8 +906,8 @@ func runFailoverSystemNonLiveness( // Constrain all existing zone configs to n4-n6, except liveness which is // constrained to n1-n3. - constrainAllConfig(t, ctx, conn, 3, []int{1, 2, 3}, 0) - constrainConfig(t, ctx, conn, `RANGE liveness`, 3, []int{4, 5, 6}, 0) + configureAllZones(t, ctx, conn, zoneConfig{replicas: 3, onlyNodes: []int{4, 5, 6}}) + configureZone(t, ctx, conn, `RANGE liveness`, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) require.NoError(t, err) // Wait for upreplication. @@ -546,7 +918,7 @@ func runFailoverSystemNonLiveness( t.Status("creating workload database") _, err = conn.ExecContext(ctx, `CREATE DATABASE kv`) require.NoError(t, err) - constrainConfig(t, ctx, conn, `DATABASE kv`, 3, []int{4, 5, 6}, 0) + configureZone(t, ctx, conn, `DATABASE kv`, zoneConfig{replicas: 3, onlyNodes: []int{1, 2, 3}}) c.Run(ctx, c.Node(7), `./cockroach workload init kv --splits 1000 {pgurl:1}`) // The replicate queue takes forever to move the kv ranges from n4-n6 to @@ -694,38 +1066,6 @@ func makeFailer( } } -// Disconnect takes a set of nodes and each nodes internal ips. It disconnects -// each node from all the others in the list. -func Disconnect(t test.Test, c cluster.Cluster, ctx context.Context, nodes []int) { - if c.IsLocal() { - t.Status("skipping iptables disconnect on local cluster") - return - } - - ips, err := c.InternalIP(ctx, t.L(), nodes) - require.NoError(t, err) - - // disconnect each node from every other passed in node. - for n := 0; n < len(nodes); n++ { - for ip := 0; ip < len(ips); ip++ { - if n != ip { - c.Run(ctx, c.Node(nodes[n]), `sudo iptables -A INPUT -s `+ips[ip]+` -j DROP`) - c.Run(ctx, c.Node(nodes[n]), `sudo iptables -A OUTPUT -d `+ips[ip]+` -j DROP`) - } - } - } -} - -// Cleanup takes a set of nodes and each nodes internal ips. It disconnects -// each node from all the others in the list. -func Cleanup(t test.Test, c cluster.Cluster, ctx context.Context) { - if c.IsLocal() { - t.Status("skipping iptables cleanup on local cluster") - return - } - c.Run(ctx, c.All(), `sudo iptables -F`) -} - // failer fails and recovers a given node in some particular way. type failer interface { // Setup prepares the failer. It is called before the cluster is started. @@ -745,6 +1085,14 @@ type failer interface { Recover(ctx context.Context, nodeID int) } +// partialFailer supports partial failures between specific node pairs. +type partialFailer interface { + failer + + // FailPartial fails the node for the given peers. + FailPartial(ctx context.Context, nodeID int, peerIDs []int) +} + // blackholeFailer causes a network failure where TCP/IP packets to/from port // 26257 are dropped, causing network hangs and timeouts. // @@ -763,7 +1111,7 @@ func (f *blackholeFailer) Ready(_ context.Context, _ cluster.Monitor) {} func (f *blackholeFailer) Cleanup(ctx context.Context) { if f.c.IsLocal() { - f.t.Status("skipping iptables cleanup on local cluster") + f.t.Status("skipping blackhole cleanup on local cluster") return } f.c.Run(ctx, f.c.All(), `sudo iptables -F`) @@ -771,21 +1119,25 @@ func (f *blackholeFailer) Cleanup(ctx context.Context) { func (f *blackholeFailer) Fail(ctx context.Context, nodeID int) { if f.c.IsLocal() { - f.t.Status("skipping fail on local cluster") + f.t.Status("skipping blackhole failure on local cluster") return } - // When dropping both input and output, we use multiport to block traffic both - // to port 26257 and from port 26257 on either side of the connection, to - // avoid any spurious packets from making it through. + // When dropping both input and output, make sure we drop packets in both + // directions for both the inbound and outbound TCP connections, such that we + // get a proper black hole. Only dropping one direction for both of INPUT and + // OUTPUT will still let e.g. TCP retransmits through, which may affect the + // TCP stack behavior and is not representative of real network outages. // - // We don't do this when only blocking in one direction, because e.g. in the - // input case we don't want inbound connections to work (INPUT to 26257), but - // we do want responses for outbound connections to work (INPUT from 26257). + // For the asymmetric partitions, only drop packets in one direction since + // this is representative of accidental firewall rules we've seen cause such + // outages in the wild. if f.input && f.output { - f.c.Run(ctx, f.c.Node(nodeID), - `sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`) - f.c.Run(ctx, f.c.Node(nodeID), - `sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`) + // Inbound TCP connections, both received and sent packets. + f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`) + f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --sport 26257 -j DROP`) + // Outbound TCP connections, both sent and received packets. + f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`) + f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A INPUT -p tcp --sport 26257 -j DROP`) } else if f.input { f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`) } else if f.output { @@ -793,9 +1145,50 @@ func (f *blackholeFailer) Fail(ctx context.Context, nodeID int) { } } +// FailPartial creates a partial blackhole failure between the given node and +// peers. +func (f *blackholeFailer) FailPartial(ctx context.Context, nodeID int, peerIDs []int) { + if f.c.IsLocal() { + f.t.Status("skipping blackhole failure on local cluster") + return + } + peerIPs, err := f.c.InternalIP(ctx, f.t.L(), peerIDs) + require.NoError(f.t, err) + + for _, peerIP := range peerIPs { + // When dropping both input and output, make sure we drop packets in both + // directions for both the inbound and outbound TCP connections, such that + // we get a proper black hole. Only dropping one direction for both of INPUT + // and OUTPUT will still let e.g. TCP retransmits through, which may affect + // TCP stack behavior and is not representative of real network outages. + // + // For the asymmetric partitions, only drop packets in one direction since + // this is representative of accidental firewall rules we've seen cause such + // outages in the wild. + if f.input && f.output { + // Inbound TCP connections, both received and sent packets. + f.c.Run(ctx, f.c.Node(nodeID), fmt.Sprintf( + `sudo iptables -A INPUT -p tcp -s %s --dport 26257 -j DROP`, peerIP)) + f.c.Run(ctx, f.c.Node(nodeID), fmt.Sprintf( + `sudo iptables -A OUTPUT -p tcp -d %s --sport 26257 -j DROP`, peerIP)) + // Outbound TCP connections, both sent and received packets. + f.c.Run(ctx, f.c.Node(nodeID), fmt.Sprintf( + `sudo iptables -A OUTPUT -p tcp -d %s --dport 26257 -j DROP`, peerIP)) + f.c.Run(ctx, f.c.Node(nodeID), fmt.Sprintf( + `sudo iptables -A INPUT -p tcp -s %s --sport 26257 -j DROP`, peerIP)) + } else if f.input { + f.c.Run(ctx, f.c.Node(nodeID), fmt.Sprintf( + `sudo iptables -A INPUT -p tcp -s %s --dport 26257 -j DROP`, peerIP)) + } else if f.output { + f.c.Run(ctx, f.c.Node(nodeID), fmt.Sprintf( + `sudo iptables -A OUTPUT -p tcp -d %s --dport 26257 -j DROP`, peerIP)) + } + } +} + func (f *blackholeFailer) Recover(ctx context.Context, nodeID int) { if f.c.IsLocal() { - f.t.Status("skipping iptables recover on local cluster") + f.t.Status("skipping blackhole recovery on local cluster") return } f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -F`) @@ -890,6 +1283,32 @@ func (f *pauseFailer) Recover(ctx context.Context, nodeID int) { f.c.Signal(ctx, f.t.L(), 18, f.c.Node(nodeID)) // SIGCONT } +// waitForUpreplication waits for upreplication of ranges that satisfy the +// given predicate (using SHOW RANGES). +// +// TODO(erikgrinaker): move this into WaitForReplication() when it can use SHOW +// RANGES, i.e. when it's no longer needed in mixed-version tests with older +// versions that don't have SHOW RANGES. +func waitForUpreplication( + t test.Test, ctx context.Context, conn *gosql.DB, predicate string, replicationFactor int, +) { + var count int + where := fmt.Sprintf("WHERE array_length(replicas, 1) < %d", replicationFactor) + if predicate != "" { + where += fmt.Sprintf(" AND (%s)", predicate) + } + for { + require.NoError(t, conn.QueryRowContext(ctx, + `SELECT count(distinct range_id) FROM [SHOW CLUSTER RANGES WITH TABLES, DETAILS] `+where). + Scan(&count)) + if count == 0 { + break + } + t.Status(fmt.Sprintf("waiting for %d ranges to upreplicate (%s)", count, predicate)) + time.Sleep(time.Second) + } +} + // relocateRanges relocates all ranges matching the given predicate from a set // of nodes to a different set of nodes. Moves are attempted sequentially from // each source onto each target, and errors are retried indefinitely. @@ -945,51 +1364,75 @@ func relocateLeases(t test.Test, ctx context.Context, conn *gosql.DB, predicate } } -// constrainConfig will alter the zone config for the target to specify the -// number of nodes the target can be on, the replicas it is prevented from being -// on and an optional leaseholder. -func constrainConfig( - t test.Test, - ctx context.Context, - conn *gosql.DB, - target string, - numNodes int, - constrainedReplicas []int, - lease int, +type zoneConfig struct { + replicas int + onlyNodes []int + leaseNode int +} + +// configureZone sets the zone config for the given target. +func configureZone( + t test.Test, ctx context.Context, conn *gosql.DB, target string, cfg zoneConfig, ) { - replica := make([]string, len(constrainedReplicas)) - for i, n := range constrainedReplicas { - replica[i] = fmt.Sprintf("-node%d", n) + require.NotZero(t, cfg.replicas, "num_replicas must be > 0") + + // If onlyNodes is given, invert the constraint and specify which nodes are + // prohibited. Otherwise, the allocator may leave replicas outside of the + // specified nodes. + var constraintsString string + if len(cfg.onlyNodes) > 0 { + nodeCount := t.Spec().(*registry.TestSpec).Cluster.NodeCount - 1 // subtract worker node + included := map[int]bool{} + for _, nodeID := range cfg.onlyNodes { + included[nodeID] = true + } + excluded := []int{} + for nodeID := 1; nodeID <= nodeCount; nodeID++ { + if !included[nodeID] { + excluded = append(excluded, nodeID) + } + } + for _, nodeID := range excluded { + if len(constraintsString) > 0 { + constraintsString += "," + } + constraintsString += fmt.Sprintf("-node%d", nodeID) + } } - replicaStr := fmt.Sprintf(`'[%s]'`, strings.Join(replica, ",")) - leaseStr := "" - if lease > 0 { - leaseStr = fmt.Sprintf(`[+node%d]`, lease) + var leaseString string + if cfg.leaseNode > 0 { + leaseString += fmt.Sprintf("[+node%d]", cfg.leaseNode) } - str := - fmt.Sprintf( - `ALTER %s CONFIGURE ZONE USING num_replicas = %d, constraints = %s, lease_preferences = '[%s]'`, - target, numNodes, replicaStr, leaseStr) - _, err := conn.ExecContext(ctx, str) - t.Status(str) + query := fmt.Sprintf( + `ALTER %s CONFIGURE ZONE USING num_replicas = %d, constraints = '[%s]', lease_preferences = '[%s]'`, + target, cfg.replicas, constraintsString, leaseString) + t.Status(query) + _, err := conn.ExecContext(ctx, query) require.NoError(t, err) } -// constrainAllConfig will alter the zone config for all zone configurations to -// specify the number of nodes the target can be on, the replicas it is -// prevented from being on and an optional leaseholder. -func constrainAllConfig( - t test.Test, ctx context.Context, conn *gosql.DB, numNodes int, replicas []int, lease int, -) { +// configureAllZones will set zone configuration for all targets in the +// clusters. +func configureAllZones(t test.Test, ctx context.Context, conn *gosql.DB, cfg zoneConfig) { rows, err := conn.QueryContext(ctx, `SELECT target FROM [SHOW ALL ZONE CONFIGURATIONS]`) require.NoError(t, err) - for rows.Next() { var target string require.NoError(t, rows.Scan(&target)) - constrainConfig(t, ctx, conn, target, numNodes, replicas, lease) + configureZone(t, ctx, conn, target, cfg) } require.NoError(t, rows.Err()) } + +// nodeMetric fetches the given metric value from the given node. +func nodeMetric( + ctx context.Context, t test.Test, c cluster.Cluster, node int, metric string, +) float64 { + var value float64 + err := c.Conn(ctx, t.L(), node).QueryRowContext( + ctx, `SELECT value FROM crdb_internal.node_metrics WHERE name = $1`, metric).Scan(&value) + require.NoError(t, err) + return value +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 06bef49699bd..f954f9496511 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -857,6 +857,10 @@ func (r *Replica) requestLeaseLocked( func (r *Replica) AdminTransferLease( ctx context.Context, target roachpb.StoreID, bypassSafetyChecks bool, ) error { + if r.store.cfg.TestingKnobs.DisableLeaderFollowsLeaseholder { + // Ensure lease transfers still work when we don't colocate leaders and leases. + bypassSafetyChecks = true + } // initTransferHelper inits a transfer if no extension is in progress. // It returns a channel for waiting for the result of a pending // extension (if any is in progress) and a channel for waiting for the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3d333701dc3d..47c66f1167f9 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1205,6 +1205,10 @@ func (sc *StoreConfig) SetDefaults(numStores int) { if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = defaultRaftEntryCacheSize } + if envutil.EnvOrDefaultBool("COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER", false) { + sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true + sc.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader = true // otherwise lease requests fail + } } // GetStoreConfig exposes the config used for this store. diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 46e3806fe837..ec5a6a468120 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -214,7 +214,8 @@ type StoreTestingKnobs struct { // DisableScanner disables the replica scanner. DisableScanner bool // DisableLeaderFollowsLeaseholder disables attempts to transfer raft - // leadership when it diverges from the range's leaseholder. + // leadership when it diverges from the range's leaseholder. This can + // also be set via COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER. DisableLeaderFollowsLeaseholder bool // DisableRefreshReasonNewLeader disables refreshing pending commands when a new // leader is discovered.