-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support AsyncStorageWrites in new asyncStorage pipeline impl #3
Open
nvanbenschoten
wants to merge
3
commits into
master
Choose a base branch
from
asyncRaftLogMsg
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- Clear asyncStorage responses channel on Reset - Remove gRPC recieve message size limit (drops messages silently!) - Adress non-convergent Raft leadership acquisition - Avoid unexpected Raft leadership movement - Increase Raft entry cache size
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Oct 26, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Psuedo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) send the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order.Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toApply <- m case raft.LocalApplyThread: toAppend <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying then (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar different for apply batch sizes. There is more benchmark to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Oct 27, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Oct 27, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Oct 27, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Nov 2, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Nov 2, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/etcd
that referenced
this pull request
Dec 12, 2022
Fixes etcd-io#12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten
added a commit
to nvanbenschoten/raft
that referenced
this pull request
Dec 21, 2022
Fixes #12257. This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop. A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default `Ready`/`Advance` function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to `Ready` iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput. When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new `MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a `LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order. `MsgStorageAppend` carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a `MsgStorageAppend` are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready. `MsgStorageApply` carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready. Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write. \## Design Considerations - There must be no regression for existing users that do not enable `AsyncStorageWrites`. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp. - Asynchronous storage work should use a message passing interface, like the rest of this library. - The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application. - The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop. - The `unstable` log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache. - Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees. - Code should be maximally unified across `AsyncStorageWrites=false` and `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of `AsyncStorageWrites=true` where the library hides the possibility of asynchrony. - It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains a snapshot to the `LocalAppendThread` to be applied. \## Usage When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice. The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed. With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this: ```go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): for _, m := range rd.Messages { switch m.To { case raft.LocalAppendThread: toAppend <- m case raft.LocalApplyThread: toApply <-m default: sendOverNetwork(m) } } case <-s.done: return } } ``` Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like: ```go // append thread go func() { for { select { case m := <-toAppend: saveToStorage(m.State, m.Entries, m.Snapshot) send(m.Responses) case <-s.done: return } } } // apply thread go func() { for { select { case m := <-toApply: for _, entry := range m.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } send(m.Responses) case <-s.done: return } } } ``` \## Compatibility The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set `AsyncStorageWrites` to true in the `Config` struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, `Node.Advance` has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change. The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes. \## Performance The bulk of the performance evaluation of this functionality thus far has been done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations. To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine (`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations. The three pipeline implementations we compared were: - **basic** (P1): baseline stock raft usage, similar to the code in `doc.go` - **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69) for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see [commit](cockroachdb/cockroach@87aaea7) for explanation). - **async append + async apply + early ack** (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread. All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). ![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg) The comparison demonstrates two different benefits of asynchronous storage writes. The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a reduction in average latency of 28% from the optimized pipeline that does not use asynchronous storage writes. This matches expectations outlined in cockroachdb/cockroach#17500. The second is that it increases the maximum throughput at saturation. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes. There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes. Signed-off-by: Nathan VanBenschoten <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
pipeline/async_storage.go
(link) is the most interesting file to look at.