Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130076: replica_rac2: use LogMark to convey semantics r=sumeerbhola a=pav-kv

Part of cockroachdb#129508

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 4, 2024
2 parents 2ec0e26 + d9c1a69 commit b74b513
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 32 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
Expand Down Expand Up @@ -308,9 +309,8 @@ func (ss *storesForRACv2) AdmittedLogEntry(
return
}
p.AdmittedLogEntry(ctx, replica_rac2.EntryForAdmissionCallbackState{
LeaderTerm: cbState.LeaderTerm,
Index: cbState.Pos.Index,
Priority: cbState.RaftPri,
Mark: rac2.LogMark{Term: cbState.LeaderTerm, Index: cbState.Pos.Index},
Priority: cbState.RaftPri,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,10 @@ func (n *controllerImpl) Admit(ctx context.Context, entry replica_rac2.EntryForA
Enabled: true,
RangeID: entry.RangeID,
ReplicaID: entry.ReplicaID,
LeaderTerm: entry.CallbackState.LeaderTerm,
LeaderTerm: entry.CallbackState.Mark.Term,
LogPosition: admission.LogPosition{
Term: 0, // Ignored by callback in RACv2.
Index: entry.CallbackState.Index,
Index: entry.CallbackState.Mark.Index,
},
Origin: 0,
RaftPri: entry.CallbackState.Priority,
Expand Down
22 changes: 10 additions & 12 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ type EntryForAdmission struct {
// EntryForAdmissionCallbackState is passed to the callback when the entry is
// admitted.
type EntryForAdmissionCallbackState struct {
// TODO(pav-kv): use LogMark.
LeaderTerm uint64
Index uint64
Priority raftpb.Priority
Mark rac2.LogMark
Priority raftpb.Priority
}

// ACWorkQueue abstracts the behavior needed from admission.WorkQueue.
Expand Down Expand Up @@ -791,6 +789,7 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
if err != nil {
panic(errors.Wrap(err, "unable to decode raft command admission data: %v"))
}
mark := rac2.LogMark{Term: e.Term, Index: entry.Index}
var raftPri raftpb.Priority
if isV2Encoding {
raftPri = raftpb.Priority(meta.AdmissionPriority)
Expand All @@ -801,7 +800,7 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
p.mu.Lock()
defer p.mu.Unlock()
raftPri = p.mu.follower.lowPriOverrideState.getEffectivePriority(entry.Index, raftPri)
p.mu.waitingForAdmissionState.add(e.Term, entry.Index, raftPri)
p.mu.waitingForAdmissionState.add(mark.Term, mark.Index, raftPri)
}()
} else {
raftPri = raftpb.LowPri
Expand All @@ -814,7 +813,7 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
func() {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.waitingForAdmissionState.add(e.Term, entry.Index, raftPri)
p.mu.waitingForAdmissionState.add(mark.Term, mark.Index, raftPri)
}()
}
admissionPri := rac2.RaftToAdmissionPriority(raftPri)
Expand All @@ -830,17 +829,16 @@ func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2
RangeID: p.opts.RangeID,
ReplicaID: p.opts.ReplicaID,
CallbackState: EntryForAdmissionCallbackState{
LeaderTerm: e.Term,
Index: entry.Index,
Priority: raftPri,
Mark: mark,
Priority: raftPri,
},
})
if !submitted {
// Very rare. e.g. store was not found.
func() {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.waitingForAdmissionState.remove(e.Term, entry.Index, raftPri)
p.mu.waitingForAdmissionState.remove(mark.Term, mark.Index, raftPri)
}()
}
}
Expand Down Expand Up @@ -920,8 +918,8 @@ func (p *processorImpl) AdmittedLogEntry(
return
}
admittedMayAdvance :=
p.mu.waitingForAdmissionState.remove(state.LeaderTerm, state.Index, state.Priority)
if !admittedMayAdvance || state.Index > p.mu.lastObservedStableIndex ||
p.mu.waitingForAdmissionState.remove(state.Mark.Term, state.Mark.Index, state.Priority)
if !admittedMayAdvance || state.Mark.Index > p.mu.lastObservedStableIndex ||
!p.isLeaderUsingV2ProcLocked() {
return
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,17 +436,12 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "admitted-log-entry":
var leaderTerm uint64
d.ScanArgs(t, "leader-term", &leaderTerm)
var index uint64
d.ScanArgs(t, "index", &index)
var cb EntryForAdmissionCallbackState
d.ScanArgs(t, "leader-term", &cb.Mark.Term)
d.ScanArgs(t, "index", &cb.Mark.Index)
var pri int
d.ScanArgs(t, "pri", &pri)
cb := EntryForAdmissionCallbackState{
LeaderTerm: leaderTerm,
Index: index,
Priority: raftpb.Priority(pri),
}
cb.Priority = raftpb.Priority(pri)
p.AdmittedLogEntry(ctx, cb)
return builderStr()

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ HandleRaftReady:
Replica.MuUnlock
.....
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{LeaderTerm:50 Index:25 Priority:LowPri}}) = true
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:25} Priority:LowPri}}) = true
destroyed-or-leader-using-v2: true

# Stable index is advanced to 25.
Expand Down Expand Up @@ -157,7 +157,7 @@ HandleRaftReady:
Replica.MuUnlock
.....
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:user-high-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{LeaderTerm:50 Index:26 Priority:AboveNormalPri}}) = true
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:user-high-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:26} Priority:AboveNormalPri}}) = true
destroyed-or-leader-using-v2: true

# handleRaftReady is a noop.
Expand Down Expand Up @@ -246,7 +246,7 @@ HandleRaftReady:
Replica.MuUnlock
.....
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{LeaderTerm:50 Index:27 Priority:LowPri}}) = true
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:27} Priority:LowPri}}) = true
destroyed-or-leader-using-v2: true

admitted-log-entry leader-term=50 index=27 pri=3
Expand Down Expand Up @@ -384,7 +384,7 @@ HandleRaftReady:
RangeController.HandleRaftEventRaftMuLocked([28])
.....
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{LeaderTerm:52 Index:28 Priority:LowPri}}) = true
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:52 Index:28} Priority:LowPri}}) = true
destroyed-or-leader-using-v2: true

# AdmitForEval returns true since there is a RangeController which admitted.
Expand Down Expand Up @@ -691,7 +691,7 @@ HandleRaftReady:
RangeController.HandleRaftEventRaftMuLocked([26])
.....
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{LeaderTerm:50 Index:26 Priority:LowPri}}) = true
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:50 Index:26} Priority:LowPri}}) = true
destroyed-or-leader-using-v2: true

# Entry is admitted.
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2124,7 +2124,7 @@ type LogEntryAdmittedCallbackState struct {
// Pos is the position of the entry in the log.
//
// TODO(sumeer): when the RACv1 protocol is deleted, drop the Term from this
// struct.
// struct, and replace LeaderTerm/Pos.Index with a LogMark.
Pos LogPosition
// Pri is the admission priority used for admission.
Pri admissionpb.WorkPriority
Expand Down

0 comments on commit b74b513

Please sign in to comment.