Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
133060: kvserver/rangefeed: add event queue r=stevendanna a=wenyihu6

This patch adds a new data structure eventQueue which is a queue but
uses a fixed size chunked linked list. Each chunk has a fixed size of
4096 elements. Chunks are pooled in a sync.Pool to reduce the number
of allocations.

pushBack, popFront, and len run in constant time, while drain runs in
linear time with respect to the number of elements in the queue. This
structure is not safe for concurrent use.

This is for future commits to include the queue in the BufferedSender
to buffer events at the node level.

Part of: cockroachdb#129813
Release note: none

Co-authored-by: Steven Danna <[email protected]>

133195: replica_rac2: inline Replica mutex assertions r=pav-kv,kvoli a=sumeerbhola

These are now accomplished via the ReplicaMutexAsserter struct, instead of an interface.

go build -gcflags "-m -m" produces output like:

```
./processor.go:1142:46: inlining call to ReplicaMutexAsserter.RaftMuAssertHeld
./processor.go:1143:49: inlining call to ReplicaMutexAsserter.ReplicaMuAssertHeld
./processor.go:1142:46: inlining call to syncutil.(*Mutex).AssertHeld
./processor.go:1143:49: inlining call to syncutil.(*RWMutex).AssertHeld
```

Epic: CRDB-37515

Release note: None

Co-authored-by: Wenyi Hu <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Oct 23, 2024
3 parents 6949837 + 20a9d8f + be0923c commit 62803f3
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 116 deletions.
14 changes: 2 additions & 12 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,19 +455,9 @@ func (n noopReplicaFlowControlIntegration) handle() (kvflowcontrol.Handle, bool)

type replicaForRACv2 Replica

var _ replica_rac2.Replica = &replicaForRACv2{}
var _ replica_rac2.ReplicaForTesting = &replicaForRACv2{}

// RaftMuAssertHeld implements replica_rac2.Replica.
func (r *replicaForRACv2) RaftMuAssertHeld() {
r.raftMu.AssertHeld()
}

// MuAssertHeld implements replica_rac2.Replica.
func (r *replicaForRACv2) MuAssertHeld() {
r.mu.AssertHeld()
}

// IsScratchRange implements replica_rac2.Replica.
// IsScratchRange implements replica_rac2.ReplicaForTesting.
func (r *replicaForRACv2) IsScratchRange() bool {
return (*Replica)(r).IsScratchRange()
}
Expand Down
80 changes: 52 additions & 28 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,8 @@ import (
"github.com/cockroachdb/errors"
)

// Replica abstracts kvserver.Replica. It exposes internal implementation
// details of Replica, specifically the locking behavior, since it is
// essential to reason about correctness.
//
// TODO(sumeer): because the mutex assertions are hidden behind an interface,
// they are not free for production builds. Fix, and then add more assertions.
type Replica interface {
// RaftMuAssertHeld asserts that Replica.raftMu is held.
RaftMuAssertHeld()
// MuAssertHeld asserts that Replica.mu is held.
MuAssertHeld()
// ReplicaForTesting abstracts kvserver.Replica for testing.
type ReplicaForTesting interface {
// IsScratchRange returns true if this is range is a scratch range (i.e.
// overlaps with the scratch span and has a start key <=
// keys.ScratchRangeMin).
Expand Down Expand Up @@ -142,6 +133,37 @@ type RangeControllerFactory interface {
New(ctx context.Context, state rangeControllerInitState) rac2.RangeController
}

// ReplicaMutexAsserter must only be used to assert that mutexes are held.
// This is a concrete struct so that the assertions can be compiled away in
// production code.
type ReplicaMutexAsserter struct {
raftMu *syncutil.Mutex
replicaMu *syncutil.RWMutex
}

func MakeReplicaMutexAsserter(
raftMu *syncutil.Mutex, replicaMu *syncutil.RWMutex,
) ReplicaMutexAsserter {
return ReplicaMutexAsserter{
raftMu: raftMu,
replicaMu: replicaMu,
}
}

// RaftMuAssertHeld asserts that Replica.raftMu is held.
//
// gcassert:inline
func (rmu ReplicaMutexAsserter) RaftMuAssertHeld() {
rmu.raftMu.AssertHeld()
}

// ReplicaMuAssertHeld asserts that Replica.mu is held for writing.
//
// gcassert:inline
func (rmu ReplicaMutexAsserter) ReplicaMuAssertHeld() {
rmu.replicaMu.AssertHeld()
}

// ProcessorOptions are specified when creating a new Processor.
type ProcessorOptions struct {
// Various constant fields that are duplicated from Replica, since we
Expand All @@ -154,7 +176,8 @@ type ProcessorOptions struct {
RangeID roachpb.RangeID
ReplicaID roachpb.ReplicaID

Replica Replica
ReplicaForTesting ReplicaForTesting
ReplicaMutexAsserter ReplicaMutexAsserter
RaftScheduler RaftScheduler
AdmittedPiggybacker AdmittedPiggybacker
ACWorkQueue ACWorkQueue
Expand Down Expand Up @@ -519,8 +542,8 @@ func (p *processorImpl) isLeaderUsingV2ProcLocked() bool {
func (p *processorImpl) InitRaftLocked(
ctx context.Context, rn rac2.RaftInterface, logMark rac2.LogMark,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
if p.desc.replicas != nil {
log.Fatalf(ctx, "initializing RaftNode after replica is initialized")
}
Expand All @@ -530,7 +553,7 @@ func (p *processorImpl) InitRaftLocked(

// OnDestroyRaftMuLocked implements Processor.
func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.destroyed = true
p.closeLeaderStateRaftMuLocked(ctx)
// Release some memory.
Expand All @@ -541,7 +564,7 @@ func (p *processorImpl) OnDestroyRaftMuLocked(ctx context.Context) {
func (p *processorImpl) SetEnabledWhenLeaderRaftMuLocked(
ctx context.Context, level kvflowcontrol.V2EnabledWhenLeaderLevel, state RaftNodeBasicState,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed || p.enabledWhenLeader >= level {
return
}
Expand Down Expand Up @@ -572,8 +595,8 @@ func descToReplicaSet(desc *roachpb.RangeDescriptor) rac2.ReplicaSet {
func (p *processorImpl) OnDescChangedLocked(
ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
initialization := p.desc.replicas == nil
if initialization {
// Replica is initialized, in that we now have a descriptor.
Expand Down Expand Up @@ -754,7 +777,7 @@ func (p *processorImpl) createLeaderStateRaftMuLocked(
func (p *processorImpl) HandleRaftReadyRaftMuLocked(
ctx context.Context, state RaftNodeBasicState, e rac2.RaftEvent,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
// Register all snapshots / log appends without exception. If the replica is
// being destroyed, this should be a no-op, but there is no harm in
// registering the write just in case.
Expand Down Expand Up @@ -786,7 +809,7 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if knobs := p.opts.Knobs; knobs == nil || !knobs.UseOnlyForScratchRanges ||
p.opts.Replica.IsScratchRange() {
p.opts.ReplicaForTesting.IsScratchRange() {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
log.Errorf(ctx, "error handling raft event: %v", err)
}
Expand Down Expand Up @@ -873,6 +896,7 @@ func (p *processorImpl) registerStorageAppendRaftMuLocked(ctx context.Context, e

// AdmitRaftEntriesRaftMuLocked implements Processor.
func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2.RaftEvent) bool {
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
// Return false only if we're not destroyed and not using V2.
if p.destroyed || !p.isLeaderUsingV2ProcLocked() {
return p.destroyed
Expand Down Expand Up @@ -1003,7 +1027,7 @@ func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(

// ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor.
func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed {
return
}
Expand Down Expand Up @@ -1040,7 +1064,7 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte
func (p *processorImpl) SideChannelForPriorityOverrideAtFollowerRaftMuLocked(
info SideChannelInfoUsingRaftMessageRequest,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed {
return
}
Expand Down Expand Up @@ -1090,7 +1114,7 @@ func (p *processorImpl) AdmittedState() rac2.AdmittedVector {
func (p *processorImpl) AdmitRaftMuLocked(
ctx context.Context, replicaID roachpb.ReplicaID, av rac2.AdmittedVector,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
// NB: rc is always updated while raftMu is held.
if rc := p.leader.rc; rc != nil {
rc.AdmitRaftMuLocked(ctx, replicaID, av)
Expand All @@ -1099,16 +1123,16 @@ func (p *processorImpl) AdmitRaftMuLocked(

// MaybeSendPingsRaftMuLocked implements Processor.
func (p *processorImpl) MaybeSendPingsRaftMuLocked() {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if rc := p.leader.rc; rc != nil {
rc.MaybeSendPingsRaftMuLocked()
}
}

// HoldsSendTokensLocked implements Processor.
func (p *processorImpl) HoldsSendTokensLocked() bool {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld()
if rc := p.leader.rc; rc != nil {
return rc.HoldsSendTokensLocked()
}
Expand Down Expand Up @@ -1142,7 +1166,7 @@ func (p *processorImpl) AdmitForEval(
func (p *processorImpl) ProcessSchedulerEventRaftMuLocked(
ctx context.Context, mode rac2.RaftMsgAppMode, logSnapshot raft.LogSnapshot,
) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
if p.destroyed {
return
}
Expand All @@ -1153,7 +1177,7 @@ func (p *processorImpl) ProcessSchedulerEventRaftMuLocked(

// InspectRaftMuLocked implements Processor.
func (p *processorImpl) InspectRaftMuLocked(ctx context.Context) (kvflowinspectpb.Handle, bool) {
p.opts.Replica.RaftMuAssertHeld()
p.opts.ReplicaMutexAsserter.RaftMuAssertHeld()
p.leader.rcReferenceUpdateMu.RLock()
defer p.leader.rcReferenceUpdateMu.RUnlock()
if p.leader.rc == nil {
Expand Down
55 changes: 42 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ import (
)

type testReplica struct {
mu syncutil.RWMutex
raftNode *testRaftNode
b *strings.Builder

leaseholder roachpb.ReplicaID
}

var _ Replica = &testReplica{}
var _ ReplicaForTesting = &testReplica{}

func newTestReplica(b *strings.Builder) *testReplica {
return &testReplica{b: b}
Expand All @@ -57,14 +56,6 @@ func (r *testReplica) initRaft(stable rac2.LogMark) {
}
}

func (r *testReplica) RaftMuAssertHeld() {
fmt.Fprintf(r.b, " Replica.RaftMuAssertHeld\n")
}

func (r *testReplica) MuAssertHeld() {
fmt.Fprintf(r.b, " Replica.MuAssertHeld\n")
}

func (r *testReplica) IsScratchRange() bool {
return true
}
Expand Down Expand Up @@ -249,6 +240,28 @@ func (c *testRangeController) SendStreamStats(stats *rac2.RangeSendStreamStats)
fmt.Fprintf(c.b, " RangeController.SendStreamStats\n")
}

func makeTestMutexAsserter() ReplicaMutexAsserter {
var raftMu syncutil.Mutex
var replicaMu syncutil.RWMutex
return MakeReplicaMutexAsserter(&raftMu, &replicaMu)
}

func LockRaftMuAndReplicaMu(mu *ReplicaMutexAsserter) (unlockFunc func()) {
mu.raftMu.Lock()
mu.replicaMu.Lock()
return func() {
mu.replicaMu.Unlock()
mu.raftMu.Unlock()
}
}

func LockRaftMu(mu *ReplicaMutexAsserter) (unlockFunc func()) {
mu.raftMu.Lock()
return func() {
mu.raftMu.Unlock()
}
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -264,6 +277,7 @@ func TestProcessorBasic(t *testing.T) {
var rcFactory testRangeControllerFactory
var p *processorImpl
tenantID := roachpb.MustMakeTenantID(4)
muAsserter := makeTestMutexAsserter()
reset := func(enabled kvflowcontrol.V2EnabledWhenLeaderLevel) {
b.Reset()
r = newTestReplica(&b)
Expand All @@ -276,7 +290,8 @@ func TestProcessorBasic(t *testing.T) {
StoreID: 2,
RangeID: 3,
ReplicaID: replicaID,
Replica: r,
ReplicaForTesting: r,
ReplicaMutexAsserter: muAsserter,
RaftScheduler: &sched,
AdmittedPiggybacker: &piggybacker,
ACWorkQueue: &q,
Expand Down Expand Up @@ -311,10 +326,10 @@ func TestProcessorBasic(t *testing.T) {
var mark rac2.LogMark
d.ScanArgs(t, "log-term", &mark.Term)
d.ScanArgs(t, "log-index", &mark.Index)
r.mu.Lock()
r.initRaft(mark)
unlockFunc := LockRaftMuAndReplicaMu(&muAsserter)
p.InitRaftLocked(ctx, r.raftNode, r.raftNode.mark)
r.mu.Unlock()
unlockFunc()
return builderStr()

case "set-raft-state":
Expand Down Expand Up @@ -357,7 +372,9 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "on-destroy":
unlockFunc := LockRaftMu(&muAsserter)
p.OnDestroyRaftMuLocked(ctx)
unlockFunc()
return builderStr()

case "set-enabled-level":
Expand All @@ -372,7 +389,9 @@ func TestProcessorBasic(t *testing.T) {
Leaseholder: r.leaseholder,
}
}
unlockFunc := LockRaftMu(&muAsserter)
p.SetEnabledWhenLeaderRaftMuLocked(ctx, enabledLevel, state)
unlockFunc()
return builderStr()

case "get-enabled-level":
Expand All @@ -382,7 +401,9 @@ func TestProcessorBasic(t *testing.T) {

case "on-desc-changed":
desc := parseRangeDescriptor(t, d)
unlockFunc := LockRaftMuAndReplicaMu(&muAsserter)
p.OnDescChangedLocked(ctx, &desc, tenantID)
unlockFunc()
return builderStr()

case "handle-raft-ready-and-admit":
Expand All @@ -409,6 +430,7 @@ func TestProcessorBasic(t *testing.T) {
Leaseholder: r.leaseholder,
}
}
unlockFunc := LockRaftMu(&muAsserter)
p.HandleRaftReadyRaftMuLocked(ctx, state, event)
fmt.Fprintf(&b, ".....\n")
if len(event.Entries) > 0 {
Expand All @@ -417,6 +439,7 @@ func TestProcessorBasic(t *testing.T) {
fmt.Fprintf(&b, "destroyed-or-leader-using-v2: %t\n", destroyedOrV2)
printLogTracker()
}
unlockFunc()
return builderStr()

case "enqueue-piggybacked-admitted":
Expand All @@ -440,7 +463,9 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "process-piggybacked-admitted":
unlockFunc := LockRaftMu(&muAsserter)
p.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx)
unlockFunc()
return builderStr()

case "side-channel":
Expand All @@ -464,7 +489,9 @@ func TestProcessorBasic(t *testing.T) {
Last: last,
LowPriOverride: lowPriOverride,
}
unlockFunc := LockRaftMu(&muAsserter)
p.SideChannelForPriorityOverrideAtFollowerRaftMuLocked(info)
unlockFunc()
return builderStr()

case "admitted-log-entry":
Expand Down Expand Up @@ -502,7 +529,9 @@ func TestProcessorBasic(t *testing.T) {
return builderStr()

case "inspect":
unlockFunc := LockRaftMu(&muAsserter)
p.InspectRaftMuLocked(ctx)
unlockFunc()
return builderStr()

case "send-stream-stats":
Expand Down
Loading

0 comments on commit 62803f3

Please sign in to comment.