From 7f77d1a860ea19d9740d5ccd8a0218a6e1115378 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 22 Dec 2022 13:23:57 -0500 Subject: [PATCH] kv: integrate raft async storage writes Fixes #17500. Waiting on github.com/cockroachdb/pebble/pull/2117. This commit integrates with the `AsyncStorageWrites` functionality that we added to Raft in github.com/etcd-io/raft/pull/8. \## Approach The commit makes the minimal changes needed to integrate with async storage writes and pull fsyncs out of the raft state machine loop. It does not make an effort to extract the non-durable portion of raft log writes or raft log application onto separate goroutine pools, as was described in #17500. Those changes will also be impactful, but they're non trivial and bump into a pipelining vs. batching trade-off, so they are left as future work items (TODO(nvanbenschoten): open new issues). With this change, asynchronous Raft log syncs are enabled by the new `DB.ApplyNoSyncWait` Pebble API introduced in github.com/cockroachdb/pebble/pull/2117. The `handleRaftReady` state machine loop continues to initiate Raft log writes, but it uses the Pebble API to offload waiting on durability to a separate goroutine. This separate goroutine then sends the corresponding `MsgStorageAppend`'s response messages where they need to go (locally and/or to the Raft leader) when the fsync completes. The async storage writes functionality in Raft makes this all safe. \## Benchmark Results The result of this change is reduced interference between Raft proposals. As a result, it reduces end-to-end commit latency. github.com/etcd-io/raft/pull/8 presented a collection of benchmark results captured from integrating async storage writes with rafttoy. When integrated into CockroachDB, we see similar improvements to average and tail latency. However, it doesn't provide the throughput improvements at the top end because log appends and state machine application have not yet been extracted into separate goroutine pools, which would facilitate increased opportunity for batching. TODO: add images ---- Release note (performance improvement): The Raft proposal pipeline has been optimized to reduce interference between Raft proposals. This improves average and tail write latency at high concurrency. --- pkg/kv/kvserver/logstore/BUILD.bazel | 5 + pkg/kv/kvserver/logstore/logstore.go | 150 +++-- .../kvserver/logstore/logstore_bench_test.go | 20 +- pkg/kv/kvserver/logstore/sync_waiter.go | 109 ++++ pkg/kv/kvserver/logstore/sync_waiter_test.go | 68 +++ pkg/kv/kvserver/replica.go | 13 + pkg/kv/kvserver/replica_raft.go | 522 +++++++++++------- pkg/kv/kvserver/replica_test.go | 93 ---- pkg/kv/kvserver/store.go | 4 + pkg/kv/kvserver/store_raft.go | 2 + 10 files changed, 630 insertions(+), 356 deletions(-) create mode 100644 pkg/kv/kvserver/logstore/sync_waiter.go create mode 100644 pkg/kv/kvserver/logstore/sync_waiter_test.go diff --git a/pkg/kv/kvserver/logstore/BUILD.bazel b/pkg/kv/kvserver/logstore/BUILD.bazel index e3cb78453749..7e29b6803e70 100644 --- a/pkg/kv/kvserver/logstore/BUILD.bazel +++ b/pkg/kv/kvserver/logstore/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "sideload.go", "sideload_disk.go", "stateloader.go", + "sync_waiter.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore", visibility = ["//visibility:public"], @@ -28,9 +29,11 @@ go_library( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", + "@com_github_cockroachdb_pebble//record", "@com_github_cockroachdb_redact//:redact", "@io_etcd_go_raft_v3//:raft", "@io_etcd_go_raft_v3//raftpb", @@ -43,6 +46,7 @@ go_test( srcs = [ "logstore_bench_test.go", "sideload_test.go", + "sync_waiter_test.go", ], args = ["-test.timeout=295s"], embed = [":logstore"], @@ -60,6 +64,7 @@ go_test( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index f41bd8402002..75536580bfd2 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -13,6 +13,7 @@ package logstore import ( "context" + "fmt" "sync" "time" @@ -44,25 +45,24 @@ var disableSyncRaftLog = settings.RegisterBoolSetting( envutil.EnvOrDefaultBool("COCKROACH_DISABLE_RAFT_LOG_SYNCHRONIZATION_UNSAFE", false), ) -// Ready contains the log entries and state to be saved to stable storage. This -// is a subset of raft.Ready relevant to log storage. All fields are read-only. -type Ready struct { - // The current state of a replica to be saved to stable storage. Empty if - // there is no update. - raftpb.HardState +var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting( + settings.TenantWritable, + "kv.raft_log.non_blocking_synchronization.enabled", + "set to true to enable non-blocking synchronization on Raft log writes to "+ + "persistent storage. Setting to true does not risk data loss or data corruption "+ + "on server crashes, but can reduce write latency.", + envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RAFT_LOG_NON_BLOCKING_SYNCHRONIZATION", true), +) - // Entries specifies entries to be saved to stable storage. Empty if there is - // no update. - Entries []raftpb.Entry +// MsgStorageAppend is a raftpb.Message with type MsgStorageAppend. +type MsgStorageAppend raftpb.Message - // MustSync indicates whether the HardState and Entries must be synchronously - // written to disk, or if an asynchronous write is permissible. - MustSync bool -} - -// MakeReady constructs a Ready struct from raft.Ready. -func MakeReady(from raft.Ready) Ready { - return Ready{HardState: from.HardState, Entries: from.Entries, MustSync: from.MustSync} +// MakeMsgStorageAppend constructs a MsgStorageAppend from a raftpb.Message. +func MakeMsgStorageAppend(m raftpb.Message) MsgStorageAppend { + if m.Type != raftpb.MsgStorageAppend { + panic(fmt.Sprintf("unexpected message type %s", m.Type)) + } + return MsgStorageAppend(m) } // RaftState stores information about the last entry and the size of the log. @@ -87,6 +87,8 @@ type AppendStats struct { PebbleBytes int64 Sync bool + // If true, PebbleEnd-PebbleBegin does not include the sync time. + NonBlocking bool } // Metrics contains metrics specific to the log storage. @@ -100,37 +102,67 @@ type LogStore struct { Engine storage.Engine Sideload SideloadStorage StateLoader StateLoader + SyncWaiter *SyncWaiterLoop EntryCache *raftentry.Cache Settings *cluster.Settings Metrics Metrics } +// SyncCallback is a callback that is notified when a raft log write has been +// durably committed to disk. The function is handed the response messages that +// are associated with the MsgStorageAppend that triggered the fsync. +type SyncCallback interface { + OnLogSync(context.Context, []raftpb.Message) +} + func newStoreEntriesBatch(eng storage.Engine) storage.Batch { // Use an unindexed batch because we don't need to read our writes, and // it is more efficient. return eng.NewUnindexedBatch(false /* writeOnly */) } -// StoreEntries persists newly appended Raft log Entries to the log storage. +// StoreEntries persists newly appended Raft log Entries to the log storage, +// then calls the provided callback with the input's response messages (if any) +// once the entries are durable. The durable log write may or may not be +// blocking (and therefore the callback may or may not be called synchronously), +// depending on the kv.raft_log.non_blocking_synchronization.enabled cluster +// setting. Either way, the effects of the log append will be immediately +// visible readers of the Engine. +// // Accepts the state of the log before the operation, returns the state after. // Persists HardState atomically with, or strictly after Entries. func (s *LogStore) StoreEntries( - ctx context.Context, state RaftState, rd Ready, stats *AppendStats, + ctx context.Context, state RaftState, m MsgStorageAppend, cb SyncCallback, stats *AppendStats, ) (RaftState, error) { batch := newStoreEntriesBatch(s.Engine) - defer batch.Close() - return s.storeEntriesAndCommitBatch(ctx, state, rd, stats, batch) + return s.storeEntriesAndCommitBatch(ctx, state, m, cb, stats, batch) } +// storeEntriesAndCommitBatch is like StoreEntries, but it accepts a +// storage.Batch, which it takes responsibility for committing and closing. func (s *LogStore) storeEntriesAndCommitBatch( - ctx context.Context, state RaftState, rd Ready, stats *AppendStats, batch storage.Batch, + ctx context.Context, + state RaftState, + m MsgStorageAppend, + cb SyncCallback, + stats *AppendStats, + batch storage.Batch, ) (RaftState, error) { + // Before returning, Close the batch if we haven't handed ownership of it to a + // SyncWaiterLoop. If batch == nil, SyncWaiterLoop is responsible for closing + // it once the in-progress disk writes complete. + defer func() { + if batch != nil { + defer batch.Close() + } + }() + prevLastIndex := state.LastIndex - if len(rd.Entries) > 0 { + if len(m.Entries) > 0 { stats.Begin = timeutil.Now() // All of the entries are appended to distinct keys, returning a new // last index. - thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, rd.Entries, s.Sideload) + thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload) if err != nil { const expl = "during sideloading" return RaftState{}, errors.Wrap(err, expl) @@ -149,16 +181,21 @@ func (s *LogStore) storeEntriesAndCommitBatch( stats.End = timeutil.Now() } - if !raft.IsEmptyHardState(rd.HardState) { + hs := raftpb.HardState{ + Term: m.Term, + Vote: m.Vote, + Commit: m.Commit, + } + if !raft.IsEmptyHardState(hs) { // NB: Note that without additional safeguards, it's incorrect to write - // the HardState before appending rd.Entries. When catching up, a follower + // the HardState before appending m.Entries. When catching up, a follower // will receive Entries that are immediately Committed in the same // Ready. If we persist the HardState but happen to lose the Entries, // assertions can be tripped. // // We have both in the same batch, so there's no problem. If that ever // changes, we must write and sync the Entries before the HardState. - if err := s.StateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { + if err := s.StateLoader.SetHardState(ctx, batch, hs); err != nil { const expl = "during setHardState" return RaftState{}, errors.Wrap(err, expl) } @@ -168,9 +205,9 @@ func (s *LogStore) storeEntriesAndCommitBatch( // // Note that the data is visible to other goroutines before it is synced to // disk. This is fine. The important constraints are that these syncs happen - // before Raft messages are sent and before the call to RawNode.Advance. Our - // regular locking is sufficient for this and if other goroutines can see the - // data early, that's fine. In particular, snapshots are not a problem (I + // before the MsgStorageAppend's responses are delivered back to the RawNode. + // Our regular locking is sufficient for this and if other goroutines can see + // the data early, that's fine. In particular, snapshots are not a problem (I // think they're the only thing that might access log entries or HardState // from other goroutines). Snapshots do not include either the HardState or // uncommitted log entries, and even if they did include log entries that @@ -183,25 +220,54 @@ func (s *LogStore) storeEntriesAndCommitBatch( // (Replica), so this comment might need to move. stats.PebbleBegin = timeutil.Now() stats.PebbleBytes = int64(batch.Len()) - sync := rd.MustSync && !disableSyncRaftLog.Get(&s.Settings.SV) - if err := batch.Commit(sync); err != nil { - const expl = "while committing batch" - return RaftState{}, errors.Wrap(err, expl) + mustSync := len(m.Responses) > 0 + sync := mustSync && !disableSyncRaftLog.Get(&s.Settings.SV) + nonBlockingSync := sync && enableNonBlockingRaftLogSync.Get(&s.Settings.SV) + if nonBlockingSync { + // If non-blocking synchronization is enabled, apply the batched updates to + // the engine and initiate a synchronous disk write, but don't wait for the + // write to complete. Instead, enqueue that waiting on the SyncWaiterLoop, + // who will signal the callback when the write completes. + if err := batch.CommitNoSyncWait(); err != nil { + const expl = "while committing batch without sync wait" + return RaftState{}, errors.Wrap(err, expl) + } + stats.PebbleEnd = timeutil.Now() + s.SyncWaiter.enqueue(ctx, batch, func() { + // NOTE: run on the SyncWaiterLoop goroutine. + logCommitEnd := timeutil.Now() + s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) + cb.OnLogSync(ctx, m.Responses) + }) + // Do not Close batch on return. Will be Closed by SyncWaiterLoop. + batch = nil + } else { + if err := batch.Commit(sync); err != nil { + const expl = "while committing batch" + return RaftState{}, errors.Wrap(err, expl) + } + stats.PebbleEnd = timeutil.Now() + if mustSync { + logCommitEnd := stats.PebbleEnd + s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) + cb.OnLogSync(ctx, m.Responses) + } } stats.Sync = sync - stats.PebbleEnd = timeutil.Now() - if rd.MustSync { - s.Metrics.RaftLogCommitLatency.RecordValue(stats.PebbleEnd.Sub(stats.PebbleBegin).Nanoseconds()) - } + stats.NonBlocking = nonBlockingSync - if len(rd.Entries) > 0 { + // TODO BEFORE MERGE: how does this work with CommitNoSyncWait()? Do we need + // to wait for the sync to finish before purging sideloaded entries? We don't + // end up with log entries without their corresponding sideloaded SSTables. + // What do we do with stats? + if len(m.Entries) > 0 { // We may have just overwritten parts of the log which contain // sideloaded SSTables from a previous term (and perhaps discarded some // entries that we didn't overwrite). Remove any such leftover on-disk // payloads (we can do that now because we've committed the deletion // just above). - firstPurge := rd.Entries[0].Index // first new entry written - purgeTerm := rd.Entries[0].Term - 1 + firstPurge := m.Entries[0].Index // first new entry written + purgeTerm := m.Entries[0].Term - 1 lastPurge := prevLastIndex // old end of the log, include in deletion purgedSize, err := maybePurgeSideloaded(ctx, s.Sideload, firstPurge, lastPurge, purgeTerm) if err != nil { @@ -217,7 +283,7 @@ func (s *LogStore) storeEntriesAndCommitBatch( // Update raft log entry cache. We clear any older, uncommitted log entries // and cache the latest ones. - s.EntryCache.Add(s.RangeID, rd.Entries, true /* truncate */) + s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */) return state, nil } diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index ca53dca803c2..96df255b4578 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -36,6 +36,10 @@ func (b *discardBatch) Commit(bool) error { return nil } +type noopSyncCallback struct{} + +func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {} + func BenchmarkLogStore_StoreEntries(b *testing.B) { defer log.Scope(b).Close(b) const kb = 1 << 10 @@ -48,23 +52,25 @@ func BenchmarkLogStore_StoreEntries(b *testing.B) { } func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { + ctx := context.Background() const tenMB = 10 * 1 << 20 ec := raftentry.NewCache(tenMB) const rangeID = 1 eng := storage.NewDefaultInMemForTesting() defer eng.Close() + st := cluster.MakeTestingClusterSettings() + enableNonBlockingRaftLogSync.Override(ctx, &st.SV, false) s := LogStore{ RangeID: rangeID, Engine: eng, StateLoader: NewStateLoader(rangeID), EntryCache: ec, - Settings: cluster.MakeTestingClusterSettings(), + Settings: st, Metrics: Metrics{ RaftLogCommitLatency: metric.NewHistogram(metric.Metadata{}, 10*time.Second, metric.IOLatencyBuckets), }, } - ctx := context.Background() rs := RaftState{ LastTerm: 1, ByteSize: 0, @@ -89,17 +95,13 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { batch := &discardBatch{} for i := 0; i < b.N; i++ { batch.Batch = newStoreEntriesBatch(eng) - rd := Ready{ - HardState: raftpb.HardState{}, - Entries: ents, - MustSync: true, - } + m := MsgStorageAppend{Entries: ents} + cb := noopSyncCallback{} var err error - rs, err = s.storeEntriesAndCommitBatch(ctx, rs, rd, stats, batch) + rs, err = s.storeEntriesAndCommitBatch(ctx, rs, m, cb, stats, batch) if err != nil { b.Fatal(err) } - batch.Batch.Close() ents[0].Index++ } require.EqualValues(b, b.N, rs.LastIndex) diff --git a/pkg/kv/kvserver/logstore/sync_waiter.go b/pkg/kv/kvserver/logstore/sync_waiter.go new file mode 100644 index 000000000000..b35e483eef81 --- /dev/null +++ b/pkg/kv/kvserver/logstore/sync_waiter.go @@ -0,0 +1,109 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package logstore + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/pebble/record" +) + +// syncWaiter is capable of waiting for a disk write to be durably committed. +type syncWaiter interface { + // SyncWait waits for the write to be durable. + SyncWait() error + // Close closes the syncWaiter and releases associated resources. + // Must be called after SyncWait returns. + Close() +} + +var _ syncWaiter = storage.Batch(nil) + +// SyncWaiterLoop waits on a sequence of in-progress disk writes, notifying +// callbacks when their corresponding disk writes have completed. +type SyncWaiterLoop struct { + q chan syncBatch + stopped chan struct{} +} + +type syncBatch struct { + wg syncWaiter + cb func() +} + +// NewSyncWaiterLoop constructs a SyncWaiterLoop. It must be Started before use. +func NewSyncWaiterLoop() *SyncWaiterLoop { + return &SyncWaiterLoop{ + // We size the waiter loop's queue to the same size as Pebble's sync + // concurrency. This is the maximum number of pending syncWaiter's that + // pebble allows. + q: make(chan syncBatch, record.SyncConcurrency), + stopped: make(chan struct{}), + } +} + +// Start launches the loop. +func (w *SyncWaiterLoop) Start(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{ + TaskName: "raft-logstore-sync-waiter-loop", + // This task doesn't reference a parent because it runs for the server's + // lifetime. + SpanOpt: stop.SterileRootSpan, + }, + func(ctx context.Context) { + w.waitLoop(ctx, stopper) + }) +} + +// waitLoop pulls off the SyncWaiterLoop's queue. For each syncWaiter, it waits +// for the sync to complete and then calls the associated callback. +func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { + defer close(w.stopped) + for { + select { + case w := <-w.q: + if err := w.wg.SyncWait(); err != nil { + log.Fatalf(ctx, "SyncWait error: %+v", err) + } + w.wg.Close() + w.cb() + case <-stopper.ShouldQuiesce(): + return + } + } +} + +// enqueue registers the syncWaiter with the SyncWaiterLoop. The provided +// callback will be called once the syncWaiter's associated disk write has been +// durably committed. +// +// The syncWaiter will be Closed after its SyncWait method completes. It must +// not be Closed by the caller. +// +// If the SyncWaiterLoop has already been stopped, the callback will never be +// called. +func (w *SyncWaiterLoop) enqueue(ctx context.Context, wg syncWaiter, cb func()) { + b := syncBatch{wg, cb} + select { + case w.q <- b: + case <-w.stopped: + default: + log.Warningf(ctx, "SyncWaiterLoop.enqueue blocking due to insufficient channel capacity") + select { + case w.q <- b: + case <-w.stopped: + } + } +} diff --git a/pkg/kv/kvserver/logstore/sync_waiter_test.go b/pkg/kv/kvserver/logstore/sync_waiter_test.go new file mode 100644 index 000000000000..f342591daebc --- /dev/null +++ b/pkg/kv/kvserver/logstore/sync_waiter_test.go @@ -0,0 +1,68 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package logstore + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +func TestSyncWaiterLoop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + w := NewSyncWaiterLoop() + w.Start(ctx, stopper) + + // Enqueue a waiter while the loop is running. + c := make(chan struct{}) + wg1 := make(chanSyncWaiter) + w.enqueue(ctx, wg1, func() { close(c) }) + + // Callback is not called before SyncWait completes. + select { + case <-c: + t.Fatal("callback unexpectedly called before SyncWait") + case <-time.After(5 * time.Millisecond): + } + + // Callback is called after SyncWait completes. + close(wg1) + <-c + + // Enqueue a waiter once the loop is stopped. Enqueuing should not block. + // NB: stopper.Stop waits for the waitLoop to exit. + stopper.Stop(ctx) + wg2 := make(chanSyncWaiter) + w.enqueue(ctx, wg2, func() { t.Fatalf("callback unexpectedly called") }) + + // Callback should not be called, even after SyncWait completes. + time.Sleep(5 * time.Millisecond) // give time to catch bugs + close(wg2) + time.Sleep(5 * time.Millisecond) // give time to catch bugs +} + +// chanSyncWaiter implements the syncWaiter interface. +type chanSyncWaiter chan struct{} + +func (c chanSyncWaiter) SyncWait() error { + <-c + return nil +} + +func (c chanSyncWaiter) Close() {} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 276b2841be88..73ab6aa6be7f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/redact" "github.com/kr/pretty" "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" ) const ( @@ -250,6 +251,18 @@ type Replica struct { decoder replicaDecoder } + // localMsgs contains a collection of raftpb.Message that target the local + // RawNode. They are to be delivered on the next iteration of handleRaftReady. + // + // Locking notes: + // - Replica.localMsgs must be held to append messages to active. + // - Replica.raftMu and Replica.localMsgs must both be held to switch slices. + // - Replica.raftMu < Replica.localMsgs + localMsgs struct { + syncutil.Mutex + active, recycled []raftpb.Message + } + // The last seen replica descriptors from incoming Raft messages. These are // stored so that the replica still knows the replica descriptors for itself // and for its message recipients in the circumstances when its RangeDescriptor diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 61fef0b4bd1d..03a4b45e493e 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -597,7 +597,11 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) { { var sync redact.SafeString if s.append.Sync { - sync = "-sync" + if s.append.NonBlocking { + sync = "-non-blocking-sync" + } else { + sync = "-sync" + } } p.Printf("raft ready handling: %.2fs [append=%.2fs, apply=%.2fs, commit-batch%s=%.2fs", dTotal.Seconds(), dAppend.Seconds(), dApply.Seconds(), sync, dPebble.Seconds()) @@ -612,6 +616,9 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) { ) if s.append.Sync { p.SafeString(" sync") + if s.append.NonBlocking { + p.SafeString("(non-blocking)") + } } p.SafeString(" [") @@ -693,7 +700,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } var hasReady bool - var rd raft.Ready + var softState *raft.SoftState + var outboundMsgs []raftpb.Message + var msgStorageAppend, msgStorageApply raftpb.Message r.mu.Lock() state := logstore.RaftState{ // used for append below LastIndex: r.mu.lastIndex, @@ -703,12 +712,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err } if hasReady = raftGroup.HasReady(); hasReady { - rd = raftGroup.Ready() + syncRd := raftGroup.Ready() + logRaftReady(ctx, syncRd) + asyncRd := makeAsyncReady(syncRd) + softState = asyncRd.SoftState + outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages) } // We unquiesce if we have a Ready (= there's work to do). We also have // to unquiesce if we just flushed some proposals but there isn't a @@ -725,7 +740,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0 return unquiesceAndWakeLeader, nil }) - r.mu.applyingEntries = len(rd.CommittedEntries) > 0 + r.mu.applyingEntries = hasMsg(msgStorageApply) pausedFollowers := r.mu.pausedFollowers r.mu.Unlock() if errors.Is(err, errRemoved) { @@ -747,10 +762,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, nil } - logRaftReady(ctx, rd) - refreshReason := noReason - if rd.SoftState != nil && leaderID != roachpb.ReplicaID(rd.SoftState.Lead) { + if softState != nil && leaderID != roachpb.ReplicaID(softState.Lead) { // Refresh pending commands if the Raft leader has changed. This is usually // the first indication we have of a new leader on a restarted node. // @@ -759,17 +772,68 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // indicating a newly elected leader or a conf change. Replay protection // prevents any corruption, so the waste is only a performance issue. if log.V(3) { - log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, rd.SoftState.Lead) + log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, softState.Lead) } if !r.store.TestingKnobs().DisableRefreshReasonNewLeader { refreshReason = reasonNewLeader } - leaderID = roachpb.ReplicaID(rd.SoftState.Lead) + leaderID = roachpb.ReplicaID(softState.Lead) } - if inSnap.Desc != nil { - if !raft.IsEmptySnap(rd.Snapshot) { - snapUUID, err := uuid.FromBytes(rd.Snapshot.Data) + r.traceMessageSends(outboundMsgs, "sending messages") + r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers) + + // If the ready struct includes entries that have been committed, these + // entries will be applied to the Replica's replicated state machine down + // below, after appending new entries to the raft log and sending messages + // to peers. However, the process of appending new entries to the raft log + // and then applying committed entries to the state machine can take some + // time - and these entries are already durably committed. If they have + // clients waiting on them, we'd like to acknowledge their success as soon + // as possible. To facilitate this, we take a quick pass over the committed + // entries and acknowledge as many as we can trivially prove will not be + // rejected beneath raft. + // + // Note that the CommittedEntries slice may contain entries that are also in + // the Entries slice (to be appended in this ready pass). This can happen when + // a follower is being caught up on committed commands. We could acknowledge + // these commands early even though they aren't durably in the local raft log + // yet (since they're committed via a quorum elsewhere), but we chose to be + // conservative and avoid it by passing the last Ready cycle's `lastIndex` for + // the maxIndex argument to AckCommittedEntriesBeforeApplication. + // + // TODO(nvanbenschoten): this is less important with async storage writes. + // Consider getting rid of it. + sm := r.getStateMachine() + dec := r.getDecoder() + var appTask apply.Task + if hasMsg(msgStorageApply) { + appTask = apply.MakeTask(sm, dec) + appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) + defer appTask.Close() + if err := appTask.Decode(ctx, msgStorageApply.Entries); err != nil { + return stats, err + } + if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication { + if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.LastIndex); err != nil { + return stats, err + } + } + } + + if hasMsg(msgStorageAppend) { + if msgStorageAppend.Snapshot != nil { + if inSnap.Desc == nil { + // If we didn't expect Raft to have a snapshot but it has one + // regardless, that is unexpected and indicates a programming + // error. + return stats, errors.AssertionFailedf( + "have inSnap=nil, but raft has a snapshot %s", + raft.DescribeSnapshot(*msgStorageAppend.Snapshot), + ) + } + + snapUUID, err := uuid.FromBytes(msgStorageAppend.Snapshot.Data) if err != nil { return stats, errors.Wrap(err, "invalid snapshot id") } @@ -780,6 +844,16 @@ func (r *Replica) handleRaftReadyRaftMuLocked( log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID) } + snap := *msgStorageAppend.Snapshot + hs := raftpb.HardState{ + Term: msgStorageAppend.Term, + Vote: msgStorageAppend.Vote, + Commit: msgStorageAppend.Commit, + } + if len(msgStorageAppend.Entries) != 0 { + log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot") + } + // Applying this snapshot may require us to subsume one or more of our right // neighbors. This occurs if this replica is informed about the merges via a // Raft snapshot instead of a MsgApp containing the merge commits, e.g., @@ -789,7 +863,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( defer releaseMergeLock() stats.tSnapBegin = timeutil.Now() - if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState, subsumedRepls); err != nil { + if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil { return stats, errors.Wrap(err, "while applying snapshot") } stats.tSnapEnd = timeutil.Now() @@ -816,129 +890,36 @@ func (r *Replica) handleRaftReadyRaftMuLocked( refreshReason == noReason { refreshReason = reasonSnapshotApplied } - } - } else if !raft.IsEmptySnap(rd.Snapshot) { - // If we didn't expect Raft to have a snapshot but it has one - // regardless, that is unexpected and indicates a programming - // error. - return stats, errors.AssertionFailedf( - "have inSnap=nil, but raft has a snapshot %s", - raft.DescribeSnapshot(rd.Snapshot), - ) - } - - // If the ready struct includes entries that have been committed, these - // entries will be applied to the Replica's replicated state machine down - // below, after appending new entries to the raft log and sending messages - // to peers. However, the process of appending new entries to the raft log - // and then applying committed entries to the state machine can take some - // time - and these entries are already durably committed. If they have - // clients waiting on them, we'd like to acknowledge their success as soon - // as possible. To facilitate this, we take a quick pass over the committed - // entries and acknowledge as many as we can trivially prove will not be - // rejected beneath raft. - // - // Note that the CommittedEntries slice may contain entries that are also in - // the Entries slice (to be appended in this ready pass). This can happen when - // a follower is being caught up on committed commands. We could acknowledge - // these commands early even though they aren't durably in the local raft log - // yet (since they're committed via a quorum elsewhere), but we chose to be - // conservative and avoid it by passing the last Ready cycle's `lastIndex` for - // the maxIndex argument to AckCommittedEntriesBeforeApplication. - sm := r.getStateMachine() - dec := r.getDecoder() - appTask := apply.MakeTask(sm, dec) - appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) - defer appTask.Close() - if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { - return stats, err - } - if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication { - if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.LastIndex); err != nil { - return stats, err - } - } - // Separate the MsgApp messages from all other Raft message types so that we - // can take advantage of the optimization discussed in the Raft thesis under - // the section: `10.2.1 Writing to the leader’s disk in parallel`. The - // optimization suggests that instead of a leader writing new log entries to - // disk before replicating them to its followers, the leader can instead - // write the entries to disk in parallel with replicating to its followers - // and them writing to their disks. - // - // Here, we invoke this optimization by: - // 1. sending all MsgApps. - // 2. syncing all entries and Raft state to disk. - // 3. sending all other messages. - // - // Since this is all handled in handleRaftReadyRaftMuLocked, we're assured - // that even though we may sync new entries to disk after sending them in - // MsgApps to followers, we'll always have them synced to disk before we - // process followers' MsgAppResps for the corresponding entries because - // Ready processing is sequential (and because a restart of the leader would - // prevent the MsgAppResp from being handled by it). This is important - // because it makes sure that the leader always has all of the entries in - // the log for its term, which is required in etcd/raft for technical - // reasons[1]. - // - // MsgApps are also used to inform followers of committed entries through - // the Commit index that they contain. Due to the optimization described - // above, a Commit index may be sent out to a follower before it is - // persisted on the leader. This is safe because the Commit index can be - // treated as volatile state, as is supported by raft.MustSync[2]. - // Additionally, the Commit index can never refer to entries from the - // current Ready (due to the MsgAppResp argument above) except in - // single-node groups, in which as a result we have to be careful to not - // persist a Commit index without the entries its commit index might refer - // to (see the HardState update below for details). - // - // [1]: the Raft thesis states that this can be made safe: - // - // > The leader may even commit an entry before it has been written to its - // > own disk, if a majority of followers have written it to their disks; - // > this is still safe. - // - // [2]: Raft thesis section: `3.8 Persisted state and server restarts`: - // - // > Other state variables are safe to lose on a restart, as they can all be - // > recreated. The most interesting example is the commit index, which can - // > safely be reinitialized to zero on a restart. - // - // Note that this will change when joint quorums are implemented, at which - // point we have to introduce coupling between the Commit index and - // persisted config changes, and also require some commit indexes to be - // durably synced. - // See: - // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 - - msgApps, otherMsgs := splitMsgApps(rd.Messages) - r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessages(ctx, msgApps, pausedFollowers) - - // TODO(pavelkalinnikov): find a way to move it to storeEntries. - if !raft.IsEmptyHardState(rd.HardState) { - if !r.IsInitialized() && rd.HardState.Commit != 0 { - log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) + // Send MsgStorageAppend's responses. + r.sendRaftMessages(ctx, msgStorageAppend.Responses, nil) + } else { + // TODO(pavelkalinnikov): find a way to move it to storeEntries. + if msgStorageAppend.Commit != 0 && !r.IsInitialized() { + log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s", r) + } + // TODO(pavelkalinnikov): construct and store this in Replica. + // TODO(pavelkalinnikov): fields like raftEntryCache are the same across all + // ranges, so can be passed to LogStore methods instead of being stored in it. + s := logstore.LogStore{ + RangeID: r.RangeID, + Engine: r.store.engine, + Sideload: r.raftMu.sideloaded, + StateLoader: r.raftMu.stateLoader.StateLoader, + SyncWaiter: r.store.syncWaiter, + EntryCache: r.store.raftEntryCache, + Settings: r.store.cfg.Settings, + Metrics: logstore.Metrics{ + RaftLogCommitLatency: r.store.metrics.RaftLogCommitLatency, + }, + } + m := logstore.MakeMsgStorageAppend(msgStorageAppend) + cb := (*replicaSyncCallback)(r) + if state, err = s.StoreEntries(ctx, state, m, cb, &stats.append); err != nil { + return stats, errors.Wrap(err, "while storing log entries") + } } } - // TODO(pavelkalinnikov): construct and store this in Replica. - // TODO(pavelkalinnikov): fields like raftEntryCache are the same across all - // ranges, so can be passed to LogStore methods instead of being stored in it. - s := logstore.LogStore{ - RangeID: r.RangeID, - Engine: r.store.engine, - Sideload: r.raftMu.sideloaded, - StateLoader: r.raftMu.stateLoader.StateLoader, - EntryCache: r.store.raftEntryCache, - Settings: r.store.cfg.Settings, - Metrics: logstore.Metrics{ - RaftLogCommitLatency: r.store.metrics.RaftLogCommitLatency, - }, - } - if state, err = s.StoreEntries(ctx, state, logstore.MakeReady(rd), &stats.append); err != nil { - return stats, errors.Wrap(err, "while storing log entries") - } // Update protected state - last index, last term, raft log size, and raft // leader ID. @@ -963,11 +944,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } - r.sendRaftMessages(ctx, otherMsgs, nil /* blocked */) - r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") - stats.tApplicationBegin = timeutil.Now() - if len(rd.CommittedEntries) > 0 { + if hasMsg(msgStorageApply) { + r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") + err := appTask.ApplyCommittedEntries(ctx) stats.apply = sm.moveStats() if err != nil { @@ -1002,11 +982,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked( refreshReason = reasonNewLeaderOrConfigChange } } + + // Send MsgStorageApply's responses. + r.sendRaftMessages(ctx, msgStorageApply.Responses, nil) } stats.tApplicationEnd = timeutil.Now() applicationElapsed := stats.tApplicationEnd.Sub(stats.tApplicationBegin).Nanoseconds() r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed) - r.store.metrics.RaftCommandsApplied.Inc(int64(len(rd.CommittedEntries))) + r.store.metrics.RaftCommandsApplied.Inc(int64(len(msgStorageApply.Entries))) if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady { refreshReason = reasonNewLeaderOrConfigChange } @@ -1023,7 +1006,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.Lock() err = r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - raftGroup.Advance(rd) + r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) + if stats.apply.numConfChangeEntries > 0 { // If the raft leader got removed, campaign the first remaining voter. // @@ -1063,18 +1047,71 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, nil } -// splitMsgApps splits the Raft message slice into two slices, one containing -// MsgApps and one containing all other message types. Each slice retains the -// relative ordering between messages in the original slice. -func splitMsgApps(msgs []raftpb.Message) (msgApps, otherMsgs []raftpb.Message) { - splitIdx := 0 - for i, msg := range msgs { - if msg.Type == raftpb.MsgApp { - msgs[i], msgs[splitIdx] = msgs[splitIdx], msgs[i] - splitIdx++ +// asyncReady encapsulates the messages that are ready to be sent to other peers +// or to be sent to local storage routines when async storage writes are enabled. +// All fields in asyncReady are read-only. +// TODO(nvanbenschoten): move this into go.etcd.io/raft. +type asyncReady struct { + // The current volatile state of a Node. + // SoftState will be nil if there is no update. + // It is not required to consume or store SoftState. + *raft.SoftState + + // ReadStates can be used for node to serve linearizable read requests locally + // when its applied index is greater than the index in ReadState. + // Note that the readState will be returned when raft receives msgReadIndex. + // The returned is only valid for the request that requested to read. + ReadStates []raft.ReadState + + // Messages specifies outbound messages to other peers and to local storage + // threads. These messages can be sent in any order. + // + // If it contains a MsgSnap message, the application MUST report back to raft + // when the snapshot has been received or has failed by calling ReportSnapshot. + Messages []raftpb.Message +} + +// makeAsyncReady constructs an asyncReady from the provided Ready. +func makeAsyncReady(rd raft.Ready) asyncReady { + return asyncReady{ + SoftState: rd.SoftState, + ReadStates: rd.ReadStates, + Messages: rd.Messages, + } +} + +// hasMsg returns whether the provided raftpb.Message is present. +// It serves as a poor man's Optional[raftpb.Message]. +func hasMsg(m raftpb.Message) bool { return m.Type != 0 } + +// splitLocalStorageMsgs filters out local storage messages from the provided +// message slice and returns them separately. +func splitLocalStorageMsgs( + msgs []raftpb.Message, +) (otherMsgs []raftpb.Message, msgStorageAppend, msgStorageApply raftpb.Message) { + for i := len(msgs) - 1; i >= 0; i-- { + switch msgs[i].Type { + case raftpb.MsgStorageAppend: + if hasMsg(msgStorageAppend) { + panic("two MsgStorageAppend") + } + msgStorageAppend = msgs[i] + case raftpb.MsgStorageApply: + if hasMsg(msgStorageApply) { + panic("two MsgStorageApply") + } + msgStorageApply = msgs[i] + default: + // Local storage messages will always be at the end of the messages slice, + // so we can terminate iteration as soon as we reach any other message + // type. This is leaking an implementation detail from etcd/raft which may + // not always hold, but while it does, we use it for convenience and + // assert against it changing in sendRaftMessages. + return msgs[:i+1], msgStorageAppend, msgStorageApply } } - return msgs[:splitIdx], msgs[splitIdx:] + // Only local storage messages. + return nil, msgStorageAppend, msgStorageApply } // maybeFatalOnRaftReadyErr will fatal if err is neither nil nor @@ -1398,87 +1435,148 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } +// replicaSyncCallback implements the logstore.SyncCallback interface. +type replicaSyncCallback Replica + +func (r *replicaSyncCallback) OnLogSync(ctx context.Context, msgs []raftpb.Message) { + // Send MsgStorageAppend's responses. + (*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */) +} + func (r *Replica) sendRaftMessages( ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{}, ) { var lastAppResp raftpb.Message for _, message := range messages { - _, drop := blocked[roachpb.ReplicaID(message.To)] - if drop { - r.store.Metrics().RaftPausedFollowerDroppedMsgs.Inc(1) - } - switch message.Type { - case raftpb.MsgApp: - if util.RaceEnabled { - // Iterate over the entries to assert that all sideloaded commands - // are already inlined. replicaRaftStorage.Entries already performs - // the sideload inlining for stable entries and raft.unstable always - // contain fat entries. Since these are the only two sources that - // raft.sendAppend gathers entries from to populate MsgApps, we - // should never see thin entries here. + switch message.To { + case raft.LocalAppendThread: + // To local append thread. + panic("unsupported, currently processed inline on raft scheduler goroutine") + case raft.LocalApplyThread: + // To local apply thread. + panic("unsupported, currently processed inline on raft scheduler goroutine") + case uint64(r.ReplicaID()): + // To local raft state machine. + r.sendLocalRaftMsg(message) + default: + _, drop := blocked[roachpb.ReplicaID(message.To)] + if drop { + r.store.Metrics().RaftPausedFollowerDroppedMsgs.Inc(1) + } + switch message.Type { + case raftpb.MsgApp: + if util.RaceEnabled { + // Iterate over the entries to assert that all sideloaded commands + // are already inlined. replicaRaftStorage.Entries already performs + // the sideload inlining for stable entries and raft.unstable always + // contain fat entries. Since these are the only two sources that + // raft.sendAppend gathers entries from to populate MsgApps, we + // should never see thin entries here. + // + // Also assert that the log term only ever increases (most of the + // time it stays constant, as term changes are rare), and that + // the index increases by exactly one with each entry. + // + // This assertion came out of #61990. + prevTerm := message.LogTerm // term of entry preceding the append + prevIndex := message.Index // index of entry preceding the append + for j := range message.Entries { + ent := &message.Entries[j] + logstore.AssertSideloadedRaftCommandInlined(ctx, ent) + + if prevIndex+1 != ent.Index { + log.Fatalf(ctx, + "index gap in outgoing MsgApp: idx %d followed by %d", + prevIndex, ent.Index, + ) + } + prevIndex = ent.Index + if prevTerm > ent.Term { + log.Fatalf(ctx, + "term regression in outgoing MsgApp: idx %d at term=%d "+ + "appended with logterm=%d", + ent.Index, ent.Term, message.LogTerm, + ) + } + prevTerm = ent.Term + } + } + + case raftpb.MsgAppResp: + // A successful (non-reject) MsgAppResp contains one piece of + // information: the highest log index. Raft currently queues up + // one MsgAppResp per incoming MsgApp, and we may process + // multiple messages in one handleRaftReady call (because + // multiple messages may arrive while we're blocked syncing to + // disk). If we get redundant MsgAppResps, drop all but the + // last (we've seen that too many MsgAppResps can overflow + // message queues on the receiving side). // - // Also assert that the log term only ever increases (most of the - // time it stays constant, as term changes are rare), and that - // the index increases by exactly one with each entry. + // Note that this reorders the chosen MsgAppResp relative to + // other messages (including any MsgAppResps with the Reject flag), + // but raft is fine with this reordering. // - // This assertion came out of #61990. - prevTerm := message.LogTerm // term of entry preceding the append - prevIndex := message.Index // index of entry preceding the append - for j := range message.Entries { - ent := &message.Entries[j] - logstore.AssertSideloadedRaftCommandInlined(ctx, ent) - - if prevIndex+1 != ent.Index { - log.Fatalf(ctx, - "index gap in outgoing MsgApp: idx %d followed by %d", - prevIndex, ent.Index, - ) - } - prevIndex = ent.Index - if prevTerm > ent.Term { - log.Fatalf(ctx, - "term regression in outgoing MsgApp: idx %d at term=%d "+ - "appended with logterm=%d", - ent.Index, ent.Term, message.LogTerm, - ) - } - prevTerm = ent.Term + // TODO(bdarnell): Consider pushing this optimization into etcd/raft. + // Similar optimizations may be possible for other message types, + // although MsgAppResp is the only one that has been seen as a + // problem in practice. + if !message.Reject && message.Index > lastAppResp.Index { + lastAppResp = message + drop = true } } - case raftpb.MsgAppResp: - // A successful (non-reject) MsgAppResp contains one piece of - // information: the highest log index. Raft currently queues up - // one MsgAppResp per incoming MsgApp, and we may process - // multiple messages in one handleRaftReady call (because - // multiple messages may arrive while we're blocked syncing to - // disk). If we get redundant MsgAppResps, drop all but the - // last (we've seen that too many MsgAppResps can overflow - // message queues on the receiving side). - // - // Note that this reorders the chosen MsgAppResp relative to - // other messages (including any MsgAppResps with the Reject flag), - // but raft is fine with this reordering. - // - // TODO(bdarnell): Consider pushing this optimization into etcd/raft. - // Similar optimizations may be possible for other message types, - // although MsgAppResp is the only one that has been seen as a - // problem in practice. - if !message.Reject && message.Index > lastAppResp.Index { - lastAppResp = message - drop = true + if !drop { + r.sendRaftMessage(ctx, message) } } - - if !drop { - r.sendRaftMessage(ctx, message) - } } if lastAppResp.Index > 0 { r.sendRaftMessage(ctx, lastAppResp) } } +// sendLocalRaftMsg sends a message to the local raft state machine. +func (r *Replica) sendLocalRaftMsg(msg raftpb.Message) { + if msg.To != uint64(r.ReplicaID()) { + panic("incorrect message target") + } + r.localMsgs.Lock() + wasEmpty := len(r.localMsgs.active) == 0 + r.localMsgs.active = append(r.localMsgs.active, msg) + r.localMsgs.Unlock() + if wasEmpty { + r.store.enqueueRaftUpdateCheck(r.RangeID) + } +} + +// deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked delivers local messages to +// the provided raw node. +func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( + ctx context.Context, raftGroup *raft.RawNode, +) { + r.raftMu.AssertHeld() + r.mu.AssertHeld() + r.localMsgs.Lock() + localMsgs := r.localMsgs.active + r.localMsgs.active, r.localMsgs.recycled = r.localMsgs.recycled, r.localMsgs.active[:0] + // Don't recycle large slices. + if cap(r.localMsgs.recycled) > 16 { + r.localMsgs.recycled = nil + } + r.localMsgs.Unlock() + + for i, m := range localMsgs { + if err := raftGroup.Step(m); err != nil { + log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", + raftDescribeMessage(m, raftEntryFormatter), err) + } + // NB: we can reset messages in the r.localMsgs.recycled slice without holding the + // localMsgs mutex because we're holding raftMu and + localMsgs[i].Reset() // for GC + } +} + // sendRaftMessage sends a Raft message. // // When calling this method, the raftMu may be held, but it does not need to be. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 3fdae36f5b25..6564793baacc 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -9882,99 +9882,6 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { } } -func TestSplitMsgApps(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - msgApp := func(idx uint64) raftpb.Message { - return raftpb.Message{Index: idx, Type: raftpb.MsgApp} - } - otherMsg := func(idx uint64) raftpb.Message { - return raftpb.Message{Index: idx, Type: raftpb.MsgVote} - } - formatMsgs := func(msgs []raftpb.Message) string { - strs := make([]string, len(msgs)) - for i, msg := range msgs { - strs[i] = fmt.Sprintf("{%s:%d}", msg.Type, msg.Index) - } - return fmt.Sprint(strs) - } - - testCases := []struct { - msgsIn, msgAppsOut, otherMsgsOut []raftpb.Message - }{ - // No msgs. - { - msgsIn: []raftpb.Message{}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{}, - }, - // Only msgApps. - { - msgsIn: []raftpb.Message{msgApp(1)}, - msgAppsOut: []raftpb.Message{msgApp(1)}, - otherMsgsOut: []raftpb.Message{}, - }, - { - msgsIn: []raftpb.Message{msgApp(1), msgApp(2)}, - msgAppsOut: []raftpb.Message{msgApp(1), msgApp(2)}, - otherMsgsOut: []raftpb.Message{}, - }, - { - msgsIn: []raftpb.Message{msgApp(2), msgApp(1)}, - msgAppsOut: []raftpb.Message{msgApp(2), msgApp(1)}, - otherMsgsOut: []raftpb.Message{}, - }, - // Only otherMsgs. - { - msgsIn: []raftpb.Message{otherMsg(1)}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{otherMsg(1)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(1), otherMsg(2)}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(2)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(2), otherMsg(1)}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{otherMsg(2), otherMsg(1)}, - }, - // Mixed msgApps and otherMsgs. - { - msgsIn: []raftpb.Message{msgApp(1), otherMsg(2)}, - msgAppsOut: []raftpb.Message{msgApp(1)}, - otherMsgsOut: []raftpb.Message{otherMsg(2)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(1), msgApp(2)}, - msgAppsOut: []raftpb.Message{msgApp(2)}, - otherMsgsOut: []raftpb.Message{otherMsg(1)}, - }, - { - msgsIn: []raftpb.Message{msgApp(1), otherMsg(2), msgApp(3)}, - msgAppsOut: []raftpb.Message{msgApp(1), msgApp(3)}, - otherMsgsOut: []raftpb.Message{otherMsg(2)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(1), msgApp(2), otherMsg(3)}, - msgAppsOut: []raftpb.Message{msgApp(2)}, - otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(3)}, - }, - } - for _, c := range testCases { - inStr := formatMsgs(c.msgsIn) - t.Run(inStr, func(t *testing.T) { - msgAppsRes, otherMsgsRes := splitMsgApps(c.msgsIn) - if !reflect.DeepEqual(msgAppsRes, c.msgAppsOut) || !reflect.DeepEqual(otherMsgsRes, c.otherMsgsOut) { - t.Errorf("expected splitMsgApps(%s)=%s/%s, found %s/%s", inStr, formatMsgs(c.msgAppsOut), - formatMsgs(c.otherMsgsOut), formatMsgs(msgAppsRes), formatMsgs(otherMsgsRes)) - } - }) - } -} - type testQuiescer struct { desc roachpb.RangeDescriptor numProposals int diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 604ba7f98921..e12661d1a236 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -286,6 +286,7 @@ func newRaftConfig( return &raft.Config{ ID: id, Applied: appliedIndex, + AsyncStorageWrites: true, ElectionTick: storeCfg.RaftElectionTimeoutTicks, HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, MaxUncommittedEntriesSize: storeCfg.RaftMaxUncommittedEntriesSize, @@ -769,6 +770,7 @@ type Store struct { metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager + syncWaiter *logstore.SyncWaiterLoop raftEntryCache *raftentry.Cache limiters batcheval.Limiters txnWaitMetrics *txnwait.Metrics @@ -1291,6 +1293,8 @@ func NewStore( s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency, cfg.RaftElectionTimeoutTicks) + s.syncWaiter = logstore.NewSyncWaiterLoop() + s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 60d543d71685..90c93262c610 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -724,6 +724,8 @@ func (s *Store) processRaft(ctx context.Context) { s.cfg.Transport.Stop(s.StoreID()) })) + s.syncWaiter.Start(ctx, s.stopper) + // We'll want to cancel all in-flight proposals. Proposals embed tracing // spans in them, and we don't want to be leaking any. s.stopper.AddCloser(stop.CloserFn(func() {