Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication: avoid fsync during raft log append #88442

Open
tbg opened this issue Sep 22, 2022 · 17 comments
Open

replication: avoid fsync during raft log append #88442

tbg opened this issue Sep 22, 2022 · 17 comments
Labels
C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-kv KV Team

Comments

@tbg
Copy link
Member

tbg commented Sep 22, 2022

Traditional quorum replication requires all log entries to be durably stored on
stable storage on a quorum of replicas before being considered committed.

In practice, for us this means fdatasync'ing raft log appends. The main downside
of this requirement is that fsync is latency-intensive - CockroachDB runs much
faster on write-heavy workload with fsync turned off or reduced. In fact, some
customers are known to de facto do this, by running on zfs with the fsync
syscall mocked out as a noop.1

Since CockroachDB ranges are usually deployed across AZs or even regions, where
correlated power failures are likely rare, it stands to reason that trading
durability for performance could be beneficial.

As explained in 2, one can run CockroachDB "correctly" with fsync turned off
if one ensures that a node that crashes (i.e. exits in a way that may lose log
writes) does not return to the cluster (i.e. has to be wiped and re-join as new
node). This is equivalent to running with fsync turned on (though more
performant) and pretending that any crash failure is permanent.

This is unappealing due to the need to replicate a lot of data, almost all of
which redundantly. The missing piece is a mechanism that allows a power-cycled
node to return to the cluster gracefully.

To give an explicit example of why naively letting the node rejoin when it
didn't properly obey durability leads to incorrect behavior, consider three
nodes n1, n2, and n3 which form the members of some range r1.

An entry at index 100 is appended to n1 and n2 and reaches quorum. It is
committed and applied by n1. n2 power-cycles and loses index 100 (which it
previously acked). Then we are in the following state:

                 committed
                     |
                     v
log(n1) = [..., 99, 100]
log(n2) = [..., 99]
log(n3) = [..., 99]

which makes it possible for n2 and n3 to jointly elect either of them as the leader, and to subsequently replace the committed (and, likely applied on n1) entry at index 100.

To avoid this, we need to ensure that n2 doesn't vote or campaign until it is
guaranteed to have been caught up across all entries that it may, in a previous
life, have acked. A simple way to do this is to propose (i.e. ask the leader to
propose) an entry carrying a UUID and abstaining from voting until n2 has this
entry in its log. In effect, the follower "is semantically down" until it has
been caught up past what it had acked previously, but it can be brought "up"
again with the minimal amount of work possible.

More work would be necessary to actually do this.

For one, etcd/raft is heavy on assertions tracking whether durable state has
regressed. For example, upon restarting, n2 might be contacted by n1 with an
MsgApp to append an index 101, which n1 considers possible based on what it
believes the durable state of n2 to be (it thinks n2 has index 100). Upon
receiving this message, n2 will exit with a fatal error. We would need to make
raft more accepting of loss of durability.

We may also need to be careful about command application, especially for
specialized commands such as AddSTs, log truncation, splits, etc., though I'm
not sure there are any new complications. To be safe, we could always use a
separate raft command encoding for "non-vanilla" entries and make sure to fsync
whenever one of these enters the log. However, the semantics around configuration
changes are already very complex3

A somewhat related issue is https://github.com/etcd-io/etcd/issues/12257[^4].

There are alternatives to do this outside of raft. For example, if we had bulk replication changes, we could "re-add" the node in place, but under a new replicaID, and with a way to re-use the existing snapshot (i.e. apply it in-place). This would have the same effect, but avoid any complications at the raft layer (since we're not violating durability). Bulk replication changes are tricky, though, since currently any replication change has to update the range descriptor and in particular the meta2 copy. One relevant observation is that currently, the meta2 is identical to the range copy, but it doesn't have to be. The meta2 copy only needs to have enough information to allow CPuts as part of replication changes and to route requests; maybe there is a way to not have it include the ReplicaID, in which case we could bump the replication ID and run a replication change in a 1PC txn on the range itself.

Jira issue: CRDB-19825

Epic CRDB-40197

Footnotes

  1. though the main driver there is resilience to EBS-level slowdowns on certain write-heavy workloads on underprovisioned gp3 volumes; I'm hesitant to think of disabling raft fsync as the right solution since we are lease-based and so a disk that has slow writes will still have severe problems serving user traffic

  2.  https://github.com/cockroachdb/cockroach/issues/19784

  3. https://github.com/etcd-io/etcd/issues/11284 https://github.com/etcd-io/etcd/issues/7625 https://github.com/etcd-io/etcd/issues/12359

@tbg tbg added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-kv-replication labels Sep 22, 2022
@blathers-crl
Copy link

blathers-crl bot commented Sep 22, 2022

cc @cockroachdb/replication

@erikgrinaker
Copy link
Contributor

erikgrinaker commented Sep 22, 2022

To avoid this, we need to ensure that n2 doesn't vote or campaign until it is guaranteed to have been caught up across all entries that it may, in a previous life, have acked. A simple way to do this is to propose (i.e. ask the leader to propose) an entry carrying a UUID and abstaining from voting until n2 has this entry in its log. In effect, the follower "is semantically down" until it has been caught up past what it had acked previously, but it can be brought "up" again with the minimal amount of work possible.

There are additional problems here around correlated failures, i.e. if we were to also lose n1 then all bets are off. Can we reliably detect this situation and surface a hard error to the operator? I would consider this a requirement, since we'd otherwise risk silent data loss.

@erikgrinaker
Copy link
Contributor

CC @sean- who's also voiced interest in this.

@tbg
Copy link
Member Author

tbg commented Sep 22, 2022

There are additional problems here around correlated failures, i.e. if we were to also lose n1 then all bets are off.

Right, our availability weakens. A node counts as down in the "raft sense" until it's been caught up by the leader. If we lose a quorum simultaneously, there may not be a leader left to get these followers back.

@erikgrinaker
Copy link
Contributor

There are additional problems here around correlated failures, i.e. if we were to also lose n1 then all bets are off.

Right, our availability weakens. A node counts as down in the "raft sense" until it's been caught up by the leader. If we lose a quorum simultaneously, there may not be a leader left to get these followers back.

Sure, the availability issue seems fine. But if we lose n1 then we also lose entry 100 (in your example above). Is there any way we can know that a previously committed entry is now completely gone from all replicas? If not, we'll be at risk of silent data loss.

@tbg
Copy link
Member Author

tbg commented Sep 22, 2022

What will happen in your example is that n1 and n2 are both unable to elect a leader (and they will not campaign themselves), and so the raft group will be unable to make progress. It has in effect lost quorum. So in a sense it is detecting that an entry got lost, because a peer only fully recovers once it is guaranteed to be fully up to date.

@erikgrinaker
Copy link
Contributor

Right, so a replica effectively needs an existing quorum to rejoin the cluster. I think we'll also need a marker for clean shutdowns (with fsyncs) that disables this check, such that it's possible to do a cold start of the cluster. But yeah, this seems viable.

@sumeerbhola
Copy link
Collaborator

Would the gains here be minor once the async work outlined in #17500 (comment) and #87050 is done since it would remove any effect of fsync latency on throughput and minimize the effect of fsync latency on writes (since raft commit and state machine application would see the K-th lowest fsync latency where K is the quorum size)?

I value the thought exercise in this proposal, but now that some details have been fleshed out, I worry that this is too complicated/risky a durability/availability story.

@tbg
Copy link
Member Author

tbg commented Sep 26, 2022

#17500 and #87050 contract the blast radius - if the leader is slower to fsync, that's fine as long as the followers are fast enough (there will be some increase in latency vs steady state since now we're waiting for two RPCs instead of one, but it's tolerable).

What this issue brings to the table is not waiting for any fsync at all. This should be significant no matter what, but just to be clear, I am not pushing for this as something we try to do in the near-term future (and perhaps not ever, and certainly not as a default). The issues you mentioned above are much more important.

@ajwerner
Copy link
Contributor

It'd be nice to gain the benefits of not waiting for an fsync on the write path while also being able to survive simultaneous power failure to the entire cluster and to make claims about what sort of data loss might be experienced.

As Tobi noted, we'll definitely want to sync things that affect the raft group state like config changes, term changes, and well a whole host of other special things.

A thing folks have come to expect of traditional DBMS systems which do not run with fsyncs on is that if you lose power, you lose recent transactions. Importantly, in such systems, you generally lose entire transactions. If parts of transactions were to be lost the system may find itself in an invalid state with regards to referential integrity. It'd be hard to talk about.

An ideal property would be that when we lose data due to accepted, and potentially opt-in durability weakening, that we only lose whole transactions.

One approach to only lose whole transactions might be to make sure that we actually wait for the data to be flushed when making transactions explicitly committed and when resolving intents. These operations are already asynchronous with respect to the client performing the write (assuming parallel commits is in use). A negative implication is that these flush operations would be on the synchronous "contention footprint" for concurrent operations which might want to read or overwrite this newly data. Other ideas, discussed in #22349 could be used to also make the contention footprint fully asynchronous too -- such that we only need to wait for one non-sync'd write before the system could allow a contending transaction to proceed. This would be a huge win.

All of the above could lead to higher throughput, lower latency, and lower IOP usage at the loss of durability in the face of black swan events. Furthermore, it would not be hard to add a mechanism for client to opt in (or opt out) of the durability claims we already make.

None of this note is in conflict with anything in the initial proposal here. The re-joining discussion in the face of lost log entries remains equally valid.

@tbg
Copy link
Member Author

tbg commented Oct 28, 2022

Importantly, in such systems, you generally lose entire transactions. If parts of transactions were to be lost the system may find itself in an invalid state with regards to referential integrity. It'd be hard to talk about.

More precisely, you lose a suffix of transactions (if transaction N was lost, N+1, ..., are also lost). Otherwise, if you delete a row to free up a unique constraint and then insert another row that uses it, then losing only the first txn suddenly you have a unique constraint violation. In CRDB, where there isn't a single operation order, I think this becomes "if txn N is lost, then all subsequent txns overlapping N are also lost or are aborted". And in both cases, "transaction" means "mutation".

One approach to only lose whole transactions might be to make sure that we actually wait for the data to be flushed when making transactions explicitly committed and when resolving intents.

One annoying detail here is that we need to wait for all intents to be durable before resolving any of them. Otherwise:

  • write intent A (no fsync)
  • write intent B (no fsync)
  • move to STAGING (no fsync)
  • client gets ACK for txn
  • intent A becomes durable, resolve it
  • power cycle - intent B lost (it wasn't durable yet, lived on a separate set of machines)
  • bad place, can't commit B because it's lost, but also can't uncommit A

Instead, we need to wait:

  • [...]
  • client gets ACK for txn
  • intent A becomes durable
  • intent B becomes durable
  • now that all intents durable, mark txn record as COMMITTED - with fsync
  • power cycle can't lose any intents nor the txn record (if we didn't fsync the COMMITTED, we might end up starting to resolve intents, but then power-cycling and losing the txn record altogether, which would make it impossible to arrive at a consistent state)

So the contention footprint increase will be pretty massive due to this need to wait for durability before commit, which amounts to another round-trip during intent resolution or the fsync (whichever is larger).

Let's say we try to hide this contention footprint from clients. Let's say we do this by having the txn gateway tell all nodes that have been touched by the txn that the txn is now considered committed (i.e. is STAGING and all intents were actually "written"). Nodes could then treat these intents as committed eagerly, reducing the contention footprint. But if a replica set worth of nodes power-cycles and loses an intent, everyone is seeing a partial txn anyway.

I think (worry) the end result is that the widened contention footprint described above will be near optimal if we really want to avoid anyone seeing a partial txn.

Also, I worry that it still isn't giving us what we need. You could have two txns that overlap only on their read/read-write (but not write-write) sets:

  • r1: txn1 writes, txn2 reads txn1
  • r2: txn2 writes

if r1 and r2 aren't colocated, you might well lose txn1, but commit txn2. This could certainly lose some app-level invariants. I wonder if it could also violate SQL-internal invariants, for example that a row is housed in exactly one table (where txn2 would check table1 before writing to table2).

@tbg
Copy link
Member Author

tbg commented Oct 28, 2022

Also, I worry that it still isn't giving us what we need. You could have two txns that overlap only on their read/read-write (but not write-write) sets:

I was wrong here, only durable txns can be observed - so if we lose a txn, it couldn't have been observed and so the property we want holds trivially.

The longer contention footprint seems necessary, though, and is a real bummer.

@andrewbaptist
Copy link
Collaborator

TL/DR It is possible to write the WAL using O_DIRECT on a preallocated file.

fdatasync does two things:

  1. Flushes data from the OS write cache (known as dirty blocks) to the drive.
  2. Forces the drive to flush its cache (by sending a SYNCHRONIZE_CACHE SCSI command).

Drives only accept writes in "block size" amounts (typically 4K). The purpose of the OS write cache is to allow unaligned writes to the file and flush them when it decides to (or is forced to with a fsync/fdatasync).

The difference between fsync and fdatasync is whether "non-critical" metadata is also flushed. Non-critical metadata is things like the last modified time.

A fairly small change that could be made to remove the need to call fdatasync at all is to fully manage the writing of the WAL. Specifically, the following three things need to happen:

  1. The WAL is pre-allocated (and zeroed out). Using O_DIRECT on a non-preallocated file is poorly defined behavior. The core reason is that if the underlying blocks that make up a file change, the timing of the "critical" metadata updates and the inode updates for the block addresses for the file are not coordinated.
  2. The writes would need to be done in 4K (or larger) blocks. O_DIRECT operations will fail if the size of the write does not match the size of the underlying device.
  3. The writes are all done with the FUA bit set. This happens automatically if the file is opened with O_DSYNC. Setting the FUA bit forces the data to bypass the drive level cache.

Fortunately setting the FUA bit is a NO-OP for both AWS and Google as well as most enterprise drives. This means setting the FUA bit is free. There will be a performance cost for (cheap) drives that don't guarantee power loss protection, but this isn't any more than the cost of later sending a fdatasync. If we go forward with this, we should make some recommendations for non-cloud customers to purchase enterprise SSD drives (I don't think anyone uses CRDB on spinning rust).

If we can make these changes, then WAL writes should be very fast, and bypass the OS completely. Optionally this could be combined with AIO, however, I'm not sure that is necessary. Generally the way I think this could be done is having a "WAL buffer" in userspace. When a write that needs to go to the WAL occurs, it would go to a cache that we would control. Periodically (every 10ms?) or once there is at least one block (4k) of data, the flush thread would flush the data to disk. There is already support in the WAL for using it in a "circular buffer" fashion, so this would just use that capability.

tbg added a commit to tbg/cockroach that referenced this issue Nov 4, 2022
Using a combination of preallocated files, aligned writes, direct I/O
and fdatasync it is supposedly[^1] possible to get fast durable writes.
This is appealing and I wanted to see if I could actually make it happen
in a toy experiment.

Bench results show that whatever I'm doing isn't producing the desired
effect of high-throughput, durable writes on either gp3 or GCE local SSD. Just using O_DIRECT alone
is enough to move throughput well below the 10mb/s threshold. Is is true
that O_DSYNC doesn't add a large additional penalty, but the damage is
already done at that point.
The one, maybe, exception is pd-ssd (GCE's attached storage), where we
at least get 48mb/s, though with the same pattern of O_DIRECT alone causing
the major regression.

Detailed results below.

**a) AWS gp3**

```
roachprod create -n 1 --clouds aws --aws-ebs-iops 16000 \
  --aws-ebs-throughput 1000 --aws-ebs-volume-size 500 \
  --aws-ebs-volume-type=gp3 --aws-machine-type=m5.4xlarge tobias-dio \
  --local-ssd=false
```

```
$ export HOME=/mnt/data1
$ cd $HOME
$ ./syncexp.test -test.benchtime 10240x -test.bench . -test.cpu 1
goos: linux
goarch: amd64
pkg: github.com/cockroachdb/cockroach/pkg/kv/kvserver/syncexp
cpu: Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
BenchmarkFoo/none         	   10240	      6523 ns/op	       598.8 mb/s
BenchmarkFoo/dsync        	   10240	    716752 ns/op	         5.450 mb/s
BenchmarkFoo/direct       	   10240	    694162 ns/op	         5.627 mb/s
BenchmarkFoo/dsync,direct 	   10240	    708828 ns/op	         5.511 mb/s
PASS
```

**b) gceworker local SSD**

```
$ go test -benchtime=10240x . -bench . -cpu 1
goos: linux
goarch: amd64
pkg: github.com/cockroachdb/cockroach/pkg/kv/kvserver/syncexp
cpu: Intel(R) Xeon(R) CPU @ 2.30GHz
BenchmarkFoo/none         	   10240	      6833 ns/op	       571.6 mb/s
BenchmarkFoo/dsync        	   10240	    476861 ns/op	         8.192 mb/s
BenchmarkFoo/direct       	   10240	    411426 ns/op	         9.494 mb/s
BenchmarkFoo/dsync,direct 	   10240	    498408 ns/op	         7.837 mb/s
PASS
ok  	github.com/cockroachdb/cockroach/pkg/kv/kvserver/syncexp	14.283s
```

**c) GCE pd-ssd**

```
$ ./syncexp.test -test.benchtime 10240x -test.bench . -test.cpu 1
goos: linux
goarch: amd64
pkg: github.com/cockroachdb/cockroach/pkg/kv/kvserver/syncexp
cpu: Intel(R) Xeon(R) CPU @ 2.30GHz
BenchmarkFoo/none         	   10240	      6869 ns/op	       568.5 mb/s
--- BENCH: BenchmarkFoo/none
    sync_test.go:70: initialized /mnt/data1/wal-4096.bin (4.0 KiB)
    sync_test.go:70: initialized /mnt/data1/wal-41943040.bin (40 MiB)
BenchmarkFoo/dsync        	   10240	     86123 ns/op	        45.36 mb/s
BenchmarkFoo/direct       	   10240	     80876 ns/op	        48.30 mb/s
BenchmarkFoo/dsync,direct 	   10240	     80814 ns/op	        48.34 mb/s
PASS
```

Release note: None

[^1]: cockroachdb#88442 (comment)
@tbg
Copy link
Member Author

tbg commented Nov 7, 2022

^-- the above was explored a bit by @andrewbaptist in #91272. There's also this slack thread. The TL;DR is (though maybe @andrewbaptist has a better TLDR!) that there's currently no good way of having the above "free fsync" strategy actually play out in practice1. This is sort of sad, I wonder if we can finagle it somehow but I suppose if that were possible folks would be doing it already.

Footnotes

  1. i.e. can't do something like described in https://linux-scsi.vger.kernel.narkive.com/yNnBRBPn/o-direct-and-barriers

@tbg
Copy link
Member Author

tbg commented Nov 15, 2022

Just in case we need some more bolstering of the no-fsync approach from academia, reproduced below is a passage from Viewstamped Replication revisited. What's described matches very closely what's described in this issue in spirit, except they elide log disk writes altogether. It first thought TigerBeetle had adopted this approach1 (which seemed a bit hardcore even for them) but it says here2 "primary disk plus backup's disk", and the code3 suggests we're doing vanilla fdatasync.


When a replica recovers after a crash it cannot participate in request processing and view changes until it has a state at least as recent as when it failed. If it could participate sooner than this, the system can fail. For example, if it forgets that it prepared some operation, this operation might then be known to fewer than a quorum of replicas even though it committed, which could cause the operation to be forgotten in a view change. If nodes record their state on disk before sending messages, a node will be able to rejoin the system as soon as it has reinitialized its state by reading from disk. The reason is that in this case a recovering node hasn’t forgotten anything it did before the crash (assuming the disk is intact). Instead it is the same as a node that has been unable to communicate for some period of time: its state is old but it hasn’t forgotten anything it did before.

However, running the protocol this way is unattractive since it adds a delay to normal case processing: the primary would need to write to disk before sending the PREPARE message, and the other replicas would need to write to disk before sending the PREPAREOK response. Furthermore, it is unnecessary to do the disk write because the state is also stored at the other replicas and can be retrieved from them, using a recovery protocol. Retrieving state will be successful provided replicas are failure independent, i.e., highly unlikely to fail at the same time. If all replicas were to fail simultaneously, state will be lost if the information on disk isn’t up to date; with failure independence a simultaneous failure is unlikely. If nodes are all in the same data center, the use of UPS’s (uninterruptible power supplies) or non-volatile memory can provide failure independence if the problem is a power failure. Placing replicas at different geographical locations can additionally avoid loss of information when there is a local problem like a fire. This section describes a recovery protocol that doesn’t require disk I/O during either normal processing or during a view change. The original VR specification used a protocol that wrote to disk during the view change but did not require writing to disk during normal case processing. When a node comes back up after a crash it sets its status to recovering and carries out the recovery protocol. While a replica’s status is recovering it does not participate in either the request processing protocol or the view change protocol. To carry out the recovery protocol, the node needs to know the configuration. It can learn this by waiting to receive messages from other group members and then fetching the configuration from one of them; alternatively this information could be stored on disk.

Footnotes

  1. https://github.com/tigerbeetledb/tigerbeetle/blob/main/docs/DESIGN.md

  2. https://youtu.be/yBBpUMR8dHw?t=63

  3. https://vscode.dev/github/tigerbeetledb/tigerbeetle/blob/a15fa7523385b826d85c6294f374314369f3913a/src/io/linux.zig#L943-L944

@nvanbenschoten
Copy link
Member

To avoid this, we need to ensure that n2 doesn't vote or campaign until it is guaranteed to have been caught up across all entries that it may, in a previous life, have acked.

This is how Kafka's replication recovery protocol works. See https://jack-vanlightly.com/blog/2023/4/24/why-apache-kafka-doesnt-need-fsync-to-be-safe.

@tbg
Copy link
Member Author

tbg commented May 17, 2023

Slack thread

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-kv KV Team
Projects
No open projects
Status: Incoming
Development

No branches or pull requests

6 participants