Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storing data/coding shred data in rocksdb causes problems #16234

Open
sakridge opened this issue Mar 30, 2021 · 8 comments
Open

Storing data/coding shred data in rocksdb causes problems #16234

sakridge opened this issue Mar 30, 2021 · 8 comments
Assignees
Milestone

Comments

@sakridge
Copy link
Member

sakridge commented Mar 30, 2021

Problem

As reported by the the following issue, we have been encountering issues with storing data/coding shred bytestreams in RocksDB #14586

The main issue is these long (~40 min) stalls; however, there are other more subtle problems that make RocksDB unattractive:

  • High write IO from write amplification
  • Sometimes high insert latency even if it doesn't result in a long stall
  • Inability to delete individual dead slots without poor performance
  • Corruption on restart
  • Deletions (and corresponding tombstones) slow down scans until next compaction

Proposed Solutions

1) RocksDB Tuning / Modifications

More info ...

RocksDB has a decent amount of customization available. However, none of these knobs seemed to have much effect in solving the core issue, as noted here: #14586 (comment)

Another mitigation has been implemented to run compaction at more of a regular interval to avoid the painful stall at once; details on that can be found here: #16697

2) Use AccountsDB (for shred payloads)

More info ...

The AccountsDB architecture is another option for tackling this problem, as it also provides storage to a collection of data far too large to completely store in memory. However, as Stephen pointed out in a comment below, the memory bloat from indexing every shred could easily become excessive. Indexing would be crucial because shreds aren't guaranteed to show up "in order" (ascending index) so we need the metadata (or constant de-serialization) to figure out which shreds are what.

Potential precurser issues with AccountsDB

  • AccountsDB has in-memory caches by default with an async flush to mmap background thread. May want to initially just write to the mmaps by-passing the cache to avoid consistency errors. Although may have to recover from inconsistent state anyway because power-off/hard-stop cases which corrupt the files.
  • AccountsDB has no data consistency checks, like CRC or hash.

3) Database via Filesystem (for shred payloads only)

Shreds could be stored on disk; either directly as a raw bytestream or in some custom file format. A similar idea was previously attempted with: #9366

Shreds are inherently uniquely identifiable/indexable from their metadata; namely, data vs. coding, slot and index. As such, we could lay shreds out on disk with metadata "encoded" into the path. Access should be simple as we just construct a path; files will either be or not be at this path so we could avoid some of the complex operations like scans that other DB's have to contend with. Likewise, per-slot operations access is easy as we just operate on a complete directory. A few options

Shreds stored in individual files
ledger_root
 | - shreds
 |    | - data
 |    |    | - slot_n
 |    |    |    | -- 0.shred
 |    |    |    | -- 1.shred
 |    |    |    | -- ...
 |    |    | - slot_n+1
 |    |    |    | -- 0.shred
 |    |    |    | -- 1.shred
 |    |    |    | -- ...
 |    |    | - ...
 |    | - coding
 |    |    | ...
Shreds stored in aggregate files to fill up a page (4 kB)

This approach reduces space amplification by reducing waste in each page. We could fit 3 entire shreds in a page, but not 4. If we have fixed shred sizes, indexing would just look (idx / 3) * 3 (integer division to quantize to multiples of 3).

ledger_root
 | - shreds
 |    | - data
 |    |    | - slot_n
 |    |    |    | -- 0.shred       // Shreds 0-2
 |    |    |    | -- 3.shred       // Shreds 3-5
 |    |    |    | -- ...
 |    |    | - slot_n+1
 |    |    |    | -- 0.shred
 |    |    |    | -- 3.shred
 |    |    |    | -- ...
 |    |    | - ...
 |    | - coding
 |    |    | ...
Shreds stored in aggregate files by slot

This approach reduces space amplification, and should theoretically perform better than the other options for entire slot operations due to being sequential I/O. If slots are guaranteed to be full, indexing may look just be idx * SHRED_SIZE.

ledger_root
 | - shreds
 |    | - data
 |    |    | - slot_n.shreds
 |    |    | - slot_n+1.shreds
 |    |    | - ...
 |    | - coding
 |    |    | ...

Shred Zero Padding

Regardless of which route we go, the discussion of whether to keep or strip the shred's zero-padding that is added for transport arises.

  • If we store one-shred-per-file, there is no difference as we just read the entire file
  • If we store multiple-shreds-per-file, metadata would be necessary as the shreds would no longer be aligned to start at multiple of SHRED_SIZE.

Partial Slots

In general, any slots that will be replayed must have all data shreds present. However, it could be the case that we have non-full slots that we need to store. For example, it would be perfectly valid to receive half of a slot's coding shreds, half of data shreds and be able to recover the full slot of data shreds. But, we still want to store the data shreds, especially as we piecewise receive shreds.

  • If we store one-shred-per-file, this is a non-issue as we just won't have files for absent shreds
  • If we store multiple-shreds-per-file, we have the possibility of gaps in shreds.

Currently, all of our shreds are <= 1228 bytes in size whereas a common page size of SSD's is 4 kB. As such, the performance of reading a single shred is theoretically at best 1228 / 4096 ~= 30% of SSD's quoted specs. We hit this regardless for single shreds; however, putting all shreds into a single file will allow us to take advantage of sequential read speeds when performing entire entry / slot operations.

Proposal: Given the above, store shreds for an entire slot (data and coding separate) in a single file. Within this same file, keep an index that allows "random-access" of shreds, as well as the option to store non-zero-padded shreds.

Read/Write Optimization via Buffering

The previous implementation of this (linked PR above) had some slowdown issues for writing when the system was under heavy load. However, that issue may have been a result of using standard Google Cloud Engine (GCE) machines with network attached disks. Stephen A pointed out that we can secure GCE instances that have local storage (ie similar to a traditional SATA / PCIe connection) that remove some latency from the network option.

If we still needed speed-up at this point, the next logical step would be adding in-memory buffering / caching. A very simple approach would just be a map that is gradually persisted to disk in the background. This might look something like this:

type Slot = u64;
type Index = u32;
type ShredPayload = Vec<u8>;

// Map per slot gives us better lock granularity
pub type ShredCache = DashMap<Slot, Arc<RwLock<BTreeMap<Index, ShredPayload>>>>
  • Writes always go to cache (use the cache as a write buffer)
  • Shred reads (both single shred and multi-shred, such as entire slot) would check cache first, and then check disk
    • Single shred case is trivial
    • Multi shred case could become complicated if shreds were distributed between memory & disk; more on this in a bit.
      • Carl pointed out that using DashMap<Slot, Arc<DashMap<...>> would allow us to grab an iterator to a slot in memory and safely iterate through the slot, even if the flushing mechanism decided that slot should be evicted from memory in the background

Cache/Buffer Flush Policy

For both persistence and memory usage considerations, the structure needs to be flushed to disk over time. There could be several strategies for flushing a slot from cache:

  • A slot is rooted
  • A slot has reached a certain "age"
  • The entire cache has reached a pre-defined maximum space and begins running a flush strategy (simplest might be oldest first)
  • The entire cache routinely flushes on some set interval that effectively keep an upper bound on how large the cache can grow

Suppose a slot is picked for cache eviction, but is not yet full. If we receive shreds after the flush, the shreds are now present in both memory and disk. This adds complexity for both reading as well as for combining those shreds into a single file at later point. This is a potential case that could have poor performance; however, this should not be the standard case.

Proposal: Flush partial slots if the the slot picked for eviction. If shreds come in later, make entire slot operations (ie iterator) support memory+disk. Additionally, add logic that handles a "merge" case where an existing shred file is combined with shreds from cache.

Early Cleanup

The current blockstore cleanup strategy for normal slots (ie slots that do not yield consensus mismatch or things like that) is to remove oldest slots once ledger bumps up against maximum size. However, with our custom shred DB, we can more easily free up larger chunks of memory / disk when we know that a shred is no longer needed. Additionally, cleaning a slot in memory would have the benefit of immediately freeing that memory as well as avoiding the disk I/O of a flush later on.

Proposal: Cleanup slots (shreds + metadata still in Rocks) earlier than current algorithm of waiting for blockstore to hit capacity. Namely, cleanup any unrooted slots before the latest root.

Ensuring Consistency with Buffering

If the validator process gets killed or the validator gets turned off, we could encounter data inconsistency issues in the blockstore. RocksDB provides consistency between different column families with the WriteBatch API. With shred payloads outside of Rocks, we have to create the guarantee of consistency between shred payloads and metadata.

Additionally, RocksDB uses a WAL to have some data persistence in event of crash while not sacrificing write performance. The idea behind a WAL is to store all transactions as they come up sequentially. The RocksDB WAL is meant to be persisted immediately (or at least written to OS buffer which will eventually get flushed to disk even if the process dies), and provides a way to replay/recover from a process crash. This seems to be the best solution for us to gain persistence without hitting disk for every operation. This guy built one in less than 125 lines of Rust code.

So, with this in mind, our order of operations would be:

  1. Write shred payloads to WAL
    • Shreds will be in OS buffer and written to disk as OS sees fit
  2. Write shreds into memory buffer structure
    • These will be pushed to disk in the background at some future point. If the process dies, we can recover from the WAL
  3. Write all other column families (ie metadata) to RocksDB

The WAL gives us an ability to recover data in the event of process crash. In the event of a crash, we can rewind the WAL and "re-insert" shreds; this will allow us to recover metadata in the event that a crash occurs between 2) and 3) above. However, shreds cannot be recovered from metadata so the order matters

Open Issues / Questions

  • Similar to AccountsDB, there are no data consistency check (CRC or hash) baked in; we'd have to manually implement this if we wanted it

Other General Considerations

  • This will be a major change in how the ledger works. How will we introduce this change? Will we need to implement side-by-side support for RocksDB and other solution? Implement a conversion tool that pulls RocksDB shred into new format?
@steviez
Copy link
Contributor

steviez commented May 5, 2021

For bookkeeping, link to a previous attempt at solving this problem
#9366

@sakridge
Copy link
Member Author

sakridge commented May 7, 2021

If you have 200m shreds there are about 650,000 slots. AccountsDB uses around 200 bytes per index in memory. 200m * 200 bytes is 40 gigabytes of memory just for the indexes.

If you index per-slot, then you have about 300x less indexing, maybe 150mb total which isn't bad, but there will be extra overhead of serializing/re-serializing and extracting the item from the slot if you need to read individual shreds. Also extra read/write bandwidth to access them.

Being able to page-out unused indexes is an option and would probably help the accounts use-case as well.

On the other hand, custom indexing using the file system itself to bucket shreds into slots may be even more efficient. This would be something like the previous solution #9366.

@mvines mvines added this to the The Future! milestone May 10, 2021
@steviez
Copy link
Contributor

steviez commented May 28, 2021

Interesting project I stumbled across; not all these are applicable to the approach we may be taking, but still useful to review what other people have already done.
https://github.com/bytedance/terarkdb

From https://bytedance.feishu.cn/docs/doccnZmYFqHBm06BbvYgjsHHcKc#

The main reasons TerarkDB works better [than RocksDB] in some use cases are:

  • TerarkDB fixed tons of bugs in the original RocksDB.
  • TerarkDB moved lots of code out from db mutex, especially those critical code paths.
  • TerarkDB optimized background I/O patterns a lot to reduce disk stress.
  • TerarkDB optimized WAL sync strategy to reduce user thread latency.
  • TerarkDB has a better KV separation implementation.
  • TerarkDB has a lazy compaction mechanism to reduce write amplification A LOT (on heavy write workloads)
  • TerarkDB has bunch of self-developed indexing and compression algorithms that is more efficient than traditional ways.
  • ......

@steviez
Copy link
Contributor

steviez commented May 29, 2021

Notes from a call with @carllin, I'll type these items a bit formally but gotta run right now (FYI @sakridge)

  • Do we care about minor forks? Do we care about keeping these around or can we chuck them?
  • Have background flush; the trigger for flushing cache
    • Roots being set OR memory size exceeding some pre-determined
    • If we drop shreds but can recover with erasure, those will be filled in and can complete a slot (if possible)
    • Partial slots might not be flushed, how do we know when to evict these?
  • Iterators: handling the case where a slot may be flushed while iteration is on-going
    • Similar structure to accountsDB: DashMap<Slot, Arc<DashMap<u64, Shred>>>

@ryoqun
Copy link
Member

ryoqun commented Aug 11, 2021

For the record, i think this is no longer true:

* Corruption on restart

I think this is referring to two cases:

Hope these hard-earned progress could result in better informed decision for the usage of continued rocksdb usage for rpc, bankless leader's block cost, etc, even after we move data/coding shred.

(to be fair, I'm neutral to the idea of ditching rocksdb for shreds. I'm thinking tweaking rocksdb might be easier than re-inventing wheel.... but there is no time to prove it by actual code/tuning in near time...)

@sakridge
Copy link
Member Author

For the record, i think this is no longer true:
Corruption: block checksum mismatch: i think this is actually due to hardware problem as the upstream bug report says: (details: Rocksdb corruption issues on compaction #9009 (comment))

Yes. This does seem to be less of a problem now, although we did encounter it on another machine a couple months ago.

One aspect of that is that it would be nice to simply recover the database even if we have this single bit corruption. The validator can throw away the data which is corrupt and retrieve it from the network again or from the coding shreds. I'm not sure how easy or hard rocksdb makes this. I've read that the accepted method is to open the DB in read-only mode and then copy it to a whole new DB. That doesn't sound like it would be very fast for a 500gb DB. Possibly we can reverse-engineer the rocks data format to get what we want, but it's probably subject to change unless they are open to some modification. Or maybe there is a way already that I don't know about. If we control the data format on-disk, it would be pretty straightforward to do though.

@sakridge
Copy link
Member Author

Too many RPC reads from ledger data stored in rocksdb seems to cause the validator to fall behind as well.

@ryoqun
Copy link
Member

ryoqun commented Dec 13, 2021

Too many RPC reads from ledger data stored in rocksdb seems to cause the validator to fall behind as well.

i'll detail later. but it seems that the previously-found rayon hack (SOLANA_RAYON_THREADS=8) or the below crossbeam hack alleviate the problem:

$ diff -U3 /home/ryoqun/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-epoch-0.9.{3,5}/src/internal.rs
--- /home/ryoqun/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-epoch-0.9.3/src/internal.rs  1970-01-01 09:00:00.000000000 +0900
+++ /home/ryoqun/.cargo/registry/src/github.com-1ecc6299db9ec823/crossbeam-epoch-0.9.5/src/internal.rs  2021-12-11 08:18:54.633505937 +0900
@@ -387,7 +387,7 @@
 impl Local {
     /// Number of pinnings after which a participant will execute some deferred functions from the
     /// global queue.
-    const PINNINGS_BETWEEN_COLLECT: usize = 128;
+    const PINNINGS_BETWEEN_COLLECT: usize = 128 * 128;
 
     /// Registers a new `Local` in the provided `Global`.
     pub(crate) fn register(collector: &Collector) -> LocalHandle {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants