diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index 7336cada2e75..e6243bd236ff 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -243,19 +243,23 @@ func (s *LogStore) storeEntriesAndCommitBatch( if nonBlockingSync { // If non-blocking synchronization is enabled, apply the batched updates to // the engine and initiate a synchronous disk write, but don't wait for the - // write to complete. Instead, enqueue that waiting on the SyncWaiterLoop, - // who will signal the callback when the write completes. + // write to complete. if err := batch.CommitNoSyncWait(); err != nil { const expl = "while committing batch without sync wait" return RaftState{}, errors.Wrap(err, expl) } stats.PebbleEnd = timeutil.Now() - s.SyncWaiter.enqueue(ctx, batch, func() { - // NOTE: run on the SyncWaiterLoop goroutine. - logCommitEnd := timeutil.Now() - s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) - cb.OnLogSync(ctx, m.Responses) - }) + // Instead, enqueue that waiting on the SyncWaiterLoop, who will signal the + // callback when the write completes. + waiterCallback := nonBlockingSyncWaiterCallbackPool.Get().(*nonBlockingSyncWaiterCallback) + *waiterCallback = nonBlockingSyncWaiterCallback{ + ctx: ctx, + cb: cb, + msgs: m.Responses, + metrics: s.Metrics, + logCommitBegin: stats.PebbleBegin, + } + s.SyncWaiter.enqueue(ctx, batch, waiterCallback) // Do not Close batch on return. Will be Closed by SyncWaiterLoop. batch = nil } else { @@ -310,6 +314,38 @@ func (s *LogStore) storeEntriesAndCommitBatch( return state, nil } +// nonBlockingSyncWaiterCallback packages up the callback that is handed to the +// SyncWaiterLoop during a non-blocking Raft log sync. Structuring the callback +// as a struct with a method instead of an anonymous function avoids individual +// fields escaping to the heap. It also provides the opportunity to pool the +// callback. +type nonBlockingSyncWaiterCallback struct { + // Used to run SyncCallback. + ctx context.Context + cb SyncCallback + msgs []raftpb.Message + // Used to record Metrics. + metrics Metrics + logCommitBegin time.Time +} + +// run is the callback's logic. It is executed on the SyncWaiterLoop goroutine. +func (cb *nonBlockingSyncWaiterCallback) run() { + dur := timeutil.Since(cb.logCommitBegin).Nanoseconds() + cb.metrics.RaftLogCommitLatency.RecordValue(dur) + cb.cb.OnLogSync(cb.ctx, cb.msgs) + cb.release() +} + +func (cb *nonBlockingSyncWaiterCallback) release() { + *cb = nonBlockingSyncWaiterCallback{} + nonBlockingSyncWaiterCallbackPool.Put(cb) +} + +var nonBlockingSyncWaiterCallbackPool = sync.Pool{ + New: func() interface{} { return new(nonBlockingSyncWaiterCallback) }, +} + var valPool = sync.Pool{ New: func() interface{} { return &roachpb.Value{} }, } diff --git a/pkg/kv/kvserver/logstore/sync_waiter.go b/pkg/kv/kvserver/logstore/sync_waiter.go index 6b983cc9f047..31880796dfc0 100644 --- a/pkg/kv/kvserver/logstore/sync_waiter.go +++ b/pkg/kv/kvserver/logstore/sync_waiter.go @@ -31,6 +31,15 @@ type syncWaiter interface { var _ syncWaiter = storage.Batch(nil) +// syncWaiterCallback is a callback provided to a SyncWaiterLoop. +// The callback is structured as an interface instead of a closure to allow +// users to batch the callback and its inputs into a single heap object, and +// then pool the allocation of that object. +type syncWaiterCallback interface { + // run executes the callback. + run() +} + // SyncWaiterLoop waits on a sequence of in-progress disk writes, notifying // callbacks when their corresponding disk writes have completed. // Invariant: The callbacks are notified in the order that they were enqueued @@ -44,7 +53,7 @@ type SyncWaiterLoop struct { type syncBatch struct { wg syncWaiter - cb func() + cb syncWaiterCallback } // NewSyncWaiterLoop constructs a SyncWaiterLoop. It must be Started before use. @@ -87,7 +96,7 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { log.Fatalf(ctx, "SyncWait error: %+v", err) } w.wg.Close() - w.cb() + w.cb.run() case <-stopper.ShouldQuiesce(): return } @@ -103,7 +112,7 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { // // If the SyncWaiterLoop has already been stopped, the callback will never be // called. -func (w *SyncWaiterLoop) enqueue(ctx context.Context, wg syncWaiter, cb func()) { +func (w *SyncWaiterLoop) enqueue(ctx context.Context, wg syncWaiter, cb syncWaiterCallback) { b := syncBatch{wg, cb} select { case w.q <- b: diff --git a/pkg/kv/kvserver/logstore/sync_waiter_test.go b/pkg/kv/kvserver/logstore/sync_waiter_test.go index ffc12492933f..ff050ff3b3ad 100644 --- a/pkg/kv/kvserver/logstore/sync_waiter_test.go +++ b/pkg/kv/kvserver/logstore/sync_waiter_test.go @@ -32,7 +32,8 @@ func TestSyncWaiterLoop(t *testing.T) { // Enqueue a waiter while the loop is running. c := make(chan struct{}) wg1 := make(chanSyncWaiter) - w.enqueue(ctx, wg1, func() { close(c) }) + cb1 := funcSyncWaiterCallback(func() { close(c) }) + w.enqueue(ctx, wg1, cb1) // Callback is not called before SyncWait completes. select { @@ -49,8 +50,9 @@ func TestSyncWaiterLoop(t *testing.T) { // regardless of how many times it is called. stopper.Stop(ctx) wg2 := make(chanSyncWaiter) + cb2 := funcSyncWaiterCallback(func() { t.Fatalf("callback unexpectedly called") }) for i := 0; i < 2*cap(w.q); i++ { - w.enqueue(ctx, wg2, func() { t.Fatalf("callback unexpectedly called") }) + w.enqueue(ctx, wg2, cb2) } // Callback should not be called, even after SyncWait completes. @@ -72,7 +74,7 @@ func BenchmarkSyncWaiterLoop(b *testing.B) { // performance of operations inside the SyncWaiterLoop. wg := make(chanSyncWaiter) c := make(chan struct{}) - cb := func() { c <- struct{}{} } + cb := funcSyncWaiterCallback(func() { c <- struct{}{} }) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -91,3 +93,8 @@ func (c chanSyncWaiter) SyncWait() error { } func (c chanSyncWaiter) Close() {} + +// funcSyncWaiterCallback implements the syncWaiterCallback interface. +type funcSyncWaiterCallback func() + +func (f funcSyncWaiterCallback) run() { f() }