diff --git a/pkg/kv/kvserver/asim/queue/BUILD.bazel b/pkg/kv/kvserver/asim/queue/BUILD.bazel index d2e10db21446..bb31212acd60 100644 --- a/pkg/kv/kvserver/asim/queue/BUILD.bazel +++ b/pkg/kv/kvserver/asim/queue/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "queue", srcs = [ + "allocator_replica.go", "pacer.go", "queue.go", "replicate_queue.go", @@ -13,11 +14,18 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvpb", + "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", + "//pkg/kv/kvserver/allocator/plan", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/state", + "//pkg/kv/kvserver/constraint", + "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", + "//pkg/util/hlc", "//pkg/util/log", + "//pkg/util/timeutil", + "@io_etcd_go_raft_v3//:raft", ], ) @@ -35,6 +43,7 @@ go_test( "//pkg/kv/kvserver/asim/gossip", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/workload", + "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvserver/asim/queue/allocator_replica.go b/pkg/kv/kvserver/asim/queue/allocator_replica.go new file mode 100644 index 000000000000..7269700198b8 --- /dev/null +++ b/pkg/kv/kvserver/asim/queue/allocator_replica.go @@ -0,0 +1,165 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package queue + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "go.etcd.io/raft/v3" +) + +// SimulatorReplica is a replica that is being tracked as a potential candidate +// for rebalancing activities. It maintains a set of methods that enable +// querying its state and processing a rebalancing action if taken. +type SimulatorReplica struct { + rng state.Range + repl state.Replica + usage allocator.RangeUsageInfo + state state.State +} + +var _ plan.AllocatorReplica = &SimulatorReplica{} + +// NewSimulatorReplica returns a new SimulatorReplica which implements the +// plan.AllocatorReplica interface used for replication planning. +func NewSimulatorReplica(repl state.Replica, s state.State) *SimulatorReplica { + rng, ok := s.Range(repl.Range()) + if !ok { + return nil + } + sr := &SimulatorReplica{ + rng: rng, + repl: repl, + usage: s.RangeUsageInfo(repl.Range(), repl.StoreID()), + state: s, + } + return sr +} + +func (sr *SimulatorReplica) HasCorrectLeaseType(lease roachpb.Lease) bool { + return true +} + +// CurrentLeaseStatus returns the status of the current lease for the +// timestamp given. +// +// Common operations to perform on the resulting status are to check if +// it is valid using the IsValid method and to check whether the lease +// is held locally using the OwnedBy method. +// +// Note that this method does not check to see if a transfer is pending, +// but returns the status of the current lease and ownership at the +// specified point in time. +func (sr *SimulatorReplica) LeaseStatusAt( + ctx context.Context, now hlc.ClockTimestamp, +) kvserverpb.LeaseStatus { + return kvserverpb.LeaseStatus{ + Lease: roachpb.Lease{ + Replica: sr.repl.Descriptor(), + }, + State: kvserverpb.LeaseState_VALID, + } +} + +// LeaseViolatesPreferences checks if current replica owns the lease and if it +// violates the lease preferences defined in the span config. If there is an +// error or no preferences defined then it will return false and consider that +// to be in-conformance. +func (sr *SimulatorReplica) LeaseViolatesPreferences(context.Context) bool { + descs := sr.state.StoreDescriptors(true /* useCached */, sr.repl.StoreID()) + if len(descs) != 1 { + panic(fmt.Sprintf("programming error: cannot get store descriptor for store %d", sr.repl.StoreID())) + } + storeDesc := descs[0] + _, conf := sr.DescAndSpanConfig() + + if len(conf.LeasePreferences) == 0 { + return false + } + for _, preference := range conf.LeasePreferences { + if constraint.ConjunctionsCheck(storeDesc, preference.Constraints) { + return false + } + } + // We have at lease one preference set up, but we don't satisfy any. + return true +} + +func (sr *SimulatorReplica) LastReplicaAdded() (roachpb.ReplicaID, time.Time) { + // We return a hack here, using the next replica ID from the descriptor and + // the current time. This is used when removing a replica to provide a grace + // period for new replicas. The corresponding code in plan.findRemoveVoter + // uses the system wall clock and we avoid that code path for now by always + // finding a remove voter without retries. + // TODO(kvoli): Record the actual time the last replica was added and rip out + // the timeutil.Now() usage in plan.findRemoveVoter, instead passing in now() + // so it maps to the simulated time. + return sr.Desc().NextReplicaID - 1, timeutil.Now() +} + +// OwnsValidLease returns whether this replica is the current valid +// leaseholder. +func (sr *SimulatorReplica) OwnsValidLease(context.Context, hlc.ClockTimestamp) bool { + return sr.repl.HoldsLease() +} + +// StoreID returns the Replica's StoreID. +func (sr *SimulatorReplica) StoreID() roachpb.StoreID { + return roachpb.StoreID(sr.repl.StoreID()) +} + +// GetRangeID returns the Range ID. +func (sr *SimulatorReplica) GetRangeID() roachpb.RangeID { + return roachpb.RangeID(sr.repl.Range()) +} + +// RaftStatus returns the current raft status of the replica. It returns +// nil if the Raft group has not been initialized yet. +func (sr *SimulatorReplica) RaftStatus() *raft.Status { + return sr.state.RaftStatus(sr.rng.RangeID(), sr.repl.StoreID()) +} + +// GetFirstIndex returns the index of the first entry in the replica's Raft +// log. +func (sr *SimulatorReplica) GetFirstIndex() kvpb.RaftIndex { + // TODO(kvoli): We always return 2 here as RaftStatus is unimplemented. When + // it is implemented, this may become variable. + return 2 +} + +// DescAndSpanConfig returns the authoritative range descriptor as well +// as the span config for the replica. +func (sr *SimulatorReplica) DescAndSpanConfig() (*roachpb.RangeDescriptor, roachpb.SpanConfig) { + return sr.rng.Descriptor(), sr.rng.SpanConfig() +} + +// Desc returns the authoritative range descriptor, acquiring a replica lock in +// the process. +func (sr *SimulatorReplica) Desc() *roachpb.RangeDescriptor { + return sr.rng.Descriptor() +} + +// RangeUsageInfo returns usage information (sizes and traffic) needed by +// the allocator to make rebalancing decisions for a given range. +func (sr *SimulatorReplica) RangeUsageInfo() allocator.RangeUsageInfo { + return sr.usage +} diff --git a/pkg/kv/kvserver/asim/queue/queue.go b/pkg/kv/kvserver/asim/queue/queue.go index d5e6f2b78efc..67ca66795cf6 100644 --- a/pkg/kv/kvserver/asim/queue/queue.go +++ b/pkg/kv/kvserver/asim/queue/queue.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" ) // RangeQueue presents an interface to interact with a single consumer @@ -101,6 +102,7 @@ func (pq *priorityQueue) Pop() interface{} { // baseQueue is an implementation of the ReplicateQueue interface. type baseQueue struct { + log.AmbientContext priorityQueue storeID state.StoreID stateChanger state.Changer diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue.go b/pkg/kv/kvserver/asim/queue/replicate_queue.go index e8329c02eb79..b499ac8a6177 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue.go @@ -13,21 +13,23 @@ package queue import ( "container/heap" "context" + "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) type replicateQueue struct { baseQueue - allocator allocatorimpl.Allocator - storePool storepool.AllocatorStorePool - delay func(rangeSize int64, add bool) time.Duration + planner plan.ReplicationPlanner + clock *hlc.Clock + delay func(rangeSize int64, add bool) time.Duration } // NewReplicateQueue returns a new replicate queue. @@ -39,32 +41,47 @@ func NewReplicateQueue( storePool storepool.AllocatorStorePool, start time.Time, ) RangeQueue { - return &replicateQueue{ + rq := replicateQueue{ baseQueue: baseQueue{ - priorityQueue: priorityQueue{items: make([]*replicaItem, 0, 1)}, - storeID: storeID, - stateChanger: stateChanger, - next: start, + AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), + priorityQueue: priorityQueue{items: make([]*replicaItem, 0, 1)}, + storeID: storeID, + stateChanger: stateChanger, + next: start, }, - delay: delay, - allocator: allocator, - storePool: storePool, + delay: delay, + planner: plan.NewReplicaPlanner( + allocator, storePool, plan.ReplicaPlannerTestingKnobs{}), + clock: storePool.Clock(), } + rq.AddLogTag("rq", nil) + return &rq +} + +func simCanTransferleaseFrom(ctx context.Context, repl plan.LeaseCheckReplica) bool { + return true } // MaybeAdd proposes a replica for inclusion into the ReplicateQueue, if it // meets the criteria it is enqueued. The criteria is currently if the // allocator returns a non-noop, then the replica is added. -func (rq *replicateQueue) MaybeAdd( - ctx context.Context, replica state.Replica, state state.State, -) bool { - rng, ok := state.Range(replica.Range()) - if !ok { - return false - } +func (rq *replicateQueue) MaybeAdd(ctx context.Context, replica state.Replica, s state.State) bool { + repl := NewSimulatorReplica(replica, s) + rq.AddLogTag("r", repl.repl.Descriptor()) + rq.AnnotateCtx(ctx) + + _, config := repl.DescAndSpanConfig() + log.VEventf(ctx, 1, "maybe add replica=%s, config=%s", + repl.repl.Descriptor(), &config) - action, priority := rq.allocator.ComputeAction(ctx, rq.storePool, rng.SpanConfig(), rng.Descriptor()) - if action == allocatorimpl.AllocatorNoop { + shouldPlanChange, priority := rq.planner.ShouldPlanChange( + ctx, + rq.clock.NowAsClockTimestamp(), + repl, + simCanTransferleaseFrom, + ) + + if !shouldPlanChange { return false } @@ -84,9 +101,9 @@ func (rq *replicateQueue) MaybeAdd( // on the action taken. Replicas in the queue are processed in order of // priority, then in FIFO order on ties. The Tick function currently only // supports processing ConsiderRebalance actions on replicas. -// TODO(kvoli,lidorcarmel): Support taking additional actions, beyond consider -// rebalance. func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.State) { + rq.AddLogTag("tick", tick) + ctx = rq.ResetAndAnnotateCtx(ctx) if rq.lastTick.After(rq.next) { rq.next = rq.lastTick } @@ -99,64 +116,72 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat rng, ok := s.Range(state.RangeID(item.rangeID)) if !ok { - return + panic("range missing which is unexpected") } - action, _ := rq.allocator.ComputeAction(ctx, rq.storePool, rng.SpanConfig(), rng.Descriptor()) + replica, ok := rng.Replica(rq.storeID) + if !ok { + // The replica may have been removed from the store by another change + // (store rebalancer). In which case, we just ignore it and proceed. + continue + } - switch action { - case allocatorimpl.AllocatorConsiderRebalance: - rq.considerRebalance(ctx, rq.next, rng, s) - case allocatorimpl.AllocatorNoop: - return - default: - log.Infof(ctx, "s%d: allocator action %s for range %s is unsupported by the simulator "+ - "replicate queue, ignoring.", rq.storeID, action, rng) - return + repl := NewSimulatorReplica(replica, s) + change, err := rq.planner.PlanOneChange(ctx, repl, simCanTransferleaseFrom, false /* scatter */) + if err != nil { + log.Errorf(ctx, "error planning change %s", err.Error()) + continue } + + log.VEventf(ctx, 1, "conf=%+v", rng.SpanConfig()) + + rq.applyChange(ctx, change, rng, tick) } rq.lastTick = tick } -// considerRebalance simulates the logic of the replicate queue when given a -// considerRebalance action. It will first ask the allocator for add and remove -// targets for a range. It will then enqueue the replica change into the state -// changer and update the time to process the next replica, with the completion -// time returned. -func (rq *replicateQueue) considerRebalance( - ctx context.Context, tick time.Time, rng state.Range, s state.State, +// applyChange applies a range allocation change. It is responsible only for +// application and returns an error if unsuccessful. +// +// TODO(kvoli): Currently applyChange is only called by the replicate queue. It +// is desirable to funnel all allocation changes via one function. Move this +// application phase onto a separate struct that will be used by both the +// replicate queue and the store rebalancer and specifically for operations +// rather than changes. +func (rq *replicateQueue) applyChange( + ctx context.Context, change plan.ReplicateChange, rng state.Range, tick time.Time, ) { - add, remove, _, ok := rq.allocator.RebalanceVoter( - ctx, - rq.storePool, - rng.SpanConfig(), - nil, /* raftStatus */ - rng.Descriptor().Replicas().VoterDescriptors(), - rng.Descriptor().Replicas().NonVoterDescriptors(), - s.RangeUsageInfo(rng.RangeID(), rq.storeID), - storepool.StoreFilterNone, - rq.allocator.ScorerOptions(ctx), - ) - - // We were unable to find a rebalance target for the range. - if !ok { + var stateChange state.Change + switch op := change.Op.(type) { + case plan.AllocationNoop: + // Nothing to do. return + case plan.AllocationFinalizeAtomicReplicationOp: + panic("unimplemented finalize atomic replication op") + case plan.AllocationTransferLeaseOp: + stateChange = &state.LeaseTransferChange{ + RangeID: state.RangeID(change.Replica.GetRangeID()), + TransferTarget: state.StoreID(op.Target), + Author: rq.storeID, + Wait: rq.delay(0, false), + } + case plan.AllocationChangeReplicasOp: + log.VEventf(ctx, 1, "pushing state change for range=%s, details=%s", rng, op.Details) + stateChange = &state.ReplicaChange{ + RangeID: state.RangeID(change.Replica.GetRangeID()), + Changes: op.Chgs, + Author: rq.storeID, + Wait: rq.delay(rng.Size(), true), + } + default: + panic(fmt.Sprintf("Unknown operation %+v, unable to apply replicate queue change", op)) } - // Enqueue the change to be processed and update when the next replica - // processing can occur with the completion time of the change. - // NB: This limits concurrency to at most one change at a time per - // ReplicateQueue. - change := state.ReplicaChange{ - RangeID: state.RangeID(rng.Descriptor().RangeID), - Changes: append( - kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, add), - kvpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, remove)...), - Wait: rq.delay(rng.Size(), true), - Author: rq.storeID, - } - if completeAt, ok := rq.stateChanger.Push(tick, &change); ok { + if completeAt, ok := rq.stateChanger.Push(tick, stateChange); ok { rq.next = completeAt + log.VEventf(ctx, 1, "pushing state change succeeded, complete at %s (cur %s)", completeAt, tick) + } else { + log.VEventf(ctx, 1, "pushing state change failed") } } diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go index cb59d061f5aa..0eb7c3c7db6d 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) @@ -67,33 +68,66 @@ func TestReplicateQueue(t *testing.T) { testSettings.StateExchangeInterval = 5 * time.Second testSettings.ReplicaChangeBaseDelay = 5 * time.Second - getReplCounts := func(s state.State) map[int]int { - storeView := make(map[int]int) + getReplLeaseCounts := func(s state.State) (map[int]int, map[int]int) { + storeReplView := make(map[int]int) + storeLeaseView := make(map[int]int) stores := s.Stores() storeIDs := make([]state.StoreID, len(stores)) for i, store := range stores { storeIDs[i] = store.StoreID() } for _, desc := range s.StoreDescriptors(false /* cached */, storeIDs...) { - storeView[int(desc.StoreID)] = int(desc.Capacity.RangeCount) + storeReplView[int(desc.StoreID)] = int(desc.Capacity.RangeCount) + storeLeaseView[int(desc.StoreID)] = int(desc.Capacity.LeaseCount) } - return storeView + return storeReplView, storeLeaseView } - testingState := func(replicaCounts map[state.StoreID]int, replicationFactor int32) state.State { - s := state.NewStateWithReplCounts(replicaCounts, 2 /* replsPerRange */, 1000 /* keyspace */, testSettings) - spanConfig := roachpb.SpanConfig{NumVoters: replicationFactor, NumReplicas: replicationFactor} + singleLocality := func(k, v string) roachpb.Locality { + return roachpb.Locality{Tiers: []roachpb.Tier{{Key: k, Value: v}}} + } + + constraint := func(k, v string) roachpb.Constraint { + return roachpb.Constraint{ + Type: roachpb.Constraint_REQUIRED, + Key: k, + Value: v, + } + } + + leasePreference := func(constraints ...roachpb.Constraint) roachpb.LeasePreference { + preference := roachpb.LeasePreference{ + Constraints: make([]roachpb.Constraint, len(constraints)), + } + copy(preference.Constraints, constraints) + return preference + } + + conjunctionConstraint := func(k, v string, numReplicas int32) roachpb.ConstraintsConjunction { + return roachpb.ConstraintsConjunction{ + NumReplicas: numReplicas, + Constraints: []roachpb.Constraint{constraint(k, v)}, + } + } + + testingState := func(replicaCounts map[state.StoreID]int, spanConfig roachpb.SpanConfig, initialRF int) state.State { + s := state.NewStateWithReplCounts(replicaCounts, initialRF, 1000 /* keyspace */, testSettings) for _, r := range s.Ranges() { s.SetSpanConfigForRange(r.RangeID(), spanConfig) + s.TransferLease(r.RangeID(), testingStore) } return s } testCases := []struct { - desc string - replicaCounts map[state.StoreID]int - ticks []int64 - expected map[int64]map[int]int + desc string + replicaCounts map[state.StoreID]int + spanConfig roachpb.SpanConfig + initialRF int + nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus + nodeLocalities map[state.NodeID]roachpb.Locality + ticks []int64 + expectedReplCounts, expectedLeaseCounts map[int64]map[int]int }{ { // NB: Expect no action, range counts are balanced. @@ -102,7 +136,7 @@ func TestReplicateQueue(t *testing.T) { 1: 10, 2: 10, }, ticks: []int64{5, 10, 15}, - expected: map[int64]map[int]int{ + expectedReplCounts: map[int64]map[int]int{ 5: {1: 10, 2: 10}, 10: {1: 10, 2: 10}, 15: {1: 10, 2: 10}, @@ -116,18 +150,219 @@ func TestReplicateQueue(t *testing.T) { replicaCounts: map[state.StoreID]int{ 1: 10, 2: 10, 3: 0, }, + ticks: []int64{5, 10, 15, 20}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 10, 2: 10, 3: 0}, + 10: {1: 10, 2: 10, 3: 0}, + 15: {1: 10, 2: 9, 3: 1}, + 20: {1: 9, 2: 9, 3: 2}, + }, + }, + { + desc: "up-replicate RF=3 -> RF=5", + replicaCounts: map[state.StoreID]int{ + 1: 1, 2: 1, 3: 1, 4: 0, 5: 0, + }, + initialRF: 3, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 5, + NumVoters: 5, + }, + ticks: []int64{5, 10, 15, 20}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 1, 2: 1, 3: 1, 4: 0, 5: 0}, + 10: {1: 1, 2: 1, 3: 1, 4: 0, 5: 0}, + 15: {1: 1, 2: 1, 3: 1, 4: 0, 5: 1}, + 20: {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, + }, + }, + { + desc: "up-replicate RF=3 -> RF=5 +2 non-voters", + replicaCounts: map[state.StoreID]int{ + 1: 1, 2: 1, 3: 1, 4: 0, 5: 0, + }, + initialRF: 3, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 5, + NumVoters: 3, + }, + ticks: []int64{5, 10, 15, 20}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 1, 2: 1, 3: 1, 4: 0, 5: 0}, + 10: {1: 1, 2: 1, 3: 1, 4: 0, 5: 0}, + 15: {1: 1, 2: 1, 3: 1, 4: 0, 5: 1}, + 20: {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, + }, + }, + { + desc: "down-replicate RF=5 -> RF=3", + replicaCounts: map[state.StoreID]int{ + 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, + }, + initialRF: 5, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 3, + NumVoters: 3, + }, + ticks: []int64{5, 10, 15, 20}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, + 10: {1: 1, 2: 1, 3: 1, 4: 1, 5: 1}, + 15: {1: 1, 2: 1, 3: 1, 4: 0, 5: 1}, + 20: {1: 1, 2: 0, 3: 1, 4: 0, 5: 1}, + }, + }, + { + desc: "replace dead voter", + replicaCounts: map[state.StoreID]int{ + 1: 1, 2: 1, 3: 1, 4: 0, + }, + initialRF: 3, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 3, + NumVoters: 3, + }, + nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DEAD}, ticks: []int64{5, 10, 15}, - expected: map[int64]map[int]int{ + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 1, 2: 1, 3: 1, 4: 0}, + 10: {1: 1, 2: 1, 3: 1, 4: 0}, + 15: {1: 1, 2: 1, 3: 0, 4: 1}, + }, + }, + { + desc: "replace decommissioning voters", + replicaCounts: map[state.StoreID]int{ + 1: 10, 2: 10, 3: 10, 4: 0, + }, + initialRF: 3, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 3, + NumVoters: 3, + }, + nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING}, + ticks: []int64{5, 10, 15, 20, 25}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 10, 2: 10, 3: 10, 4: 0}, + 10: {1: 10, 2: 10, 3: 10, 4: 0}, + 15: {1: 10, 2: 10, 3: 9, 4: 1}, + 20: {1: 10, 2: 10, 3: 8, 4: 2}, + 25: {1: 10, 2: 10, 3: 7, 4: 3}, + }, + }, + { + // There are 10 voters in region a and b each at the moment.The span + // config specifies 1 replica should be in a and one replica in c. b is + // not a valid region, replicas should move from b -> c. + desc: "handle span config constraints", + replicaCounts: map[state.StoreID]int{ + 1: 10, 2: 10, 3: 0, + }, + initialRF: 2, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 2, + NumVoters: 2, + Constraints: []roachpb.ConstraintsConjunction{ + conjunctionConstraint("region", "a", 1), + conjunctionConstraint("region", "c", 1), + }, + }, + nodeLocalities: map[state.NodeID]roachpb.Locality{ + 1: singleLocality("region", "a"), + 2: singleLocality("region", "b"), + 3: singleLocality("region", "c"), + }, + ticks: []int64{5, 10, 15, 20, 25}, + expectedReplCounts: map[int64]map[int]int{ 5: {1: 10, 2: 10, 3: 0}, - 10: {1: 10, 2: 9, 3: 1}, - 15: {1: 10, 2: 8, 3: 2}, + 10: {1: 10, 2: 10, 3: 0}, + 15: {1: 10, 2: 9, 3: 1}, + 20: {1: 10, 2: 8, 3: 2}, + 25: {1: 10, 2: 7, 3: 3}, + }, + }, + { + desc: "handle lease preferences", + replicaCounts: map[state.StoreID]int{ + 1: 10, 2: 10, + }, + initialRF: 2, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 2, + NumVoters: 2, + LeasePreferences: []roachpb.LeasePreference{ + leasePreference(constraint("region", "b")), + }, + }, + nodeLocalities: map[state.NodeID]roachpb.Locality{ + 1: singleLocality("region", "a"), + 2: singleLocality("region", "b"), + }, + ticks: []int64{5, 10, 15, 20, 25}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 10, 2: 10}, + 10: {1: 10, 2: 10}, + 15: {1: 10, 2: 10}, + 20: {1: 10, 2: 10}, + 25: {1: 10, 2: 10}, + }, + expectedLeaseCounts: map[int64]map[int]int{ + 5: {1: 10, 2: 0}, + 10: {1: 10, 2: 0}, + 15: {1: 9, 2: 1}, + 20: {1: 8, 2: 2}, + 25: {1: 7, 2: 3}, + }, + }, + { + desc: "don't transfer leases to draining store", + replicaCounts: map[state.StoreID]int{ + 1: 10, 2: 10, + }, + initialRF: 2, + spanConfig: roachpb.SpanConfig{ + NumReplicas: 2, + NumVoters: 2, + LeasePreferences: []roachpb.LeasePreference{ + leasePreference(constraint("region", "b")), + }, + }, + nodeLocalities: map[state.NodeID]roachpb.Locality{ + 1: singleLocality("region", "a"), + 2: singleLocality("region", "b"), + }, + nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ + 2: livenesspb.NodeLivenessStatus_DRAINING}, + ticks: []int64{5, 10, 15}, + expectedReplCounts: map[int64]map[int]int{ + 5: {1: 10, 2: 10}, + 10: {1: 10, 2: 10}, + 15: {1: 10, 2: 10}, + }, + expectedLeaseCounts: map[int64]map[int]int{ + 5: {1: 10, 2: 0}, + 10: {1: 10, 2: 0}, + 15: {1: 10, 2: 0}, }, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - s := testingState(tc.replicaCounts, 2 /* replication factor */) + spanConfig := tc.spanConfig + initialRF := tc.initialRF + // Default to RF=2 for simplicity when not specified. Although this is + // fairly unrealistic. + if spanConfig.NumReplicas == 0 { + spanConfig = roachpb.SpanConfig{NumVoters: 2, NumReplicas: 2} + } + if initialRF == 0 { + initialRF = 2 + } + + s := testingState(tc.replicaCounts, spanConfig, initialRF) changer := state.NewReplicaChanger() store, _ := s.Store(testingStore) rq := NewReplicateQueue( @@ -140,7 +375,16 @@ func TestReplicateQueue(t *testing.T) { ) s.TickClock(start) - results := make(map[int64]map[int]int) + for nodeID, livenessStatus := range tc.nonLiveNodes { + s.SetNodeLiveness(nodeID, livenessStatus) + } + + for nodeID, locality := range tc.nodeLocalities { + s.SetNodeLocality(nodeID, locality) + } + + replCountResults := make(map[int64]map[int]int) + leaseCountResults := make(map[int64]map[int]int) // Initialize the store pool information. gossip := gossip.NewGossip(s, testSettings) gossip.Tick(ctx, start, s) @@ -153,6 +397,9 @@ func TestReplicateQueue(t *testing.T) { // considering stores as dead. s.TickClock(state.OffsetTick(start, tick)) + // Tick state updates that are queued for completion. + changer.Tick(state.OffsetTick(start, tick), s) + // Update the store's view of the cluster, we update all stores // but only care about s1's view. gossip.Tick(ctx, state.OffsetTick(start, tick), s) @@ -161,16 +408,22 @@ func TestReplicateQueue(t *testing.T) { // considering rebalance. rq.Tick(ctx, state.OffsetTick(start, tick), s) - // Tick state updates that are queued for completion. - changer.Tick(state.OffsetTick(start, tick), s) - + if nextRepl == len(repls) { + repls = s.Replicas(store.StoreID()) + nextRepl = 0 + } // Add a new repl to the replicate queue. rq.MaybeAdd(ctx, repls[nextRepl], s) - nextRepl++ - results[tick] = getReplCounts(s) + nextRepl++ + replCounts, leaseCounts := getReplLeaseCounts(s) + replCountResults[tick] = replCounts + leaseCountResults[tick] = leaseCounts + } + require.Equal(t, tc.expectedReplCounts, replCountResults) + if len(tc.expectedLeaseCounts) > 0 { + require.Equal(t, tc.expectedLeaseCounts, leaseCountResults) } - require.Equal(t, tc.expected, results) }) } } diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 5bc27190e3df..9ca689d2e1be 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -56,6 +56,7 @@ go_test( "//pkg/kv/kvpb", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/workload", + "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/load", "//pkg/roachpb", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing b/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing index b6433008a69a..d26af0187a23 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing +++ b/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing @@ -41,22 +41,22 @@ plot stat=qps sample=2 ---- ---- - 4023 ┤ ╭────────────────────────╮ - 3755 ┤ │ │ - 3487 ┤ │ │ - 3219 ┤ │ │ - 2951 ┤ │ │ - 2682 ┤ │ │ - 2414 ┤ │ ╰╮ - 2146 ┤ │ │ - 1878 ┤ │ ╭─────────────────────╮│ - 1609 ┤ │ │ ││ - 1341 ┤ │ │ ││ - 1073 ┤ │╭──────────────────────────╭───────────────────────────────────────────────── - 805 ┤ │││ ╭╯ - 536 ┤ │││ ││ - 268 ┤ │││ ││ - 0 ┼────────────────────────────╯╯ + 5021 ┤ ╭╮ + 4686 ┤ ││ + 4351 ┤ │╰╮ + 4017 ┤ │ ╰──────────────────────╮ + 3682 ┤ │ │ + 3347 ┤ │ │ + 3013 ┤ │ │ + 2678 ┤ │ │ + 2343 ┤ │ ╰╮ + 2008 ┤ │ │ ╭───────────────────────╮ + 1674 ┤ │ │╭╯ │ + 1339 ┤ │ ││ │ + 1004 ┤ │ ╭────────────────────────╯╭───────────────────────────────────────────────── + 669 ┤ │╭│ │ │ │ + 335 ┤ │││ │ │ │ + 0 ┼─────────────────────────────╯─────────────────────────╯ qps ---- ---- @@ -74,12 +74,12 @@ eval duration=5m samples=2 seed=42 ---- failed assertion sample 2 balance stat=qps threshold=1.15 ticks=6 - max/mean=3.50 tick=0 - max/mean=3.50 tick=1 - max/mean=3.50 tick=2 - max/mean=3.50 tick=3 - max/mean=3.50 tick=4 - max/mean=3.50 tick=5 + max/mean=2.00 tick=0 + max/mean=2.00 tick=1 + max/mean=2.00 tick=2 + max/mean=2.00 tick=3 + max/mean=2.00 tick=4 + max/mean=2.00 tick=5 # To investigate further, plot the QPS output again. The plotted output matches # expectations given the assertion failed, The system exhibits thrashing as @@ -91,19 +91,19 @@ plot stat=qps sample=3 7000 ┤ ╭───╮ 6533 ┤ │ │ 6067 ┤ │ │ - 5600 ┤ │ │ - 5133 ┤ │ ╰╮ - 4667 ┤ │ │ - 4200 ┤ │ │ ╭──╮ - 3733 ┤ │ │ │ │ - 3267 ┤ │ │ │ │ - 2800 ┤ │ │ ╭─────╮ │ ╰╮ ╭───────╮ - 2333 ┤ │ │ │ │ │ │ │ ╰╮ - 1867 ┤ │ ╰╭╭────╮─╮╭──────────────╮│╭──────────────╮ - 1400 ┤ │ ││╯ │ ││ │╭╯ ╰╮│ │ - 933 ┤ │ ╭╭╯ ╰╮╭╯────╯│ ╰╭───────────────────────────────────────────── - 467 ┤ │ ││ ││╮ ││ │ │ - 0 ┼─────────────────────────────────╯───────────────╯ + 5600 ┤ │ ╰╮ + 5133 ┤ │ │ ╭───╮ + 4667 ┤ │ │ │ │ + 4200 ┤ │ │ ╭───╮ ╭───╮ │ │ + 3733 ┤ │ │ │ │ │ │ │ │ + 3267 ┤ │ │ │ │ │ │ │ │ + 2800 ┤ │ │╭╭─────╮╭╯ │ ╭╯ ╰╮ ╭──╮ ╭─────╮ + 2333 ┤ │ │││ │││ │ │ │╭╯ │ │ │ + 1867 ┤ │ ╰││ │╰╮╭───╮╮│ ││╭──╰╮╭───────────╮╮╭─────────────╮ + 1400 ┤ │ ││ ││╭╯ │││ │││ ╭╯│ │ │││ │ + 933 ┤ │ ╭╭╯ ╰╮│╭────────────────────────────────────────────────────────────── + 467 ┤ │ ││ │││ ││ ││││ ││ │ ││ │ + 0 ┼────────────────╯────────────────────────────────╯───────────────╯ qps ---- ---- @@ -115,22 +115,22 @@ plot stat=replica_moves sample=3 ---- ---- - 14.00 ┤ ╭───────────────────────────────────────────────────────── - 13.07 ┤ │ - 12.13 ┤ │ - 11.20 ┤ ╭╯ - 10.27 ┤ │ - 9.33 ┤ ╭────╯ ╭────────────────────────────── - 8.40 ┤ ╭───────╭────────────────────────────────╯ - 7.47 ┤ │ ╭╯ - 6.53 ┤ │ │ - 5.60 ┤ ╭╯ ╭╯ - 4.67 ┤ │ │ ╭─────────────────────────────────────────────────────────────── - 3.73 ┤ │ │╭──────────────────────────────────────────────────────────────── - 2.80 ┤ │╭─────╭╯╯ ╭──────────────────────────────────────────────────────── - 1.87 ┤ ╭╯│ ╭───╯╯ ╭─╯──────────────────────────────────────────────────────── - 0.93 ┤ │╭╭─╯───╯ ╭────╯ - 0.00 ┼─────────────────────────────────────────────────────────────────────────────── + 22.00 ┤ ╭────────────────────────────────────── + 20.53 ┤ ╭────╭─────────────────────────────────────── + 19.07 ┤ ╭╯ │ + 17.60 ┤ ╭─────╭──────────╯ + 16.13 ┤ ╭╯ │ + 14.67 ┤ ╭╯ ╭╯ ╭──────────────────────────────────────────── + 13.20 ┤ │ │ │ + 11.73 ┤ │ │ ╭╯ + 10.27 ┤ ╭─────╭──────╯ ╭───╯ + 8.80 ┤ ╭╭─────╯ ╭╯ + 7.33 ┤ ╭─────╭╯╭────────────╯ ╭───────╭────────────────────────────── + 5.87 ┤ ╭╯ │╭╯ ╭────────╯ + 4.40 ┤ │ ╭───╭╯ ╭─────╭──────────╯ ╭───────────────────────────── + 2.93 ┤ ╭╯ ╭╯ ╭╯ ╭─╯ ╭╯ ╭─╯ + 1.47 ┤ │╭─╭───╯ ╭╭──────╯───────────────────╯╭────────────────────────────── + 0.00 ┼────────────────────╯───────────────────────────╯ replica_moves ---- ----