Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Light friendly storage tracking: changes trie + extending over ranges (
Browse files Browse the repository at this point in the history
…#628)

* changes_trie

* changs_trie: continue

* changes_trie: adding tests

* fixed TODO

* removed obsolete ExtrinsicChanges

* encodable ChangesTrieConfiguration

* removed polkadot fle

* fixed grumbles

* ext_storage_changes_root returns u32

* moved changes trie root to digest

* removed commented code

* read storage values from native code

* fixed grumbles

* fixed grumbles

* missing comma
  • Loading branch information
svyatonik authored and gavofyork committed Sep 18, 2018
1 parent dd53fc7 commit 95e017d
Show file tree
Hide file tree
Showing 64 changed files with 3,126 additions and 784 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ substrate-telemetry = { path = "../telemetry" }
hashdb = "0.2.1"
patricia-trie = "0.2.1"
rlp = "0.2.4"
memorydb = "0.2.1"

[dev-dependencies]
substrate-test-client = { path = "../test-client" }
164 changes: 158 additions & 6 deletions core/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use parking_lot::RwLock;
use primitives::{H256, AuthorityId, Blake2Hasher, RlpCodec};
use runtime_primitives::generic::BlockId;
use runtime_primitives::bft::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hash, HashFor, NumberFor, Zero};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hash, HashFor,
NumberFor, Zero, Digest, DigestItem};
use runtime_primitives::BuildStorage;
use state_machine::backend::Backend as StateBackend;
use executor::RuntimeInfo;
Expand All @@ -70,7 +71,7 @@ pub use state_db::PruningMode;
const FINALIZATION_WINDOW: u64 = 32;

/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend<Blake2Hasher, RlpCodec>;
pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2Hasher>>, Blake2Hasher, RlpCodec>;

/// Database settings.
pub struct DatabaseSettings {
Expand Down Expand Up @@ -107,6 +108,7 @@ mod columns {
pub const HEADER: Option<u32> = Some(4);
pub const BODY: Option<u32> = Some(5);
pub const JUSTIFICATION: Option<u32> = Some(6);
pub const CHANGES_TRIE: Option<u32> = Some(7);
}

struct PendingBlock<Block: BlockT> {
Expand Down Expand Up @@ -230,6 +232,7 @@ impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> {
pub struct BlockImportOperation<Block: BlockT, H: Hasher> {
old_state: DbState,
updates: MemoryDB<H>,
changes_trie_updates: MemoryDB<H>,
pending_block: Option<PendingBlock<Block>>,
}

Expand Down Expand Up @@ -269,6 +272,11 @@ where Block: BlockT,
self.updates = update;
Ok(())
}

fn update_changes_trie(&mut self, update: MemoryDB<Blake2Hasher>) -> Result<(), client::error::Error> {
self.changes_trie_updates = update;
Ok(())
}
}

struct StorageDb<Block: BlockT> {
Expand All @@ -292,11 +300,55 @@ impl<Block: BlockT> state_db::HashDb for StorageDb<Block> {
}
}

struct DbGenesisStorage(pub H256);

impl DbGenesisStorage {
pub fn new() -> Self {
let mut root = H256::default();
let mut mdb = MemoryDB::<Blake2Hasher>::new();
state_machine::TrieDBMut::<Blake2Hasher, RlpCodec>::new(&mut mdb, &mut root);
DbGenesisStorage(root)
}
}

impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {
fn get(&self, _key: &H256) -> Result<Option<DBValue>, String> {
Ok(None)
}
}

pub struct DbChangesTrieStorage<Block: BlockT> {
db: Arc<KeyValueDB>,
_phantom: ::std::marker::PhantomData<Block>,
}

impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
fn root(&self, block: u64) -> Result<Option<H256>, String> {
Ok(read_db::<Block>(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(As::sa(block)))
.map_err(|err| format!("{}", err))
.and_then(|header| match header {
Some(header) => Block::Header::decode(&mut &header[..])
.ok_or_else(|| format!("Failed to parse header of block {}", block))
.map(Some),
None => Ok(None)
})?
.and_then(|header| header.digest().logs().iter()
.find(|log| log.as_changes_trie_root().is_some())
.and_then(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref()))))
}

fn get(&self, key: &H256) -> Result<Option<DBValue>, String> {
self.db.get(columns::CHANGES_TRIE, &key[..])
.map_err(|err| format!("{}", err))
}
}

/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from the most recent block.
pub struct Backend<Block: BlockT> {
storage: Arc<StorageDb<Block>>,
tries_change_storage: DbChangesTrieStorage<Block>,
blockchain: BlockchainDb<Block>,
finalization_window: u64,
}
Expand All @@ -323,12 +375,17 @@ impl<Block: BlockT> Backend<Block> {
let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
db,
db: db.clone(),
state_db,
};
let tries_change_storage = DbChangesTrieStorage {
db,
_phantom: Default::default(),
};

Ok(Backend {
storage: Arc::new(storage_db),
tries_change_storage: tries_change_storage,
blockchain,
finalization_window,
})
Expand All @@ -350,17 +407,25 @@ fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitS
}
}

fn apply_changes_trie_commit(transaction: &mut DBTransaction, mut commit: MemoryDB<Blake2Hasher>) {
for (key, (val, _)) in commit.drain() {
transaction.put(columns::CHANGES_TRIE, &key[..], &val);
}
}

impl<Block> client::backend::Backend<Block, Blake2Hasher, RlpCodec> for Backend<Block> where Block: BlockT {
type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>;
type Blockchain = BlockchainDb<Block>;
type State = DbState;
type ChangesTrieStorage = DbChangesTrieStorage<Block>;

fn begin_operation(&self, block: BlockId<Block>) -> Result<Self::BlockImportOperation, client::error::Error> {
let state = self.state_at(block)?;
Ok(BlockImportOperation {
pending_block: None,
old_state: state,
updates: MemoryDB::default(),
changes_trie_updates: MemoryDB::default(),
})
}

Expand Down Expand Up @@ -393,6 +458,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher, RlpCodec> for Backend<
let number_u64 = number.as_().into();
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset);
apply_state_commit(&mut transaction, commit);
apply_changes_trie_commit(&mut transaction, operation.changes_trie_updates);

//finalize an older block
if number_u64 > self.finalization_window {
Expand Down Expand Up @@ -420,6 +486,10 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher, RlpCodec> for Backend<
Ok(())
}

fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> {
Some(&self.tries_change_storage)
}

fn revert(&self, n: NumberFor<Block>) -> Result<NumberFor<Block>, client::error::Error> {
use client::blockchain::HeaderBackend;
let mut best = self.blockchain.info()?.best_number;
Expand Down Expand Up @@ -459,15 +529,18 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher, RlpCodec> for Backend<

// special case for genesis initialization
match block {
BlockId::Hash(h) if h == Default::default() =>
return Ok(DbState::with_storage_for_genesis(self.storage.clone())),
BlockId::Hash(h) if h == Default::default() => {
let genesis_storage = DbGenesisStorage::new();
let root = genesis_storage.0.clone();
return Ok(DbState::new(Arc::new(genesis_storage), root));
},
_ => {}
}

match self.blockchain.header(block) {
Ok(Some(ref hdr)) if !self.storage.state_db.is_pruned(hdr.number().as_()) => {
let root = H256::from_slice(hdr.state_root().as_ref());
Ok(DbState::with_storage(self.storage.clone(), root))
Ok(DbState::new(self.storage.clone(), root))
},
Err(e) => Err(e),
_ => Err(client::error::ErrorKind::UnknownBlock(format!("{:?}", block)).into()),
Expand All @@ -486,6 +559,7 @@ mod tests {
use client::backend::BlockImportOperation as Op;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use runtime_primitives::testing::{Header, Block as RawBlock};
use state_machine::{TrieMut, TrieDBMut, ChangesTrieStorage};

type Block = RawBlock<u64>;

Expand Down Expand Up @@ -711,4 +785,82 @@ mod tests {
assert!(backend.storage.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
}
}

#[test]
fn changes_trie_storage_works() {
let backend = Backend::<Block>::new_test(1000);

let prepare_changes = |changes: Vec<(Vec<u8>, Vec<u8>)>| {
let mut changes_root = H256::default();
let mut changes_trie_update = MemoryDB::<Blake2Hasher>::new();
{
let mut trie = TrieDBMut::<Blake2Hasher, RlpCodec>::new(
&mut changes_trie_update,
&mut changes_root
);
for (key, value) in changes {
trie.insert(&key, &value).unwrap();
}
}

(changes_root, changes_trie_update)
};

let insert_header = |number: u64, parent_hash: H256, changes: Vec<(Vec<u8>, Vec<u8>)>| {
use runtime_primitives::generic::DigestItem;
use runtime_primitives::testing::Digest;

let (changes_root, changes_trie_update) = prepare_changes(changes);
let digest = Digest {
logs: vec![
DigestItem::ChangesTrieRoot(changes_root),
],
};
let header = Header {
number,
parent_hash,
state_root: Default::default(),
digest,
extrinsics_root: Default::default(),
};
let header_hash = header.hash();

let block_id = if number == 0 {
BlockId::Hash(Default::default())
} else {
BlockId::Number(number - 1)
};
let mut op = backend.begin_operation(block_id).unwrap();
op.set_block_data(header, None, None, true).unwrap();
op.update_changes_trie(changes_trie_update).unwrap();
backend.commit_operation(op).unwrap();

header_hash
};

let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| {
let (changes_root, mut changes_trie_update) = prepare_changes(changes);
assert_eq!(backend.tries_change_storage.root(block), Ok(Some(changes_root)));

for (key, (val, _)) in changes_trie_update.drain() {
assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val)));
}
};

let changes0 = vec![(b"key_at_0".to_vec(), b"val_at_0".to_vec())];
let changes1 = vec![
(b"key_at_1".to_vec(), b"val_at_1".to_vec()),
(b"another_key_at_1".to_vec(), b"another_val_at_1".to_vec()),
];
let changes2 = vec![(b"key_at_2".to_vec(), b"val_at_2".to_vec())];

let block0 = insert_header(0, Default::default(), changes0.clone());
let block1 = insert_header(1, block0, changes1.clone());
let _ = insert_header(2, block1, changes2.clone());

// check that the storage contains tries for all blocks
check_changes(&backend, 0, changes0);
check_changes(&backend, 1, changes1);
check_changes(&backend, 2, changes2);
}
}
2 changes: 1 addition & 1 deletion core/client/db/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use DatabaseSettings;

/// Number of columns in the db. Must be the same for both full && light dbs.
/// Otherwise RocksDb will fail to open database && check its type.
pub const NUM_COLUMNS: u32 = 7;
pub const NUM_COLUMNS: u32 = 8;
/// Meta column. The set of keys in the column is shared by full && light storages.
pub const COLUMN_META: Option<u32> = Some(0);

Expand Down
8 changes: 8 additions & 0 deletions core/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use runtime_primitives::bft::Justification;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use state_machine::backend::Backend as StateBackend;
use state_machine::ChangesTrieStorage as StateChangesTrieStorage;
use patricia_trie::NodeCodec;
use hashdb::Hasher;
use memorydb::MemoryDB;

/// Block insertion operation. Keeps hold if the inserted block state and data.
pub trait BlockImportOperation<Block, H, C>
Expand Down Expand Up @@ -53,6 +55,8 @@ where
fn update_storage(&mut self, update: <Self::State as StateBackend<H, C>>::Transaction) -> error::Result<()>;
/// Inject storage data into the database replacing any existing data.
fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> error::Result<()>;
/// Inject changes trie data into the database.
fn update_changes_trie(&mut self, update: MemoryDB<H>) -> error::Result<()>;
}

/// Client backend. Manages the data layer.
Expand All @@ -75,6 +79,8 @@ where
type Blockchain: ::blockchain::Backend<Block>;
/// Associated state backend type.
type State: StateBackend<H, C>;
/// Changes trie storage.
type ChangesTrieStorage: StateChangesTrieStorage<H>;

/// Begin a new block insertion transaction with given parent block id.
/// When constructing the genesis, this is called with all-zero hash.
Expand All @@ -83,6 +89,8 @@ where
fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>;
/// Returns reference to blockchain backend.
fn blockchain(&self) -> &Self::Blockchain;
/// Returns reference to changes trie storage.
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage>;
/// Returns state backend with post-state of given block.
fn state_at(&self, block: BlockId<Block>) -> error::Result<Self::State>;
/// Attempts to revert the chain by `n` blocks. Returns the number of blocks that were
Expand Down
4 changes: 2 additions & 2 deletions core/client/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ where
/// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, xt: <Block as BlockT>::Extrinsic) -> error::Result<()> {
match self.executor.call_at_state(&self.state, &mut self.changes, "apply_extrinsic", &xt.encode(), native_when_possible()) {
Ok((result, _)) => {
Ok((result, _, _)) => {
match ApplyResult::decode(&mut result.as_slice()) {
Some(Ok(ApplyOutcome::Success)) | Some(Ok(ApplyOutcome::Fail)) => {
self.extrinsics.push(xt);
Expand All @@ -120,7 +120,7 @@ where

/// Consume the builder to return a valid `Block` containing all pushed extrinsics.
pub fn bake(mut self) -> error::Result<Block> {
let (output, _) = self.executor.call_at_state(
let (output, _, _) = self.executor.call_at_state(
&self.state,
&mut self.changes,
"finalise_block",
Expand Down
Loading

0 comments on commit 95e017d

Please sign in to comment.