Skip to content

Commit

Permalink
kv/logstore: avoid heap allocations around non-blocking sync waiter c…
Browse files Browse the repository at this point in the history
…allback

This commit structures the non-blocking sync callback provided to the Raft log
`SyncWaiterLoop` as a struct with a method that satisfies an interface (i.e. a
functor) instead of an anonymous function. This provides more control over the
memory layout of the callback and prevents individual fields escaping to the
heap. The change also provides the opportunity to pool the callback to avoid an
additional heap allocation.

The change has the following effect on microbenchmarks:
```
name                                                 old time/op    new time/op    delta
ReplicaProposal/bytes=1.0_KiB,withFollower=false-10    40.1µs ± 3%    39.1µs ± 1%   -2.51%  (p=0.000 n=10+9)
ReplicaProposal/bytes=512_B,withFollower=false-10      38.2µs ± 2%    37.3µs ± 4%   -2.48%  (p=0.015 n=10+10)
ReplicaProposal/bytes=256_B,withFollower=false-10      37.1µs ± 1%    36.2µs ± 3%   -2.32%  (p=0.000 n=10+10)
ReplicaProposal/bytes=256_B,withFollower=true-10       52.2µs ± 1%    51.2µs ± 1%   -1.91%  (p=0.000 n=10+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=true-10     58.5µs ± 2%    57.5µs ± 2%   -1.79%  (p=0.001 n=10+10)
ReplicaProposal/bytes=512_B,withFollower=true-10       53.8µs ± 2%    52.8µs ± 1%   -1.74%  (p=0.000 n=10+10)

name                                                 old speed      new speed      delta
ReplicaProposal/bytes=512_B,withFollower=false-10    13.4MB/s ± 2%  13.7MB/s ± 4%   +2.57%  (p=0.016 n=10+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=false-10  25.5MB/s ± 2%  26.2MB/s ± 1%   +2.57%  (p=0.000 n=10+9)
ReplicaProposal/bytes=256_B,withFollower=false-10    6.91MB/s ± 1%  7.07MB/s ± 3%   +2.36%  (p=0.000 n=10+10)
ReplicaProposal/bytes=256_B,withFollower=true-10     4.90MB/s ± 1%  5.00MB/s ± 1%   +1.96%  (p=0.000 n=10+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=true-10   17.5MB/s ± 2%  17.8MB/s ± 1%   +1.82%  (p=0.001 n=10+10)
ReplicaProposal/bytes=512_B,withFollower=true-10     9.52MB/s ± 2%  9.69MB/s ± 1%   +1.76%  (p=0.000 n=10+10)

name                                                 old alloc/op   new alloc/op   delta
ReplicaProposal/bytes=256_B,withFollower=false-10      14.6kB ± 0%    12.8kB ± 0%  -12.73%  (p=0.000 n=10+10)
ReplicaProposal/bytes=256_B,withFollower=true-10       35.0kB ± 1%    30.6kB ± 1%  -12.59%  (p=0.000 n=10+10)
ReplicaProposal/bytes=512_B,withFollower=true-10       42.9kB ± 0%    38.6kB ± 1%  -10.04%  (p=0.000 n=8+10)
ReplicaProposal/bytes=512_B,withFollower=false-10      18.5kB ± 2%    16.8kB ± 1%   -9.19%  (p=0.000 n=10+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=true-10     60.6kB ± 1%    55.9kB ± 1%   -7.76%  (p=0.000 n=10+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=false-10    27.5kB ± 2%    25.6kB ± 2%   -7.06%  (p=0.000 n=10+10)

name                                                 old allocs/op  new allocs/op  delta
ReplicaProposal/bytes=512_B,withFollower=false-10        70.0 ± 0%      61.6 ± 1%  -12.00%  (p=0.000 n=10+10)
ReplicaProposal/bytes=256_B,withFollower=false-10        69.0 ± 0%      61.0 ± 0%  -11.59%  (p=0.000 n=10+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=false-10      73.0 ± 0%      65.0 ± 0%  -10.96%  (p=0.002 n=8+10)
ReplicaProposal/bytes=256_B,withFollower=true-10          179 ± 0%       161 ± 0%  -10.21%  (p=0.000 n=10+7)
ReplicaProposal/bytes=512_B,withFollower=true-10          181 ± 1%       162 ± 0%  -10.11%  (p=0.000 n=9+10)
ReplicaProposal/bytes=1.0_KiB,withFollower=true-10        186 ± 0%       168 ± 0%   -9.84%  (p=0.000 n=9+10)
```

Release note: None
Epic: None
  • Loading branch information
nvanbenschoten committed Feb 6, 2023
1 parent efd9f51 commit 894216b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 14 deletions.
52 changes: 44 additions & 8 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,23 @@ func (s *LogStore) storeEntriesAndCommitBatch(
if nonBlockingSync {
// If non-blocking synchronization is enabled, apply the batched updates to
// the engine and initiate a synchronous disk write, but don't wait for the
// write to complete. Instead, enqueue that waiting on the SyncWaiterLoop,
// who will signal the callback when the write completes.
// write to complete.
if err := batch.CommitNoSyncWait(); err != nil {
const expl = "while committing batch without sync wait"
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
s.SyncWaiter.enqueue(ctx, batch, func() {
// NOTE: run on the SyncWaiterLoop goroutine.
logCommitEnd := timeutil.Now()
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses)
})
// Instead, enqueue that waiting on the SyncWaiterLoop, who will signal the
// callback when the write completes.
waiterCallback := nonBlockingSyncWaiterCallbackPool.Get().(*nonBlockingSyncWaiterCallback)
*waiterCallback = nonBlockingSyncWaiterCallback{
ctx: ctx,
cb: cb,
msgs: m.Responses,
metrics: s.Metrics,
logCommitBegin: stats.PebbleBegin,
}
s.SyncWaiter.enqueue(ctx, batch, waiterCallback)
// Do not Close batch on return. Will be Closed by SyncWaiterLoop.
batch = nil
} else {
Expand Down Expand Up @@ -310,6 +314,38 @@ func (s *LogStore) storeEntriesAndCommitBatch(
return state, nil
}

// nonBlockingSyncWaiterCallback packages up the callback that is handed to the
// SyncWaiterLoop during a non-blocking Raft log sync. Structuring the callback
// as a struct with a method instead of an anonymous function avoids individual
// fields escaping to the heap. It also provides the opportunity to pool the
// callback.
type nonBlockingSyncWaiterCallback struct {
// Used to run SyncCallback.
ctx context.Context
cb SyncCallback
msgs []raftpb.Message
// Used to record Metrics.
metrics Metrics
logCommitBegin time.Time
}

// run is the callback's logic. It is executed on the SyncWaiterLoop goroutine.
func (cb *nonBlockingSyncWaiterCallback) run() {
dur := timeutil.Since(cb.logCommitBegin).Nanoseconds()
cb.metrics.RaftLogCommitLatency.RecordValue(dur)
cb.cb.OnLogSync(cb.ctx, cb.msgs)
cb.release()
}

func (cb *nonBlockingSyncWaiterCallback) release() {
*cb = nonBlockingSyncWaiterCallback{}
nonBlockingSyncWaiterCallbackPool.Put(cb)
}

var nonBlockingSyncWaiterCallbackPool = sync.Pool{
New: func() interface{} { return new(nonBlockingSyncWaiterCallback) },
}

var valPool = sync.Pool{
New: func() interface{} { return &roachpb.Value{} },
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/logstore/sync_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ type syncWaiter interface {

var _ syncWaiter = storage.Batch(nil)

// syncWaiterCallback is a callback provided to a SyncWaiterLoop.
// The callback is structured as an interface instead of a closure to allow
// users to batch the callback and its inputs into a single heap object, and
// then pool the allocation of that object.
type syncWaiterCallback interface {
// run executes the callback.
run()
}

// SyncWaiterLoop waits on a sequence of in-progress disk writes, notifying
// callbacks when their corresponding disk writes have completed.
// Invariant: The callbacks are notified in the order that they were enqueued
Expand All @@ -44,7 +53,7 @@ type SyncWaiterLoop struct {

type syncBatch struct {
wg syncWaiter
cb func()
cb syncWaiterCallback
}

// NewSyncWaiterLoop constructs a SyncWaiterLoop. It must be Started before use.
Expand Down Expand Up @@ -87,7 +96,7 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
log.Fatalf(ctx, "SyncWait error: %+v", err)
}
w.wg.Close()
w.cb()
w.cb.run()
case <-stopper.ShouldQuiesce():
return
}
Expand All @@ -103,7 +112,7 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
//
// If the SyncWaiterLoop has already been stopped, the callback will never be
// called.
func (w *SyncWaiterLoop) enqueue(ctx context.Context, wg syncWaiter, cb func()) {
func (w *SyncWaiterLoop) enqueue(ctx context.Context, wg syncWaiter, cb syncWaiterCallback) {
b := syncBatch{wg, cb}
select {
case w.q <- b:
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/logstore/sync_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestSyncWaiterLoop(t *testing.T) {
// Enqueue a waiter while the loop is running.
c := make(chan struct{})
wg1 := make(chanSyncWaiter)
w.enqueue(ctx, wg1, func() { close(c) })
cb1 := funcSyncWaiterCallback(func() { close(c) })
w.enqueue(ctx, wg1, cb1)

// Callback is not called before SyncWait completes.
select {
Expand All @@ -49,8 +50,9 @@ func TestSyncWaiterLoop(t *testing.T) {
// regardless of how many times it is called.
stopper.Stop(ctx)
wg2 := make(chanSyncWaiter)
cb2 := funcSyncWaiterCallback(func() { t.Fatalf("callback unexpectedly called") })
for i := 0; i < 2*cap(w.q); i++ {
w.enqueue(ctx, wg2, func() { t.Fatalf("callback unexpectedly called") })
w.enqueue(ctx, wg2, cb2)
}

// Callback should not be called, even after SyncWait completes.
Expand All @@ -72,7 +74,7 @@ func BenchmarkSyncWaiterLoop(b *testing.B) {
// performance of operations inside the SyncWaiterLoop.
wg := make(chanSyncWaiter)
c := make(chan struct{})
cb := func() { c <- struct{}{} }
cb := funcSyncWaiterCallback(func() { c <- struct{}{} })

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -91,3 +93,8 @@ func (c chanSyncWaiter) SyncWait() error {
}

func (c chanSyncWaiter) Close() {}

// funcSyncWaiterCallback implements the syncWaiterCallback interface.
type funcSyncWaiterCallback func()

func (f funcSyncWaiterCallback) run() { f() }

0 comments on commit 894216b

Please sign in to comment.