Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: misc rac2 race conditions fixes #131108

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,6 @@ type RangeController interface {
// TODO(pav-kv): This interface a placeholder for the interface containing raft
// methods. Replace this as part of #128019.
type RaftInterface interface {
// FollowerStateRaftMuLocked returns the current state of a follower. The
// value of Match, Next are populated iff in StateReplicate. All entries >=
// Next have not had MsgApps constructed during the lifetime of this
// StateReplicate (they may have been constructed previously).
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should
// notice such transitions both in HandleRaftEvent and
// SetReplicasRaftMuLocked.
//
// Requires Replica.raftMu to be held, Replica.mu is not held.
FollowerStateRaftMuLocked(roachpb.ReplicaID) FollowerStateInfo
// SendPingRaftMuLocked sends a MsgApp ping to the given raft peer if there
// wasn't a recent MsgApp to this peer. The message is added to raft's message
// queue, and will be extracted and sent during the next Ready processing.
Expand All @@ -116,7 +104,7 @@ type RaftInterface interface {
SendPingRaftMuLocked(roachpb.ReplicaID) bool
}

type FollowerStateInfo struct {
type ReplicaStateInfo struct {
State tracker.StateType

// Remaining only populated in StateReplicate.
Expand Down Expand Up @@ -160,6 +148,11 @@ type RaftEvent struct {
// A key can map to an empty slice, in order to reuse already allocated
// slice memory.
MsgApps map[roachpb.ReplicaID][]raftpb.Message
// ReplicasStateInfo contains the state of all replicas. This is used to
// determine if the state of a replica has changed, and if so, to update the
// flow control state. It also informs the RangeController of a replica's
// Match and Next.
ReplicasStateInfo map[roachpb.ReplicaID]ReplicaStateInfo
}

// RaftEventFromMsgStorageAppendAndMsgApps constructs a RaftEvent from the
Expand Down Expand Up @@ -460,8 +453,7 @@ retry:
func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e RaftEvent) error {
shouldWaitChange := false
for r, rs := range rc.replicaMap {
info := rc.opts.RaftInterface.FollowerStateRaftMuLocked(r)
shouldWaitChange = rs.handleReadyState(ctx, info) || shouldWaitChange
shouldWaitChange = rs.handleReadyState(ctx, e.ReplicasStateInfo[r]) || shouldWaitChange
}
// If there was a quorum change, update the voter sets, triggering the
// refresh channel for any requests waiting for eval tokens.
Expand Down Expand Up @@ -824,7 +816,7 @@ func (rs *replicaState) handleReadyEntries(ctx context.Context, entries []entryF
// provided follower state information. If the state changes in a way that
// affects requests waiting for evaluation, returns true.
func (rs *replicaState) handleReadyState(
ctx context.Context, info FollowerStateInfo,
ctx context.Context, info ReplicaStateInfo,
) (shouldWaitChange bool) {
switch info.State {
case tracker.StateProbe:
Expand Down
30 changes: 18 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (s *testingRCState) getOrInitRange(t *testing.T, r testingRange) *testingRC
TenantID: r.tenantID,
LocalReplicaID: r.localReplicaID,
SSTokenCounter: s.ssTokenCounter,
RaftInterface: testRC,
Clock: s.clock,
CloseTimerScheduler: s.probeToCloseScheduler,
EvalWaitMetrics: s.evalMetrics,
Expand All @@ -258,7 +257,7 @@ func (s *testingRCState) getOrInitRange(t *testing.T, r testingRange) *testingRC
s.maybeSetInitialTokens(r)
// Send through an empty raft event to trigger creating necessary replica
// send streams for the range.
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, RaftEvent{}))
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, testRC.makeRaftEventWithReplicasState()))
return testRC
}

Expand Down Expand Up @@ -294,15 +293,21 @@ type testingRCRange struct {
}
}

func (r *testingRCRange) FollowerStateRaftMuLocked(replicaID roachpb.ReplicaID) FollowerStateInfo {
func (r *testingRCRange) makeRaftEventWithReplicasState() RaftEvent {
return RaftEvent{
ReplicasStateInfo: r.replicasStateInfo(),
}
}

func (r *testingRCRange) replicasStateInfo() map[roachpb.ReplicaID]ReplicaStateInfo {
r.mu.Lock()
defer r.mu.Unlock()

replica, ok := r.mu.r.replicaSet[replicaID]
if !ok {
return FollowerStateInfo{}
replicasStateInfo := map[roachpb.ReplicaID]ReplicaStateInfo{}
for _, replica := range r.mu.r.replicaSet {
replicasStateInfo[replica.desc.ReplicaID] = replica.info
}
return replica.info
return replicasStateInfo
}

func (r *testingRCRange) SendPingRaftMuLocked(roachpb.ReplicaID) bool {
Expand Down Expand Up @@ -386,7 +391,7 @@ const invalidTrackerState = tracker.StateSnapshot + 1

type testingReplica struct {
desc roachpb.ReplicaDescriptor
info FollowerStateInfo
info ReplicaStateInfo
}

func scanRanges(t *testing.T, input string) []testingRange {
Expand Down Expand Up @@ -503,7 +508,7 @@ func scanReplica(t *testing.T, line string) testingReplica {
ReplicaID: roachpb.ReplicaID(replicaID),
Type: replicaType,
},
info: FollowerStateInfo{State: state},
info: ReplicaStateInfo{State: state},
}
}

Expand Down Expand Up @@ -597,7 +602,7 @@ func (t *testingProbeToCloseTimerScheduler) ScheduleSendStreamCloseRaftMuLocked(
}
timer.MarkRead()
require.NoError(t.state.t,
t.state.ranges[rangeID].rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
t.state.ranges[rangeID].rc.HandleRaftEventRaftMuLocked(ctx, t.state.ranges[rangeID].makeRaftEventWithReplicasState()))
}()
}

Expand Down Expand Up @@ -791,7 +796,7 @@ func TestRangeController(t *testing.T) {
require.NoError(t, testRC.rc.SetReplicasRaftMuLocked(ctx, r.replicas()))
// Send an empty raft event in order to trigger any potential
// connectedState changes.
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, testRC.makeRaftEventWithReplicasState()))
}
// Sleep for a bit to allow any timers to fire.
time.Sleep(20 * time.Millisecond)
Expand Down Expand Up @@ -877,7 +882,8 @@ func TestRangeController(t *testing.T) {

propRangeEntries := func() {
event := RaftEvent{
Entries: make([]raftpb.Entry, len(buf)),
Entries: make([]raftpb.Entry, len(buf)),
ReplicasStateInfo: state.ranges[lastRangeID].replicasStateInfo(),
}
for i, state := range buf {
event.Entries[i] = testingCreateEntry(t, state)
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,8 @@ func (r *testingRCRange) testingDeductTokens(
r.mu.quorumPosition.Index++

require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{
Term: r.mu.quorumPosition.Term,
ReplicasStateInfo: r.replicasStateInfo(),
Term: r.mu.quorumPosition.Term,
Entries: []raftpb.Entry{testingCreateEntry(t, entryInfo{
term: r.mu.quorumPosition.Term,
index: r.mu.quorumPosition.Index,
Expand Down Expand Up @@ -1161,7 +1162,7 @@ func (r *testingRCRange) testingReturnTokens(
repl.info.Next = r.mu.quorumPosition.Index + 1
r.mu.r.replicaSet[rid] = repl
r.rc.AdmitRaftMuLocked(ctx, rs.desc.ReplicaID, av)
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, r.makeRaftEventWithReplicasState()))
}
}

Expand Down Expand Up @@ -1198,15 +1199,15 @@ func (r *testingRCRange) testingConnectStream(
ReplicaID: roachpb.ReplicaID(stream.StoreID),
Type: roachpb.VOTER_FULL,
},
info: FollowerStateInfo{
info: ReplicaStateInfo{
State: tracker.StateReplicate,
Match: position.Index,
Next: position.Index + 1,
},
}
// Send an empty raft event in order to trigger any state changes.
require.NoError(t, r.rc.SetReplicasRaftMuLocked(ctx, r.mu.r.replicas()))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, r.makeRaftEventWithReplicasState()))
}

// testingDisconnectStream changes the tracker state of a given stream's
Expand All @@ -1223,7 +1224,7 @@ func (r *testingRCRange) testingDisconnectStream(
rs.info.State = tracker.StateSnapshot
r.mu.r.replicaSet[rid] = rs
// Send an empty raft event in order to trigger any state changes.
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, RaftEvent{}))
require.NoError(t, r.rc.HandleRaftEventRaftMuLocked(ctx, r.makeRaftEventWithReplicasState()))
}

// testingString returns a string representation of the tracker state for use
Expand Down
44 changes: 36 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ type RaftScheduler interface {
// reads Raft state at various points while holding raftMu, and expects those
// various reads to be mutually consistent.
type RaftNode interface {
// RaftInterface is an interface that abstracts the raft.RawNode for use in
// the RangeController.
rac2.RaftInterface
// TermLocked returns the current term of this replica.
TermLocked() uint64
Expand All @@ -114,6 +112,19 @@ type RaftNode interface {
// NB: NextUnstableIndex can regress when the node accepts appends or
// snapshots from a newer leader.
NextUnstableIndexLocked() uint64
// ReplicasStateLocked returns the current status state of all replicas.
// RACv2 uses the Match and Next indices only for replicas in StateReplicate.
// All entries >= Next have not had MsgApps constructed during the lifetime
// of this StateReplicate (they may have been constructed previously).
//
// When a follower transitions from {StateProbe,StateSnapshot} =>
// StateReplicate, we start trying to send MsgApps. We should notice such
// transitions both in rac2.HandleRaftEventRaftMuLocked and
// rac2.SetReplicasRaftMuLocked.
//
// infoMap is an in-out parameter. It is expected to be empty, and is
// populated with the ReplicaStateInfos for all replicas.
ReplicasStateLocked(infoMap map[roachpb.ReplicaID]rac2.ReplicaStateInfo)
}

// AdmittedPiggybacker is used to enqueue admitted vector messages addressed to
Expand Down Expand Up @@ -423,6 +434,10 @@ type processorImpl struct {
// leaseholderID is the currently known leaseholder replica.
leaseholderID roachpb.ReplicaID

// scratchInfoMap is used as a pre-allocated in-out parameter for calling
// ReplicasStateLocked when constructing a rac2.RaftEvent.
scratchInfoMap map[roachpb.ReplicaID]rac2.ReplicaStateInfo

// State at a follower.
follower struct {
// isLeaderUsingV2Protocol is true when the leaderID indicated that it's
Expand Down Expand Up @@ -511,6 +526,7 @@ func NewProcessor(opts ProcessorOptions) Processor {
opts: opts,
enabledWhenLeader: opts.EnabledWhenLeaderLevel,
v1EncodingPriorityMismatch: log.Every(time.Minute),
scratchInfoMap: make(map[roachpb.ReplicaID]rac2.ReplicaStateInfo),
}
}

Expand Down Expand Up @@ -775,6 +791,10 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
log.Fatal(ctx, "RaftNode is not initialized")
return
}

// We will use the scratchInfoMap to get the latest state of the replicas and
// construct the RaftEvent. Ensure that it is empty before we start.
clear(p.scratchInfoMap)
// NB: we need to call makeStateConsistentRaftMuLocked even if
// NotEnabledWhenLeader, since this replica could be a follower and the leader
// may switch to v2.
Expand All @@ -790,6 +810,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
leaderID = p.replMu.raftNode.LeaderLocked()
leaseholderID = p.opts.Replica.LeaseholderMuRLocked()
term = p.replMu.raftNode.TermLocked()
p.replMu.raftNode.ReplicasStateLocked(p.scratchInfoMap)
}()
if len(e.Entries) > 0 {
nextUnstableIndex = e.Entries[0].Index
Expand All @@ -803,6 +824,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
// NB: since we've registered the latest log/snapshot write (if any) above,
// our admitted vector is likely consistent with the latest leader term.
p.maybeSendAdmittedRaftMuLocked(ctx)
e.ReplicasStateInfo = p.scratchInfoMap
if rc := p.leader.rc; rc != nil {
if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges ||
p.opts.Replica.IsScratchRange() {
Expand Down Expand Up @@ -1018,18 +1040,24 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte
if p.destroyed {
return
}
// updates is left unset (so empty unallocated map) or refers to the map in
// p.leader.scratch, so can be read and written without holding
// pendingAdmittedMu.
var updates map[roachpb.ReplicaID]rac2.AdmittedVector
// Swap the updates map with the empty scratch. This is an optimization to
// minimize the time we hold the pendingAdmittedMu lock.
func() {
// Swap the pendingAdmittedMu.updates map with the empty scratch if
// non-empty. This is an optimization to minimize the time we hold the
// pendingAdmittedMu lock.
if updatesEmpty := func() bool {
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
p.leader.pendingAdmittedMu.Lock()
defer p.leader.pendingAdmittedMu.Unlock()
if updates = p.leader.pendingAdmittedMu.updates; len(updates) != 0 {
if len(p.leader.pendingAdmittedMu.updates) > 0 {
updates = p.leader.pendingAdmittedMu.updates
p.leader.pendingAdmittedMu.updates = p.leader.scratch
p.leader.scratch = updates
return false
}
}()
if len(updates) == 0 {
return true
}(); updatesEmpty {
return
}
for replicaID, state := range updates {
Expand Down
10 changes: 3 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,9 @@ func (rn *testRaftNode) NextUnstableIndexLocked() uint64 {
return rn.nextUnstableIndex
}

func (rn *testRaftNode) FollowerStateRaftMuLocked(
replicaID roachpb.ReplicaID,
) rac2.FollowerStateInfo {
rn.r.mu.AssertHeld()
fmt.Fprintf(rn.b, " RaftNode.FollowerStateRaftMuLocked(%v)\n", replicaID)
// TODO(kvoli,sumeerbhola): implement.
return rac2.FollowerStateInfo{}
func (rn *testRaftNode) ReplicasStateLocked(_ map[roachpb.ReplicaID]rac2.ReplicaStateInfo) {
rn.r.mu.AssertRHeld()
fmt.Fprint(rn.b, " RaftNode.ReplicasStateLocked\n")
}

func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
Expand Down
18 changes: 7 additions & 11 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,16 @@ func (rn raftNodeForRACv2) NextUnstableIndexLocked() uint64 {
return rn.NextUnstableIndex()
}

func (rn raftNodeForRACv2) FollowerStateRaftMuLocked(
replicaID roachpb.ReplicaID,
) rac2.FollowerStateInfo {
// TODO(pav-kv): this is a temporary implementation.
status := rn.Status()
if progress, ok := status.Progress[raftpb.PeerID(replicaID)]; ok {
return rac2.FollowerStateInfo{
State: progress.State,
func (rn raftNodeForRACv2) ReplicasStateLocked(
infoMap map[roachpb.ReplicaID]rac2.ReplicaStateInfo,
) {
rn.WithProgress(func(peerID raftpb.PeerID, _ raft.ProgressType, progress tracker.Progress) {
infoMap[roachpb.ReplicaID(peerID)] = rac2.ReplicaStateInfo{
Match: progress.Match,
Next: progress.Next,
State: progress.State,
}
}

return rac2.FollowerStateInfo{State: tracker.StateProbe}
})
}

func (rn raftNodeForRACv2) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
Expand Down
Loading
Loading