Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: support asynchronous storage writes #8

Merged
merged 13 commits into from
Dec 21, 2022

Conversation

nvanbenschoten
Copy link
Contributor

Fixes etcd-io/etcd#12257.

NOTE: this PR was originally presented in etcd-io/etcd#14627 before etcd/raft was extracted into a separate repository (see etcd-io/etcd#14713). Review comments from the original PR have been addressed.

This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop.

Summary

A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default Ready/Advance function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to Ready iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the Ready.Message slice will include new MsgStorageAppend and MsgStorageApply messages. The messages will target a LocalAppendThread and a LocalApplyThread, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order.

MsgStorageAppend carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a MsgStorageAppend are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready.

MsgStorageApply carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write.

Design Considerations

  • There must be no regression for existing users that do not enable AsyncStorageWrites. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp.
  • Asynchronous storage work should use a message passing interface, like the rest of this library.
  • The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application.
  • The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop.
  • The unstable log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache.
  • Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees.
  • Code should be maximally unified across AsyncStorageWrites=false and AsyncStorageWrites=true. AsyncStorageWrites=false should be a special case of AsyncStorageWrites=true where the library hides the possibility of asynchrony.
  • It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the MsgStorageAppend that contains a snapshot to the LocalAppendThread to be applied.

Usage

When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this:

for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}

Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like:

// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

Compatibility

The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set AsyncStorageWrites to true in the Config struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, Node.Advance has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes.

Performance

The bulk of the performance evaluation of this functionality thus far has been done with rafttoy, a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations.

To evaluate this change, we fixed the raft log (etcd/wal), storage engine (pebble), and network transport (grpc). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:

  • basic (P1): baseline stock raft usage, similar to the code in doc.go
  • parallel append + early ack (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see commit for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see commit for explanation).
  • async append + async apply + early ack (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). The testing used an open-loop workload to increase the rate of new raft proposals until a saturation point was reached.

Throughput vs latency of Raft proposal pipeline implementations

The comparison demonstrates two different benefits of asynchronous storage writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.2ms. This is a reduction in average latency of 29% from the optimized pipeline that does not use asynchronous storage writes. This matches the expectations outlined in cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. In this test, P1 and P2 topped out at 30MB/s, while P3 could push up to 52MB/s, an increase in maximum throughput of 73%. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes.

An additional performance comparison was presented in etcd-io/etcd#14627 (comment).

cc. @tbg @bdarnell

This commit introduces an intermediate state that delays the
acknowledgement of a node's self-vote during an election until
that vote has been durably persisted (i.e. on the next call to
Advance). This change can be viewed as the election counterpart
to #14413.

This is an intermediate state that limits code movement for the
rest of the async storage writes change.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit adds a mechanism to the unstable struct to track "in-progress"
log writes that are not yet stable. In-progress writes are still tracked
in the unstable struct, which is necessary because they are not yet
guaranteed to be present in `Storage` implementations. However, they are
not included in the Entries field of future Ready structs, which avoids
redundant raft log writes.

The commit also does the same thing for the optional unstable snapshots.

For now, entries and snapshots are immediately considered stable by
`raft.advance`. A future commit will make it possible to accept multiple
Ready structs without immediately stabilizing entries and snapshots from
earlier Ready structs.

This all works towards async Raft log writes, where log writes are
decoupled from Ready iterations.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit adds a mechanism to the raft log struct to track in progress
log application that has not yet completed.

For now, committed entries are immediately considered applied by
`raft.advance`. A future commit will make it possible to accept multiple
Ready structs without immediately applying committed entries from earlier
Ready structs.

This all works towards async Raft log writes, where log application is
decoupled from Ready iterations.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This allows callers to configure whether they want to allow entries that
are not already in stable storage to be returned from the method. This
will be used in a future commit.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit adds new proto fields and message types for the upcoming
async storage writes functionality. These proto changes are not yet
used.

Signed-off-by: Nathan VanBenschoten <[email protected]>
@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/asyncRaftLogMsg branch from a7c1bb0 to b2158f0 Compare December 13, 2022 02:42
@tbg tbg self-requested a review December 16, 2022 11:36
raft.go Outdated Show resolved Hide resolved
raftpb/raft.proto Outdated Show resolved Hide resolved
rawnode.go Show resolved Hide resolved
raftpb/raft.proto Show resolved Hide resolved
doc.go Outdated Show resolved Hide resolved
log.go Outdated Show resolved Hide resolved
log.go Show resolved Hide resolved
@@ -194,14 +194,22 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool {
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

func needStorageAppend(rd Ready, haveMsgsAfterAppend bool) bool {
func needStorageAppendMsg(r *raft, rd Ready) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the right place for this comment but I didn't want to post it in the main thread (no "Resolve" feature there):

I wonder if there's a simple UX improvement we could make to ready handling when using async writes. Right now, there are a bunch of fields that are unused in that case:

  • MustSync
  • CommittedEntries
  • Entries
  • Snapshot
  • any others?

What about a helper method

func AsyncStorageWritesReady(rd Ready) AsyncReady {
  // ...
}

that basically returns only the things you need to care about as a user of the async interface? This would also provide a convenient place to anchor documentation without having to interleave the two different usage patterns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea. Let me try it out and see how it looks.

log.go Outdated Show resolved Hide resolved
@tbg
Copy link
Collaborator

tbg commented Dec 21, 2022

Getting close! No substantial comments.
I'd classify my comment about an AsyncReady struct is semi-substantial. I wouldn't want it to necessarily block the merge here, but I think there is a significant improvement to be made here in how accessible this new functionality it is to other adopters. So I consider it important that we discuss it and follow it to its conclusion in a timely manner.

Fixes #12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit makes it more clear that the asyncStorageWrites handling is
entirely local to RawNode and that the raft object always operates in
"async storage" mode.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit adds a new data-driven test the reproduces a scenario
similar to the one described in newStorageAppendRespMsg, exercising a
few interesting interactions between asynchronous storage writes, term
changes, and log truncation.

Signed-off-by: Nathan VanBenschoten <[email protected]>
Pure code movement.

Eliminates asyncStorageWrites handling in node.go.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit removes certain cases where `MsgStorageAppendResp` messages
were attached as responses to a `MsgStorageAppend` message, even when
the response contained no useful information. The most common case where
this comes up is when the HardState changes but no new entries are
appended to the log.

Avoiding the response in these cases eliminates useless work.

Additionally, if the HardState does not include a new vote and only
includes a new Commit then there will be no response messages on the on
`MsgStorageAppend`. Users of this library can use this condition to
determine when an fsync is not necessary, similar to how it used to use
the `Ready.MustSync` flag.

Signed-off-by: Nathan VanBenschoten <[email protected]>
This avoids a call to stable `Storage`. It turns a regression in firstIndex/op
from 2 to 3 (or 5 to 7) into an improvement from 2 to 1 (or 5 to 3).

```
name                     old firstIndex/op  new firstIndex/op  delta
RawNode/single-voter-10          3.00 ± 0%          1.00 ± 0%  -66.67%  (p=0.000 n=10+10)
RawNode/two-voters-10            7.00 ± 0%          3.00 ± 0%  -57.14%  (p=0.000 n=10+10)
```

Signed-off-by: Nathan VanBenschoten <[email protected]>
This commit fixes the interactions between commit entry pagination and async
storage writes. The pagination now properly applies across multiple Ready
structs, acting as a limit on outstanding committed entries that have yet to be
acked through a MsgStorageApplyResp message.

The commit also resolves an abuse of the LogTerm field in MsgStorageApply{Resp}.

Signed-off-by: Nathan VanBenschoten <[email protected]>
@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/asyncRaftLogMsg branch 2 times, most recently from 71d9161 to 280607a Compare December 21, 2022 17:02
This commit replaces the HardState field in Message with a Vote. For
MsgStorageAppends, the term, vote, and commit fields will either all be
set (to facilitate the construction of a HardState) if any of the fields
have changed or will all be unset if none of the fields have changed.

Signed-off-by: Nathan VanBenschoten <[email protected]>
@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/asyncRaftLogMsg branch from 280607a to 09c91d8 Compare December 21, 2022 17:07
@tbg
Copy link
Collaborator

tbg commented Dec 21, 2022

Can't review the diffs (no fixups?) but I looked at the one last new commit, still LGTM.

You may want to merge this and pursue #8 (comment) separately.

@tbg
Copy link
Collaborator

tbg commented Dec 21, 2022

Merging with @nvanbenschoten's consent.

@tbg tbg merged commit 65a0bf3 into etcd-io:main Dec 21, 2022
adityamaru pushed a commit to adityamaru/cockroach that referenced this pull request Dec 22, 2022
This picks up etcd-io/raft#8.

However, it doesn't enabled the `AsyncStorageWrites` configuration, so we do
not expect any impact from the change. Let's hope there are no surprises.

Release note: None
Epic: CRDB-6037
@nvanbenschoten nvanbenschoten deleted the nvanbenschoten/asyncRaftLogMsg branch December 22, 2022 04:54
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this pull request Jan 6, 2023
Fixes cockroachdb#17500.
Waiting on github.com/cockroachdb/pebble/pull/2117.

This commit integrates with the `AsyncStorageWrites` functionality that
we added to Raft in github.com/etcd-io/raft/pull/8.

\## Approach

The commit makes the minimal changes needed to integrate with async
storage writes and pull fsyncs out of the raft state machine loop. It
does not make an effort to extract the non-durable portion of raft log
writes or raft log application onto separate goroutine pools, as was
described in cockroachdb#17500. Those changes will also be impactful, but they're
non trivial and bump into a pipelining vs. batching trade-off, so they
are left as future work items (TODO(nvanbenschoten): open new issues).

With this change, asynchronous Raft log syncs are enabled by the new
`DB.ApplyNoSyncWait` Pebble API introduced in github.com/cockroachdb/pebble/pull/2117.
The `handleRaftReady` state machine loop continues to initiate Raft log
writes, but it uses the Pebble API to offload waiting on durability to a
separate goroutine. This separate goroutine then sends the corresponding
`MsgStorageAppend`'s response messages where they need to go (locally
and/or to the Raft leader) when the fsync completes. The async storage
writes functionality in Raft makes this all safe.

\## Benchmark Results

The result of this change is reduced interference between Raft
proposals. As a result, it reduces end-to-end commit latency.

github.com/etcd-io/raft/pull/8 presented a collection of benchmark
results captured from integrating async storage writes with rafttoy.

When integrated into CockroachDB, we see similar improvements to average
and tail latency. However, it doesn't provide the throughput
improvements at the top end because log appends and state machine
application have not yet been extracted into separate goroutine pools,
which would facilitate increased opportunity for batching.

TODO: add images

----

Release note (performance improvement): The Raft proposal pipeline
has been optimized to reduce interference between Raft proposals.
This improves average and tail write latency at high concurrency.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this pull request Jan 31, 2023
Fixes cockroachdb#17500.
Waiting on github.com/cockroachdb/pebble/pull/2117.

This commit integrates with the `AsyncStorageWrites` functionality that
we added to Raft in github.com/etcd-io/raft/pull/8.

\## Approach

The commit makes the minimal changes needed to integrate with async
storage writes and pull fsyncs out of the raft state machine loop. It
does not make an effort to extract the non-durable portion of raft log
writes or raft log application onto separate goroutine pools, as was
described in cockroachdb#17500. Those changes will also be impactful, but they're
non trivial and bump into a pipelining vs. batching trade-off, so they
are left as future work items (TODO(nvanbenschoten): open new issues).

With this change, asynchronous Raft log syncs are enabled by the new
`DB.ApplyNoSyncWait` Pebble API introduced in github.com/cockroachdb/pebble/pull/2117.
The `handleRaftReady` state machine loop continues to initiate Raft log
writes, but it uses the Pebble API to offload waiting on durability to a
separate goroutine. This separate goroutine then sends the corresponding
`MsgStorageAppend`'s response messages where they need to go (locally
and/or to the Raft leader) when the fsync completes. The async storage
writes functionality in Raft makes this all safe.

\## Benchmark Results

The result of this change is reduced interference between Raft
proposals. As a result, it reduces end-to-end commit latency.

github.com/etcd-io/raft/pull/8 presented a collection of benchmark
results captured from integrating async storage writes with rafttoy.

When integrated into CockroachDB, we see similar improvements to average
and tail latency. However, it doesn't provide the throughput
improvements at the top end because log appends and state machine
application have not yet been extracted into separate goroutine pools,
which would facilitate increased opportunity for batching.

TODO: add images

----

Release note (performance improvement): The Raft proposal pipeline
has been optimized to reduce interference between Raft proposals.
This improves average and tail write latency at high concurrency.
nvanbenschoten added a commit to nvanbenschoten/cockroach that referenced this pull request Feb 2, 2023
Fixes cockroachdb#17500.
Waiting on github.com/cockroachdb/pebble/pull/2117.

This commit integrates with the `AsyncStorageWrites` functionality that
we added to Raft in github.com/etcd-io/raft/pull/8.

\## Approach

The commit makes the minimal changes needed to integrate with async
storage writes and pull fsyncs out of the raft state machine loop. It
does not make an effort to extract the non-durable portion of raft log
writes or raft log application onto separate goroutine pools, as was
described in cockroachdb#17500. Those changes will also be impactful, but they're
non trivial and bump into a pipelining vs. batching trade-off, so they
are left as future work items (TODO(nvanbenschoten): open new issues).

With this change, asynchronous Raft log syncs are enabled by the new
`DB.ApplyNoSyncWait` Pebble API introduced in github.com/cockroachdb/pebble/pull/2117.
The `handleRaftReady` state machine loop continues to initiate Raft log
writes, but it uses the Pebble API to offload waiting on durability to a
separate goroutine. This separate goroutine then sends the corresponding
`MsgStorageAppend`'s response messages where they need to go (locally
and/or to the Raft leader) when the fsync completes. The async storage
writes functionality in Raft makes this all safe.

\## Benchmark Results

The result of this change is reduced interference between Raft
proposals. As a result, it reduces end-to-end commit latency.

github.com/etcd-io/raft/pull/8 presented a collection of benchmark
results captured from integrating async storage writes with rafttoy.

When integrated into CockroachDB, we see similar improvements to average
and tail latency. However, it doesn't provide the throughput
improvements at the top end because log appends and state machine
application have not yet been extracted into separate goroutine pools,
which would facilitate increased opportunity for batching.

TODO: add images

----

Release note (performance improvement): The Raft proposal pipeline
has been optimized to reduce interference between Raft proposals.
This improves average and tail write latency at high concurrency.
craig bot pushed a commit to cockroachdb/cockroach that referenced this pull request Feb 3, 2023
94165: kv: integrate raft async storage writes r=nvanbenschoten a=nvanbenschoten

Fixes #17500.
Epic: CRDB-22644

This commit integrates with the `AsyncStorageWrites` functionality that we added to Raft in etcd-io/raft/pull/8. 

## Approach

The commit makes the minimal changes needed to integrate with async storage writes and pull fsyncs out of the raft state machine loop. It does not make an effort to extract the non-durable portion of raft log writes or raft log application onto separate goroutine pools, as was described in #17500. Those changes will also be impactful, but they're non trivial and bump into a pipelining vs. batching trade-off, so they are left as future work items. See #94853 and #94854.

With this change, asynchronous Raft log syncs are enabled by the new `DB.ApplyNoSyncWait` Pebble API introduced in cockroachdb/pebble/pull/2117.  The `handleRaftReady` state machine loop continues to initiate Raft log writes, but it uses the Pebble API to offload waiting on durability to a separate goroutine. This separate goroutine then sends the corresponding `MsgStorageAppend`'s response messages where they need to go (locally and/or to the Raft leader) when the fsync completes. The async storage writes functionality in Raft makes this all safe.

## Benchmark Results

The result of this change is reduced interference between Raft proposals. As a result, it reduces end-to-end commit latency.

etcd-io/raft/pull/8 presented a collection of benchmark results captured from integrating async storage writes with rafttoy.

When integrated into CockroachDB, we see similar improvements to average and tail latency. However, it doesn't provide the throughput improvements at the top end because log appends and state machine application have not yet been extracted into separate goroutine pools, which would facilitate an increased opportunity for batching.

To visualize the impact on latency, consider the following test. The experiment uses a 3-node GCP cluster with n2-standard-32 instances spread across three availability zones. It runs kv0 (write-only) against the cluster with 64-byte values. It then ramps up concurrency to compare throughput vs. average and tail latency.

_NOTE: log scales on x and y axes_

![Throughput vs  average latency of write-only workload](https://user-images.githubusercontent.com/5438456/209210719-bec842f6-1093-48cd-8be7-05a3d79c2a71.svg)

![Throughput vs  tail latency of write-only workload](https://user-images.githubusercontent.com/5438456/209210777-670a4d25-9516-41a2-b7e7-86b402004536.svg)

Async storage writes impacts latency by different amounts at different throughputs, ranging from an improvement of 20% to 40% when the system is "well utilized". However, it increases latency by 5% to 10% when the system is over-saturated and CPU bound, presumably because of the extra goroutine handoff to the log append fsync callback, which will be impacted by elevated goroutine scheduling latency.

| Throughput (B/s) | Throughput (qps) | Avg. Latency Δ | p99 Latency Δ |
| ---------------- | ---------------- | -------------- | ------------- |
| 63  KB/s         | 1,000            | -10.5%         | -8.8%         |
| 125 KB/s         | 2,000            | -7.1%          | -10.4%        |
| 250 KB/s         | 4,000            | -20%           | -11.2%        |
| 500 KB/s         | 8,000            | -16.6%         | -25.3%        |
| 1 MB/s           | 16,000           | -30.8%         | -44.0%        |
| 2 MB/s           | 32,000           | -38.2%         | -30.9%        |
| 4 MB/s           | 64,000           | 5.9%           | 9.4%          |

### Other benchmark results
```bash
name                   old ops/s    new ops/s    delta
# 50% read, 50% update
ycsb/A/nodes=3          16.0k ± 5%   16.2k ± 4%     ~     (p=0.353 n=10+10)
ycsb/A/nodes=3/cpu=32   28.7k ± 5%   33.8k ± 2%  +17.57%  (p=0.000 n=9+9)
# 95% read, 5% update
ycsb/B/nodes=3          29.9k ± 3%   30.2k ± 3%     ~     (p=0.278 n=9+10)
ycsb/B/nodes=3/cpu=32    101k ± 1%    100k ± 3%     ~     (p=0.274 n=8+10)
# 100% read
ycsb/C/nodes=3          40.4k ± 3%   40.0k ± 3%     ~     (p=0.190 n=10+10)
ycsb/C/nodes=3/cpu=32    135k ± 1%    137k ± 1%   +0.87%  (p=0.011 n=9+9)
# 95% read, 5% insert
ycsb/D/nodes=3          33.6k ± 3%   33.8k ± 3%     ~     (p=0.315 n=10+10)
ycsb/D/nodes=3/cpu=32    108k ± 1%    106k ± 6%     ~     (p=0.739 n=10+10)
# 95% scan, 5% insert
ycsb/E/nodes=3          3.79k ± 1%   3.73k ± 1%   -1.42%  (p=0.000 n=9+9)
ycsb/E/nodes=3/cpu=32   6.31k ± 5%   6.48k ± 6%     ~     (p=0.123 n=10+10)
# 50% read, 50% read-modify-write
ycsb/F/nodes=3          7.68k ± 2%   7.99k ± 2%   +4.11%  (p=0.000 n=10+10)
ycsb/F/nodes=3/cpu=32   15.6k ± 4%   18.1k ± 3%  +16.14%  (p=0.000 n=8+10)

name                   old avg(ms)  new avg(ms)  delta
ycsb/A/nodes=3           6.01 ± 5%    5.95 ± 4%     ~     (p=0.460 n=10+10)
ycsb/A/nodes=3/cpu=32    5.01 ± 4%    4.25 ± 4%  -15.19%  (p=0.000 n=9+10)
ycsb/B/nodes=3           4.80 ± 0%    4.77 ± 4%     ~     (p=0.586 n=7+10)
ycsb/B/nodes=3/cpu=32    1.90 ± 0%    1.90 ± 0%     ~     (all equal)
ycsb/C/nodes=3           3.56 ± 2%    3.61 ± 3%     ~     (p=0.180 n=10+10)
ycsb/C/nodes=3/cpu=32    1.40 ± 0%    1.40 ± 0%     ~     (all equal)
ycsb/D/nodes=3           2.87 ± 2%    2.85 ± 2%     ~     (p=0.650 n=10+10)
ycsb/D/nodes=3/cpu=32    1.30 ± 0%    1.34 ± 4%     ~     (p=0.087 n=10+10)
ycsb/E/nodes=3           25.3 ± 0%    25.7 ± 1%   +1.38%  (p=0.000 n=8+8)
ycsb/E/nodes=3/cpu=32    22.9 ± 5%    22.2 ± 6%     ~     (p=0.109 n=10+10)
ycsb/F/nodes=3           12.5 ± 2%    12.0 ± 1%   -3.72%  (p=0.000 n=10+9)
ycsb/F/nodes=3/cpu=32    9.27 ± 4%    7.98 ± 3%  -13.96%  (p=0.000 n=8+10)

name                   old p99(ms)  new p99(ms)  delta
ycsb/A/nodes=3           45.7 ±15%    35.7 ± 6%  -21.90%  (p=0.000 n=10+8)
ycsb/A/nodes=3/cpu=32    67.6 ±13%    55.3 ± 5%  -18.10%  (p=0.000 n=9+10)
ycsb/B/nodes=3           30.5 ±24%    29.4 ± 7%     ~     (p=0.589 n=10+10)
ycsb/B/nodes=3/cpu=32    12.8 ± 2%    13.3 ± 7%     ~     (p=0.052 n=10+8)
ycsb/C/nodes=3           14.0 ± 3%    14.2 ± 0%     ~     (p=0.294 n=10+8)
ycsb/C/nodes=3/cpu=32    5.80 ± 0%    5.70 ± 5%     ~     (p=0.233 n=7+10)
ycsb/D/nodes=3           12.4 ± 2%    11.7 ± 3%   -5.32%  (p=0.001 n=10+10)
ycsb/D/nodes=3/cpu=32    6.30 ± 0%    5.96 ± 6%   -5.40%  (p=0.001 n=10+10)
ycsb/E/nodes=3           81.0 ± 4%    83.9 ± 0%   +3.63%  (p=0.012 n=10+7)
ycsb/E/nodes=3/cpu=32     139 ±19%     119 ±12%  -14.46%  (p=0.021 n=10+10)
ycsb/F/nodes=3            122 ±17%     103 ±10%  -15.48%  (p=0.002 n=10+8)
ycsb/F/nodes=3/cpu=32     146 ±20%     133 ± 7%   -8.89%  (p=0.025 n=10+10)
```

The way to interpret these results is that async raft storage writes reduce latency and, as a result of the closed loop natured workload, also increase throughput for the YCSB variants that perform writes and aren't already CPU saturated. Variants that are read-only are unaffected. Variants that are CPU-saturated do not benefit from the change because they are already bottlenecked on CPU resources and cannot push any more load (see above).

----

Release note (performance improvement): The Raft proposal pipeline has been optimized to reduce interference between Raft proposals. This improves average and tail write latency at high concurrency.

96458: sql: fixes statement contention count metric r=j82w a=j82w

Fixes a bug introduced in #94750 where the metric
count was counting transaction that hit contention events instead of the statement count.

closes: #96429

Release note: none

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: j82w <[email protected]>
@ahrtr ahrtr added this to the v3.6.0 milestone May 31, 2023
Comment on lines +160 to +170
if rn.asyncStorageWrites {
// If async storage writes are enabled, enqueue messages to
// local storage threads, where applicable.
if needStorageAppendMsg(r, rd) {
m := newStorageAppendMsg(r, rd)
rd.Messages = append(rd.Messages, m)
}
if needStorageApplyMsg(rd) {
m := newStorageApplyMsg(r, rd)
rd.Messages = append(rd.Messages, m)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When async write is enabled, you get rd.Entries, rd.HardState and rd.Snapshot encapsulated in MsgStorageAppend message. But you do not clear the fields, so you expect application ignore the fields, and only process the MsgStorageAppend message?

The same for the MsgStorageApply message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Proposal: support fully control fsync frequency in raft
3 participants