Skip to content

Commit

Permalink
replica_rac2: integrate admitted tracking on leader
Browse files Browse the repository at this point in the history
This commit plumbs the admitted vectors into the RACv2 Processor.
Admitted vectors are applied to replicaSendStream and cause releasing
the tokens held by the leader.

The admitted vectors are plumbed to replicaSendStream via 3 paths:

- The leader's own admitted vector is applied from
  HandleRaftReadyRaftMuLocked, calling into
  RangeController.AdmitRaftMuLocked directly.
- The followers' admitted vectors in most cases are received via
  annotated RaftMessageRequest.AdmittedState, which is dispatched from
  stepRaftGroupRaftMuLocked into the Processor via
  Processor.AdmitRaftMuLocked method.
- The followers' piggybacked admitted vectors from
  RaftMessageRequestBatch are queued on the Processor via
  EnqueuePiggybackedAdmittedAtLeader method, and later applied from
  HandleRaftReadyRaftMuLocked.

Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 12, 2024
1 parent f9b36ba commit 253e0df
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/flow_control_raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,6 @@ func TestFlowControlRaftTransportV2(t *testing.T) {
type noopPiggybackedAdmittedResponseScheduler struct{}

func (s noopPiggybackedAdmittedResponseScheduler) ScheduleAdmittedResponseForRangeRACv2(
ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange,
context.Context, []kvflowcontrolpb.PiggybackedAdmittedState,
) {
}
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ type StoresForRACv2 interface {
// processing.
type PiggybackedAdmittedResponseScheduler interface {
ScheduleAdmittedResponseForRangeRACv2(
ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange)
ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState)
}

func MakeStoresForRACv2(stores *Stores) StoresForRACv2 {
Expand Down Expand Up @@ -332,20 +332,20 @@ func (ss *storesForRACv2) lookup(

// ScheduleAdmittedResponseForRangeRACv2 implements PiggybackedAdmittedResponseScheduler.
func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2(
ctx context.Context, msgs []kvflowcontrolpb.AdmittedResponseForRange,
ctx context.Context, msgs []kvflowcontrolpb.PiggybackedAdmittedState,
) {
ls := (*Stores)(ss)
for _, m := range msgs {
s, err := ls.GetStore(m.LeaderStoreID)
s, err := ls.GetStore(m.ToStoreID)
if err != nil {
log.Errorf(ctx, "store %s not found", m.LeaderStoreID)
log.Errorf(ctx, "store %s not found", m.ToStoreID)
continue
}
repl := s.GetReplicaIfExists(m.RangeID)
if repl == nil {
if repl == nil || repl.replicaID != m.ToReplicaID {
continue
}
repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.Msg)
repl.flowControlV2.EnqueuePiggybackedAdmittedAtLeader(m.FromReplicaID, m.Admitted)
s.scheduler.EnqueueRACv2PiggybackAdmitted(m.RangeID)
}
}
Expand Down
92 changes: 58 additions & 34 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,13 @@ type Processor interface {
AdmitRaftEntriesRaftMuLocked(
ctx context.Context, event rac2.RaftEvent) bool

// EnqueuePiggybackedAdmittedAtLeader is called at the leader when
// receiving a piggybacked MsgAppResp that can advance a follower's
// admitted state. The caller is responsible for scheduling on the raft
// scheduler, such that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked
// gets called soon.
EnqueuePiggybackedAdmittedAtLeader(msg raftpb.Message)
// EnqueuePiggybackedAdmittedAtLeader is called at the leader when receiving a
// piggybacked admitted vector that can advance the given follower's admitted
// state. The caller is responsible for scheduling on the raft scheduler, such
// that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked gets called soon.
EnqueuePiggybackedAdmittedAtLeader(roachpb.ReplicaID, kvflowcontrolpb.AdmittedState)
// ProcessPiggybackedAdmittedAtLeaderRaftMuLocked is called to process
// previous enqueued piggybacked MsgAppResp. Returns true if
// previously enqueued piggybacked admitted vectors. Returns true if
// HandleRaftReadyRaftMuLocked should be called.
//
// raftMu is held.
Expand Down Expand Up @@ -390,6 +389,13 @@ type Processor interface {
// AdmittedState returns the vector of admitted log indices.
AdmittedState() rac2.AdmittedVector

// AdmitRaftMuLocked is called to notify RACv2 about the admitted vector
// update on the given peer replica. This releases the associated flow tokens
// if the replica is known and the admitted vector covers any.
//
// raftMu is held.
AdmitRaftMuLocked(context.Context, roachpb.ReplicaID, rac2.AdmittedVector)

// AdmitForEval is called to admit work that wants to evaluate at the
// leaseholder.
//
Expand Down Expand Up @@ -429,7 +435,14 @@ type processorImpl struct {
// State when leader, i.e., when leaderID == opts.ReplicaID, and v2
// protocol is enabled.
leader struct {
enqueuedPiggybackedResponses map[roachpb.ReplicaID]raftpb.Message
// pendingAdmitted contains recently delivered admitted vectors. When this
// map is not empty, the range is scheduled for applying these vectors to
// the corresponding streams / token trackers. The map is cleared when the
// admitted vectors are applied.
//
// Invariant: rc == nil ==> len(pendingAdmitted) == 0.
// Invariant: len(pendingAdmitted) != 0 ==> the processing is scheduled.
pendingAdmitted map[roachpb.ReplicaID]rac2.AdmittedVector
// Updating the rc reference requires both the enclosing mu and
// rcReferenceUpdateMu. Code paths that want to access this
// reference only need one of these mutexes. rcReferenceUpdateMu
Expand Down Expand Up @@ -676,7 +689,7 @@ func (p *processorImpl) closeLeaderStateRaftMuLockedProcLocked(ctx context.Conte
defer p.mu.leader.rcReferenceUpdateMu.Unlock()
p.mu.leader.rc = nil
}()
p.mu.leader.enqueuedPiggybackedResponses = nil
p.mu.leader.pendingAdmitted = nil
p.mu.leader.term = 0
}

Expand All @@ -700,7 +713,7 @@ func (p *processorImpl) createLeaderStateRaftMuLockedProcLocked(
})
}()
p.mu.leader.term = term
p.mu.leader.enqueuedPiggybackedResponses = map[roachpb.ReplicaID]raftpb.Message{}
p.mu.leader.pendingAdmitted = map[roachpb.ReplicaID]rac2.AdmittedVector{}
}

// HandleRaftReadyRaftMuLocked implements Processor.
Expand Down Expand Up @@ -744,20 +757,23 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
return
}

// Piggyback admitted index advancement, if any, to the message stream going
// to the leader node, if we are not the leader. At the leader node, the
// admitted vector is read directly from the log tracker.
if p.mu.leader.rc == nil && p.mu.leaderNodeID != 0 {
// TODO(pav-kv): must make sure the leader term is the same.
if admitted, dirty := p.logTracker.admitted(true /* sched */); dirty {
if av, dirty := p.logTracker.admitted(true /* sched */); dirty {
if rc := p.mu.leader.rc; rc != nil {
// If we are the leader, notify the RangeController about our replica's
// new admitted vector.
rc.AdmitRaftMuLocked(ctx, p.opts.ReplicaID, av)
} else if p.mu.leaderNodeID != 0 {
// If the leader is known, piggyback the updated admitted vector to the
// message stream going to the leader node.
// TODO(pav-kv): must make sure the leader term is the same.
p.opts.AdmittedPiggybacker.Add(p.mu.leaderNodeID, kvflowcontrolpb.PiggybackedAdmittedState{
RangeID: p.opts.RangeID,
ToStoreID: p.mu.leaderStoreID,
FromReplicaID: p.opts.ReplicaID,
ToReplicaID: p.mu.leaderID,
Admitted: kvflowcontrolpb.AdmittedState{
Term: admitted.Term,
Admitted: admitted.Admitted[:],
Term: av.Term,
Admitted: av.Admitted[:],
}})
}
}
Expand Down Expand Up @@ -864,37 +880,34 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
}

// EnqueuePiggybackedAdmittedAtLeader implements Processor.
func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(msg raftpb.Message) {
if roachpb.ReplicaID(msg.To) != p.opts.ReplicaID {
// Ignore message to a stale ReplicaID.
return
}
func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(
from roachpb.ReplicaID, state kvflowcontrolpb.AdmittedState,
) {
p.mu.Lock()
defer p.mu.Unlock()
if p.mu.leader.rc == nil {
return
}
// Only need to keep the latest message from a replica.
p.mu.leader.enqueuedPiggybackedResponses[roachpb.ReplicaID(msg.From)] = msg
var admitted [raftpb.NumPriorities]uint64
copy(admitted[:], state.Admitted)
// Merge in the received admitted vector. We are only interested in the
// highest admitted marks. Zero value always merges in favour of the new one.
p.mu.leader.pendingAdmitted[from] = p.mu.leader.pendingAdmitted[from].Merge(
rac2.AdmittedVector{Term: state.Term, Admitted: admitted})
}

// ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor.
func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) bool {
p.opts.Replica.RaftMuAssertHeld()
p.mu.Lock()
defer p.mu.Unlock()
if p.mu.destroyed || len(p.mu.leader.enqueuedPiggybackedResponses) == 0 || p.raftMu.raftNode == nil {
if p.mu.destroyed || len(p.mu.leader.pendingAdmitted) == 0 {
return false
}
p.opts.Replica.MuLock()
defer p.opts.Replica.MuUnlock()
for k, m := range p.mu.leader.enqueuedPiggybackedResponses {
err := p.raftMu.raftNode.StepMsgAppRespForAdmittedLocked(m)
if err != nil {
log.Errorf(ctx, "%s", err)
}
delete(p.mu.leader.enqueuedPiggybackedResponses, k)
for replicaID, state := range p.mu.leader.pendingAdmitted {
p.mu.leader.rc.AdmitRaftMuLocked(ctx, replicaID, state)
}
clear(p.mu.leader.pendingAdmitted)
return true
}

Expand Down Expand Up @@ -954,6 +967,17 @@ func (p *processorImpl) AdmittedState() rac2.AdmittedVector {
return admitted
}

// AdmitRaftMuLocked implements Processor.
func (p *processorImpl) AdmitRaftMuLocked(
ctx context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector,
) {
p.opts.Replica.RaftMuAssertHeld()
// NB: rc is always updated while raftMu is held.
if rc := p.mu.leader.rc; rc != nil {
rc.AdmitRaftMuLocked(ctx, replicaID, av)
}
}

// AdmitForEval implements Processor.
func (p *processorImpl) AdmitForEval(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
Expand Down
13 changes: 3 additions & 10 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ func (rn *testRaftNode) print() {
rn.term, rn.leader, rn.r.leaseholder, rn.mark, rn.nextUnstableIndex)
}

func msgString(msg raftpb.Message) string {
return fmt.Sprintf("type: %s from: %d to: %d", msg.Type.String(), msg.From, msg.To)
}

type testAdmittedPiggybacker struct {
b *strings.Builder
}
Expand Down Expand Up @@ -412,12 +408,9 @@ func TestProcessorBasic(t *testing.T) {
var from, to uint64
d.ScanArgs(t, "from", &from)
d.ScanArgs(t, "to", &to)
msg := raftpb.Message{
Type: raftpb.MsgAppResp,
To: raftpb.PeerID(to),
From: raftpb.PeerID(from),
}
p.EnqueuePiggybackedAdmittedAtLeader(msg)
// TODO(pav-kv): parse the admitted vector.
p.EnqueuePiggybackedAdmittedAtLeader(
roachpb.ReplicaID(from), kvflowcontrolpb.AdmittedState{})
return builderStr()

case "process-piggybacked-admitted":
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ HandleRaftReady:
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:25} Priority:LowPri}}) = true
destroyed-or-leader-using-v2: true
LogTracker [+dirty]: mark:{Term:50 Index:25}, stable:23, admitted:[23 23 23 23]
LogTracker: mark:{Term:50 Index:25}, stable:23, admitted:[23 23 23 23]
LowPri: {Term:50 Index:25}

# The leader has changed.
Expand Down Expand Up @@ -463,6 +463,7 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 52
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]})
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand All @@ -483,6 +484,7 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 52
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[28 28 28 28]})
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand All @@ -494,9 +496,7 @@ enqueue-piggybacked-admitted from=25 to=5
process-piggybacked-admitted
----
Replica.RaftMuAssertHeld
Replica.MuLock
RaftNode.StepMsgAppRespForAdmittedLocked(type: MsgAppResp from: 25 to: 5)
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]})

# Noop.
process-piggybacked-admitted
Expand All @@ -511,6 +511,7 @@ enqueue-piggybacked-admitted from=25 to=6
process-piggybacked-admitted
----
Replica.RaftMuAssertHeld
RangeController.AdmitRaftMuLocked(25, {Term:0 Admitted:[0 0 0 0]})

# We are still the leader, now at a new term.
set-raft-state term=53
Expand Down Expand Up @@ -739,6 +740,7 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, {Term:50 Admitted:[26 26 26 26]})
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand All @@ -761,7 +763,7 @@ HandleRaftReady:
.....
AdmitRaftEntries:
destroyed-or-leader-using-v2: true
LogTracker [+dirty]: mark:{Term:50 Index:27}, stable:26, admitted:[26 26 26 26]
LogTracker: mark:{Term:50 Index:27}, stable:26, admitted:[26 26 26 26]

# Everything up to 27 is admitted.
synced-log term=50 index=27
Expand All @@ -778,6 +780,7 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 50
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, {Term:50 Admitted:[27 27 27 27]})
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,15 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs)
t.kvflowControl.mu.Unlock()
if len(batch.AdmittedStates) != 0 {
// TODO(pav-kv): dispatch admitted vectors to RACv2.
// Dispatch the admitted vectors to RACv2.
// NB: we do this via this special path instead of using the
// handleRaftRequest path since we don't have a full-fledged
// RaftMessageRequest for each range (each of these responses could
// be for a different range), and because what we need to do w.r.t.
// queueing is much simpler (we don't need to worry about queue size
// since we only keep the latest message from each replica).
_ = t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2
// since we only keep the highest admitted marks from each replica).
t.kvflowcontrol2.piggybackedResponseScheduler.
ScheduleAdmittedResponseForRangeRACv2(ctx, batch.AdmittedStates)
}
if len(batch.Requests) == 0 {
continue
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,12 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest)
}
}
case raftpb.MsgAppResp:
if req.AdmittedState.Term != 0 {
// TODO(pav-kv): dispatch admitted vector to RACv2 if one is attached.
_ = 0
// If there is an admitted vector annotation, pass it to RACv2 to release
// the flow control tokens.
if term := req.AdmittedState.Term; term != 0 {
av := rac2.AdmittedVector{Term: term}
copy(av.Admitted[:], req.AdmittedState.Admitted)
r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, av)
}
}
err := raftGroup.Step(req.Message)
Expand Down

0 comments on commit 253e0df

Please sign in to comment.