From 28f2c391a888974b351ef8cd0f5805ab1fee07b2 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 25 Jun 2018 16:15:42 +0300 Subject: [PATCH 1/3] use db in light clients --- polkadot/service/src/components.rs | 14 +- polkadot/service/src/lib.rs | 2 +- substrate/client/db/src/lib.rs | 164 ++++--------- substrate/client/db/src/light.rs | 250 +++++++++++++++++++ substrate/client/db/src/utils.rs | 167 +++++++++++++ substrate/client/src/blockchain.rs | 16 +- substrate/client/src/call_executor.rs | 119 +-------- substrate/client/src/client.rs | 2 +- substrate/client/src/error.rs | 10 +- substrate/client/src/in_mem.rs | 71 +++--- substrate/client/src/lib.rs | 4 +- substrate/client/src/light.rs | 256 -------------------- substrate/client/src/light/backend.rs | 183 ++++++++++++++ substrate/client/src/light/blockchain.rs | 95 ++++++++ substrate/client/src/light/call_executor.rs | 143 +++++++++++ substrate/client/src/light/fetcher.rs | 90 +++++++ substrate/client/src/light/mod.rs | 75 ++++++ substrate/network/src/lib.rs | 2 +- substrate/network/src/on_demand.rs | 167 +++++++------ substrate/network/src/protocol.rs | 14 +- substrate/network/src/service.rs | 2 +- substrate/network/src/test/sync.rs | 2 +- substrate/state-machine/src/lib.rs | 10 +- 23 files changed, 1233 insertions(+), 625 deletions(-) create mode 100644 substrate/client/db/src/light.rs create mode 100644 substrate/client/db/src/utils.rs delete mode 100644 substrate/client/src/light.rs create mode 100644 substrate/client/src/light/backend.rs create mode 100644 substrate/client/src/light/blockchain.rs create mode 100644 substrate/client/src/light/call_executor.rs create mode 100644 substrate/client/src/light/fetcher.rs create mode 100644 substrate/client/src/light/mod.rs diff --git a/polkadot/service/src/components.rs b/polkadot/service/src/components.rs index e8274f4dfccc9..7977291416195 100644 --- a/polkadot/service/src/components.rs +++ b/polkadot/service/src/components.rs @@ -117,15 +117,19 @@ impl Components for FullComponents { pub struct LightComponents; impl Components for LightComponents { - type Backend = client::light::Backend; + type Backend = client::light::backend::Backend, network::OnDemand>>; type Api = polkadot_api::light::RemotePolkadotApiWrapper; - type Executor = client::RemoteCallExecutor, network::OnDemand>>; + type Executor = client::light::call_executor::RemoteCallExecutor< + client::light::blockchain::Blockchain, network::OnDemand>>, + network::OnDemand>>; - fn build_client(&self, _settings: client_db::DatabaseSettings, executor: CodeExecutor, genesis_storage: MakeStorage) + fn build_client(&self, db_settings: client_db::DatabaseSettings, executor: CodeExecutor, genesis_storage: MakeStorage) -> Result<(Arc>, Option>>>), error::Error> { - let client_backend = client::light::new_light_backend(); - let fetch_checker = Arc::new(client::light::new_fetch_checker(client_backend.clone(), executor)); + let db_storage = client_db::light::LightStorage::new(db_settings)?; + let light_blockchain = client::light::new_light_blockchain(db_storage); + let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor)); let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); + let client_backend = client::light::new_light_backend(light_blockchain, fetcher.clone()); let client = client::light::new_light(client_backend, fetcher.clone(), genesis_storage)?; Ok((Arc::new(client), Some(fetcher))) } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 75ddf1ffbee7d..c3a7e79e028ec 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -135,7 +135,7 @@ impl Service }, network_config: config.network, chain: client.clone(), - on_demand: on_demand.clone().map(|d| d as Arc), + on_demand: on_demand.clone().map(|d| d as Arc>), transaction_pool: transaction_pool_adapter, }; let network = network::Service::new(network_params)?; diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index f222df681fc03..5f0136879bbfe 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -34,12 +34,14 @@ extern crate log; #[cfg(test)] extern crate kvdb_memorydb; +pub mod light; + +mod utils; + use std::sync::Arc; use std::path::PathBuf; use codec::Slicable; -use hashdb::DBValue; -use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; use memorydb::MemoryDB; use parking_lot::RwLock; @@ -49,6 +51,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hashing use runtime_primitives::BuildStorage; use state_machine::backend::Backend as StateBackend; use state_machine::CodeExecutor; +use utils::{Meta, db_err, meta_keys, number_to_db_key, open_database, read_db, read_id, read_meta}; /// DB-backed patricia trie state, transaction type is an overlay of changes to commit. pub type DbState = state_machine::TrieBackend; @@ -74,7 +77,7 @@ pub fn new_client( E: CodeExecutor, S: BuildStorage, { - let backend = Arc::new(Backend::new(&settings)?); + let backend = Arc::new(Backend::new(settings)?); let executor = client::LocalCallExecutor::new(backend.clone(), executor); Ok(client::Client::new(backend, executor, genesis_storage)?) } @@ -86,11 +89,6 @@ mod columns { pub const HEADER: Option = Some(3); pub const BODY: Option = Some(4); pub const JUSTIFICATION: Option = Some(5); - pub const NUM_COLUMNS: u32 = 6; -} - -mod meta { - pub const BEST_BLOCK: &[u8; 4] = b"best"; } struct PendingBlock { @@ -100,37 +98,6 @@ struct PendingBlock { is_best: bool, } -#[derive(Clone)] -struct Meta { - best_hash: H, - best_number: N, - genesis_hash: H, -} - -type BlockKey = [u8; 4]; - -// Little endian -fn number_to_db_key(n: N) -> BlockKey where N: As { - let n: u32 = n.as_(); - - [ - (n >> 24) as u8, - ((n >> 16) & 0xff) as u8, - ((n >> 8) & 0xff) as u8, - (n & 0xff) as u8 - ] -} - -// Maps database error to client error -fn db_err(err: kvdb::Error) -> client::error::Error { - use std::error::Error; - match err.kind() { - &kvdb::ErrorKind::Io(ref err) => client::error::ErrorKind::Backend(err.description().into()).into(), - &kvdb::ErrorKind::Msg(ref m) => client::error::ErrorKind::Backend(m.clone()).into(), - _ => client::error::ErrorKind::Backend("Unknown backend error".into()).into(), - } -} - /// Block database pub struct BlockchainDb { db: Arc, @@ -138,59 +105,14 @@ pub struct BlockchainDb { } impl BlockchainDb where ::Number: As { - fn id(&self, id: BlockId) -> Result, client::error::Error> { - match id { - BlockId::Hash(h) => { - { - let meta = self.meta.read(); - if meta.best_hash == h { - return Ok(Some(number_to_db_key(meta.best_number))); - } - } - self.db.get(columns::BLOCK_INDEX, h.as_ref()).map(|v| v.map(|v| { - let mut key: [u8; 4] = [0; 4]; - key.copy_from_slice(&v); - key - })).map_err(db_err) - }, - BlockId::Number(n) => Ok(Some(number_to_db_key(n))), - } - } - fn new(db: Arc) -> Result { - let (best_hash, best_number) = if let Some(Some(header)) = db.get(columns::META, meta::BEST_BLOCK).and_then(|id| - match id { - Some(id) => db.get(columns::HEADER, &id).map(|h| h.map(|b| Block::Header::decode(&mut &b[..]))), - None => Ok(None), - }).map_err(db_err)? - { - let hash = header.hash(); - debug!("DB Opened blockchain db, best {:?} ({})", hash, header.number()); - (hash, header.number().clone()) - } else { - (Default::default(), Zero::zero()) - }; - let genesis_hash = db.get(columns::HEADER, &number_to_db_key(::Number::zero())).map_err(db_err)? - .map(|b| HashingFor::::hash(&b)).unwrap_or_default().into(); - + let meta = read_meta::(&*db, columns::HEADER)?; Ok(BlockchainDb { db, - meta: RwLock::new(Meta { - best_hash, - best_number, - genesis_hash, - }) + meta: RwLock::new(meta) }) } - fn read_db(&self, id: BlockId, column: Option) -> Result, client::error::Error> { - self.id(id).and_then(|key| - match key { - Some(key) => self.db.get(column, &key).map_err(db_err), - None => Ok(None), - }) - } - fn update_meta(&self, hash: Block::Hash, number: ::Number, is_best: bool) { if is_best { let mut meta = self.meta.write(); @@ -203,9 +125,9 @@ impl BlockchainDb where ::Number } } -impl client::blockchain::Backend for BlockchainDb where ::Number: As { +impl client::blockchain::HeaderBackend for BlockchainDb where ::Number: As { fn header(&self, id: BlockId) -> Result, client::error::Error> { - match self.read_db(id, columns::HEADER)? { + match read_db(&*self.db, columns::BLOCK_INDEX, columns::HEADER, id)? { Some(header) => match Block::Header::decode(&mut &header[..]) { Some(header) => Ok(Some(header)), None => return Err(client::error::ErrorKind::Backend("Error decoding header".into()).into()), @@ -214,26 +136,6 @@ impl client::blockchain::Backend for BlockchainDb w } } - fn body(&self, id: BlockId) -> Result>, client::error::Error> { - match self.read_db(id, columns::BODY)? { - Some(body) => match Slicable::decode(&mut &body[..]) { - Some(body) => Ok(Some(body)), - None => return Err(client::error::ErrorKind::Backend("Error decoding body".into()).into()), - } - None => Ok(None), - } - } - - fn justification(&self, id: BlockId) -> Result>, client::error::Error> { - match self.read_db(id, columns::JUSTIFICATION)? { - Some(justification) => match Slicable::decode(&mut &justification[..]) { - Some(justification) => Ok(Some(justification)), - None => return Err(client::error::ErrorKind::Backend("Error decoding justification".into()).into()), - } - None => Ok(None), - } - } - fn info(&self) -> Result, client::error::Error> { let meta = self.meta.read(); Ok(client::blockchain::Info { @@ -245,7 +147,7 @@ impl client::blockchain::Backend for BlockchainDb w fn status(&self, id: BlockId) -> Result { let exists = match id { - BlockId::Hash(_) => self.id(id)?.is_some(), + BlockId::Hash(_) => read_id(&*self.db, columns::BLOCK_INDEX, id)?.is_some(), BlockId::Number(n) => n <= self.meta.read().best_number, }; match exists { @@ -255,12 +157,34 @@ impl client::blockchain::Backend for BlockchainDb w } fn hash(&self, number: ::Number) -> Result, client::error::Error> { - self.read_db(BlockId::Number(number), columns::HEADER).map(|x| + read_db::(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x| x.map(|raw| HashingFor::::hash(&raw[..])).map(Into::into) ) } } +impl client::blockchain::Backend for BlockchainDb where ::Number: As { + fn body(&self, id: BlockId) -> Result>, client::error::Error> { + match read_db(&*self.db, columns::BLOCK_INDEX, columns::BODY, id)? { + Some(body) => match Slicable::decode(&mut &body[..]) { + Some(body) => Ok(Some(body)), + None => return Err(client::error::ErrorKind::Backend("Error decoding body".into()).into()), + } + None => Ok(None), + } + } + + fn justification(&self, id: BlockId) -> Result>, client::error::Error> { + match read_db(&*self.db, columns::BLOCK_INDEX, columns::JUSTIFICATION, id)? { + Some(justification) => match Slicable::decode(&mut &justification[..]) { + Some(justification) => Ok(Some(justification)), + None => return Err(client::error::ErrorKind::Backend("Error decoding justification".into()).into()), + } + None => Ok(None), + } + } +} + /// Database transaction pub struct BlockImportOperation { old_state: DbState, @@ -309,19 +233,17 @@ pub struct Backend { impl Backend where ::Number: As { /// Create a new instance of database backend. - pub fn new(config: &DatabaseSettings) -> Result { - let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS)); - db_config.memory_budget = config.cache_size; - db_config.wal = true; - let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?; - let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?); + pub fn new(config: DatabaseSettings) -> Result { + let db = open_database(config, "full")?; Backend::from_kvdb(db as Arc<_>, true) } #[cfg(test)] fn new_test() -> Self { - let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS)); + use utils::NUM_COLUMNS; + + let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS)); Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db") } @@ -359,7 +281,7 @@ impl client::backend::Backend for Backend where if let Some(pending_block) = operation.pending_block { let hash = pending_block.header.hash(); let number = pending_block.header.number().clone(); - let key = number_to_db_key(pending_block.header.number().clone()); + let key = number_to_db_key(number.clone()); transaction.put(columns::HEADER, &key, &pending_block.header.encode()); if let Some(body) = pending_block.body { transaction.put(columns::BODY, &key, &body.encode()); @@ -369,7 +291,7 @@ impl client::backend::Backend for Backend where } transaction.put(columns::BLOCK_INDEX, hash.as_ref(), &key); if pending_block.is_best { - transaction.put(columns::META, meta::BEST_BLOCK, &key); + transaction.put(columns::META, meta_keys::BEST_BLOCK, &key); } for (key, (val, rc)) in operation.updates.drain() { if rc > 0 { @@ -390,7 +312,7 @@ impl client::backend::Backend for Backend where } fn state_at(&self, block: BlockId) -> Result { - use client::blockchain::Backend as BcBackend; + use client::blockchain::HeaderBackend as BcHeaderBackend; // special case for genesis initialization match block { @@ -417,7 +339,7 @@ mod tests { use super::*; use client::backend::Backend as BTrait; use client::backend::BlockImportOperation as Op; - use client::blockchain::Backend as BCTrait; + use client::blockchain::HeaderBackend as BlockchainHeaderBackend; use runtime_primitives::testing::{Header, Block as RawBlock}; type Block = RawBlock; diff --git a/substrate/client/db/src/light.rs b/substrate/client/db/src/light.rs new file mode 100644 index 0000000000000..905d962a07815 --- /dev/null +++ b/substrate/client/db/src/light.rs @@ -0,0 +1,250 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! RocksDB-based light client blockchain storage. + +use std::sync::Arc; +use parking_lot::RwLock; + +use kvdb::{KeyValueDB, DBTransaction}; + +use client::blockchain::{BlockStatus, HeaderBackend as BlockchainHeaderBackend, + Info as BlockchainInfo}; +use client::error::{ErrorKind as ClientErrorKind, Result as ClientResult}; +use client::light::blockchain::Storage as LightBlockchainStorage; +use codec::Slicable; +use primitives::AuthorityId; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hashing, HashingFor, Zero}; +use utils::{meta_keys, Meta, db_err, number_to_db_key, open_database, read_db, read_id, read_meta}; +use DatabaseSettings; + +pub(crate) mod columns { + pub const META: Option = ::utils::COLUMN_META; + pub const BLOCK_INDEX: Option = Some(1); + pub const HEADER: Option = Some(2); +} + +/// Light blockchain storage. Stores most recent headers + CHTs for older headers. +pub struct LightStorage { + db: Arc, + meta: RwLock::Header as HeaderT>::Number, Block::Hash>>, +} + +#[derive(Clone, PartialEq, Debug)] +struct BestAuthorities { + /// first block, when this set became actual + valid_from: N, + /// None means that we do not know the set starting from `valid_from` block + authorities: Option>, +} + +impl LightStorage + where + Block: BlockT, + <::Header as HeaderT>::Number: As, +{ + /// Create new storage with given settings. + pub fn new(config: DatabaseSettings) -> ClientResult { + let db = open_database(config, "light")?; + + Self::from_kvdb(db as Arc<_>) + } + + #[cfg(test)] + pub(crate) fn new_test() -> Self { + use utils::NUM_COLUMNS; + + let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS)); + + Self::from_kvdb(db as Arc<_>).expect("failed to create test-db") + } + + fn from_kvdb(db: Arc) -> ClientResult { + let meta = RwLock::new(read_meta::(&*db, columns::HEADER)?); + + Ok(LightStorage { + db, + meta, + }) + } + + fn update_meta(&self, hash: Block::Hash, number: <::Header as HeaderT>::Number, is_best: bool) { + if is_best { + let mut meta = self.meta.write(); + if number == <::Header as HeaderT>::Number::zero() { + meta.genesis_hash = hash; + } + + meta.best_number = number; + meta.best_hash = hash; + } + } +} + +impl BlockchainHeaderBackend for LightStorage + where + Block: BlockT, + <::Header as HeaderT>::Number: As, +{ + fn header(&self, id: BlockId) -> ClientResult> { + match read_db(&*self.db, columns::BLOCK_INDEX, columns::HEADER, id)? { + Some(header) => match Block::Header::decode(&mut &header[..]) { + Some(header) => Ok(Some(header)), + None => return Err(ClientErrorKind::Backend("Error decoding header".into()).into()), + } + None => Ok(None), + } + } + + fn info(&self) -> ClientResult> { + let meta = self.meta.read(); + Ok(BlockchainInfo { + best_hash: meta.best_hash, + best_number: meta.best_number, + genesis_hash: meta.genesis_hash, + }) + } + + fn status(&self, id: BlockId) -> ClientResult { + let exists = match id { + BlockId::Hash(_) => read_id(&*self.db, columns::BLOCK_INDEX, id)?.is_some(), + BlockId::Number(n) => n <= self.meta.read().best_number, + }; + match exists { + true => Ok(BlockStatus::InChain), + false => Ok(BlockStatus::Unknown), + } + } + + fn hash(&self, number: <::Header as HeaderT>::Number) -> ClientResult> { + read_db::(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x| + x.map(|raw| HashingFor::::hash(&raw[..])).map(Into::into) + ) + } +} + +impl LightBlockchainStorage for LightStorage + where + Block: BlockT, + <::Header as HeaderT>::Number: As, + ::Hash: From<[u8; 32]> + Into<[u8; 32]>, +{ + fn import_header(&self, is_new_best: bool, header: Block::Header) -> ClientResult<()> { + let mut transaction = DBTransaction::new(); + + let hash = header.hash(); + let number = *header.number(); + let key = number_to_db_key(number); + + transaction.put(columns::HEADER, &key, &header.encode()); + transaction.put(columns::BLOCK_INDEX, hash.as_ref(), &key); + + if is_new_best { + transaction.put(columns::META, meta_keys::BEST_BLOCK, &key); + } + + debug!("Light DB Commit {:?} ({})", hash, number); + self.db.write(transaction).map_err(db_err)?; + self.update_meta(hash, number, is_new_best); + + Ok(()) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use runtime_primitives::testing::{H256 as Hash, Header, Block as RawBlock}; + use super::*; + + type Block = RawBlock; + + pub fn insert_block(db: &LightStorage, parent: &Hash, number: u32) -> Hash { + let header = Header { + number: number.into(), + parent_hash: *parent, + state_root: Default::default(), + digest: Default::default(), + extrinsics_root: Default::default(), + }; + + let hash = header.hash(); + db.import_header(true, header).unwrap(); + hash + } + + #[test] + fn returns_known_header() { + let db = LightStorage::new_test(); + let known_hash = insert_block(&db, &Default::default(), 0); + let header_by_hash = db.header(BlockId::Hash(known_hash)).unwrap().unwrap(); + let header_by_number = db.header(BlockId::Number(0)).unwrap().unwrap(); + assert_eq!(header_by_hash, header_by_number); + } + + #[test] + fn does_not_return_unknown_header() { + let db = LightStorage::::new_test(); + assert!(db.header(BlockId::Hash(1.into())).unwrap().is_none()); + assert!(db.header(BlockId::Number(0)).unwrap().is_none()); + } + + #[test] + fn returns_info() { + let db = LightStorage::new_test(); + let genesis_hash = insert_block(&db, &Default::default(), 0); + let info = db.info().unwrap(); + assert_eq!(info.best_hash, genesis_hash); + assert_eq!(info.best_number, 0); + assert_eq!(info.genesis_hash, genesis_hash); + let best_hash = insert_block(&db, &genesis_hash, 1); + let info = db.info().unwrap(); + assert_eq!(info.best_hash, best_hash); + assert_eq!(info.best_number, 1); + assert_eq!(info.genesis_hash, genesis_hash); + } + + #[test] + fn returns_block_status() { + let db = LightStorage::new_test(); + let genesis_hash = insert_block(&db, &Default::default(), 0); + assert_eq!(db.status(BlockId::Hash(genesis_hash)).unwrap(), BlockStatus::InChain); + assert_eq!(db.status(BlockId::Number(0)).unwrap(), BlockStatus::InChain); + assert_eq!(db.status(BlockId::Hash(1.into())).unwrap(), BlockStatus::Unknown); + assert_eq!(db.status(BlockId::Number(1)).unwrap(), BlockStatus::Unknown); + } + + #[test] + fn returns_block_hash() { + let db = LightStorage::new_test(); + let genesis_hash = insert_block(&db, &Default::default(), 0); + assert_eq!(db.hash(0).unwrap(), Some(genesis_hash)); + assert_eq!(db.hash(1).unwrap(), None); + } + + #[test] + fn import_header_works() { + let db = LightStorage::new_test(); + + let genesis_hash = insert_block(&db, &Default::default(), 0); + assert_eq!(db.db.iter(columns::HEADER).count(), 1); + assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 1); + + let _ = insert_block(&db, &genesis_hash, 1); + assert_eq!(db.db.iter(columns::HEADER).count(), 2); + assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 2); + } +} diff --git a/substrate/client/db/src/utils.rs b/substrate/client/db/src/utils.rs new file mode 100644 index 0000000000000..c6b9bd121c4a5 --- /dev/null +++ b/substrate/client/db/src/utils.rs @@ -0,0 +1,167 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Db-based backend utility structures and functions, used by both +//! full and light storages. + +use std::sync::Arc; + +use kvdb::{self, KeyValueDB, DBTransaction}; +use kvdb_rocksdb::{Database, DatabaseConfig}; + +use client; +use codec::Slicable; +use hashdb::DBValue; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, Hashing, HashingFor, Zero}; +use DatabaseSettings; + +/// Number of columns in the db. Must be the same for both full && light dbs. +/// Otherwise RocksDb will fail to open database && check its type. +pub const NUM_COLUMNS: u32 = 6; +/// Meta column. Thes set of keys in the column is shared by full && light storages. +pub const COLUMN_META: Option = Some(0); + +/// Keys of entries in COLUMN_META. +pub mod meta_keys { + /// Type of storage (full or light). + pub const TYPE: &[u8; 4] = b"type"; + /// Best block key. + pub const BEST_BLOCK: &[u8; 4] = b"best"; +} + +/// Database metadata. +pub struct Meta { + /// Hash of the best known block. + pub best_hash: H, + /// Number of the best known block. + pub best_number: N, + /// Hash of the genesis block. + pub genesis_hash: H, +} + +/// Type of block key in the database (LE block number). +pub type BlockKey = [u8; 4]; + +/// Convert block number into key (LE representation). +pub fn number_to_db_key(n: N) -> BlockKey where N: As { + let n: u32 = n.as_(); + + [ + (n >> 24) as u8, + ((n >> 16) & 0xff) as u8, + ((n >> 8) & 0xff) as u8, + (n & 0xff) as u8 + ] +} + +/// Maps database error to client error +pub fn db_err(err: kvdb::Error) -> client::error::Error { + use std::error::Error; + match err.kind() { + &kvdb::ErrorKind::Io(ref err) => client::error::ErrorKind::Backend(err.description().into()).into(), + &kvdb::ErrorKind::Msg(ref m) => client::error::ErrorKind::Backend(m.clone()).into(), + _ => client::error::ErrorKind::Backend("Unknown backend error".into()).into(), + } +} + +/// Open RocksDB database. +pub fn open_database(config: DatabaseSettings, db_type: &str) -> client::error::Result> { + let mut db_config = DatabaseConfig::with_columns(Some(NUM_COLUMNS)); + db_config.memory_budget = config.cache_size; + db_config.wal = true; + let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?; + let db = Database::open(&db_config, &path).map_err(db_err)?; + + // check database type + match db.get(COLUMN_META, meta_keys::TYPE).map_err(db_err)? { + Some(stored_type) => { + if db_type.as_bytes() != &*stored_type { + return Err(client::error::ErrorKind::Backend( + format!("Unexpected database type. Expected: {}", db_type)).into()); + } + }, + None => { + let mut transaction = DBTransaction::new(); + transaction.put(COLUMN_META, meta_keys::TYPE, db_type.as_bytes()); + db.write(transaction).map_err(db_err)?; + }, + } + + Ok(Arc::new(db)) +} + +/// Convert block id to block key, reading number from db if required. +pub fn read_id(db: &KeyValueDB, col_index: Option, id: BlockId) -> Result, client::error::Error> + where + Block: BlockT, + <::Header as HeaderT>::Number: As, +{ + match id { + BlockId::Hash(h) => db.get(col_index, h.as_ref()) + .map(|v| v.map(|v| { + let mut key: [u8; 4] = [0; 4]; + key.copy_from_slice(&v); + key + })).map_err(db_err), + BlockId::Number(n) => Ok(Some(number_to_db_key(n))), + } +} + +/// Read database column entry for the given block. +pub fn read_db(db: &KeyValueDB, col_index: Option, col: Option, id: BlockId) -> client::error::Result> + where + Block: BlockT, + <::Header as HeaderT>::Number: As, +{ + read_id(db, col_index, id).and_then(|key| match key { + Some(key) => db.get(col, &key).map_err(db_err), + None => Ok(None), + }) +} + +/// Read meta from the database. +pub fn read_meta(db: &KeyValueDB, col_header: Option) -> Result::Header as HeaderT>::Number, Block::Hash>, client::error::Error> + where + Block: BlockT, + <::Header as HeaderT>::Number: As, +{ + let genesis_number = <::Header as HeaderT>::Number::zero(); + let (best_hash, best_number) = if let Some(Some(header)) = db.get(COLUMN_META, meta_keys::BEST_BLOCK).and_then(|id| + match id { + Some(id) => db.get(col_header, &id).map(|h| h.map(|b| Block::Header::decode(&mut &b[..]))), + None => Ok(None), + }).map_err(db_err)? + { + let hash = header.hash(); + debug!("DB Opened blockchain db, best {:?} ({})", hash, header.number()); + (hash, *header.number()) + } else { + (Default::default(), genesis_number) + }; + + let genesis_hash = db.get(col_header, &number_to_db_key(genesis_number)) + .map_err(db_err)? + .map(|raw| HashingFor::::hash(&raw[..])) + .unwrap_or_default() + .into(); + + Ok(Meta { + best_hash, + best_number, + genesis_hash, + }) +} diff --git a/substrate/client/src/blockchain.rs b/substrate/client/src/blockchain.rs index 73c7107b30ee2..18aed482b8c4d 100644 --- a/substrate/client/src/blockchain.rs +++ b/substrate/client/src/blockchain.rs @@ -22,14 +22,10 @@ use runtime_primitives::bft::Justification; use error::Result; -/// Blockchain database backend. Does not perform any validation. -pub trait Backend: Send + Sync { +/// Blockchain database header backend. Does not perform any validation. +pub trait HeaderBackend: Send + Sync { /// Get block header. Returns `None` if block is not found. fn header(&self, id: BlockId) -> Result::Header>>; - /// Get block body. Returns `None` if block is not found. - fn body(&self, id: BlockId) -> Result::Extrinsic>>>; - /// Get block justification. Returns `None` if justification does not exist. - fn justification(&self, id: BlockId) -> Result>>; /// Get blockchain info. fn info(&self) -> Result>; /// Get block status. @@ -38,6 +34,14 @@ pub trait Backend: Send + Sync { fn hash(&self, number: <::Header as HeaderT>::Number) -> Result::Header as HeaderT>::Hash>>; } +/// Blockchain database backend. Does not perform any validation. +pub trait Backend: HeaderBackend { + /// Get block body. Returns `None` if block is not found. + fn body(&self, id: BlockId) -> Result::Extrinsic>>>; + /// Get block justification. Returns `None` if justification does not exist. + fn justification(&self, id: BlockId) -> Result>>; +} + /// Block import outcome pub enum ImportResult { /// Imported successfully. diff --git a/substrate/client/src/call_executor.rs b/substrate/client/src/call_executor.rs index cd67ec4d97722..b0d2c2707cc2e 100644 --- a/substrate/client/src/call_executor.rs +++ b/substrate/client/src/call_executor.rs @@ -15,18 +15,15 @@ // along with Polkadot. If not, see . use std::sync::Arc; -use futures::{IntoFuture, Future}; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use runtime_primitives::traits::Block as BlockT; use state_machine::{self, OverlayedChanges, Backend as StateBackend, CodeExecutor}; use backend; -use blockchain::Backend as ChainBackend; use error; -use light::{Fetcher, RemoteCallRequest}; /// Information regarding the result of a call. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CallResult { /// The data that was returned from the call. pub return_data: Vec, @@ -62,13 +59,6 @@ pub struct LocalCallExecutor { executor: E, } -/// Call executor that executes methods on remote node, querying execution proof -/// and checking proof by re-executing locally. -pub struct RemoteCallExecutor { - backend: Arc, - fetcher: Arc, -} - impl LocalCallExecutor { /// Creates new instance of local call executor. pub fn new(backend: Arc, executor: E) -> Self { @@ -111,7 +101,7 @@ impl CallExecutor for LocalCallExecutor } fn prove_at_state(&self, state: S, changes: &mut OverlayedChanges, method: &str, call_data: &[u8]) -> Result<(Vec, Vec>), error::Error> { - state_machine::prove( + state_machine::prove_execution( state, changes, &self.executor, @@ -121,105 +111,4 @@ impl CallExecutor for LocalCallExecutor .map(|(result, proof, _)| (result, proof)) .map_err(Into::into) } -} - -impl RemoteCallExecutor { - /// Creates new instance of remote call executor. - pub fn new(backend: Arc, fetcher: Arc) -> Self { - RemoteCallExecutor { backend, fetcher } - } -} - -impl CallExecutor for RemoteCallExecutor - where - B: backend::RemoteBackend, - F: Fetcher, - Block: BlockT, - error::Error: From<<>::State as StateBackend>::Error>, -{ - type Error = error::Error; - - fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result { - let block_hash = match *id { - BlockId::Hash(hash) => hash, - BlockId::Number(number) => self.backend.blockchain().hash(number)? - .ok_or_else(|| error::ErrorKind::UnknownBlock(format!("{}", number)))?, - }; - - self.fetcher.remote_call(RemoteCallRequest { - block: block_hash, - method: method.into(), - call_data: call_data.to_vec(), - }).into_future().wait() - } - - fn call_at_state(&self, _state: &S, _changes: &mut OverlayedChanges, _method: &str, _call_data: &[u8]) -> error::Result<(Vec, S::Transaction)> { - Err(error::ErrorKind::NotAvailableOnLightClient.into()) - } - - fn prove_at_state(&self, _state: S, _changes: &mut OverlayedChanges, _method: &str, _call_data: &[u8]) -> Result<(Vec, Vec>), error::Error> { - Err(error::ErrorKind::NotAvailableOnLightClient.into()) - } -} - -/// Check remote execution proof using given backend. -pub fn check_execution_proof(backend: &B, executor: &E, request: &RemoteCallRequest, remote_proof: Vec>) -> Result - where - B: backend::RemoteBackend, - E: CodeExecutor, - Block: BlockT, - <::Header as HeaderT>::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic. - error::Error: From<<>::State as StateBackend>::Error>, -{ - let local_header = backend.blockchain().header(BlockId::Hash(request.block))?; - let local_header = local_header.ok_or_else(|| error::ErrorKind::UnknownBlock(format!("{}", request.block)))?; - let local_state_root = local_header.state_root().clone(); - do_check_execution_proof(local_state_root, executor, request, remote_proof) -} - -/// Check remote execution proof using given state root. -fn do_check_execution_proof(local_state_root: H, executor: &E, request: &RemoteCallRequest, remote_proof: Vec>) -> Result - where - E: CodeExecutor, - H: Into<[u8; 32]>, // TODO: remove when patricia_trie generic. -{ - let mut changes = OverlayedChanges::default(); - let (local_result, _) = state_machine::proof_check( - local_state_root.into(), - remote_proof, - &mut changes, - executor, - &request.method, - &request.call_data)?; - - Ok(CallResult { return_data: local_result, changes }) -} - -#[cfg(test)] -mod tests { - use runtime_primitives::generic::BlockId; - use state_machine::Backend; - use test_client; - use light::RemoteCallRequest; - use super::do_check_execution_proof; - - #[test] - fn execution_proof_is_generated_and_checked() { - // prepare remote client - let remote_client = test_client::new(); - let remote_block_id = BlockId::Number(0); - let remote_block_storage_root = remote_client.state_at(&remote_block_id) - .unwrap().storage_root(::std::iter::empty()).0; - - // 'fetch' execution proof from remote node - let remote_execution_proof = remote_client.execution_proof(&remote_block_id, "authorities", &[]).unwrap().1; - - // check remote execution proof locally - let local_executor = test_client::NativeExecutor::new(); - do_check_execution_proof(remote_block_storage_root, &local_executor, &RemoteCallRequest { - block: Default::default(), - method: "authorities".into(), - call_data: vec![], - }, remote_execution_proof).unwrap(); - } -} +} \ No newline at end of file diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 6604b3225dbee..04c247b434d6e 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -28,7 +28,7 @@ use codec::Slicable; use state_machine::{self, Ext, OverlayedChanges, Backend as StateBackend, CodeExecutor}; use backend::{self, BlockImportOperation}; -use blockchain::{self, Info as ChainInfo, Backend as ChainBackend}; +use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend}; use call_executor::{CallExecutor, LocalCallExecutor}; use {error, in_mem, block_builder, runtime_io, bft, genesis}; diff --git a/substrate/client/src/error.rs b/substrate/client/src/error.rs index 7db181deae670..9ee6e7639fea9 100644 --- a/substrate/client/src/error.rs +++ b/substrate/client/src/error.rs @@ -88,17 +88,23 @@ error_chain! { display("This method is not currently available when running in light client mode"), } - /// Invalid remote proof. + /// Invalid remote execution proof. InvalidExecutionProof { description("invalid execution proof"), display("Remote node has responded with invalid execution proof"), } - /// Invalid remote proof. + /// Remote fetch has been cancelled. RemoteFetchCancelled { description("remote fetch cancelled"), display("Remote data fetch has been cancelled"), } + + /// Remote fetch has been failed. + RemoteFetchFailed { + description("remote fetch failed"), + display("Remote data fetch has been failed"), + } } } diff --git a/substrate/client/src/in_mem.rs b/substrate/client/src/in_mem.rs index 9bbcf01b6e905..29d74ef466c30 100644 --- a/substrate/client/src/in_mem.rs +++ b/substrate/client/src/in_mem.rs @@ -17,9 +17,11 @@ //! In memory client backend use std::collections::HashMap; +use std::sync::Arc; use parking_lot::RwLock; use error; use backend; +use light; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero}; use runtime_primitives::bft::Justification; @@ -86,16 +88,9 @@ struct BlockchainStorage { } /// In-memory blockchain. Supports concurrent reads. +#[derive(Clone)] pub struct Blockchain { - storage: RwLock>, -} - -impl Clone for Blockchain { - fn clone(&self) -> Self { - Blockchain { - storage: RwLock::new(self.storage.read().clone()), - } - } + storage: Arc>>, } impl Blockchain { @@ -108,16 +103,17 @@ impl Blockchain { } /// Create new in-memory blockchain storage. - pub fn new() -> Self { + pub fn new() -> Blockchain { + let storage = Arc::new(RwLock::new( + BlockchainStorage { + blocks: HashMap::new(), + hashes: HashMap::new(), + best_hash: Default::default(), + best_number: Zero::zero(), + genesis_hash: Default::default(), + })); Blockchain { - storage: RwLock::new( - BlockchainStorage { - blocks: HashMap::new(), - hashes: HashMap::new(), - best_hash: Default::default(), - best_number: Zero::zero(), - genesis_hash: Default::default(), - }) + storage: storage.clone(), } } @@ -159,26 +155,13 @@ impl Blockchain { } } -impl blockchain::Backend for Blockchain { +impl blockchain::HeaderBackend for Blockchain { fn header(&self, id: BlockId) -> error::Result::Header>> { Ok(self.id(id).and_then(|hash| { self.storage.read().blocks.get(&hash).map(|b| b.header().clone()) })) } - fn body(&self, id: BlockId) -> error::Result::Extrinsic>>> { - Ok(self.id(id).and_then(|hash| { - self.storage.read().blocks.get(&hash) - .and_then(|b| b.extrinsics().map(|x| x.to_vec())) - })) - } - - fn justification(&self, id: BlockId) -> error::Result>> { - Ok(self.id(id).and_then(|hash| self.storage.read().blocks.get(&hash).and_then(|b| - b.justification().map(|x| x.clone())) - )) - } - fn info(&self) -> error::Result> { let storage = self.storage.read(); Ok(blockchain::Info { @@ -200,6 +183,30 @@ impl blockchain::Backend for Blockchain { } } + +impl blockchain::Backend for Blockchain { + fn body(&self, id: BlockId) -> error::Result::Extrinsic>>> { + Ok(self.id(id).and_then(|hash| { + self.storage.read().blocks.get(&hash) + .and_then(|b| b.extrinsics().map(|x| x.to_vec())) + })) + } + + fn justification(&self, id: BlockId) -> error::Result>> { + Ok(self.id(id).and_then(|hash| self.storage.read().blocks.get(&hash).and_then(|b| + b.justification().map(|x| x.clone())) + )) + } +} + +impl light::blockchain::Storage for Blockchain { + fn import_header(&self, is_new_best: bool, header: Block::Header) -> error::Result<()> { + let hash = header.hash(); + self.insert(hash, header, None, None, is_new_best); + Ok(()) + } +} + /// In-memory operation. pub struct BlockImportOperation { pending_block: Option>, diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 1e69b2faa869b..7c84e9f3614d8 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -56,6 +56,4 @@ pub use client::{ ImportResult, }; pub use blockchain::Info as ChainInfo; -pub use call_executor::{ - CallResult, CallExecutor, LocalCallExecutor, RemoteCallExecutor, -}; +pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor}; diff --git a/substrate/client/src/light.rs b/substrate/client/src/light.rs deleted file mode 100644 index 3055d7c163606..0000000000000 --- a/substrate/client/src/light.rs +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Light client backend. Only stores headers and justifications of blocks. -//! Everything else is requested from full nodes on demand. - -use std::sync::Arc; -use futures::future::IntoFuture; -use state_machine::{CodeExecutor, TryIntoTrieBackend as TryIntoStateTrieBackend, - TrieBackend as StateTrieBackend}; -use state_machine::backend::Backend as StateBackend; -use runtime_primitives::generic::BlockId; -use runtime_primitives::bft::Justification; -use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; -use runtime_primitives::BuildStorage; -use blockchain::{self, BlockStatus}; -use backend; -use call_executor::{CallResult, RemoteCallExecutor, check_execution_proof}; -use client::Client; -use error; -use in_mem::Blockchain as InMemBlockchain; - -/// Remote call request. -pub struct RemoteCallRequest { - /// Call at state of block referenced by given header hash. - pub block: H, - /// Method to call. - pub method: String, - /// Call data. - pub call_data: Vec, -} - -/// Light client data fetcher. Implementations of this trait must check if remote data -/// is correct (see FetchedDataChecker) and return already checked data. -pub trait Fetcher: Send + Sync { - /// Remote call result future. - type RemoteCallResult: IntoFuture; - - /// Fetch remote call result. - fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult; -} - -/// Light client remote data checker. -pub trait FetchChecker: Send + Sync { - /// Check remote method execution proof. - fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: Vec>) -> error::Result; -} - -/// Light client backend. -pub struct Backend { - blockchain: Blockchain, -} - -/// Light client blockchain. -pub struct Blockchain { - storage: InMemBlockchain, -} - -/// Block (header and justification) import operation. -pub struct BlockImportOperation { - pending_block: Option>, -} - -/// On-demand state. -#[derive(Clone)] -pub struct OnDemandState { - /// Hash of the block, state is valid for. - _block: H, -} - -/// Remote data checker. -pub struct LightDataChecker { - /// Backend reference. - backend: Arc>, - /// Executor. - executor: E, -} - -struct PendingBlock { - header: B::Header, - justification: Option>, - is_best: bool, -} - -impl backend::Backend for Backend { - type BlockImportOperation = BlockImportOperation; - type Blockchain = Blockchain; - type State = OnDemandState; - - fn begin_operation(&self, _block: BlockId) -> error::Result { - Ok(BlockImportOperation { - pending_block: None, - }) - } - - fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { - if let Some(pending_block) = operation.pending_block { - let hash = pending_block.header.hash(); - self.blockchain.storage.insert(hash, pending_block.header, pending_block.justification, None, pending_block.is_best); - } - Ok(()) - } - - fn blockchain(&self) -> &Blockchain { - &self.blockchain - } - - fn state_at(&self, block: BlockId) -> error::Result { - Ok(OnDemandState { - _block: self.blockchain.storage.id(block).ok_or(error::ErrorKind::UnknownBlock(format!("{:?}", block)))?, - }) - } -} - -impl backend::RemoteBackend for Backend {} - -impl backend::BlockImportOperation for BlockImportOperation { - type State = OnDemandState; - - fn state(&self) -> error::Result> { - // None means 'locally-stateless' backend - Ok(None) - } - - fn set_block_data(&mut self, header: B::Header, _body: Option>, justification: Option>, is_new_best: bool) -> error::Result<()> { - assert!(self.pending_block.is_none(), "Only one block per operation is allowed"); - self.pending_block = Some(PendingBlock { - header, - justification, - is_best: is_new_best, - }); - Ok(()) - } - - fn update_storage(&mut self, _update: ::Transaction) -> error::Result<()> { - // we're not storing anything locally => ignore changes - Ok(()) - } - - fn reset_storage, Vec)>>(&mut self, _iter: I) -> error::Result<()> { - // we're not storing anything locally => ignore changes - Ok(()) - } -} - -impl blockchain::Backend for Blockchain { - fn header(&self, id: BlockId) -> error::Result> { - self.storage.header(id) - } - - fn body(&self, _id: BlockId) -> error::Result>> { - // TODO [light]: fetch from remote node - Ok(None) - } - - fn justification(&self, id: BlockId) -> error::Result>> { - self.storage.justification(id) - } - - fn info(&self) -> error::Result> { - self.storage.info() - } - - fn status(&self, id: BlockId) -> error::Result { - self.storage.status(id) - } - - fn hash(&self, number: ::Number) -> error::Result> { - self.storage.hash(number) - } -} - -impl StateBackend for OnDemandState { - type Error = error::Error; - type Transaction = (); - - fn storage(&self, _key: &[u8]) -> Result>, Self::Error> { - // TODO [light]: fetch from remote node - Err(error::ErrorKind::NotAvailableOnLightClient.into()) - } - - fn storage_root(&self, _delta: I) -> ([u8; 32], Self::Transaction) - where I: IntoIterator, Option>)> - { - ([0; 32], ()) - } - - fn pairs(&self) -> Vec<(Vec, Vec)> { - // whole state is not available on light node - Vec::new() - } -} - -impl TryIntoStateTrieBackend for OnDemandState { - fn try_into_trie_backend(self) -> Option { - None - } -} - -impl FetchChecker for LightDataChecker - where - E: CodeExecutor, - B: BlockT, - <::Header as HeaderT>::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic. -{ - fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: Vec>) -> error::Result { - check_execution_proof(&*self.backend, &self.executor, request, remote_proof) - } -} - -/// Create an instance of light client backend. -pub fn new_light_backend() -> Arc> { - let storage = InMemBlockchain::new(); - let blockchain = Blockchain { storage }; - Arc::new(Backend { blockchain }) -} - -/// Create an instance of light client. -pub fn new_light( - backend: Arc>, - fetcher: Arc, - genesis_storage: S, -) -> error::Result, RemoteCallExecutor, F>, Block>> - where - F: Fetcher, - S: BuildStorage, - Block: BlockT, -{ - let executor = RemoteCallExecutor::new(backend.clone(), fetcher); - Client::new(backend, executor, genesis_storage) -} - -/// Create an instance of fetch data checker. -pub fn new_fetch_checker( - backend: Arc>, - executor: E, -) -> LightDataChecker - where - E: CodeExecutor, - Block: BlockT, -{ - LightDataChecker { backend, executor } -} diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs new file mode 100644 index 0000000000000..172b811473107 --- /dev/null +++ b/substrate/client/src/light/backend.rs @@ -0,0 +1,183 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Light client backend. Only stores headers and justifications of blocks. +//! Everything else is requested from full nodes on demand. + +use std::sync::{Arc, Weak}; + +use runtime_primitives::{bft::Justification, generic::BlockId}; +use runtime_primitives::traits::Block as BlockT; +use state_machine::{Backend as StateBackend, TrieBackend as StateTrieBackend, + TryIntoTrieBackend as TryIntoStateTrieBackend}; + +use backend::{Backend as ClientBackend, BlockImportOperation, RemoteBackend}; +use blockchain::HeaderBackend as BlockchainHeaderBackend; +use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; +use light::blockchain::{Blockchain, Storage as BlockchainStorage}; +use light::fetcher::Fetcher; + +/// Light client backend. +pub struct Backend { + blockchain: Arc>, +} + +/// Ligh block (header and justification) import operation. +pub struct ImportOperation { + is_new_best: bool, + header: Option, + _phantom: ::std::marker::PhantomData, +} + +/// On-demand state. +pub struct OnDemandState { + fetcher: Weak, + block: Block::Hash, +} + +impl Backend { + /// Create new light backend. + pub fn new(blockchain: Arc>) -> Self { + Self { blockchain } + } + + /// Get shared blockchain reference. + pub fn blockchain(&self) -> &Arc> { + &self.blockchain + } +} + +impl ClientBackend for Backend where Block: BlockT, S: BlockchainStorage, F: Fetcher { + type BlockImportOperation = ImportOperation; + type Blockchain = Blockchain; + type State = OnDemandState; + + fn begin_operation(&self, _block: BlockId) -> ClientResult { + Ok(ImportOperation { + is_new_best: false, + header: None, + _phantom: Default::default(), + }) + } + + fn commit_operation(&self, operation: Self::BlockImportOperation) -> ClientResult<()> { + let header = operation.header.expect("commit is called after set_block_data; set_block_data sets header; qed"); + self.blockchain.storage().import_header(operation.is_new_best, header) + } + + fn blockchain(&self) -> &Blockchain { + &self.blockchain + } + + fn state_at(&self, block: BlockId) -> ClientResult { + let block_hash = match block { + BlockId::Hash(h) => Some(h), + BlockId::Number(n) => self.blockchain.hash(n).unwrap_or_default(), + }; + + Ok(OnDemandState { + block: block_hash.ok_or_else(|| ClientErrorKind::UnknownBlock(format!("{}", block)))?, + fetcher: self.blockchain.fetcher(), + }) + } +} + +impl RemoteBackend for Backend where Block: BlockT, S: BlockchainStorage, F: Fetcher {} + +impl BlockImportOperation for ImportOperation where Block: BlockT, F: Fetcher { + type State = OnDemandState; + + fn state(&self) -> ClientResult> { + // None means 'locally-stateless' backend + Ok(None) + } + + fn set_block_data( + &mut self, + header: Block::Header, + _body: Option>, + _justification: Option>, + is_new_best: bool + ) -> ClientResult<()> { + self.is_new_best = is_new_best; + self.header = Some(header); + Ok(()) + } + + fn update_storage(&mut self, _update: ::Transaction) -> ClientResult<()> { + // we're not storing anything locally => ignore changes + Ok(()) + } + + fn reset_storage, Vec)>>(&mut self, _iter: I) -> ClientResult<()> { + // we're not storing anything locally => ignore changes + Ok(()) + } +} + +impl Clone for OnDemandState { + fn clone(&self) -> Self { + OnDemandState { + fetcher: self.fetcher.clone(), + block: self.block, + } + } +} + +impl StateBackend for OnDemandState where Block: BlockT, F: Fetcher { + type Error = ClientError; + type Transaction = (); + + fn storage(&self, _key: &[u8]) -> ClientResult>> { + Err(ClientErrorKind::NotAvailableOnLightClient.into()) // TODO: fetch from remote node + } + + fn storage_root(&self, _delta: I) -> ([u8; 32], Self::Transaction) + where I: IntoIterator, Option>)> { + ([0; 32], ()) + } + + fn pairs(&self) -> Vec<(Vec, Vec)> { + // whole state is not available on light node + Vec::new() + } +} + +impl TryIntoStateTrieBackend for OnDemandState where Block: BlockT, F: Fetcher { + fn try_into_trie_backend(self) -> Option { + None + } +} + +#[cfg(test)] +pub mod tests { + use futures::future::{ok, FutureResult}; + use parking_lot::Mutex; + use call_executor::CallResult; + use error::Error as ClientError; + use test_client::runtime::{Hash, Block}; + use light::fetcher::{Fetcher, RemoteCallRequest}; + + pub type OkCallFetcher = Mutex; + + impl Fetcher for OkCallFetcher { + type RemoteCallResult = FutureResult; + + fn remote_call(&self, _request: RemoteCallRequest) -> Self::RemoteCallResult { + ok((*self.lock()).clone()) + } + } +} diff --git a/substrate/client/src/light/blockchain.rs b/substrate/client/src/light/blockchain.rs new file mode 100644 index 0000000000000..9655f91baa616 --- /dev/null +++ b/substrate/client/src/light/blockchain.rs @@ -0,0 +1,95 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Light client blockchin backend. Only stores headers and justifications of recent +//! blocks. CHT roots are stored for headers of ancient blocks. + +use std::sync::Weak; +use parking_lot::Mutex; + +use runtime_primitives::{bft::Justification, generic::BlockId}; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; + +use blockchain::{Backend as BlockchainBackend, BlockStatus, + HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo}; +use error::Result as ClientResult; +use light::fetcher::Fetcher; + +/// Light client blockchain storage. +pub trait Storage: BlockchainHeaderBackend { + /// Store new header. + fn import_header(&self, is_new_best: bool, header: Block::Header) -> ClientResult<()>; +} + +/// Light client blockchain. +pub struct Blockchain { + fetcher: Mutex>, + storage: S, +} + +impl Blockchain { + /// Create new light blockchain backed with given storage. + pub fn new(storage: S) -> Self { + Self { + fetcher: Mutex::new(Default::default()), + storage, + } + } + + /// Sets fetcher reference. + pub fn set_fetcher(&self, fetcher: Weak) { + *self.fetcher.lock() = fetcher; + } + + /// Get fetcher weak reference. + pub fn fetcher(&self) -> Weak { + self.fetcher.lock().clone() + } + + /// Get storage reference. + pub fn storage(&self) -> &S { + &self.storage + } +} + +impl BlockchainHeaderBackend for Blockchain where Block: BlockT, S: Storage, F: Fetcher { + fn header(&self, id: BlockId) -> ClientResult> { + self.storage.header(id) + } + + fn info(&self) -> ClientResult> { + self.storage.info() + } + + fn status(&self, id: BlockId) -> ClientResult { + self.storage.status(id) + } + + fn hash(&self, number: <::Header as HeaderT>::Number) -> ClientResult> { + self.storage.hash(number) + } +} + +impl BlockchainBackend for Blockchain where Block: BlockT, S: Storage, F: Fetcher { + fn body(&self, _id: BlockId) -> ClientResult>> { + // TODO [light]: fetch from remote node + Ok(None) + } + + fn justification(&self, _id: BlockId) -> ClientResult>> { + Ok(None) + } +} diff --git a/substrate/client/src/light/call_executor.rs b/substrate/client/src/light/call_executor.rs new file mode 100644 index 0000000000000..330781667b1fe --- /dev/null +++ b/substrate/client/src/light/call_executor.rs @@ -0,0 +1,143 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Light client call exector. Executes methods on remote full nodes, fetching +//! execution proof and checking it locally. + +use std::sync::Arc; +use futures::{IntoFuture, Future}; + +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; +use state_machine::{Backend as StateBackend, CodeExecutor, OverlayedChanges, execution_proof_check}; + +use blockchain::Backend as ChainBackend; +use call_executor::{CallExecutor, CallResult}; +use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult}; +use light::fetcher::{Fetcher, RemoteCallRequest}; + +/// Call executor that executes methods on remote node, querying execution proof +/// and checking proof by re-executing locally. +pub struct RemoteCallExecutor { + blockchain: Arc, + fetcher: Arc, +} + +impl RemoteCallExecutor { + /// Creates new instance of remote call executor. + pub fn new(blockchain: Arc, fetcher: Arc) -> Self { + RemoteCallExecutor { blockchain, fetcher } + } +} + +impl CallExecutor for RemoteCallExecutor + where + Block: BlockT, + B: ChainBackend, + F: Fetcher, +{ + type Error = ClientError; + + fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> ClientResult { + let block_hash = match *id { + BlockId::Hash(hash) => hash, + BlockId::Number(number) => self.blockchain.hash(number)? + .ok_or_else(|| ClientErrorKind::UnknownBlock(format!("{}", number)))?, + }; + + self.fetcher.remote_call(RemoteCallRequest { + block: block_hash.clone(), + method: method.into(), + call_data: call_data.to_vec(), + }).into_future().wait() + } + + fn call_at_state(&self, _state: &S, _changes: &mut OverlayedChanges, _method: &str, _call_data: &[u8]) -> ClientResult<(Vec, S::Transaction)> { + Err(ClientErrorKind::NotAvailableOnLightClient.into()) + } + + fn prove_at_state(&self, _state: S, _changes: &mut OverlayedChanges, _method: &str, _call_data: &[u8]) -> ClientResult<(Vec, Vec>)> { + Err(ClientErrorKind::NotAvailableOnLightClient.into()) + } +} + +/// Check remote execution proof using given backend. +pub fn check_execution_proof( + blockchain: &B, + executor: &E, + request: &RemoteCallRequest, + remote_proof: Vec> +) -> ClientResult + where + Block: BlockT, + ::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic. + B: ChainBackend, + E: CodeExecutor, +{ + let local_header = blockchain.header(BlockId::Hash(request.block))?; + let local_header = local_header.ok_or_else(|| ClientErrorKind::UnknownBlock(format!("{}", request.block)))?; + let local_state_root = *local_header.state_root(); + do_check_execution_proof(local_state_root.into(), executor, request, remote_proof) +} + +/// Check remote execution proof using given state root. +fn do_check_execution_proof( + local_state_root: [u8; 32], + executor: &E, + request: &RemoteCallRequest, + remote_proof: Vec>, +) -> ClientResult + where + Hash: ::std::fmt::Display, + E: CodeExecutor, +{ + let mut changes = OverlayedChanges::default(); + let (local_result, _) = execution_proof_check( + local_state_root, + remote_proof, + &mut changes, + executor, + &request.method, + &request.call_data)?; + + Ok(CallResult { return_data: local_result, changes }) +} + +#[cfg(test)] +mod tests { + use test_client; + use super::*; + + #[test] + fn execution_proof_is_generated_and_checked() { + // prepare remote client + let remote_client = test_client::new(); + let remote_block_id = BlockId::Number(0); + let remote_block_storage_root = remote_client.state_at(&remote_block_id) + .unwrap().storage_root(::std::iter::empty()).0; + + // 'fetch' execution proof from remote node + let remote_execution_proof = remote_client.execution_proof(&remote_block_id, "authorities", &[]).unwrap().1; + + // check remote execution proof locally + let local_executor = test_client::NativeExecutor::new(); + do_check_execution_proof(remote_block_storage_root.into(), &local_executor, &RemoteCallRequest { + block: test_client::runtime::Hash::default(), + method: "authorities".into(), + call_data: vec![], + }, remote_execution_proof).unwrap(); + } +} diff --git a/substrate/client/src/light/fetcher.rs b/substrate/client/src/light/fetcher.rs new file mode 100644 index 0000000000000..3527203b3fdd8 --- /dev/null +++ b/substrate/client/src/light/fetcher.rs @@ -0,0 +1,90 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Light client data fetcher. Fetches requested data from remote full nodes. + +use std::sync::Arc; +use futures::IntoFuture; + +use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT}; +use state_machine::CodeExecutor; + +use call_executor::CallResult; +use error::{Error as ClientError, Result as ClientResult}; +use light::blockchain::{Blockchain, Storage as BlockchainStorage}; +use light::call_executor::check_execution_proof; + +/// Remote call request. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RemoteCallRequest { + /// Call at state of given block. + pub block: Hash, + /// Method to call. + pub method: String, + /// Call data. + pub call_data: Vec, +} + +/// Light client data fetcher. Implementations of this trait must check if remote data +/// is correct (see FetchedDataChecker) and return already checked data. +pub trait Fetcher: Send + Sync { + /// Remote call result future. + type RemoteCallResult: IntoFuture; + + /// Fetch remote call result. + fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult; +} + +/// Light client remote data checker. +pub trait FetchChecker: Send + Sync { + /// Check remote method execution proof. + fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: Vec>) -> ClientResult; +} + +/// Remote data checker. +pub struct LightDataChecker { + blockchain: Arc>, + executor: E, +} + +impl LightDataChecker { + /// Create new light data checker. + pub fn new(blockchain: Arc>, executor: E) -> Self { + Self { + blockchain, + executor, + } + } + + /// Get blockchain reference. + pub fn blockchain(&self) -> &Arc> { + &self.blockchain + } +} + +impl FetchChecker for LightDataChecker + where + Block: BlockT, + ::Hash: From<[u8; 32]> + Into<[u8; 32]>, // TODO: remove when patricia_trie generic. + <::Header as HeaderT>::Number: As, + S: BlockchainStorage, + E: CodeExecutor, + F: Fetcher, +{ + fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: Vec>) -> ClientResult { + check_execution_proof(&*self.blockchain, &self.executor, request, remote_proof) + } +} diff --git a/substrate/client/src/light/mod.rs b/substrate/client/src/light/mod.rs new file mode 100644 index 0000000000000..d55ab5f3ac8de --- /dev/null +++ b/substrate/client/src/light/mod.rs @@ -0,0 +1,75 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Light client components. + +pub mod backend; +pub mod blockchain; +pub mod call_executor; +pub mod fetcher; + +use std::sync::Arc; + +use runtime_primitives::BuildStorage; +use runtime_primitives::traits::Block as BlockT; +use state_machine::CodeExecutor; + +use client::Client; +use error::Result as ClientResult; +use light::backend::Backend; +use light::blockchain::{Blockchain, Storage as BlockchainStorage}; +use light::call_executor::RemoteCallExecutor; +use light::fetcher::{Fetcher, LightDataChecker}; + +/// Create an instance of light client blockchain backend. +pub fn new_light_blockchain, F>(storage: S) -> Arc> { + Arc::new(Blockchain::new(storage)) +} + +/// Create an instance of light client backend. +pub fn new_light_backend, F: Fetcher>(blockchain: Arc>, fetcher: Arc) -> Arc> { + blockchain.set_fetcher(Arc::downgrade(&fetcher)); + Arc::new(Backend::new(blockchain)) +} + +/// Create an instance of light client. +pub fn new_light( + backend: Arc>, + fetcher: Arc, + genesis_storage: GS, +) -> ClientResult, RemoteCallExecutor, F>, B>> + where + B: BlockT, + S: BlockchainStorage, + F: Fetcher, + GS: BuildStorage, +{ + let executor = RemoteCallExecutor::new(backend.blockchain().clone(), fetcher); + Client::new(backend, executor, genesis_storage) +} + +/// Create an instance of fetch data checker. +pub fn new_fetch_checker( + blockchain: Arc>, + executor: E, +) -> LightDataChecker + where + B: BlockT, + S: BlockchainStorage, + E: CodeExecutor, +{ + LightDataChecker::new(blockchain, executor) +} diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 15779ea6522cd..d15cc4855d295 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -68,4 +68,4 @@ pub use network::{NonReservedPeerMode, NetworkConfiguration, ConnectionFilter, C pub use message::{generic as generic_message, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal}; pub use error::Error; pub use config::{Role, ProtocolConfig}; -pub use on_demand::{OnDemand, OnDemandService, Response as OnDemandResponse}; +pub use on_demand::{OnDemand, OnDemandService, RemoteCallResponse}; diff --git a/substrate/network/src/on_demand.rs b/substrate/network/src/on_demand.rs index 67c5adf923da0..a6b225d917e30 100644 --- a/substrate/network/src/on_demand.rs +++ b/substrate/network/src/on_demand.rs @@ -19,13 +19,13 @@ use std::collections::VecDeque; use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; -use futures::{Future, Poll}; +use futures::{Async, Future, Poll}; use futures::sync::oneshot::{channel, Receiver, Sender}; use linked_hash_map::LinkedHashMap; use linked_hash_map::Entry; use parking_lot::Mutex; use client; -use client::light::{Fetcher, FetchChecker, RemoteCallRequest}; +use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; use io::SyncIo; use message; use network::PeerId; @@ -36,7 +36,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); /// On-demand service API. -pub trait OnDemandService: Send + Sync { +pub trait OnDemandService: Send + Sync { /// When new node is connected. fn on_connect(&self, peer: PeerId, role: service::Role); @@ -46,8 +46,8 @@ pub trait OnDemandService: Send + Sync { /// Maintain peers requests. fn maintain_peers(&self, io: &mut SyncIo); - /// When response is received from remote node. - fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse); + /// When call response is received from remote node. + fn on_remote_call_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse); } /// On-demand requests service. Dispatches requests to appropriate peers. @@ -56,34 +56,47 @@ pub struct OnDemand> { checker: Arc>, } -/// On-demand response. -pub struct Response { - receiver: Receiver, +/// On-demand remote call response. +pub struct RemoteCallResponse { + receiver: Receiver>, } #[derive(Default)] struct OnDemandCore> { service: Weak, next_request_id: u64, - pending_requests: VecDeque>, - active_peers: LinkedHashMap>, + pending_requests: VecDeque>, + active_peers: LinkedHashMap>, idle_peers: VecDeque, } -struct Request { +struct Request { id: u64, timestamp: Instant, - sender: Sender, - request: RemoteCallRequest, + data: RequestData, } -impl Future for Response { +enum RequestData { + RemoteCall(RemoteCallRequest, Sender>), +} + +enum Accept { + Ok, + CheckFailed(client::error::Error, RequestData), +} + +impl Future for RemoteCallResponse { type Item = client::CallResult; type Error = client::error::Error; fn poll(&mut self) -> Poll { self.receiver.poll() .map_err(|_| client::error::ErrorKind::RemoteFetchCancelled.into()) + .and_then(|r| match r { + Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)), + Async::Ready(Err(error)) => Err(error), + Async::NotReady => Ok(Async::NotReady), + }) } } @@ -110,24 +123,47 @@ impl OnDemand where self.core.lock().service = service; } - /// Execute method call on remote node, returning execution result and proof. - pub fn remote_call(&self, request: RemoteCallRequest) -> Response { - let (sender, receiver) = channel(); - let result = Response { - receiver: receiver, + /// Schedule && dispatch all scheduled requests. + fn schedule_request(&self, data: RequestData, result: R) -> R { + let mut core = self.core.lock(); + core.insert(data); + core.dispatch(); + result + } + + /// Try to accept response from given peer. + fn accept_response) -> Accept>(&self, rtype: &str, io: &mut SyncIo, peer: PeerId, request_id: u64, try_accept: F) { + let mut core = self.core.lock(); + let request = match core.remove(peer, request_id) { + Some(request) => request, + None => { + trace!(target: "sync", "Invalid remote {} response from peer {}", rtype, peer); + io.disconnect_peer(peer); + core.remove_peer(peer); + return; + }, + }; + + let retry_request_data = match try_accept(request) { + Accept::Ok => None, + Accept::CheckFailed(error, retry_request_data) => { + trace!(target: "sync", "Failed to check remote {} response from peer {}: {}", rtype, peer, error); + + io.disconnect_peer(peer); + core.remove_peer(peer); + Some(retry_request_data) + }, }; - { - let mut core = self.core.lock(); - core.insert(sender, request); - core.dispatch(); + if let Some(request_data) = retry_request_data { + core.insert(request_data); } - result + core.dispatch(); } } -impl OnDemandService for OnDemand where +impl OnDemandService for OnDemand where B: BlockT, E: service::ExecuteInContext, B::Header: HeaderT, @@ -157,29 +193,17 @@ impl OnDemandService for OnDemand where core.dispatch(); } - fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse) { - let mut core = self.core.lock(); - match core.remove(peer, response.id) { - Some(request) => match self.checker.check_execution_proof(&request.request, response.proof) { + fn on_remote_call_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse) { + self.accept_response("call", io, peer, response.id, |request| match request.data { + RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) { Ok(response) => { // we do not bother if receiver has been dropped already - let _ = request.sender.send(response); - }, - Err(error) => { - trace!(target: "sync", "Failed to check remote response from peer {}: {}", peer, error); - io.disconnect_peer(peer); - core.remove_peer(peer); - core.insert(request.sender, request.request); + let _ = sender.send(Ok(response)); + Accept::Ok }, + Err(error) => Accept::CheckFailed(error, RequestData::RemoteCall(request, sender)), }, - None => { - trace!(target: "sync", "Invalid remote response from peer {}", peer); - io.disconnect_peer(peer); - core.remove_peer(peer); - }, - } - - core.dispatch(); + }) } } @@ -188,10 +212,12 @@ impl Fetcher for OnDemand where E: service::ExecuteInContext, B::Header: HeaderT, { - type RemoteCallResult = Response; + type RemoteCallResult = RemoteCallResponse; fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { - OnDemand::remote_call(self, request) + let (sender, receiver) = channel(); + self.schedule_request(RequestData::RemoteCall(request, sender), + RemoteCallResponse { receiver }) } } @@ -230,19 +256,18 @@ impl OnDemandCore where } } - pub fn insert(&mut self, sender: Sender, request: RemoteCallRequest) { + pub fn insert(&mut self, data: RequestData) { let request_id = self.next_request_id; self.next_request_id += 1; self.pending_requests.push_back(Request { id: request_id, timestamp: Instant::now(), - sender, - request, + data, }); } - pub fn remove(&mut self, peer: PeerId, id: u64) -> Option> { + pub fn remove(&mut self, peer: PeerId, id: u64) -> Option> { match self.active_peers.entry(peer) { Entry::Occupied(entry) => match entry.get().id == id { true => { @@ -272,20 +297,26 @@ impl OnDemandCore where trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); service.execute_in_context(|ctx, protocol| { - let message = message::RemoteCallRequest { - id: request.id, - block: request.request.block, - method: request.request.method.clone(), - data: request.request.call_data.clone(), - }; - - protocol.send_message(ctx, peer, message::generic::Message::RemoteCallRequest(message)) + protocol.send_message(ctx, peer, request.message()) }); self.active_peers.insert(peer, request); } } } +impl Request { + pub fn message(&self) -> message::Message { + match self.data { + RequestData::RemoteCall(ref data, _) => message::generic::Message::RemoteCallRequest(message::RemoteCallRequest { + id: self.id, + block: data.block, + method: data.method.clone(), + data: data.call_data.clone(), + }), + } + } +} + #[cfg(test)] mod tests { use std::collections::VecDeque; @@ -294,7 +325,7 @@ mod tests { use futures::Future; use parking_lot::RwLock; use client; - use client::light::{FetchChecker, RemoteCallRequest}; + use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; use io::NetSyncIo; use message; use network::PeerId; @@ -335,8 +366,8 @@ mod tests { core.idle_peers.len() + core.active_peers.len() } - fn receive_response(on_demand: &OnDemand, network: &mut TestIo, peer: PeerId, id: message::RequestId) { - on_demand.on_remote_response(network, peer, message::RemoteCallResponse { + fn receive_call_response(on_demand: &OnDemand, network: &mut TestIo, peer: PeerId, id: message::RequestId) { + on_demand.on_remote_call_response(network, peer, message::RemoteCallResponse { id: id, proof: vec![vec![2]], }); @@ -391,7 +422,7 @@ mod tests { on_demand.on_connect(0, Role::FULL); on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); - receive_response(&*on_demand, &mut network, 0, 1); + receive_call_response(&*on_demand, &mut network, 0, 1); assert!(network.to_disconnect.contains(&0)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -401,10 +432,10 @@ mod tests { let (_x, on_demand) = dummy(false); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Role::FULL); - on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] }); - receive_response(&*on_demand, &mut network, 0, 0); + + on_demand.on_connect(0, Role::FULL); + receive_call_response(&*on_demand, &mut network, 0, 0); assert!(network.to_disconnect.contains(&0)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -416,7 +447,7 @@ mod tests { let mut network = TestIo::new(&queue, None); on_demand.on_connect(0, Role::FULL); - receive_response(&*on_demand, &mut network, 0, 0); + receive_call_response(&*on_demand, &mut network, 0, 0); assert!(network.to_disconnect.contains(&0)); } @@ -433,7 +464,7 @@ mod tests { assert_eq!(result.return_data, vec![42]); }); - receive_response(&*on_demand, &mut network, 0, 0); + receive_call_response(&*on_demand, &mut network, 0, 0); thread.join().unwrap(); } -} +} \ No newline at end of file diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 6113ab735730e..6d23f1c7bd864 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -45,7 +45,7 @@ const MAX_BLOCK_DATA_RESPONSE: u32 = 128; pub struct Protocol { config: ProtocolConfig, chain: Arc>, - on_demand: Option>, + on_demand: Option>>, genesis_hash: B::Hash, sync: RwLock>, consensus: Mutex>, @@ -108,7 +108,7 @@ impl Protocol where pub fn new( config: ProtocolConfig, chain: Arc>, - on_demand: Option>, + on_demand: Option>>, transaction_pool: Arc> ) -> error::Result { let info = chain.info()?; @@ -182,7 +182,7 @@ impl Protocol where GenericMessage::BftMessage(m) => self.on_bft_message(io, peer_id, m, HashingFor::::hash(data)), GenericMessage::Transactions(m) => self.on_transactions(io, peer_id, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, peer_id, request), - GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response) + GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response), } } @@ -512,11 +512,11 @@ impl Protocol where } fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) { - trace!(target: "sync", "Remote request {} from {} ({} at {})", request.id, peer_id, request.method, request.block); + trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, peer_id, request.method, request.block); let proof = match self.chain.execution_proof(&request.block, &request.method, &request.data) { Ok((_, proof)) => proof, Err(error) => { - trace!(target: "sync", "Remote request {} from {} ({} at {}) failed with: {}", + trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}", request.id, peer_id, request.method, request.block, error); Default::default() }, @@ -528,8 +528,8 @@ impl Protocol where } fn on_remote_call_response(&self, io: &mut SyncIo, peer_id: PeerId, response: message::RemoteCallResponse) { - trace!(target: "sync", "Remote response {} from {}", response.id, peer_id); - self.on_demand.as_ref().map(|s| s.on_remote_response(io, peer_id, response)); + trace!(target: "sync", "Remote call response {} from {}", response.id, peer_id); + self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, peer_id, response)); } pub fn chain(&self) -> &Client { diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 06af3c52cc7ad..276307427731a 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -133,7 +133,7 @@ pub struct Params { /// Polkadot relay chain access point. pub chain: Arc>, /// On-demand service reference. - pub on_demand: Option>, + pub on_demand: Option>>, /// Transaction pool. pub transaction_pool: Arc>, } diff --git a/substrate/network/src/test/sync.rs b/substrate/network/src/test/sync.rs index dfab41a5e1030..d67d530cce935 100644 --- a/substrate/network/src/test/sync.rs +++ b/substrate/network/src/test/sync.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use client::backend::Backend; -use client::blockchain::Backend as BlockchainBackend; +use client::blockchain::HeaderBackend as BlockchainHeaderBackend; use sync::SyncState; use {Role}; use super::*; diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index af643d78971cc..9cc231f762b3b 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -210,7 +210,7 @@ pub fn execute( /// /// Note: changes to code will be in place if this call is made again. For running partial /// blocks (e.g. a transaction at a time), ensure a different method is used. -pub fn prove( +pub fn prove_execution( backend: B, overlay: &mut OverlayedChanges, exec: &Exec, @@ -227,7 +227,7 @@ pub fn prove( } /// Check execution proof, generated by `prove` call. -pub fn proof_check( +pub fn execution_proof_check( root: [u8; 32], proof: Vec>, overlay: &mut OverlayedChanges, @@ -328,15 +328,15 @@ mod tests { } #[test] - fn prove_and_proof_check_works() { + fn prove_execution_and_proof_check_works() { // fetch execution proof from 'remote' full node let remote_backend = trie_backend::tests::test_trie(); let remote_root = remote_backend.storage_root(::std::iter::empty()).0; - let (remote_result, remote_proof, _) = prove(remote_backend, + let (remote_result, remote_proof, _) = prove_execution(remote_backend, &mut Default::default(), &DummyCodeExecutor, "test", &[]).unwrap(); // check proof locally - let (local_result, _) = proof_check(remote_root, remote_proof, + let (local_result, _) = execution_proof_check(remote_root, remote_proof, &mut Default::default(), &DummyCodeExecutor, "test", &[]).unwrap(); // check that both results are correct From 142f1fe68fdef2f16cd49aeb9724be5079dec248 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 28 Jun 2018 16:37:05 +0300 Subject: [PATCH 2/3] fixed comment --- substrate/state-machine/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index 9cc231f762b3b..f5e1263a84172 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -226,7 +226,7 @@ pub fn prove_execution( Ok((result, proof, transaction)) } -/// Check execution proof, generated by `prove` call. +/// Check execution proof, generated by `prove_execution` call. pub fn execution_proof_check( root: [u8; 32], proof: Vec>, From 68b08e242e37624d7edaee8aa82faa092fe4351a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 28 Jun 2018 17:55:51 +0300 Subject: [PATCH 3/3] fixed grumbles --- substrate/client/src/in_mem.rs | 2 +- substrate/client/src/light/backend.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/src/in_mem.rs b/substrate/client/src/in_mem.rs index 29d74ef466c30..0473c1fa6ae07 100644 --- a/substrate/client/src/in_mem.rs +++ b/substrate/client/src/in_mem.rs @@ -113,7 +113,7 @@ impl Blockchain { genesis_hash: Default::default(), })); Blockchain { - storage: storage.clone(), + storage: storage, } } diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index 172b811473107..dce924f6ded48 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -35,7 +35,7 @@ pub struct Backend { blockchain: Arc>, } -/// Ligh block (header and justification) import operation. +/// Light block (header and justification) import operation. pub struct ImportOperation { is_new_best: bool, header: Option,