From d9d9c22ee3db98eaaad0db086dcd801bf41ddd91 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 21 Jun 2023 13:00:56 +0000 Subject: [PATCH 1/4] roachtest: add lease preference tests Previously, there were no e2e tests which assert on lease preference conformance. This commit adds three `lease-preferences` tests, which assert that lease preferences are conformed to. The time taken to conform is also reported in the test logs. The first test, `.../partial-first-preference-down`, takes down one of the preferred locality nodes (1/2). The second test, `.../full-first-preference-down`, takes down both the preferred locality nodes (2/2). The third test, `.../manual-violating-transfer`, manually transfers leases to a locality which violates the configured leases preferences. Informs: #106100 Epic: none Release note: None --- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/failover.go | 4 +- pkg/cmd/roachtest/tests/lease_preferences.go | 411 +++++++++++++++++++ pkg/cmd/roachtest/tests/registry.go | 1 + 4 files changed, 416 insertions(+), 1 deletion(-) create mode 100644 pkg/cmd/roachtest/tests/lease_preferences.go diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 37c8a3c098b3..0f55e08768f8 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "kv.go", "kvbench.go", "latency_verifier.go", + "lease_preferences.go", "ledger.go", "libpq.go", "libpq_blocklist.go", diff --git a/pkg/cmd/roachtest/tests/failover.go b/pkg/cmd/roachtest/tests/failover.go index 998065fa78c3..dc47bd3da6ee 100644 --- a/pkg/cmd/roachtest/tests/failover.go +++ b/pkg/cmd/roachtest/tests/failover.go @@ -1598,7 +1598,9 @@ func nodeMetric( ctx context.Context, t test.Test, c cluster.Cluster, node int, metric string, ) float64 { var value float64 - err := c.Conn(ctx, t.L(), node).QueryRowContext( + conn := c.Conn(ctx, t.L(), node) + defer conn.Close() + err := conn.QueryRowContext( ctx, `SELECT value FROM crdb_internal.node_metrics WHERE name = $1`, metric).Scan(&value) require.NoError(t, err) return value diff --git a/pkg/cmd/roachtest/tests/lease_preferences.go b/pkg/cmd/roachtest/tests/lease_preferences.go new file mode 100644 index 000000000000..97ec23d7852c --- /dev/null +++ b/pkg/cmd/roachtest/tests/lease_preferences.go @@ -0,0 +1,411 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// lease-preferences/* roachtests assert that after a related event, lease +// preferences are conformed to. Currently, the events being tested are +// stopping all or some of the preferred localities nodes, and manually +// transferring all leases to a violating locality. +// +// The timeout is controlled by the test timeout, and is currently 30 minutes. +// In most cases there should be no violating preferences immediately. However, +// this test chooses not to assert on this in order to reduce noise. +// +// The results are printed to the log file: +// +// 30.56s: violating(false) duration=0.00s: [n3: 0, n4: 0, n5: 0] +// less-preferred(true) duration=30.56s: [n3: 10, n4: 12, n5: 0] +// +// Where violating(true|false) indicates whether there is a lease violating a +// preference currently. less-preferred(true|false) indicates whether there is +// a lease on locality which is preferred, but not the first preference. The +// elapsed duration of violating/less-preferred is also shown. + +type leasePreferencesEventFn func(context.Context, test.Test, cluster.Cluster) + +type leasePreferencesSpec struct { + preferences string + ranges, replFactor int + eventFn leasePreferencesEventFn + checkNodes []int + waitForLessPreferred bool +} + +// makeStopNodesEventFn returns a leasePreferencesEventFn which stops the +// node targets supplied as arguments, when invoked. +func makeStopNodesEventFn(targets ...int) leasePreferencesEventFn { + return func(ctx context.Context, t test.Test, c cluster.Cluster) { + t.L().Printf( + "stopping nodes %v and waiting for lease preference conformance", + targets) + stopOpts := option.DefaultStopOpts() + stopOpts.RoachprodOpts.Sig = 9 + c.Stop(ctx, t.L(), stopOpts, c.Nodes(targets...)) + } +} + +// makeTransferLeasesEventFn returns a leasePreferencesEventFn which transfers +// every lease in the workload table to the target node, when invoked. +func makeTransferLeasesEventFn(gateway, target int) leasePreferencesEventFn { + return func(ctx context.Context, t test.Test, c cluster.Cluster) { + t.L().Printf( + "transferring leases to node %v and waiting for lease preference conformance", + target) + conn := c.Conn(ctx, t.L(), gateway) + defer conn.Close() + _, err := conn.ExecContext(ctx, fmt.Sprintf(` + ALTER RANGE RELOCATE LEASE TO %d + FOR SELECT range_id + FROM [SHOW RANGES FROM DATABASE kv WITH DETAILS] + WHERE lease_holder <> %d + `, + target, target, + )) + require.NoError(t, err) + } +} + +func registerLeasePreferences(r registry.Registry) { + r.Add(registry.TestSpec{ + // NB: This test takes down 1(/2) nodes in the most preferred locality. + // Some of the leases on the stopped node will be acquired by node's which + // are not in the preferred locality; or in a secondary preferred locality. + Name: "lease-preferences/partial-first-preference-down", + Owner: registry.OwnerKV, + Timeout: 60 * time.Minute, + // This test purposefully kills nodes. Skip the dead node post-test + // validation. + SkipPostValidations: registry.PostValidationNoDeadNodes, + Cluster: r.MakeClusterSpec(5, spec.CPU(4)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLeasePreferences(ctx, t, c, leasePreferencesSpec{ + preferences: `[+dc=1],[+dc=2]`, + ranges: 1000, + replFactor: 5, + checkNodes: []int{1, 3, 4, 5}, + eventFn: makeStopNodesEventFn(2 /* targets */), + waitForLessPreferred: false, + }) + }, + }) + r.Add(registry.TestSpec{ + // NB: This test takes down 2(/2) nodes in the most preferred locality. Th + // leases on the stopped node will be acquired by node's which are not in + // the most preferred locality. This test waits until all the leases are on + // the secondary preference. + Name: "lease-preferences/full-first-preference-down", + Owner: registry.OwnerKV, + Timeout: 60 * time.Minute, + // This test purposefully kills nodes. Skip the dead node post-test + // validation. + SkipPostValidations: registry.PostValidationNoDeadNodes, + Cluster: r.MakeClusterSpec(5, spec.CPU(4)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLeasePreferences(ctx, t, c, leasePreferencesSpec{ + preferences: `[+dc=1],[+dc=2]`, + ranges: 1000, + replFactor: 5, + eventFn: makeStopNodesEventFn(1, 2 /* targets */), + checkNodes: []int{3, 4, 5}, + waitForLessPreferred: false, + }) + }, + }) + r.Add(registry.TestSpec{ + // NB: This test manually transfers leases onto [+dc=3], which violates the + // lease preferences. This test then waits until all the leases are back on + // the most preferred locality. + Name: "lease-preferences/manual-violating-transfer", + Owner: registry.OwnerKV, + Timeout: 60 * time.Minute, + Cluster: r.MakeClusterSpec(5, spec.CPU(4)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLeasePreferences(ctx, t, c, leasePreferencesSpec{ + preferences: `[+dc=1],[+dc=2]`, + ranges: 1000, + replFactor: 5, + eventFn: makeTransferLeasesEventFn( + 5 /* gateway */, 5 /* target */), + checkNodes: []int{1, 2, 3, 4, 5}, + waitForLessPreferred: false, + }) + }, + }) +} + +func runLeasePreferences( + ctx context.Context, t test.Test, c cluster.Cluster, spec leasePreferencesSpec, +) { + // stableDuration is how long the test will wait, after satisfying the most + // preferred preference, or just some preference (when + // waitForLessPreferred=false). This duration is used to ensure the lease + // preference satisfaction is reasonably permanent. + const stableDuration = 30 * time.Second + + numNodes := c.Spec().NodeCount + allNodes := make([]int, 0, numNodes) + for i := 1; i <= numNodes; i++ { + allNodes = append(allNodes, i) + } + + // TODO(kvoli): temporary workaround for + // https://github.com/cockroachdb/cockroach/issues/105274 + settings := install.MakeClusterSettings() + settings.ClusterSettings["server.span_stats.span_batch_limit"] = "4096" + + startNodes := func(nodes ...int) { + for _, node := range nodes { + // Don't start a backup schedule because this test is timing sensitive. + opts := option.DefaultStartOpts() + opts.RoachprodOpts.ScheduleBackups = false + opts.RoachprodOpts.ExtraArgs = append(opts.RoachprodOpts.ExtraArgs, + // Set 2 nodes per DC locality: + // dc=1: n1 n2 + // dc=2: n3 n4 + // ... + // dc=N: n2N-1 n2N + fmt.Sprintf("--locality=region=fake-region,zone=fake-zone,dc=%d", (node-1)/2+1)) + c.Start(ctx, t.L(), opts, settings, c.Node(node)) + + } + } + + if c.IsLocal() { + // Reduce total number of ranges to a lower number when running locally. + // Local tests have a default time out of 5 minutes. + spec.ranges = 100 + } + + c.Put(ctx, t.Cockroach(), "./cockroach") + t.Status("starting cluster") + startNodes(allNodes...) + + conn := c.Conn(ctx, t.L(), numNodes) + defer conn.Close() + + setLeasePreferences := func(ctx context.Context, preferences string) { + _, err := conn.ExecContext(ctx, fmt.Sprintf( + `ALTER database kv CONFIGURE ZONE USING + num_replicas = %d, + num_voters = %d, + voter_constraints='[]', + lease_preferences='[%s]' + `, + spec.replFactor, spec.replFactor, spec.preferences, + )) + require.NoError(t, err) + } + + checkLeasePreferenceConformance := func(ctx context.Context) { + result, err := waitForLeasePreferences( + ctx, t, c, spec.checkNodes, spec.waitForLessPreferred, stableDuration) + require.NoError(t, err) + require.Truef(t, !result.violating(), "violating lease preferences %s", result) + if spec.waitForLessPreferred { + require.Truef(t, !result.lessPreferred(), "less preferred preferences %s", result) + } + } + + t.L().Printf("creating workload database") + _, err := conn.ExecContext(ctx, `CREATE DATABASE kv;`) + require.NoError(t, err) + + // Initially, set no lease preference and require every range to have the + // replication factor specified in the test. + t.L().Printf("setting zone configs") + configureAllZones(t, ctx, conn, zoneConfig{ + replicas: spec.replFactor, + }) + // Wait for the existing ranges (not kv) to be up-replicated. That way, + // creating the splits and waiting for up-replication on kv will be much + // quicker. + require.NoError(t, WaitForReplication(ctx, t, conn, spec.replFactor)) + c.Run(ctx, c.Node(numNodes), fmt.Sprintf( + `./cockroach workload init kv --scatter --splits %d {pgurl:%d}`, + spec.ranges, numNodes)) + // Wait for under-replicated ranges before checking lease preference + // enforcement. + require.NoError(t, WaitForReplication(ctx, t, conn, spec.replFactor)) + + t.L().Printf("setting lease preferences: %s", spec.preferences) + setLeasePreferences(ctx, spec.preferences) + t.L().Printf("waiting for initial lease preference conformance") + checkLeasePreferenceConformance(ctx) + + // Run the spec event function. The event function will move leases to + // non-conforming localities. + spec.eventFn(ctx, t, c) + // Run a full table scan to force lease acquisition, there may be unacquired + // leases if the event stopped nodes. + _, err = conn.ExecContext(ctx, `SELECT count(*) FROM kv.kv;`) + require.NoError(t, err) + + // Wait for the preference conformance with some leases in non-conforming + // localities. + t.L().Printf("waiting for post-event lease preference conformance") + checkLeasePreferenceConformance(ctx) +} + +type leasePreferencesResult struct { + nodes []int + violatingCounts, lessPreferredCounts []int + violatingDuration, lessPreferredDuration time.Duration +} + +func (lpr leasePreferencesResult) violating() bool { + for i := range lpr.nodes { + if lpr.violatingCounts[i] > 0 { + return true + } + } + return false +} + +func (lpr leasePreferencesResult) lessPreferred() bool { + for i := range lpr.nodes { + if lpr.lessPreferredCounts[i] > 0 { + return true + } + } + return false +} + +func (lpr leasePreferencesResult) String() string { + var buf strings.Builder + fmt.Fprintf(&buf, + "violating(%v) duration=%.2fs: [", + lpr.violating(), lpr.violatingDuration.Seconds()) + for i := range lpr.nodes { + if i > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, "n%d: %d", lpr.nodes[i], lpr.violatingCounts[i]) + } + fmt.Fprintf(&buf, + "] less-preferred(%v) duration=%.2fs: [", + lpr.lessPreferred(), lpr.lessPreferredDuration.Seconds()) + for i := range lpr.nodes { + if i > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, "n%d: %d", lpr.nodes[i], lpr.lessPreferredCounts[i]) + } + buf.WriteString("]") + return buf.String() +} + +// waitForLeasePreferences waits until there are no leases violating +// preferences, or also that there are no leases on less preferred options. +// When checkViolatingOnly is true, this function only waits for leases +// violating preferences, and does not wait for leases on less preferred nodes. +func waitForLeasePreferences( + ctx context.Context, + t test.Test, + c cluster.Cluster, + nodes []int, + waitForLessPreferred bool, + stableDuration time.Duration, +) (leasePreferencesResult, error) { + // NB: We are querying metrics, expect these to be populated approximately + // every 10s. + const checkInterval = 10 * time.Second + var checkTimer timeutil.Timer + defer checkTimer.Stop() + checkTimer.Reset(checkInterval) + + // preferenceMetrics queries the crdb_internal metrics table per-node and + // returns two slices: (1) number of leases on each node which are violating + // preferences, and (2) the number of leases on each node which are less + // preferred, but satisfy a preference. + preferenceMetrics := func(ctx context.Context) (violating, lessPreferred []int) { + const violatingPreferenceMetricName = `leases.preferences.violating` + const lessPreferredMetricMetricName = `leases.preferences.less-preferred` + for _, node := range nodes { + violating = append(violating, + int(nodeMetric(ctx, t, c, node, violatingPreferenceMetricName))) + lessPreferred = append(lessPreferred, + int(nodeMetric(ctx, t, c, node, lessPreferredMetricMetricName))) + } + return violating, lessPreferred + } + + var ret leasePreferencesResult + ret.nodes = nodes + start := timeutil.Now() + for { + select { + case <-ctx.Done(): + return ret, ctx.Err() + case <-checkTimer.C: + checkTimer.Read = true + violating, lessPreferred := preferenceMetrics(ctx) + now := timeutil.Now() + sinceStart := now.Sub(start) + // Sanity check we are getting the correct number of metrics back, given + // the nodes we are checking. + require.Equal(t, len(nodes), len(violating)) + require.Equal(t, len(nodes), len(lessPreferred)) + ret.violatingCounts = violating + ret.lessPreferredCounts = lessPreferred + isViolating, isLessPreferred := ret.violating(), ret.lessPreferred() + // Record either the stable duration if the preferences are satisfied, or + // record the time since waiting began, if not. + var violatingStable, lessPreferredStable bool + if isViolating { + ret.violatingDuration = sinceStart + } else { + violatingStable = sinceStart-ret.violatingDuration > stableDuration + } + if isLessPreferred { + ret.lessPreferredDuration = sinceStart + } else { + lessPreferredStable = sinceStart-ret.lessPreferredDuration > stableDuration + } + + // Report the status of the test every checkInterval (10s). + t.L().Printf("%.2fs: %s", sinceStart.Seconds(), ret) + stableNotViolating := !isViolating && violatingStable + stableNotLessPreferred := !isLessPreferred && lessPreferredStable + if stableNotViolating && stableNotLessPreferred { + // Every lease is on the most preferred preference and has been for at + // least stableDuration. Return early. + return ret, nil + } else if !waitForLessPreferred && stableNotViolating { + // Every lease is on some preference for at least stableDuration. We + // aren't going to wait for less preferred leases because + // waitForLessPreferred is false. + return ret, nil + } else { + // Some leases are on non-preferred localities, or there are less + // preferred leases and we are waiting for both. + t.L().Printf("not yet meeting requirements will check again in %s", + checkInterval) + } + checkTimer.Reset(checkInterval) + } + } +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index c91731ff3349..9d1779b09b35 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -81,6 +81,7 @@ func RegisterTests(r registry.Registry) { registerKnex(r) registerLOQRecovery(r) registerLargeRange(r) + registerLeasePreferences(r) registerLedger(r) registerLibPQ(r) registerLiquibase(r) From 6cf896f5f40f704ebc855843b4857623a6d0364b Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 21 Jul 2023 20:16:41 +0000 Subject: [PATCH 2/4] kvserver: transfer lease when acquiring lease outside preferences When a leaseholder is lost, any surviving replica may acquire the lease, even if it violates lease preferences. There are two main reasons for this: we need to elect a new Raft leader who will acquire the lease, which is agnostic to lease preferences, and there may not even be any surviving replicas that satisfy the lease preferences at all, so we don't want to keep the range unavailable while we try to figure this out (considering e.g. network timeouts can delay this for many seconds). However, after acquiring a lease, we rely on the replicate queue to transfer the lease back to a replica that conforms with the preferences, which can take several minutes. In multi-region clusters, this can cause severe latency degradation if the lease is acquired in a remote region. This patch will detect lease preference violations when a replica acquires a new lease, and eagerly enqueue it in the replicate queue for transfer (if possible). This behavior can be turned off (on by default), by the cluster setting `kv.lease.check_preferences_on_acquisition.enabled`. Epic: none Release note (bug fix): When losing a leaseholder and using lease preferences, the lease can be acquired by any other replica (regardless of lease preferences) in order to restore availability as soon as possible. The new leaseholder will now immediately check if it violates the lease preferences, and attempt to transfer the lease to a replica that satisfies the preferences if possible. Co-authored-by: Erik Grinaker Co-authored-by: Austen McClernon --- pkg/kv/kvserver/replica_proposal.go | 22 ++++++++++++++++++++++ pkg/kv/kvserver/replica_range_lease.go | 15 +++++++++++++++ pkg/kv/kvserver/replicate_queue.go | 7 +++++++ 3 files changed, 44 insertions(+) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index b4ac3dae25b0..981996ee9456 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -460,6 +460,28 @@ func (r *Replica) leasePostApplyLocked( }) } + // If we acquired a new lease, and it violates the lease preferences, enqueue + // it in the replicate queue. + if leaseChangingHands && iAmTheLeaseHolder { + if LeaseCheckPreferencesOnAcquisitionEnabled.Get(&r.store.cfg.Settings.SV) { + preferenceStatus := makeLeasePreferenceStatus(st, r.store.StoreID(), r.store.Attrs(), + r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) + switch preferenceStatus { + case leasePreferencesOK, leasePreferencesLessPreferred, leasePreferencesUnknown: + // We could also enqueue the lease when we are a less preferred + // leaseholder, however the replicate queue will eventually get to it and + // we already satisfy _some_ preference. + case leasePreferencesViolating: + log.VEventf(ctx, 2, + "acquired lease violates lease preferences, enqueueing for transfer [lease=%v preferences=%v]", + newLease, r.mu.conf.LeasePreferences) + r.store.replicateQueue.AddAsync(ctx, r, replicateQueueLeasePreferencePriority) + default: + log.Fatalf(ctx, "unknown lease preferences status: %v", preferenceStatus) + } + } + } + // Inform the store of this lease. if iAmTheLeaseHolder { r.store.registerLeaseholder(ctx, r, newLease.Sequence) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index e91c7a7ed64c..478e4ca8b01e 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -95,6 +95,21 @@ var ExpirationLeasesOnly = settings.RegisterBoolSetting( var DisableExpirationLeasesOnly = envutil.EnvOrDefaultBool( "COCKROACH_DISABLE_EXPIRATION_LEASES_ONLY", false) +// LeaseCheckPreferencesOnAcquisitionEnabled controls whether lease preferences +// are checked upon acquiring a new lease. If the new lease violates the +// configured preferences, it is enqueued in the replicate queue for +// processing. +// +// TODO(kvoli): Remove this cluster setting in 24.1, once we wish to enable +// this by default or is subsumed by another mechanism. +var LeaseCheckPreferencesOnAcquisitionEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.lease.check_preferences_on_acquisition.enabled", + "controls whether lease preferences are checked on lease acquisition, "+ + "if the new lease violates preferences, it is queued for processing", + true, +) + var leaseStatusLogLimiter = func() *log.EveryN { e := log.Every(15 * time.Second) e.ShouldLog() // waste the first shot diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 13fd8b507675..c7e65372c38b 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -58,6 +58,13 @@ const ( // in high latency clusters, and not allowing enough of a cushion can // make rebalance thrashing more likely (#17879). newReplicaGracePeriod = 5 * time.Minute + + // replicateQueueLeasePreferencePriority is the priority replicas are + // enqueued into the replicate queue with when violating lease preferences. + // This priority is lower than any voter up-replication, yet higher than + // removal, non-voter addition and rebalancing. + // See allocatorimpl.AllocatorAction.Priority. + replicateQueueLeasePreferencePriority = 1001 ) // MinLeaseTransferInterval controls how frequently leases can be transferred From 76932f9b4ef27fab477a47b91adf14c6c86df7fd Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Fri, 21 Jul 2023 00:37:07 +0000 Subject: [PATCH 3/4] allocator/plan: place leases violating preferences in purgatory This patch places replicas in the replicate queue purgatory when it has a lease violating the lease preferences and it's unable to find a suitable target. This causes the replica to be retried more often. This will only trigger when replicas are eagerly enqueued (typically when we acquire a new lease that violates preferences), since we otherwise don't attempt to enqueue replicas when they don't have a valid lease transfer target. This patch also enables requeuing replicas after a successful rebalance, when the lease violates preferences. Epic: none Release note: None Co-authored-by: Erik Grinaker Co-authored-by: Austen McClernon --- .../allocator/allocatorimpl/allocator.go | 5 + pkg/kv/kvserver/allocator/base.go | 3 + pkg/kv/kvserver/replica_proposal.go | 5 +- pkg/kv/kvserver/replicate_queue.go | 76 ++++++++++-- pkg/kv/kvserver/replicate_queue_test.go | 117 ++++++++++++++++++ 5 files changed, 193 insertions(+), 13 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 6b529503ca8a..4065e8e49475 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -2232,6 +2232,11 @@ func (a *Allocator) TransferLeaseTarget( forceDecisionWithoutStats bool, opts allocator.TransferLeaseOptions, ) roachpb.ReplicaDescriptor { + if a.knobs != nil { + if blockFn := a.knobs.BlockTransferTarget; blockFn != nil && blockFn() { + return roachpb.ReplicaDescriptor{} + } + } excludeLeaseRepl := opts.ExcludeLeaseRepl if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) || a.leaseholderShouldMoveDueToIOOverload(ctx, storePool, existing, leaseRepl.StoreID(), a.IOOverloadOptions()) { diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index ef073bf4c0ed..be9e9c991b01 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -95,6 +95,9 @@ type TestingKnobs struct { Desc() *roachpb.RangeDescriptor StoreID() roachpb.StoreID }) *raft.Status + // BlockTransferTarget can be used to block returning any transfer targets + // from TransferLeaseTarget. + BlockTransferTarget func() bool } // QPSRebalanceThreshold is much like rangeRebalanceThreshold, but for diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 981996ee9456..43830e3431ca 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -464,8 +464,9 @@ func (r *Replica) leasePostApplyLocked( // it in the replicate queue. if leaseChangingHands && iAmTheLeaseHolder { if LeaseCheckPreferencesOnAcquisitionEnabled.Get(&r.store.cfg.Settings.SV) { - preferenceStatus := makeLeasePreferenceStatus(st, r.store.StoreID(), r.store.Attrs(), - r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) + preferenceStatus := makeLeasePreferenceStatus(r.leaseStatusAtRLocked(ctx, now), + r.store.StoreID(), r.store.Attrs(), r.store.nodeDesc.Attrs, + r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) switch preferenceStatus { case leasePreferencesOK, leasePreferencesLessPreferred, leasePreferencesUnknown: // We could also enqueue the lease when we are a less preferred diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index c7e65372c38b..1f2c2111d50b 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -922,15 +922,20 @@ func (rq *replicateQueue) ShouldRequeue(ctx context.Context, change ReplicateQue // time around. requeue = false - } else if change.Action == allocatorimpl.AllocatorConsiderRebalance { - // Don't requeue after a successful rebalance operation. - requeue = false - } else if change.Op.lhBeingRemoved() { // Don't requeue if the leaseholder was removed as a voter or the range // lease was transferred away. requeue = false + } else if change.Action == allocatorimpl.AllocatorConsiderRebalance && + !change.replica.leaseViolatesPreferences(ctx) { + // Don't requeue after a successful rebalance operation, when the lease + // does not violate any preferences. If the lease does violate preferences, + // the next process attempt will either find a target to transfer the lease + // or place the replica into purgatory if unable. See + // CantTransferLeaseViolatingPreferencesError. + requeue = false + } else { // Otherwise, requeue to see if there is more work to do. As the // operation succeeded and was planned for a repair action i.e. not @@ -1776,7 +1781,8 @@ func (rq *replicateQueue) considerRebalance( if !canTransferLeaseFrom(ctx, repl) { return nil, nil } - return rq.shedLeaseTarget( + var err error + op, err = rq.shedLeaseTarget( ctx, repl, desc, @@ -1786,8 +1792,19 @@ func (rq *replicateQueue) considerRebalance( ExcludeLeaseRepl: false, CheckCandidateFullness: true, }, - ), nil - + ) + if err != nil { + if scatter && errors.Is(err, CantTransferLeaseViolatingPreferencesError{}) { + // When scatter is specified, we ignore lease preference violation + // errors returned from shedLeaseTarget. These errors won't place the + // replica into purgatory because they are called outside the replicate + // queue loop, directly. + log.KvDistribution.VEventf(ctx, 3, "%v", err) + err = nil + } + return nil, err + } + return op, nil } // If we have a valid rebalance action (ok == true) and we haven't @@ -1950,22 +1967,36 @@ func (rq *replicateQueue) shedLeaseTarget( desc *roachpb.RangeDescriptor, conf roachpb.SpanConfig, opts allocator.TransferLeaseOptions, -) (op AllocationOp) { +) (op AllocationOp, _ error) { usage := RangeUsageInfoForRepl(repl) + existingVoters := desc.Replicas().VoterDescriptors() // Learner replicas aren't allowed to become the leaseholder or raft leader, // so only consider the `VoterDescriptors` replicas. target := rq.allocator.TransferLeaseTarget( ctx, rq.storePool, conf, - desc.Replicas().VoterDescriptors(), + existingVoters, repl, usage, false, /* forceDecisionWithoutStats */ opts, ) if target == (roachpb.ReplicaDescriptor{}) { - return nil + // If we don't find a suitable target, but we own a lease violating the + // lease preferences, and there is a more suitable target, return an error + // to place the replica in purgatory and retry sooner. This typically + // happens when we've just acquired a violating lease and we eagerly + // enqueue the replica before we've received Raft leadership, which + // prevents us from finding appropriate lease targets since we can't + // determine if any are behind. + liveVoters, _ := rq.storePool.LiveAndDeadReplicas( + existingVoters, false /* includeSuspectAndDrainingStores */) + preferred := rq.allocator.PreferredLeaseholders(rq.storePool, conf, liveVoters) + if len(preferred) > 0 && repl.leaseViolatesPreferences(ctx) { + return nil, CantTransferLeaseViolatingPreferencesError{RangeID: desc.RangeID} + } + return nil, nil } op = AllocationTransferLeaseOp{ @@ -1974,7 +2005,7 @@ func (rq *replicateQueue) shedLeaseTarget( usage: usage, bypassSafetyChecks: false, } - return op + return op, nil } // shedLease takes in a leaseholder replica, looks for a target for transferring @@ -2199,3 +2230,26 @@ func RangeUsageInfoForRepl(repl *Replica) allocator.RangeUsageInfo { }, } } + +// CantTransferLeaseViolatingPreferencesError is an error returned when a lease +// violates the lease preferences, but we couldn't find a valid target to +// transfer the lease to. It indicates that the replica should be sent to +// purgatory, to retry the transfer faster. +type CantTransferLeaseViolatingPreferencesError struct { + RangeID roachpb.RangeID +} + +var _ errors.SafeFormatter = CantTransferLeaseViolatingPreferencesError{} + +func (e CantTransferLeaseViolatingPreferencesError) Error() string { return fmt.Sprint(e) } + +func (e CantTransferLeaseViolatingPreferencesError) Format(s fmt.State, verb rune) { + errors.FormatError(e, s, verb) +} + +func (e CantTransferLeaseViolatingPreferencesError) SafeFormatError(p errors.Printer) (next error) { + p.Printf("can't transfer r%d lease violating preferences, no suitable target", e.RangeID) + return nil +} + +func (CantTransferLeaseViolatingPreferencesError) PurgatoryErrorMarker() {} diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index e0752ab801c2..bd2d9d0ea90a 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" @@ -2487,3 +2488,119 @@ func TestReplicateQueueExpirationLeasesOnly(t *testing.T) { return epochLeases > 0 && expLeases > 0 && expLeases <= initialExpLeases }, 30*time.Second, 500*time.Millisecond) } + +// TestReplicateQueueLeasePreferencePurgatoryError tests that not finding a +// lease transfer target whilst violating lease preferences, will put the +// replica in the replicate queue purgatory. +func TestReplicateQueueLeasePreferencePurgatoryError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + skip.UnderRace(t) // too slow under stressrace + skip.UnderDeadlock(t) + skip.UnderShort(t) + + const initialPreferredNode = 1 + const nextPreferredNode = 2 + const numRanges = 40 + const numNodes = 3 + + var blockTransferTarget atomic.Bool + + blockTransferTargetFn := func() bool { + block := blockTransferTarget.Load() + return block + } + + knobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + AllocatorKnobs: &allocator.TestingKnobs{ + BlockTransferTarget: blockTransferTargetFn, + }, + }, + } + + serverArgs := make(map[int]base.TestServerArgs, numNodes) + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Knobs: knobs, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "rack", Value: fmt.Sprintf("%d", i+1)}}, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + db := tc.Conns[0] + setLeasePreferences := func(node int) { + _, err := db.Exec(fmt.Sprintf(`ALTER TABLE t CONFIGURE ZONE USING + num_replicas=3, num_voters=3, voter_constraints='[]', lease_preferences='[[+rack=%d]]'`, + node)) + require.NoError(t, err) + } + + leaseCount := func(node int) int { + var count int + err := db.QueryRow(fmt.Sprintf( + "SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node), + ).Scan(&count) + require.NoError(t, err) + return count + } + + checkLeaseCount := func(node, expectedLeaseCount int) error { + if count := leaseCount(node); count != expectedLeaseCount { + return errors.Errorf("expected %d leases on node %d, found %d", + expectedLeaseCount, node, count) + } + return nil + } + + // Create a test table with numRanges-1 splits, to end up with numRanges + // ranges. We will use the test table ranges to assert on the purgatory lease + // preference behavior. + _, err := db.Exec("CREATE TABLE t (i int);") + require.NoError(t, err) + _, err = db.Exec( + fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1)) + require.NoError(t, err) + _, err = db.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t;") + require.NoError(t, err) + require.NoError(t, tc.WaitForFullReplication()) + + store := tc.GetFirstStoreFromServer(t, 0) + // Set a preference on the initial node, then wait until all the leases for + // the test table are on that node. + setLeasePreferences(initialPreferredNode) + testutils.SucceedsSoon(t, func() error { + require.NoError(t, store.ForceReplicationScanAndProcess()) + return checkLeaseCount(initialPreferredNode, numRanges) + }) + + // Block returning transfer targets from the allocator, then update the + // preferred node. We expect that every range for the test table will end up + // in purgatory on the initially preferred node. + blockTransferTarget.Store(true) + setLeasePreferences(nextPreferredNode) + testutils.SucceedsSoon(t, func() error { + require.NoError(t, store.ForceReplicationScanAndProcess()) + if purgLen := store.ReplicateQueuePurgatoryLength(); purgLen != numRanges { + return errors.Errorf("expected %d in purgatory but got %v", numRanges, purgLen) + } + return nil + }) + + // Lastly, unblock returning transfer targets. Expect that the leases from + // the test table all move to the new preference. Note we don't force a + // replication queue scan, as the purgatory retry should handle the + // transfers. + blockTransferTarget.Store(false) + testutils.SucceedsSoon(t, func() error { + return checkLeaseCount(nextPreferredNode, numRanges) + }) +} From dc05a48f7cab6baacb1116f6d58cfe9bab3cc4e4 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 31 Jul 2023 19:56:58 +0000 Subject: [PATCH 4/4] kvserver: ignore lease validity when checking lease preferences In #107507, we began eagerly enqueuing into the replicate queue, when acquiring a replica acquired a new lease which violated lease preferences. Lease preferences were only considered violated when the lease itself was valid. In #107507, we saw that it is uncommon, but possible for an invalid lease to be acquired, violate lease preferences and not be enqueued as a result. The end result was a leaseholder violating the applied lease preferences which would not be resolved until the next scanner cycle. Update the eager enqueue check to only check that the replica is the incoming leaseholder when applying the lease, and that the replica violates the applied lease preferences. The check now applies on any lease acquisition, where previously it only occurred on the leaseholder changing. Note the purgatory error introduced in #107507, still checks that the lease is valid and owned by the store before proceeding. It is a condition that the lease must be valid+owned by the store to have a change planned, so whilst it is possible the lease becomes invalid somewhere in-between planning, when the replica applies a valid lease, it will still be enqueued, so purgatory is unnecessary. Fixes: #107862 Release note: None --- pkg/kv/kvserver/replica_metrics.go | 4 +-- pkg/kv/kvserver/replica_proposal.go | 41 +++++++++++++------------- pkg/kv/kvserver/replica_range_lease.go | 38 ++++++++++++------------ 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index d6bcafcc9b39..4c2f9c585449 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -146,8 +146,8 @@ func calcReplicaMetrics(d calcReplicaMetricsInput) ReplicaMetrics { validLeaseType = d.leaseStatus.Lease.Type() if validLeaseOwner { livenessLease = keys.NodeLivenessSpan.Overlaps(d.desc.RSpan().AsRawSpanWithNoLocals()) - switch makeLeasePreferenceStatus( - d.leaseStatus, d.storeID, d.storeAttrs, d.nodeAttrs, + switch checkStoreAgainstLeasePreferences( + d.storeID, d.storeAttrs, d.nodeAttrs, d.nodeLocality, d.conf.LeasePreferences) { case leasePreferencesViolating: violatingLeasePreferences = true diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 43830e3431ca..9787a85348f2 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -460,26 +460,27 @@ func (r *Replica) leasePostApplyLocked( }) } - // If we acquired a new lease, and it violates the lease preferences, enqueue - // it in the replicate queue. - if leaseChangingHands && iAmTheLeaseHolder { - if LeaseCheckPreferencesOnAcquisitionEnabled.Get(&r.store.cfg.Settings.SV) { - preferenceStatus := makeLeasePreferenceStatus(r.leaseStatusAtRLocked(ctx, now), - r.store.StoreID(), r.store.Attrs(), r.store.nodeDesc.Attrs, - r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) - switch preferenceStatus { - case leasePreferencesOK, leasePreferencesLessPreferred, leasePreferencesUnknown: - // We could also enqueue the lease when we are a less preferred - // leaseholder, however the replicate queue will eventually get to it and - // we already satisfy _some_ preference. - case leasePreferencesViolating: - log.VEventf(ctx, 2, - "acquired lease violates lease preferences, enqueueing for transfer [lease=%v preferences=%v]", - newLease, r.mu.conf.LeasePreferences) - r.store.replicateQueue.AddAsync(ctx, r, replicateQueueLeasePreferencePriority) - default: - log.Fatalf(ctx, "unknown lease preferences status: %v", preferenceStatus) - } + // If we acquired a lease, and it violates the lease preferences, enqueue it + // in the replicate queue. NOTE: We don't check whether the lease is valid, + // it is possible that the lease being applied is invalid due to replication + // lag, or previously needing a snapshot. The replicate queue will ensure the + // lease is valid and owned by the replica before processing. + if iAmTheLeaseHolder && leaseChangingHands && + LeaseCheckPreferencesOnAcquisitionEnabled.Get(&r.store.cfg.Settings.SV) { + preferenceStatus := checkStoreAgainstLeasePreferences(r.store.StoreID(), r.store.Attrs(), + r.store.nodeDesc.Attrs, r.store.nodeDesc.Locality, r.mu.conf.LeasePreferences) + switch preferenceStatus { + case leasePreferencesOK, leasePreferencesLessPreferred: + // We could also enqueue the lease when we are a less preferred + // leaseholder, however the replicate queue will eventually get to it and + // we already satisfy _some_ preference. + case leasePreferencesViolating: + log.VEventf(ctx, 2, + "acquired lease violates lease preferences, enqueuing for transfer [lease=%v preferences=%v]", + newLease, r.mu.conf.LeasePreferences) + r.store.replicateQueue.AddAsync(ctx, r, replicateQueueLeasePreferencePriority) + default: + log.Fatalf(ctx, "unknown lease preferences status: %v", preferenceStatus) } } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 478e4ca8b01e..95ddc3f91529 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -1525,16 +1525,14 @@ func (r *Replica) hasCorrectLeaseTypeRLocked(lease roachpb.Lease) bool { type leasePreferencesStatus int const ( - // leasePreferencesUnknown indicates the preferences status cannot be - // determined. - leasePreferencesUnknown leasePreferencesStatus = iota - // leasePreferencesViolating indicates the leaseholder does not - // satisfy any lease preference applied. + _ leasePreferencesStatus = iota + // leasePreferencesViolating indicates the checked store does not satisfy any + // lease preference applied. leasePreferencesViolating - // leasePreferencesLessPreferred indicates the leaseholder satisfies _some_ + // leasePreferencesLessPreferred indicates the checked store satisfies _some_ // preference, however not the most preferred. leasePreferencesLessPreferred - // leasePreferencesOK indicates the lease satisfies the first + // leasePreferencesOK indicates the checked store satisfies the first // preference, or no lease preferences are applied. leasePreferencesOK ) @@ -1546,32 +1544,34 @@ func (r *Replica) leaseViolatesPreferences(ctx context.Context) bool { storeID := r.store.StoreID() now := r.Clock().NowAsClockTimestamp() r.mu.RLock() - leaseStatus := r.leaseStatusAtRLocked(ctx, now) preferences := r.mu.conf.LeasePreferences + leaseStatus := r.leaseStatusAtRLocked(ctx, now) r.mu.RUnlock() + if !leaseStatus.IsValid() || !leaseStatus.Lease.OwnedBy(storeID) { + // We can't determine if the lease preferences are being conformed to or + // not, as the store either doesn't own the lease, or doesn't own a valid + // lease. + return false + } + storeAttrs := r.store.Attrs() nodeAttrs := r.store.nodeDesc.Attrs nodeLocality := r.store.nodeDesc.Locality - preferenceStatus := makeLeasePreferenceStatus( - leaseStatus, storeID, storeAttrs, nodeAttrs, nodeLocality, preferences) - + preferenceStatus := checkStoreAgainstLeasePreferences( + storeID, storeAttrs, nodeAttrs, nodeLocality, preferences) return preferenceStatus == leasePreferencesViolating } -func makeLeasePreferenceStatus( - leaseStatus kvserverpb.LeaseStatus, +// checkStoreAgainstLeasePreferences returns whether the given store would +// violate, be less preferred or ok, leaseholder, according the the lease +// preferences. +func checkStoreAgainstLeasePreferences( storeID roachpb.StoreID, storeAttrs, nodeAttrs roachpb.Attributes, nodeLocality roachpb.Locality, preferences []roachpb.LeasePreference, ) leasePreferencesStatus { - if !leaseStatus.IsValid() || !leaseStatus.Lease.OwnedBy(storeID) { - // We can't determine if the lease preferences are being conformed to or - // not, as the store either doesn't own the lease, or doesn't own a valid - // lease. - return leasePreferencesUnknown - } if len(preferences) == 0 { return leasePreferencesOK }