diff --git a/.changelog/3499.bugfix.1.md b/.changelog/3499.bugfix.1.md new file mode 100644 index 00000000000..888af396a1c --- /dev/null +++ b/.changelog/3499.bugfix.1.md @@ -0,0 +1 @@ +runtime: Remove incorrect Sync impl on mkvs::Tree diff --git a/.changelog/3499.bugfix.2.md b/.changelog/3499.bugfix.2.md new file mode 100644 index 00000000000..9dca3e4542c --- /dev/null +++ b/.changelog/3499.bugfix.2.md @@ -0,0 +1 @@ +go/storage/mkvs: Fix edge case in overlay iterator diff --git a/.changelog/3499.feature.1.md b/.changelog/3499.feature.1.md new file mode 100644 index 00000000000..a44f2820fd3 --- /dev/null +++ b/.changelog/3499.feature.1.md @@ -0,0 +1 @@ +runtime: Add `mkvs::Iterator` trait and `MKVS::iter` trait method diff --git a/.changelog/3499.feature.2.md b/.changelog/3499.feature.2.md new file mode 100644 index 00000000000..43098b20758 --- /dev/null +++ b/.changelog/3499.feature.2.md @@ -0,0 +1 @@ +runtime: Add mkvs::OverlayTree similar to its Go counterpart diff --git a/.changelog/3499.feature.3.md b/.changelog/3499.feature.3.md new file mode 100644 index 00000000000..d1d5d5443d9 --- /dev/null +++ b/.changelog/3499.feature.3.md @@ -0,0 +1 @@ +go/storage/mkvs: Generate proofs even if Tree.Position is not found diff --git a/client/src/transaction/snapshot.rs b/client/src/transaction/snapshot.rs index 28e568b4c95..16565889691 100644 --- a/client/src/transaction/snapshot.rs +++ b/client/src/transaction/snapshot.rs @@ -1,5 +1,5 @@ //! A block snapshot. -use std::any::Any; +use std::{any::Any, sync::Mutex}; use anyhow::{Context as AnyContext, Result}; use grpcio::CallOption; @@ -11,7 +11,7 @@ use oasis_core_runtime::{ roothash::{Block, Namespace}, }, storage::{ - mkvs::{sync::*, Prefix, Root, Tree, WriteLog}, + mkvs::{sync::*, Iterator, Prefix, Root, Tree, WriteLog}, MKVS, }, transaction::types::{TxnCall, TxnOutput}, @@ -57,7 +57,7 @@ pub struct BlockSnapshot { pub block_hash: Hash, read_syncer: RemoteReadSync, - mkvs: Tree, + mkvs: Mutex, } impl Clone for BlockSnapshot { @@ -77,7 +77,7 @@ impl Clone for BlockSnapshot { block, block_hash, read_syncer, - mkvs, + mkvs: Mutex::new(mkvs), } } } @@ -97,18 +97,20 @@ impl BlockSnapshot { block_hash: block.header.encoded_hash(), block, read_syncer, - mkvs, + mkvs: Mutex::new(mkvs), } } } impl MKVS for BlockSnapshot { fn get(&self, ctx: Context, key: &[u8]) -> Option> { - MKVS::get(&self.mkvs, ctx, key) + let mkvs = self.mkvs.lock().unwrap(); + mkvs.get(ctx, key).unwrap() } fn cache_contains_key(&self, ctx: Context, key: &[u8]) -> bool { - MKVS::cache_contains_key(&self.mkvs, ctx, key) + let mkvs = self.mkvs.lock().unwrap(); + mkvs.cache_contains_key(ctx, key) } fn insert(&mut self, _ctx: Context, _key: &[u8], _value: &[u8]) -> Option> { @@ -120,7 +122,12 @@ impl MKVS for BlockSnapshot { } fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16) { - MKVS::prefetch_prefixes(&self.mkvs, ctx, prefixes, limit) + let mkvs = self.mkvs.lock().unwrap(); + mkvs.prefetch_prefixes(ctx, prefixes, limit).unwrap() + } + + fn iter(&self, _ctx: Context) -> Box { + unimplemented!("block snapshot doesn't support iterators"); } fn commit( @@ -131,10 +138,6 @@ impl MKVS for BlockSnapshot { ) -> Result<(WriteLog, Hash)> { unimplemented!("block snapshot is read-only"); } - - fn rollback(&mut self) { - unimplemented!("block snapshot is read-only"); - } } #[derive(Clone)] diff --git a/go/storage/mkvs/iterator.go b/go/storage/mkvs/iterator.go index fb80f48a974..4bbde1b90ed 100644 --- a/go/storage/mkvs/iterator.go +++ b/go/storage/mkvs/iterator.go @@ -150,7 +150,7 @@ func IteratorPrefetch(prefetch uint16) IteratorOption { // visited nodes. func WithProof(root hash.Hash) IteratorOption { return func(it Iterator) { - it.(*treeIterator).proofBuilder = syncer.NewProofBuilder(root) + it.(*treeIterator).proofBuilder = syncer.NewProofBuilder(root, root) } } @@ -251,10 +251,7 @@ func (it *treeIterator) doNext(ptr *node.Pointer, bitDepth node.Depth, path, key // Include nodes in proof if we have a proof builder. if pb := it.proofBuilder; pb != nil && ptr != nil { - proofRoot := pb.GetRoot() - if pb.HasRoot() || proofRoot.Equal(&ptr.Hash) { - pb.Include(nd) - } + pb.Include(nd) } switch n := nd.(type) { diff --git a/go/storage/mkvs/lookup.go b/go/storage/mkvs/lookup.go index 02b95c9fe00..e22e45ea550 100644 --- a/go/storage/mkvs/lookup.go +++ b/go/storage/mkvs/lookup.go @@ -48,7 +48,7 @@ func (t *tree) SyncGet(ctx context.Context, request *syncer.GetRequest) (*syncer // Remember where the path from root to target node ends (will end). t.cache.markPosition() - pb := syncer.NewProofBuilder(request.Tree.Position) + pb := syncer.NewProofBuilder(request.Tree.Root.Hash, request.Tree.Position) opts := doGetOptions{ proofBuilder: pb, includeSiblings: request.IncludeSiblings, @@ -108,10 +108,7 @@ func (t *tree) doGet( // Include nodes in proof if we have a proof builder. if pb := opts.proofBuilder; pb != nil && ptr != nil { - proofRoot := pb.GetRoot() - if pb.HasRoot() || proofRoot.Equal(&ptr.Hash) { - pb.Include(nd) - } + pb.Include(nd) } // This may be used to only include the given node in a proof and not diff --git a/go/storage/mkvs/overlay.go b/go/storage/mkvs/overlay.go index efad1ca62dc..47d71305367 100644 --- a/go/storage/mkvs/overlay.go +++ b/go/storage/mkvs/overlay.go @@ -111,6 +111,7 @@ func (o *treeOverlay) Commit(ctx context.Context) error { return err } } + o.dirty = make(map[string]bool) return nil } @@ -169,7 +170,7 @@ func (it *treeOverlayIterator) Seek(key node.Key) { } func (it *treeOverlayIterator) Next() { - if !it.overlay.Valid() || it.inner.Key().Compare(it.overlay.Key()) <= 0 { + if !it.overlay.Valid() || (it.inner.Valid() && it.inner.Key().Compare(it.overlay.Key()) <= 0) { // Key of inner iterator is smaller or equal than the key of the overlay iterator. it.inner.Next() } else { diff --git a/go/storage/mkvs/overlay_test.go b/go/storage/mkvs/overlay_test.go index 9c122b5741b..bda65953679 100644 --- a/go/storage/mkvs/overlay_test.go +++ b/go/storage/mkvs/overlay_test.go @@ -12,12 +12,9 @@ import ( func TestOverlay(t *testing.T) { require := require.New(t) - ctx := context.Background() - tree := New(nil, nil) - defer tree.Close() - // Insert some items. + // Generate some items. items := writelog.WriteLog{ writelog.LogEntry{Key: []byte("key"), Value: []byte("first")}, writelog.LogEntry{Key: []byte("key 1"), Value: []byte("one")}, @@ -40,11 +37,30 @@ func TestOverlay(t *testing.T) { {seek: node.Key("key A"), pos: -1}, } + tree := New(nil, nil) + defer tree.Close() + + // Create an overlay over an empty tree and insert some items into the overlay. + overlay := NewOverlay(tree) + for _, item := range items { + err := overlay.Insert(ctx, item.Key, item.Value) + require.NoError(err, "Insert") + } + + // Test that an overlay-only iterator works correctly. + t.Run("OnlyOverlay/Iterator", func(t *testing.T) { + it := overlay.NewIterator(ctx) + defer it.Close() + + testIterator(t, items, it, tests) + }) + + // Insert some items into the underlying tree. err := tree.ApplyWriteLog(ctx, writelog.NewStaticIterator(items)) require.NoError(err, "ApplyWriteLog") // Create an overlay. - overlay := NewOverlay(tree) + overlay = NewOverlay(tree) // Test that all keys can be fetched from an empty overlay. t.Run("EmptyOverlay/Get", func(t *testing.T) { diff --git a/go/storage/mkvs/syncer/proof.go b/go/storage/mkvs/syncer/proof.go index 9420370b7ae..3f0466ffe99 100644 --- a/go/storage/mkvs/syncer/proof.go +++ b/go/storage/mkvs/syncer/proof.go @@ -34,14 +34,16 @@ type proofNode struct { // ProofBuilder is a Merkle proof builder. type ProofBuilder struct { root hash.Hash + subtree hash.Hash included map[hash.Hash]*proofNode size uint64 } // NewProofBuilder creates a new Merkle proof builder for the given root. -func NewProofBuilder(root hash.Hash) *ProofBuilder { +func NewProofBuilder(root, subtree hash.Hash) *ProofBuilder { return &ProofBuilder{ root: root, + subtree: subtree, included: make(map[hash.Hash]*proofNode), } } @@ -94,14 +96,14 @@ func (b *ProofBuilder) Include(n node.Node) { b.size += 1 + uint64(len(pn.serialized)) } -// HasRoot returns true if the root node has already been included. -func (b *ProofBuilder) HasRoot() bool { - return b.included[b.root] != nil +// HasSubtree returns true if the subtree root node has already been included. +func (b *ProofBuilder) HasSubtreeRoot() bool { + return b.included[b.subtree] != nil } -// GetRoot returns the root hash for this proof. -func (b *ProofBuilder) GetRoot() hash.Hash { - return b.root +// GetSubtree returns the subtree root hash for this proof. +func (b *ProofBuilder) GetSubtreeRoot() hash.Hash { + return b.subtree } // Size returns the current size of this proof. @@ -111,10 +113,17 @@ func (b *ProofBuilder) Size() uint64 { // Build tries to build the proof. func (b *ProofBuilder) Build(ctx context.Context) (*Proof, error) { - proof := Proof{ - UntrustedRoot: b.root, + var proof Proof + switch b.HasSubtreeRoot() { + case true: + // A partial proof for the subtree is available, include that. + proof.UntrustedRoot = b.subtree + case false: + // No partial proof available, we need to use the tree root. + proof.UntrustedRoot = b.root } - if err := b.build(ctx, &proof, b.root); err != nil { + + if err := b.build(ctx, &proof, proof.UntrustedRoot); err != nil { return nil, err } return &proof, nil diff --git a/go/storage/mkvs/syncer_test.go b/go/storage/mkvs/syncer_test.go index 27830f5014f..e928a2fabfe 100644 --- a/go/storage/mkvs/syncer_test.go +++ b/go/storage/mkvs/syncer_test.go @@ -31,9 +31,9 @@ func TestProof(t *testing.T) { require.NoError(err, "Commit") // Create a Merkle proof, starting at the root node. - builder := syncer.NewProofBuilder(rootHash) - require.False(builder.HasRoot(), "HasRoot should return false") - require.EqualValues(rootHash, builder.GetRoot(), "GetRoot should return correct root") + builder := syncer.NewProofBuilder(rootHash, rootHash) + require.False(builder.HasSubtreeRoot(), "HasSubtreeRoot should return false") + require.EqualValues(rootHash, builder.GetSubtreeRoot(), "GetSubtreeRoot should return correct root") rootOnlyProof, err := builder.Build(ctx) require.NoError(err, "Build should not fail without a root present") @@ -44,7 +44,7 @@ func TestProof(t *testing.T) { // Include root node. rootNode := tree.cache.pendingRoot.Node builder.Include(rootNode) - require.True(builder.HasRoot(), "HasRoot should return true after root included") + require.True(builder.HasSubtreeRoot(), "HasRoot should return true after root included") proof, err := builder.Build(ctx) require.NoError(err, "Build should not fail") @@ -107,7 +107,7 @@ func TestProof(t *testing.T) { // Empty root proof should verify. var emptyHash hash.Hash emptyHash.Empty() - builder = syncer.NewProofBuilder(emptyHash) + builder = syncer.NewProofBuilder(emptyHash, emptyHash) emptyRootProof, err := builder.Build(ctx) require.NoError(err, "Build should not fail for an empty root") emptyRootPtr, err := pv.VerifyProof(ctx, emptyHash, emptyRootProof) diff --git a/go/storage/mkvs/tree_test.go b/go/storage/mkvs/tree_test.go index 78a30422d73..dd4e4dacf46 100644 --- a/go/storage/mkvs/tree_test.go +++ b/go/storage/mkvs/tree_test.go @@ -1634,6 +1634,46 @@ func testPruneLoneRootsShared2(t *testing.T, ndb db.NodeDB, factory NodeDBFactor require.NoError(t, it.Err(), "tree should still be consistent") } +func testPruneLoneRootsShared3(t *testing.T, ndb db.NodeDB, factory NodeDBFactory) { + require := require.New(t) + ctx := context.Background() + + // Create a root in version 0. + tree := New(nil, ndb) + err := tree.Insert(ctx, []byte("foo"), []byte("bar")) + require.NoError(err, "Insert") + _, _, err = tree.Commit(ctx, testNs, 0) + require.NoError(err, "Commit") + + // Create another root in version 0. + tree = New(nil, ndb) + err = tree.Insert(ctx, []byte("moo"), []byte("goo")) + require.NoError(err, "Insert") + _, rootHashR0_2, err := tree.Commit(ctx, testNs, 0) + require.NoError(err, "Commit") + + // Create the same root as the first root in version 1. + tree = New(nil, ndb) + err = tree.Insert(ctx, []byte("foo"), []byte("bar")) + require.NoError(err, "Insert") + _, rootHashR1_1, err := tree.Commit(ctx, testNs, 1) + require.NoError(err, "Commit") + + // Finalize version 0 with the second root. + err = ndb.Finalize(ctx, 0, []hash.Hash{rootHashR0_2}) + require.NoError(err, "Finalize") + + // Make sure that the first root in version 1 is still valid. + tree = NewWithRoot(nil, ndb, node.Root{ + Namespace: testNs, + Version: 1, + Hash: rootHashR1_1, + }) + value, err := tree.Get(ctx, []byte("foo")) + require.NoError(err, "Get") + require.EqualValues([]byte("bar"), value) +} + func testPruneLoneRoots(t *testing.T, ndb db.NodeDB, factory NodeDBFactory) { ctx := context.Background() @@ -2064,6 +2104,7 @@ func testBackend( {"PruneLoneRoots", testPruneLoneRoots}, {"PruneLoneRootsShared", testPruneLoneRootsShared}, {"PruneLoneRootsShared2", testPruneLoneRootsShared2}, + {"PruneLoneRootsShared3", testPruneLoneRootsShared3}, {"PruneForkedRoots", testPruneForkedRoots}, {"SpecialCase1", testSpecialCase1}, {"SpecialCase2", testSpecialCase2}, diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index 1380cad84ea..cfd4f368d1c 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -35,7 +35,7 @@ use crate::{ storage::{ mkvs::{ sync::{HostReadSyncer, NoopReadSyncer}, - Root, Tree, + OverlayTree, Root, Tree, }, StorageContext, }, @@ -321,7 +321,8 @@ impl Dispatcher { protocol.clone(), )); let txn_ctx = TxnContext::new(ctx.clone(), &block.header, check_only); - match StorageContext::enter(&mut cache.mkvs, untrusted_local.clone(), || { + let mut overlay = OverlayTree::new(&mut cache.mkvs); + match StorageContext::enter(&mut overlay, untrusted_local.clone(), || { txn_dispatcher.dispatch_batch(&inputs, txn_ctx) }) { Err(error) => { @@ -347,14 +348,14 @@ impl Dispatcher { .unwrap(); } else { // Finalize state. - let (state_write_log, new_state_root) = cache - .mkvs - .commit( + let (state_write_log, new_state_root) = overlay + .commit_both( Context::create_child(&ctx), block.header.namespace, block.header.round + 1, ) .expect("state commit must succeed"); + txn_dispatcher.finalize(new_state_root); cache.commit(block.header.round + 1, new_state_root); @@ -503,13 +504,14 @@ impl Dispatcher { // Request, dispatch. let ctx = ctx.freeze(); let mut mkvs = Tree::make().new(Box::new(NoopReadSyncer)); + let mut overlay = OverlayTree::new(&mut mkvs); let untrusted_local = Arc::new(ProtocolUntrustedLocalStorage::new( Context::create_child(&ctx), protocol.clone(), )); let rpc_ctx = RpcContext::new(ctx.clone(), self.rak.clone(), session_info); let response = - StorageContext::enter(&mut mkvs, untrusted_local.clone(), || { + StorageContext::enter(&mut overlay, untrusted_local.clone(), || { rpc_dispatcher.dispatch(req, rpc_ctx) }); let response = RpcMessage::Response(response); @@ -584,12 +586,13 @@ impl Dispatcher { // Request, dispatch. let ctx = ctx.freeze(); let mut mkvs = Tree::make().new(Box::new(NoopReadSyncer)); + let mut overlay = OverlayTree::new(&mut mkvs); let untrusted_local = Arc::new(ProtocolUntrustedLocalStorage::new( Context::create_child(&ctx), protocol.clone(), )); let rpc_ctx = RpcContext::new(ctx.clone(), self.rak.clone(), None); - let response = StorageContext::enter(&mut mkvs, untrusted_local.clone(), || { + let response = StorageContext::enter(&mut overlay, untrusted_local.clone(), || { rpc_dispatcher.dispatch_local(req, rpc_ctx) }); let response = RpcMessage::Response(response); diff --git a/runtime/src/storage/context.rs b/runtime/src/storage/context.rs index 61f2e610f0b..2273b22f925 100644 --- a/runtime/src/storage/context.rs +++ b/runtime/src/storage/context.rs @@ -20,8 +20,12 @@ struct CtxGuard; impl CtxGuard { fn new(mkvs: &mut M, untrusted_local: Arc) -> Self where - M: MKVS + 'static, + M: MKVS, { + // This is safe because the reference is only valid within StorageContext::enter within + // the same thread. + let mkvs = unsafe { std::mem::transmute::<&mut dyn MKVS, &mut (dyn MKVS + 'static)>(mkvs) }; + CTX.with(|ctx| { assert!(ctx.borrow().is_none(), "nested enter is not allowed"); ctx.borrow_mut().replace(Ctx { @@ -49,7 +53,7 @@ impl StorageContext { /// Enter the storage context. pub fn enter(mkvs: &mut M, untrusted_local: Arc, f: F) -> R where - M: MKVS + 'static, + M: MKVS, F: FnOnce() -> R, { let _guard = CtxGuard::new(mkvs, untrusted_local); @@ -68,6 +72,8 @@ impl StorageContext { CTX.with(|ctx| { let ctx = ctx.borrow(); let ctx_ref = ctx.as_ref().expect("must only be called while entered"); + // This is safe because the reference is only valid within StorageContext::enter within + // the same thread. let mkvs_ref = unsafe { ctx_ref.mkvs.as_mut().expect("pointer is never null") }; f(mkvs_ref, &ctx_ref.untrusted_local) diff --git a/runtime/src/storage/mkvs/mod.rs b/runtime/src/storage/mkvs/mod.rs index 403bd6c57f3..ce0911e76dc 100644 --- a/runtime/src/storage/mkvs/mod.rs +++ b/runtime/src/storage/mkvs/mod.rs @@ -1,7 +1,10 @@ //! Merklized key-value store. -use std::ops::{Deref, DerefMut}; +use std::{ + iter, + ops::{Deref, DerefMut}, +}; -use anyhow::Result; +use anyhow::{Error, Result}; use base64; use io_context::Context; use serde::{self, ser::SerializeSeq, Deserialize, Serialize, Serializer}; @@ -19,7 +22,7 @@ pub mod sync; #[cfg(test)] mod tests; -pub use tree::{Depth, Key, NodeBox, Root, Tree}; +pub use tree::{Depth, Key, NodeBox, OverlayTree, Root, Tree}; /// The type of entry in the log. #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -115,7 +118,7 @@ impl From> for Prefix { } /// Merklized key-value store. -pub trait MKVS: Send + Sync { +pub trait MKVS { /// Fetch entry with given key. fn get(&self, ctx: Context, key: &[u8]) -> Option>; @@ -143,6 +146,9 @@ pub trait MKVS: Send + Sync { /// Populate the in-memory tree with nodes for keys starting with given prefixes. fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16); + /// Returns an iterator over the tree. + fn iter(&self, ctx: Context) -> Box; + /// Commit all database changes to the underlying store. fn commit( &mut self, @@ -150,9 +156,99 @@ pub trait MKVS: Send + Sync { namespace: Namespace, version: u64, ) -> Result<(WriteLog, Hash)>; +} + +/// Merklized key-value store where methods return errors instead of panicking. +pub trait FallibleMKVS { + /// Fetch entry with given key. + fn get(&self, ctx: Context, key: &[u8]) -> Result>>; + + /// Check if the local MKVS cache contains the given key. + /// + /// While get can be used to check if the MKVS as a whole contains + /// a given key, this function specifically guarantees that no remote + /// syncing will be invoked, only checking the local cache. + fn cache_contains_key(&self, ctx: Context, key: &[u8]) -> bool; + + /// Update entry with given key. + /// + /// If the database did not have this key present, [`None`] is returned. + /// + /// If the database did have this key present, the value is updated, and the old value is + /// returned. + /// + /// [`None`]: std::option::Option + fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Result>>; + + /// Remove entry with given key, returning the value at the key if the key was previously + /// in the database. + fn remove(&mut self, ctx: Context, key: &[u8]) -> Result>>; + + /// Populate the in-memory tree with nodes for keys starting with given prefixes. + fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16) -> Result<()>; + + /// Returns an iterator over the tree. + fn iter(&self, ctx: Context) -> Box; - /// Rollback any pending changes. - fn rollback(&mut self); + /// Commit all database changes to the underlying store. + fn commit(&mut self, ctx: Context, namespace: Namespace, version: u64) -> Result; +} + +/// An MKVS iterator. +pub trait Iterator: iter::Iterator, Vec)> { + /// Sets the number of next elements to prefetch. + fn set_prefetch(&mut self, prefetch: usize); + + /// Return whether the iterator is valid. + fn is_valid(&self) -> bool; + + /// Return the error that occurred during iteration if any. + fn error(&self) -> &Option; + + /// Moves the iterator to the first key in the tree. + fn rewind(&mut self); + + /// Moves the iterator either at the given key or at the next larger key. + fn seek(&mut self, key: &[u8]); + + /// The key under the iterator. + fn get_key(&self) -> &Option; + + /// The value under the iterator. + fn get_value(&self) -> &Option>; + + /// Advance the iterator to the next key. + fn next(&mut self); +} + +impl FallibleMKVS for &mut T { + fn get(&self, ctx: Context, key: &[u8]) -> Result>> { + T::get(self, ctx, key) + } + + fn cache_contains_key(&self, ctx: Context, key: &[u8]) -> bool { + T::cache_contains_key(self, ctx, key) + } + + fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Result>> { + T::insert(self, ctx, key, value) + } + + fn remove(&mut self, ctx: Context, key: &[u8]) -> Result>> { + T::remove(self, ctx, key) + } + + fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16) -> Result<()> { + T::prefetch_prefixes(self, ctx, prefixes, limit) + } + + fn iter(&self, ctx: Context) -> Box { + T::iter(self, ctx) + } + + fn commit(&mut self, ctx: Context, namespace: Namespace, version: u64) -> Result { + T::commit(self, ctx, namespace, version) + } } #[cfg(test)] diff --git a/runtime/src/storage/mkvs/sync/test.rs b/runtime/src/storage/mkvs/sync/test.rs index f45b068b824..c0a2a97cc75 100644 --- a/runtime/src/storage/mkvs/sync/test.rs +++ b/runtime/src/storage/mkvs/sync/test.rs @@ -36,7 +36,7 @@ fn test_nil_pointers() { // Verify at least one null pointer somewhere. //println!("full tree: {:#?}", tree); - let (_, root) = + let root = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); server.apply(&write_log, root, Default::default(), 0); diff --git a/runtime/src/storage/mkvs/tree/commit.rs b/runtime/src/storage/mkvs/tree/commit.rs index 242db3f0c35..15e2d341078 100644 --- a/runtime/src/storage/mkvs/tree/commit.rs +++ b/runtime/src/storage/mkvs/tree/commit.rs @@ -5,18 +5,13 @@ use io_context::Context; use crate::{ common::{crypto::hash::Hash, roothash::Namespace}, - storage::mkvs::{cache::*, tree::*, LogEntry, WriteLog}, + storage::mkvs::{cache::*, tree::*}, }; impl Tree { /// Commit tree updates to the underlying database and return /// the write log and new merkle root. - pub fn commit( - &mut self, - ctx: Context, - namespace: Namespace, - version: u64, - ) -> Result<(WriteLog, Hash)> { + pub fn commit(&mut self, ctx: Context, namespace: Namespace, version: u64) -> Result { let ctx = ctx.freeze(); let mut update_list: UpdateList = UpdateList::new(); let pending_root = self.cache.borrow().get_pending_root(); @@ -24,26 +19,13 @@ impl Tree { update_list.commit(&mut self.cache.borrow_mut()); - let mut log: WriteLog = Vec::new(); - for (_, entry) in self.pending_write_log.iter() { - // Skip all entries that do not exist after all the updates and - // did not exist before. - if entry.value.is_none() && !entry.existed { - continue; - } - log.push(LogEntry { - key: entry.key.clone(), - value: entry.value.clone(), - }); - } - self.pending_write_log.clear(); self.cache.borrow_mut().set_sync_root(Root { namespace, version, hash: new_hash, }); - Ok((log, new_hash)) + Ok(new_hash) } } diff --git a/runtime/src/storage/mkvs/tree/insert.rs b/runtime/src/storage/mkvs/tree/insert.rs index ec1c3cc8d94..55427236f65 100644 --- a/runtime/src/storage/mkvs/tree/insert.rs +++ b/runtime/src/storage/mkvs/tree/insert.rs @@ -20,22 +20,6 @@ impl Tree { let (new_root, old_val) = self._insert(&ctx, pending_root, 0, &boxed_key, boxed_val.clone(), 0)?; - let existed = old_val != None; - match self.pending_write_log.get_mut(&boxed_key) { - None => { - self.pending_write_log.insert( - boxed_key, - PendingLogEntry { - key: key.to_vec(), - value: Some(boxed_val.clone()), - existed: existed, - }, - ); - } - Some(ref mut entry) => { - entry.value = Some(boxed_val.clone()); - } - }; self.cache.borrow_mut().set_pending_root(new_root.clone()); Ok(old_val) diff --git a/runtime/src/storage/mkvs/tree/iterator.rs b/runtime/src/storage/mkvs/tree/iterator.rs index 0d4b7b29ed6..43aa1302ab2 100644 --- a/runtime/src/storage/mkvs/tree/iterator.rs +++ b/runtime/src/storage/mkvs/tree/iterator.rs @@ -4,7 +4,7 @@ use std::{collections::VecDeque, fmt, iter::Iterator, mem::replace, sync::Arc}; use anyhow::{Error, Result}; use io_context::Context; -use crate::storage::mkvs::{cache::*, sync::*, tree::*}; +use crate::storage::mkvs::{self, cache::*, sync::*, tree::*}; pub(super) struct FetcherSyncIterate<'a> { key: &'a Key, @@ -93,53 +93,12 @@ impl<'tree> TreeIterator<'tree> { } } - /// Sets the number of next elements to prefetch. - pub fn set_prefetch(&mut self, prefetch: usize) { - self.prefetch = prefetch; - } - fn reset(&mut self) { self.pos.clear(); self.key = None; self.value = None; } - /// Return whether the iterator is valid. - pub fn is_valid(&self) -> bool { - self.key.is_some() - } - - /// Return the error that occurred during iteration if any. - pub fn error(&self) -> &Option { - &self.error - } - - /// Move the iterator to the first key in the tree. - pub fn rewind(&mut self) { - self.seek(&[]) - } - - /// Moves the iterator either at the given key or at the next larger - /// key. - pub fn seek(&mut self, key: &[u8]) { - if self.error.is_some() { - return; - } - - self.reset(); - let pending_root = self.tree.cache.borrow().get_pending_root(); - if let Err(error) = self._next( - pending_root, - 0, - Key::new(), - key.to_vec(), - VisitState::Before, - ) { - self.error = Some(error); - self.reset(); - } - } - fn next(&mut self) { if self.error.is_some() { return; @@ -329,32 +288,89 @@ impl<'tree> Iterator for TreeIterator<'tree> { type Item = (Vec, Vec); fn next(&mut self) -> Option { + use mkvs::Iterator; + if !self.is_valid() { return None; } let key = self.key.as_ref().expect("iterator is valid").clone(); let value = self.value.as_ref().expect("iterator is valid").clone(); - self.next(); + TreeIterator::next(self); Some((key, value)) } } +impl<'tree> mkvs::Iterator for TreeIterator<'tree> { + fn set_prefetch(&mut self, prefetch: usize) { + self.prefetch = prefetch; + } + + fn is_valid(&self) -> bool { + self.key.is_some() + } + + fn error(&self) -> &Option { + &self.error + } + + fn rewind(&mut self) { + self.seek(&[]) + } + + fn seek(&mut self, key: &[u8]) { + if self.error.is_some() { + return; + } + + self.reset(); + let pending_root = self.tree.cache.borrow().get_pending_root(); + if let Err(error) = self._next( + pending_root, + 0, + Key::new(), + key.to_vec(), + VisitState::Before, + ) { + self.error = Some(error); + self.reset(); + } + } + + fn get_key(&self) -> &Option { + &self.key + } + + fn get_value(&self) -> &Option> { + &self.value + } + + fn next(&mut self) { + TreeIterator::next(self) + } +} + impl Tree { - /// Returns an iterator over the tree. + /// Return an iterator over the tree. pub fn iter(&self, ctx: Context) -> TreeIterator { TreeIterator::new(ctx, self) } } #[cfg(test)] -mod test { +pub(super) mod test { + use std::iter; + use io_context::Context; use rustc_hex::FromHex; use super::{tree_test::generate_key_value_pairs_ex, *}; - use crate::storage::mkvs::interop::{Driver, ProtocolServer}; + use crate::storage::mkvs::{ + self, + interop::{Driver, ProtocolServer}, + Iterator, + }; #[test] fn test_iterator() { @@ -411,8 +427,17 @@ mod test { test_iterator_with(&items, it, &tests); // Remote. - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let hash = tree + .commit(Context::background(), Default::default(), 0) + .expect("commit"); + let write_log = items + .iter() + .cloned() + .map(|(key, value)| mkvs::LogEntry { + key, + value: Some(value), + }) + .collect(); server.apply(&write_log, hash, Default::default(), 0); let remote_tree = Tree::make() @@ -522,7 +547,7 @@ mod test { .unwrap(); it.seek(&missing_key); assert!(it.is_valid(), "iterator should be valid"); - let item = Iterator::next(&mut it); + let item = iter::Iterator::next(&mut it); assert_eq!( Some((items[0].0.clone(), b"value".to_vec())), item, @@ -534,9 +559,11 @@ mod test { fn test_iterator_eviction() { let server = ProtocolServer::new(); - let mut tree = Tree::make() - .with_capacity(0, 0) - .new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new( + Tree::make() + .with_capacity(0, 0) + .new(Box::new(NoopReadSyncer)), + ); let (keys, values) = generate_key_value_pairs_ex("T".to_owned(), 100); let items: Vec<(Vec, Vec)> = keys.into_iter().zip(values.into_iter()).collect(); @@ -544,8 +571,9 @@ mod test { tree.insert(Context::background(), &key, &value).unwrap(); } - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); server.apply(&write_log, hash, Default::default(), 0); // Create a remote tree with limited cache capacity so that nodes will @@ -576,9 +604,9 @@ mod test { assert_eq!(2, stats.sync_iterate_count, "sync_iterate_count"); } - fn test_iterator_with( + pub(in super::super) fn test_iterator_with( items: &Vec<(Vec, Vec)>, - mut it: TreeIterator, + mut it: I, tests: &Vec<(Vec, isize)>, ) { // Iterate through the whole tree. @@ -605,7 +633,7 @@ mod test { } for expected in &items[*pos as usize..] { - let item = Iterator::next(&mut it); + let item = iter::Iterator::next(&mut it); assert_eq!( Some(expected.clone()), item, diff --git a/runtime/src/storage/mkvs/tree/lookup.rs b/runtime/src/storage/mkvs/tree/lookup.rs index c75b6f6dd99..abbfff72881 100644 --- a/runtime/src/storage/mkvs/tree/lookup.rs +++ b/runtime/src/storage/mkvs/tree/lookup.rs @@ -62,11 +62,6 @@ impl Tree { let boxed_key = key.to_vec(); let pending_root = self.cache.borrow().get_pending_root(); - // If the key has been modified locally, no need to perform any lookups. - if let Some(PendingLogEntry { ref value, .. }) = self.pending_write_log.get(&boxed_key) { - return Ok(value.clone()); - } - // Remember where the path from root to target node ends (will end). self.cache.borrow_mut().mark_position(); diff --git a/runtime/src/storage/mkvs/tree/mkvs.rs b/runtime/src/storage/mkvs/tree/mkvs.rs deleted file mode 100644 index e2ae4a618c1..00000000000 --- a/runtime/src/storage/mkvs/tree/mkvs.rs +++ /dev/null @@ -1,59 +0,0 @@ -use anyhow::Result; -use io_context::Context; - -use crate::{ - common::{crypto::hash::Hash, roothash::Namespace}, - storage::mkvs::{tree::*, Prefix, WriteLog, MKVS}, -}; - -unsafe impl Send for Tree {} -unsafe impl Sync for Tree {} - -// TODO: We should likely change the MKVS interface to propagate errors instead of unwrapping. - -impl MKVS for Tree { - fn get(&self, ctx: Context, key: &[u8]) -> Option> { - let _lock = self.lock.lock().unwrap(); - self.get(ctx, key).unwrap() - } - - fn cache_contains_key(&self, ctx: Context, key: &[u8]) -> bool { - let _lock = self.lock.lock().unwrap(); - self.cache_contains_key(ctx, key) - } - - fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Option> { - let lock = self.lock.clone(); - let _guard = lock.lock().unwrap(); - self.insert(ctx, key, value).unwrap() - } - - fn remove(&mut self, ctx: Context, key: &[u8]) -> Option> { - let lock = self.lock.clone(); - let _guard = lock.lock().unwrap(); - self.remove(ctx, key).unwrap() - } - - fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16) { - let lock = self.lock.clone(); - let _guard = lock.lock().unwrap(); - self.prefetch_prefixes(ctx, prefixes, limit).unwrap() - } - - fn commit( - &mut self, - ctx: Context, - namespace: Namespace, - version: u64, - ) -> Result<(WriteLog, Hash)> { - let lock = self.lock.clone(); - let _guard = lock.lock().unwrap(); - Tree::commit(self, ctx, namespace, version) - } - - fn rollback(&mut self) { - let lock = self.lock.clone(); - let _guard = lock.lock().unwrap(); - self.pending_write_log.clear(); - } -} diff --git a/runtime/src/storage/mkvs/tree/mod.rs b/runtime/src/storage/mkvs/tree/mod.rs index 7ba6b695612..fa28c3fb312 100644 --- a/runtime/src/storage/mkvs/tree/mod.rs +++ b/runtime/src/storage/mkvs/tree/mod.rs @@ -7,8 +7,8 @@ mod insert; mod iterator; mod lookup; mod marshal; -mod mkvs; mod node; +mod overlay; mod prefetch; mod remove; mod tree; @@ -18,6 +18,7 @@ pub use errors::*; pub use insert::*; pub use iterator::*; pub use node::*; +pub use overlay::*; pub use remove::*; pub use tree::*; diff --git a/runtime/src/storage/mkvs/tree/overlay.rs b/runtime/src/storage/mkvs/tree/overlay.rs new file mode 100644 index 00000000000..8a93e602b11 --- /dev/null +++ b/runtime/src/storage/mkvs/tree/overlay.rs @@ -0,0 +1,438 @@ +use std::{ + collections::{btree_map, BTreeMap, HashSet}, + iter::{Iterator, Peekable}, +}; + +use anyhow::{Error, Result}; +use io_context::Context; + +use crate::{ + common::{crypto::hash::Hash, roothash::Namespace}, + storage::mkvs::{self, tree::*}, +}; + +/// A key-value tree overlay that holds all updates in memory and only commits them if requested. +/// This can be used to create snapshots that can be discarded. +/// +/// While updates (inserts, removes) are stored in the overlay, reads are not cached in the overlay +/// as the inner tree has its own cache and double caching makes less sense. +pub struct OverlayTree { + inner: T, + overlay: BTreeMap, Vec>, + dirty: HashSet>, +} + +impl OverlayTree { + /// Create a new overlay tree. + pub fn new(inner: T) -> Self { + Self { + inner, + overlay: BTreeMap::new(), + dirty: HashSet::new(), + } + } + + /// Get an existing key. + pub fn get(&self, ctx: Context, key: &[u8]) -> Result>> { + // For dirty values, check the overlay. + if self.dirty.contains(key) { + return Ok(self.overlay.get(key).map(|v| v.clone())); + } + + // Otherwise fetch from inner tree. + self.inner.get(ctx, key) + } + + /// Insert a key/value pair into the tree. + pub fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Result>> { + let previous = self.get(ctx, key)?; + + self.overlay.insert(key.to_owned(), value.to_owned()); + self.dirty.insert(key.to_owned()); + + Ok(previous) + } + + /// Remove entry with given key, returning the value at the key if the key was previously + /// in the database. + pub fn remove(&mut self, ctx: Context, key: &[u8]) -> Result>> { + // For dirty values, remove from the overlay. + if self.dirty.contains(key) { + return Ok(self.overlay.remove(key).map(|v| v.clone())); + } + + let value = self.inner.get(ctx, key)?; + + // Do not treat a value as dirty if it was not dirty before and did not exist in the inner tree. + if value.is_some() { + self.dirty.insert(key.to_owned()); + } + Ok(value) + } + + /// Return an iterator over the tree. + pub fn iter(&self, ctx: Context) -> OverlayTreeIterator { + OverlayTreeIterator::new(ctx, self) + } + + /// Commit any modifications to the underlying tree. + pub fn commit(&mut self, ctx: Context) -> Result { + let ctx = ctx.freeze(); + let mut log: mkvs::WriteLog = Vec::new(); + + // Insert all items present in the overlay. + for (key, value) in &self.overlay { + self.inner + .insert(Context::create_child(&ctx), &key, &value)?; + self.dirty.remove(key); + + log.push(mkvs::LogEntry { + key: key.clone(), + value: Some(value.clone()), + }); + } + self.overlay.clear(); + + // Any remaining dirty items must have been removed. + for key in &self.dirty { + self.inner.remove(Context::create_child(&ctx), key)?; + + log.push(mkvs::LogEntry { + key: key.clone(), + value: None, + }); + } + self.dirty.clear(); + + Ok(log) + } + + /// Commit any modifications to the underlying tree and then immediately commit the underlying + /// tree, returning the new root hash. + pub fn commit_both( + &mut self, + ctx: Context, + namespace: Namespace, + version: u64, + ) -> Result<(mkvs::WriteLog, Hash)> { + let ctx = ctx.freeze(); + // First commit modifications to the underlying tree. + let write_log = self.commit(Context::create_child(&ctx))?; + // Then commit the underlying tree. + let root_hash = self + .inner + .commit(Context::create_child(&ctx), namespace, version)?; + + Ok((write_log, root_hash)) + } +} + +/// An iterator over the `OverlayTree`. +pub struct OverlayTreeIterator<'tree, T: mkvs::FallibleMKVS> { + tree: &'tree OverlayTree, + + inner: Box, + overlay: Peekable, Vec>>, + overlay_valid: bool, + + key: Option>, + value: Option>, +} + +impl<'tree, T: mkvs::FallibleMKVS> OverlayTreeIterator<'tree, T> { + fn new(ctx: Context, tree: &'tree OverlayTree) -> Self { + Self { + tree, + inner: tree.inner.iter(ctx), + overlay: tree.overlay.range(vec![]..).peekable(), + overlay_valid: true, + key: None, + value: None, + } + } + + fn update_iterator_position(&mut self) { + // Skip over any dirty entries from the inner iterator. + loop { + if !self.inner.is_valid() + || !self + .tree + .dirty + .contains(self.inner.get_key().as_ref().expect("inner.is_valid")) + { + break; + } + self.inner.next(); + } + + let i_key = self.inner.get_key(); + let o_item = self.overlay.peek(); + self.overlay_valid = o_item.is_some(); + + if self.inner.is_valid() + && (!self.overlay_valid + || i_key.as_ref().expect("inner.is_valid") < o_item.expect("overlay_valid").0) + { + // Key of inner iterator is smaller than the key of the overlay iterator. + self.key = i_key.clone(); + self.value = self.inner.get_value().clone(); + } else if self.overlay_valid { + // Key of overlay iterator is smaller than or equal to the key of the inner iterator. + let (o_key, o_value) = o_item.expect("overlay_valid"); + self.key = Some(o_key.to_vec()); + self.value = Some(o_value.to_vec()); + } else { + // Both iterators are invalid. + self.key = None; + self.value = None; + } + } + + fn next(&mut self) { + if !self.overlay_valid + || (self.inner.is_valid() + && self.inner.get_key().as_ref().expect("inner.is_valid") + <= self.overlay.peek().expect("overlay_valid").0) + { + // Key of inner iterator is smaller or equal than the key of the overlay iterator. + self.inner.next(); + } else { + // Key of inner iterator is greater than the key of the overlay iterator. + self.overlay.next(); + } + + self.update_iterator_position(); + } +} + +impl<'tree, T: mkvs::FallibleMKVS> Iterator for OverlayTreeIterator<'tree, T> { + type Item = (Vec, Vec); + + fn next(&mut self) -> Option { + use mkvs::Iterator; + + if !self.is_valid() { + return None; + } + + let key = self.key.as_ref().expect("iterator is valid").clone(); + let value = self.value.as_ref().expect("iterator is valid").clone(); + OverlayTreeIterator::next(self); + + Some((key, value)) + } +} + +impl<'tree, T: mkvs::FallibleMKVS> mkvs::Iterator for OverlayTreeIterator<'tree, T> { + fn set_prefetch(&mut self, prefetch: usize) { + self.inner.set_prefetch(prefetch) + } + + fn is_valid(&self) -> bool { + // If either iterator is valid, the merged iterator is valid. + self.inner.is_valid() || self.overlay_valid + } + + fn error(&self) -> &Option { + self.inner.error() + } + + fn rewind(&mut self) { + self.seek(&[]); + } + + fn seek(&mut self, key: &[u8]) { + self.inner.seek(key); + self.overlay = self.tree.overlay.range(key.to_vec()..).peekable(); + + self.update_iterator_position(); + } + + fn get_key(&self) -> &Option { + &self.key + } + + fn get_value(&self) -> &Option> { + &self.value + } + + fn next(&mut self) { + OverlayTreeIterator::next(self) + } +} + +impl mkvs::MKVS for OverlayTree { + fn get(&self, ctx: Context, key: &[u8]) -> Option> { + self.get(ctx, key).unwrap() + } + + fn cache_contains_key(&self, ctx: Context, key: &[u8]) -> bool { + // For dirty values, check the overlay. + if self.dirty.contains(key) { + return self.overlay.contains_key(key); + } + self.inner.cache_contains_key(ctx, key) + } + + fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Option> { + self.insert(ctx, key, value).unwrap() + } + + fn remove(&mut self, ctx: Context, key: &[u8]) -> Option> { + self.remove(ctx, key).unwrap() + } + + fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16) { + self.inner.prefetch_prefixes(ctx, prefixes, limit).unwrap() + } + + fn iter(&self, ctx: Context) -> Box { + Box::new(self.iter(ctx)) + } + + fn commit( + &mut self, + ctx: Context, + namespace: Namespace, + version: u64, + ) -> Result<(mkvs::WriteLog, Hash)> { + self.commit_both(ctx, namespace, version) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::mkvs::{sync::NoopReadSyncer, tree::iterator::test::test_iterator_with}; + + #[test] + fn test_overlay() { + let mut tree = Tree::make().new(Box::new(NoopReadSyncer)); + + // Generate some items. + let items = vec![ + (b"key".to_vec(), b"first".to_vec()), + (b"key 1".to_vec(), b"one".to_vec()), + (b"key 2".to_vec(), b"two".to_vec()), + (b"key 5".to_vec(), b"five".to_vec()), + (b"key 8".to_vec(), b"eight".to_vec()), + (b"key 9".to_vec(), b"nine".to_vec()), + ]; + + let tests = vec![ + (b"k".to_vec(), 0), + (b"key 1".to_vec(), 1), + (b"key 3".to_vec(), 3), + (b"key 4".to_vec(), 3), + (b"key 5".to_vec(), 3), + (b"key 6".to_vec(), 4), + (b"key 7".to_vec(), 4), + (b"key 8".to_vec(), 4), + (b"key 9".to_vec(), 5), + (b"key A".to_vec(), -1), + ]; + + // Create an overlay over an empty tree and insert some items into the overlay. + let mut overlay = OverlayTree::new(&mut tree); + for (key, value) in items.iter() { + overlay.insert(Context::background(), key, value).unwrap(); + } + + // Test that an overlay-only iterator works correctly. + let it = overlay.iter(Context::background()); + test_iterator_with(&items, it, &tests); + + // Insert some items into the underlying tree. + for (key, value) in items.iter() { + tree.insert(Context::background(), key, value).unwrap(); + } + + // Create a tree pointer so we can unsafely peek into the tree later. + let tree_ref = &tree as *const Tree; + + // Create an overlay. + let mut overlay = OverlayTree::new(&mut tree); + + // Test that all keys can be fetched from an empty overlay. + for (k, expected_v) in &items { + let v = overlay.get(Context::background(), &k).unwrap(); + assert_eq!(v.as_ref(), Some(expected_v)); + } + + // Test that merged iterator works correctly on an empty overlay (it should behave exactly + // the same as for the inner tree). + let it = overlay.iter(Context::background()); + test_iterator_with(&items, it, &tests); + + // Add some updates to the overlay. + overlay.remove(Context::background(), b"key 2").unwrap(); + overlay + .insert(Context::background(), b"key 7", b"seven") + .unwrap(); + overlay.remove(Context::background(), b"key 5").unwrap(); + overlay + .insert(Context::background(), b"key 5", b"fivey") + .unwrap(); + + // Make sure updates did not propagate to the inner tree. + // NOTE: This is unsafe as we are otherwise not allowed to reference the inner tree. + unsafe { + let tree_ref = &*tree_ref; + + let value = tree_ref.get(Context::background(), b"key 2").unwrap(); + assert_eq!( + value, + Some(b"two".to_vec()), + "value in inner tree should be unchanged" + ); + let value = tree_ref.get(Context::background(), b"key 7").unwrap(); + assert_eq!(value, None, "value should not exist in inner tree"); + } + + // State of overlay after updates. + let items = vec![ + (b"key".to_vec(), b"first".to_vec()), + (b"key 1".to_vec(), b"one".to_vec()), + (b"key 5".to_vec(), b"fivey".to_vec()), + (b"key 7".to_vec(), b"seven".to_vec()), + (b"key 8".to_vec(), b"eight".to_vec()), + (b"key 9".to_vec(), b"nine".to_vec()), + ]; + + let tests = vec![ + (b"k".to_vec(), 0), + (b"key 1".to_vec(), 1), + (b"key 3".to_vec(), 2), + (b"key 4".to_vec(), 2), + (b"key 5".to_vec(), 2), + (b"key 6".to_vec(), 3), + (b"key 7".to_vec(), 3), + (b"key 8".to_vec(), 4), + (b"key 9".to_vec(), 5), + (b"key A".to_vec(), -1), + ]; + + // Test that all keys can be fetched from an updated overlay. + for (k, expected_v) in &items { + let v = overlay.get(Context::background(), &k).unwrap(); + assert_eq!(v.as_ref(), Some(expected_v)); + } + + // Make sure that merged overlay iterator works. + let it = overlay.iter(Context::background()); + test_iterator_with(&items, it, &tests); + + // Commit the overlay. + overlay.commit(Context::background()).unwrap(); + + // Test that all keys can be fetched from an updated tree. + for (k, expected_v) in &items { + let v = tree.get(Context::background(), &k).unwrap(); + assert_eq!(v.as_ref(), Some(expected_v)); + } + + // Make sure that the updated tree is correct. + let it = tree.iter(Context::background()); + test_iterator_with(&items, it, &tests); + } +} diff --git a/runtime/src/storage/mkvs/tree/remove.rs b/runtime/src/storage/mkvs/tree/remove.rs index 33f5160572d..0ccabc59f05 100644 --- a/runtime/src/storage/mkvs/tree/remove.rs +++ b/runtime/src/storage/mkvs/tree/remove.rs @@ -8,36 +8,17 @@ use crate::storage::mkvs::{cache::*, tree::*}; use super::lookup::FetcherSyncGet; impl Tree { - /// Remove a key from the tree and return true if the tree was modified. + /// Remove entry with given key, returning the value at the key if the key was previously + /// in the database. pub fn remove(&mut self, ctx: Context, key: &[u8]) -> Result>> { let ctx = ctx.freeze(); let boxed_key = key.to_vec(); let pending_root = self.cache.borrow().get_pending_root(); - // If the key has already been removed locally, don't try to remove it again. - if let Some(PendingLogEntry { value: None, .. }) = self.pending_write_log.get(&boxed_key) { - return Ok(None); - } - // Remember where the path from root to target node ends (will end). self.cache.borrow_mut().mark_position(); - let (new_root, changed, old_val) = self._remove(&ctx, pending_root, 0, &boxed_key, 0)?; - match self.pending_write_log.get_mut(&boxed_key) { - None => { - self.pending_write_log.insert( - boxed_key.clone(), - PendingLogEntry { - key: boxed_key, - value: None, - existed: changed, - }, - ); - } - Some(ref mut entry) => { - entry.value = None; - } - }; + let (new_root, _, old_val) = self._remove(&ctx, pending_root, 0, &boxed_key, 0)?; self.cache.borrow_mut().set_pending_root(new_root); Ok(old_val) diff --git a/runtime/src/storage/mkvs/tree/tree.rs b/runtime/src/storage/mkvs/tree/tree.rs index ed4f0221b2f..bde5d5e969a 100644 --- a/runtime/src/storage/mkvs/tree/tree.rs +++ b/runtime/src/storage/mkvs/tree/tree.rs @@ -1,18 +1,12 @@ -use std::{ - cell::RefCell, - collections::BTreeMap, - fmt, - rc::Rc, - sync::{Arc, Mutex}, -}; +use std::{cell::RefCell, fmt, rc::Rc}; -use crate::storage::mkvs::{cache::*, sync::*, tree::*}; +use anyhow::Result; +use io_context::Context; -pub struct PendingLogEntry { - pub key: Vec, - pub value: Option>, - pub existed: bool, -} +use crate::{ + common::{crypto::hash::Hash, roothash::Namespace}, + storage::mkvs::{self, cache::*, sync::*, tree::*}, +}; /// A container for the parameters used to construct a new MKVS tree instance. pub struct Options { @@ -52,10 +46,11 @@ impl Options { /// A patricia tree-based MKVS implementation. pub struct Tree { pub(crate) cache: RefCell>, - pub(crate) pending_write_log: BTreeMap, - pub(crate) lock: Arc>, } +// Tree is Send as long as ownership of internal Rcs cannot leak out via any of its methods. +unsafe impl Send for Tree {} + impl Tree { /// Construct a new tree instance using the given read syncer and options struct. pub fn new(read_syncer: Box, opts: &Options) -> Tree { @@ -65,8 +60,6 @@ impl Tree { opts.value_capacity, read_syncer, )), - pending_write_log: BTreeMap::new(), - lock: Arc::new(Mutex::new(0)), }; if let Some(root) = opts.root { @@ -98,3 +91,38 @@ impl fmt::Debug for Tree { self.cache.borrow().get_pending_root().fmt(f) } } + +impl mkvs::FallibleMKVS for Tree { + fn get(&self, ctx: Context, key: &[u8]) -> Result>> { + Tree::get(self, ctx, key) + } + + fn cache_contains_key(&self, ctx: Context, key: &[u8]) -> bool { + Tree::cache_contains_key(self, ctx, key) + } + + fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Result>> { + Tree::insert(self, ctx, key, value) + } + + fn remove(&mut self, ctx: Context, key: &[u8]) -> Result>> { + Tree::remove(self, ctx, key) + } + + fn prefetch_prefixes( + &self, + ctx: Context, + prefixes: &Vec, + limit: u16, + ) -> Result<()> { + Tree::prefetch_prefixes(self, ctx, prefixes, limit) + } + + fn iter(&self, ctx: Context) -> Box { + Box::new(Tree::iter(self, ctx)) + } + + fn commit(&mut self, ctx: Context, namespace: Namespace, version: u64) -> Result { + Tree::commit(self, ctx, namespace, version) + } +} diff --git a/runtime/src/storage/mkvs/tree/tree_test.rs b/runtime/src/storage/mkvs/tree/tree_test.rs index 0ccc4526c34..f6bc20a8539 100644 --- a/runtime/src/storage/mkvs/tree/tree_test.rs +++ b/runtime/src/storage/mkvs/tree/tree_test.rs @@ -1,6 +1,6 @@ use io_context::Context; use serde_json; -use std::{collections::HashSet, fs::File, io::BufReader, iter::FromIterator, path::Path}; +use std::{collections::HashSet, fs::File, io::BufReader, iter, iter::FromIterator, path::Path}; use crate::{ common::crypto::hash::Hash, @@ -10,7 +10,7 @@ use crate::{ sync::*, tests, tree::*, - LogEntry, LogEntryKind, WriteLog, + Iterator, LogEntry, LogEntryKind, WriteLog, MKVS, }, }; @@ -51,7 +51,7 @@ fn generate_long_key_value_pairs() -> (Vec>, Vec>) { #[test] fn test_basic() { - let mut tree = Tree::make().new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new(Tree::make().new(Box::new(NoopReadSyncer))); let key_zero = b"foo"; let value_zero = b"bar"; @@ -101,8 +101,9 @@ fn test_basic() { .expect("get_some"); assert_eq!(value.as_slice(), value_zero); - let (log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); assert_eq!( format!("{:?}", hash), "68e0c95d0dcb3a4ace95d1a64b8d7bb1dd08e3708abdca4068c1ccf32b7076d4" @@ -229,8 +230,9 @@ fn test_basic() { ); // Tree now has key_zero and key_one and should hash as if the mangling didn't happen. - let (log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); assert_eq!( format!("{:?}", hash), "821d13489eae34debd85117823058a143ee3c534e91828a0db8d48ecb2128b8c" @@ -264,8 +266,9 @@ fn test_basic() { .is_none() ); - let (log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); assert_eq!( format!("{:?}", hash), "68e0c95d0dcb3a4ace95d1a64b8d7bb1dd08e3708abdca4068c1ccf32b7076d4" @@ -306,7 +309,7 @@ fn test_long_keys() { ) .expect("insert"); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); roots.push(hash); } @@ -339,7 +342,7 @@ fn test_long_keys() { .expect("get") ); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, roots[i - 1]); } @@ -347,7 +350,7 @@ fn test_long_keys() { tree.remove(Context::background(), keys[0].as_slice()) .expect("remove"); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, Hash::empty_hash()); } @@ -436,7 +439,7 @@ fn test_empty_keys() { test_empty_key(&mut tree); test_zeroth_discriminator_bit(&mut tree); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); roots.push(hash); } @@ -462,7 +465,7 @@ fn test_empty_keys() { test_empty_key(&mut tree); test_zeroth_discriminator_bit(&mut tree); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, roots[i - 1]); } @@ -473,7 +476,7 @@ fn test_empty_keys() { test_empty_key(&mut tree); test_zeroth_discriminator_bit(&mut tree); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, Hash::empty_hash()); } @@ -498,7 +501,7 @@ fn test_insert_commit_batch() { assert_eq!(values[i], value.as_slice()); } - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(format!("{:?}", hash), ALL_ITEMS_ROOT); } @@ -527,7 +530,7 @@ fn test_insert_commit_each() { Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); } - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(format!("{:?}", hash), ALL_ITEMS_ROOT); } @@ -563,7 +566,7 @@ fn test_remove() { .expect("get_some"); assert_eq!(values[i], value.as_slice()); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); roots.push(hash); } @@ -588,14 +591,14 @@ fn test_remove() { .expect("get") ); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, roots[i - 1]); } tree.remove(Context::background(), keys[0].as_slice()) .expect("remove"); - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, Hash::empty_hash()); @@ -622,7 +625,7 @@ fn test_remove() { .expect("get_some"); assert_eq!(values[i], value.as_slice()); - let (_, _) = + let _ = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); } @@ -644,11 +647,11 @@ fn test_remove() { .expect("get") ); - let (_, _) = + let _ = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); } - let (_, hash) = + let hash = Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); assert_eq!(hash, Hash::empty_hash()); } @@ -657,9 +660,11 @@ fn test_remove() { fn test_syncer_basic() { let server = ProtocolServer::new(); - let mut tree = Tree::make() - .with_capacity(0, 0) - .new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new( + Tree::make() + .with_capacity(0, 0) + .new(Box::new(NoopReadSyncer)), + ); let (keys, values) = generate_key_value_pairs(); for i in 0..keys.len() { @@ -671,8 +676,9 @@ fn test_syncer_basic() { .expect("insert"); } - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); assert_eq!(format!("{:?}", hash), ALL_ITEMS_ROOT); server.apply(&write_log, hash, Default::default(), 0); @@ -712,9 +718,11 @@ fn test_syncer_basic() { fn test_syncer_remove() { let server = ProtocolServer::new(); - let mut tree = Tree::make() - .with_capacity(0, 0) - .new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new( + Tree::make() + .with_capacity(0, 0) + .new(Box::new(NoopReadSyncer)), + ); let mut roots: Vec = Vec::new(); let mut write_log = WriteLog::new(); @@ -727,8 +735,9 @@ fn test_syncer_remove() { ) .expect("insert"); - let (mut wl, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (mut wl, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); roots.push(hash); write_log.append(&mut wl); } @@ -751,7 +760,7 @@ fn test_syncer_remove() { .expect("remove"); } - let (_, hash) = Tree::commit( + let hash = Tree::commit( &mut remote_tree, Context::background(), Default::default(), @@ -775,9 +784,11 @@ fn test_syncer_remove() { fn test_syncer_insert() { let server = ProtocolServer::new(); - let mut tree = Tree::make() - .with_capacity(0, 0) - .new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new( + Tree::make() + .with_capacity(0, 0) + .new(Box::new(NoopReadSyncer)), + ); let (keys, values) = generate_key_value_pairs(); for i in 0..keys.len() { @@ -789,8 +800,9 @@ fn test_syncer_insert() { .expect("insert"); } - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); server.apply(&write_log, hash, Default::default(), 0); let stats = StatsCollector::new(server.read_sync()); @@ -827,9 +839,11 @@ fn test_syncer_insert() { fn test_syncer_writelog_remove() { let server = ProtocolServer::new(); - let mut tree = Tree::make() - .with_capacity(0, 0) - .new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new( + Tree::make() + .with_capacity(0, 0) + .new(Box::new(NoopReadSyncer)), + ); let (keys, values) = generate_key_value_pairs(); for i in 0..keys.len() { @@ -841,16 +855,18 @@ fn test_syncer_writelog_remove() { .expect("insert"); } - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); server.apply(&write_log, hash, Default::default(), 0); tree.remove(Context::background(), keys[0].as_slice()) .expect("remove"); let previous_hash = hash; - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); // Submit the write log to the protocol server. This will fail in case the server interprets the // write log differently. server.apply_existing(&write_log, previous_hash, hash, Default::default(), 0); @@ -860,9 +876,11 @@ fn test_syncer_writelog_remove() { fn test_syncer_prefetch_prefixes() { let server = ProtocolServer::new(); - let mut tree = Tree::make() - .with_capacity(0, 0) - .new(Box::new(NoopReadSyncer)); + let mut tree = OverlayTree::new( + Tree::make() + .with_capacity(0, 0) + .new(Box::new(NoopReadSyncer)), + ); let (keys, values) = generate_key_value_pairs(); for i in 0..keys.len() { @@ -874,8 +892,9 @@ fn test_syncer_prefetch_prefixes() { .expect("insert"); } - let (write_log, hash) = - Tree::commit(&mut tree, Context::background(), Default::default(), 0).expect("commit"); + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); server.apply(&write_log, hash, Default::default(), 0); let stats = StatsCollector::new(server.read_sync()); @@ -997,12 +1016,14 @@ fn test_special_case_from_json(fixture: &'static str) { let mut tree = Tree::make() .with_capacity(0, 0) .new(Box::new(NoopReadSyncer)); + let mut overlay = OverlayTree::new(&mut tree); let mut remote_tree: Option = None; let mut root = Hash::empty_hash(); - let mut commit_remote = |tree: &mut Tree, remote_tree: &mut Option| { - let (write_log, hash) = - Tree::commit(tree, Context::background(), Default::default(), 0).expect("commit"); + let mut commit_remote = |tree: &mut OverlayTree<_>, remote_tree: &mut Option| { + let (write_log, hash) = tree + .commit_both(Context::background(), Default::default(), 0) + .expect("commit"); server.apply_existing(&write_log, root, hash, Default::default(), 0); remote_tree.replace( @@ -1029,34 +1050,36 @@ fn test_special_case_from_json(fixture: &'static str) { .expect("insert"); } - tree.insert(Context::background(), &key, &value) + overlay + .insert(Context::background(), &key, &value) .expect("insert"); - commit_remote(&mut tree, &mut remote_tree); + commit_remote(&mut overlay, &mut remote_tree); } tests::OpKind::Remove => { let key = op.key.unwrap(); if let Some(ref mut remote_tree) = remote_tree { - remote_tree - .remove(Context::background(), &key) - .expect("remove"); - let value = remote_tree + // If we want a mutable remote synced tree, we must use an overlay. + let mut overlay = OverlayTree::new(remote_tree); + + overlay.remove(Context::background(), &key).expect("remove"); + let value = overlay .get(Context::background(), &key) .expect("get (after remove)"); assert!(value.is_none(), "get (after remove) should return None"); } - tree.remove(Context::background(), &key).expect("remove"); - let value = tree + overlay.remove(Context::background(), &key).expect("remove"); + let value = overlay .get(Context::background(), &key) .expect("get (after remove)"); assert!(value.is_none(), "get (after remove) should return None"); - commit_remote(&mut tree, &mut remote_tree); + commit_remote(&mut overlay, &mut remote_tree); } tests::OpKind::Get => { - let value = tree + let value = overlay .get(Context::background(), &op.key.unwrap()) .expect("get"); assert_eq!(value, op.value, "get should return the correct value"); @@ -1071,7 +1094,7 @@ fn test_special_case_from_json(fixture: &'static str) { it.seek(&key); assert!(it.error().is_none(), "seek"); - let item = Iterator::next(&mut it); + let item = iter::Iterator::next(&mut it); assert_eq!( expected_key, item.as_ref().map(|p| &p.0), @@ -1084,11 +1107,11 @@ fn test_special_case_from_json(fixture: &'static str) { ); } - let mut it = tree.iter(Context::background()); + let mut it = overlay.iter(Context::background()); it.seek(&key); assert!(it.error().is_none(), "seek"); - let item = Iterator::next(&mut it); + let item = iter::Iterator::next(&mut it); assert_eq!( expected_key, item.as_ref().map(|p| &p.0), diff --git a/runtime/src/transaction/tree.rs b/runtime/src/transaction/tree.rs index 0b0811d37dd..e316f961d90 100644 --- a/runtime/src/transaction/tree.rs +++ b/runtime/src/transaction/tree.rs @@ -149,7 +149,7 @@ impl serde::Serialize for OutputArtifacts { /// A Merkle tree containing transaction artifacts. pub struct Tree { io_root: Root, - tree: mkvs::Tree, + tree: mkvs::OverlayTree, } impl Tree { @@ -157,7 +157,7 @@ impl Tree { pub fn new(read_syncer: Box, io_root: Root) -> Self { Self { io_root, - tree: mkvs::Tree::make().with_root(io_root).new(read_syncer), + tree: mkvs::OverlayTree::new(mkvs::Tree::make().with_root(io_root).new(read_syncer)), } } @@ -222,7 +222,7 @@ impl Tree { /// log and root hash. pub fn commit(&mut self, ctx: Context) -> Result<(WriteLog, Hash)> { self.tree - .commit(ctx, self.io_root.namespace, self.io_root.version) + .commit_both(ctx, self.io_root.namespace, self.io_root.version) } }