Skip to content

Commit

Permalink
asim: simulate replicate queue recovery changes
Browse files Browse the repository at this point in the history
Previously, it was not possible to simulate recovery actions such as
replacing a dead or decommissioning (non-)voter, as the simulator
replicate queue previously had limited support to only rebalancing. With
the changes introduced in cockroachdb#99247, it is now possible to simulate all
actions returned from the Allocator's `ComputeChange`.

This commit updates the simulator replicate queue to call
`ShouldPlanChange` and `PlanOneChange`, in the same manner that the
actual replicate queue does.

Note the output of `example_rebalancing` has changed, this is expected
as the replicate queue now returns lease transfer changes, previously it
did not.

Resolves: cockroachdb#90141

Release note: None
  • Loading branch information
kvoli authored and raggar committed May 23, 2023
1 parent 1de0817 commit de77ffa
Show file tree
Hide file tree
Showing 7 changed files with 597 additions and 142 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/asim/queue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "queue",
srcs = [
"allocator_replica.go",
"pacer.go",
"queue.go",
"replicate_queue.go",
Expand All @@ -13,11 +14,18 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/plan",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/timeutil",
"@io_etcd_go_raft_v3//:raft",
],
)

Expand All @@ -35,6 +43,7 @@ go_test(
"//pkg/kv/kvserver/asim/gossip",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/asim/workload",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"@com_github_stretchr_testify//require",
],
Expand Down
165 changes: 165 additions & 0 deletions pkg/kv/kvserver/asim/queue/allocator_replica.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package queue

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"go.etcd.io/raft/v3"
)

// SimulatorReplica is a replica that is being tracked as a potential candidate
// for rebalancing activities. It maintains a set of methods that enable
// querying its state and processing a rebalancing action if taken.
type SimulatorReplica struct {
rng state.Range
repl state.Replica
usage allocator.RangeUsageInfo
state state.State
}

var _ plan.AllocatorReplica = &SimulatorReplica{}

// NewSimulatorReplica returns a new SimulatorReplica which implements the
// plan.AllocatorReplica interface used for replication planning.
func NewSimulatorReplica(repl state.Replica, s state.State) *SimulatorReplica {
rng, ok := s.Range(repl.Range())
if !ok {
return nil
}
sr := &SimulatorReplica{
rng: rng,
repl: repl,
usage: s.RangeUsageInfo(repl.Range(), repl.StoreID()),
state: s,
}
return sr
}

func (sr *SimulatorReplica) HasCorrectLeaseType(lease roachpb.Lease) bool {
return true
}

// CurrentLeaseStatus returns the status of the current lease for the
// timestamp given.
//
// Common operations to perform on the resulting status are to check if
// it is valid using the IsValid method and to check whether the lease
// is held locally using the OwnedBy method.
//
// Note that this method does not check to see if a transfer is pending,
// but returns the status of the current lease and ownership at the
// specified point in time.
func (sr *SimulatorReplica) LeaseStatusAt(
ctx context.Context, now hlc.ClockTimestamp,
) kvserverpb.LeaseStatus {
return kvserverpb.LeaseStatus{
Lease: roachpb.Lease{
Replica: sr.repl.Descriptor(),
},
State: kvserverpb.LeaseState_VALID,
}
}

// LeaseViolatesPreferences checks if current replica owns the lease and if it
// violates the lease preferences defined in the span config. If there is an
// error or no preferences defined then it will return false and consider that
// to be in-conformance.
func (sr *SimulatorReplica) LeaseViolatesPreferences(context.Context) bool {
descs := sr.state.StoreDescriptors(true /* useCached */, sr.repl.StoreID())
if len(descs) != 1 {
panic(fmt.Sprintf("programming error: cannot get store descriptor for store %d", sr.repl.StoreID()))
}
storeDesc := descs[0]
_, conf := sr.DescAndSpanConfig()

if len(conf.LeasePreferences) == 0 {
return false
}
for _, preference := range conf.LeasePreferences {
if constraint.ConjunctionsCheck(storeDesc, preference.Constraints) {
return false
}
}
// We have at lease one preference set up, but we don't satisfy any.
return true
}

func (sr *SimulatorReplica) LastReplicaAdded() (roachpb.ReplicaID, time.Time) {
// We return a hack here, using the next replica ID from the descriptor and
// the current time. This is used when removing a replica to provide a grace
// period for new replicas. The corresponding code in plan.findRemoveVoter
// uses the system wall clock and we avoid that code path for now by always
// finding a remove voter without retries.
// TODO(kvoli): Record the actual time the last replica was added and rip out
// the timeutil.Now() usage in plan.findRemoveVoter, instead passing in now()
// so it maps to the simulated time.
return sr.Desc().NextReplicaID - 1, timeutil.Now()
}

// OwnsValidLease returns whether this replica is the current valid
// leaseholder.
func (sr *SimulatorReplica) OwnsValidLease(context.Context, hlc.ClockTimestamp) bool {
return sr.repl.HoldsLease()
}

// StoreID returns the Replica's StoreID.
func (sr *SimulatorReplica) StoreID() roachpb.StoreID {
return roachpb.StoreID(sr.repl.StoreID())
}

// GetRangeID returns the Range ID.
func (sr *SimulatorReplica) GetRangeID() roachpb.RangeID {
return roachpb.RangeID(sr.repl.Range())
}

// RaftStatus returns the current raft status of the replica. It returns
// nil if the Raft group has not been initialized yet.
func (sr *SimulatorReplica) RaftStatus() *raft.Status {
return sr.state.RaftStatus(sr.rng.RangeID(), sr.repl.StoreID())
}

// GetFirstIndex returns the index of the first entry in the replica's Raft
// log.
func (sr *SimulatorReplica) GetFirstIndex() kvpb.RaftIndex {
// TODO(kvoli): We always return 2 here as RaftStatus is unimplemented. When
// it is implemented, this may become variable.
return 2
}

// DescAndSpanConfig returns the authoritative range descriptor as well
// as the span config for the replica.
func (sr *SimulatorReplica) DescAndSpanConfig() (*roachpb.RangeDescriptor, roachpb.SpanConfig) {
return sr.rng.Descriptor(), sr.rng.SpanConfig()
}

// Desc returns the authoritative range descriptor, acquiring a replica lock in
// the process.
func (sr *SimulatorReplica) Desc() *roachpb.RangeDescriptor {
return sr.rng.Descriptor()
}

// RangeUsageInfo returns usage information (sizes and traffic) needed by
// the allocator to make rebalancing decisions for a given range.
func (sr *SimulatorReplica) RangeUsageInfo() allocator.RangeUsageInfo {
return sr.usage
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/asim/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// RangeQueue presents an interface to interact with a single consumer
Expand Down Expand Up @@ -101,6 +102,7 @@ func (pq *priorityQueue) Pop() interface{} {

// baseQueue is an implementation of the ReplicateQueue interface.
type baseQueue struct {
log.AmbientContext
priorityQueue
storeID state.StoreID
stateChanger state.Changer
Expand Down
Loading

0 comments on commit de77ffa

Please sign in to comment.