Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

DB-based light client backend #250

Merged
merged 4 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions polkadot/service/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,19 @@ impl Components for FullComponents {
pub struct LightComponents;

impl Components for LightComponents {
type Backend = client::light::Backend<Block>;
type Backend = client::light::backend::Backend<client_db::light::LightStorage<Block>, network::OnDemand<Block, network::Service<Block>>>;
type Api = polkadot_api::light::RemotePolkadotApiWrapper<Self::Backend, Self::Executor>;
type Executor = client::RemoteCallExecutor<client::light::Backend<Block>, network::OnDemand<Block, network::Service<Block>>>;
type Executor = client::light::call_executor::RemoteCallExecutor<
client::light::blockchain::Blockchain<client_db::light::LightStorage<Block>, network::OnDemand<Block, network::Service<Block>>>,
network::OnDemand<Block, network::Service<Block>>>;

fn build_client(&self, _settings: client_db::DatabaseSettings, executor: CodeExecutor, genesis_storage: MakeStorage)
fn build_client(&self, db_settings: client_db::DatabaseSettings, executor: CodeExecutor, genesis_storage: MakeStorage)
-> Result<(Arc<client::Client<Self::Backend, Self::Executor, Block>>, Option<Arc<network::OnDemand<Block, network::Service<Block>>>>), error::Error> {
let client_backend = client::light::new_light_backend();
let fetch_checker = Arc::new(client::light::new_fetch_checker(client_backend.clone(), executor));
let db_storage = client_db::light::LightStorage::new(db_settings)?;
let light_blockchain = client::light::new_light_blockchain(db_storage);
let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor));
let fetcher = Arc::new(network::OnDemand::new(fetch_checker));
let client_backend = client::light::new_light_backend(light_blockchain, fetcher.clone());
let client = client::light::new_light(client_backend, fetcher.clone(), genesis_storage)?;
Ok((Arc::new(client), Some(fetcher)))
}
Expand Down
2 changes: 1 addition & 1 deletion polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<Components> Service<Components>
},
network_config: config.network,
chain: client.clone(),
on_demand: on_demand.clone().map(|d| d as Arc<network::OnDemandService>),
on_demand: on_demand.clone().map(|d| d as Arc<network::OnDemandService<Block>>),
transaction_pool: transaction_pool_adapter,
};
let network = network::Service::new(network_params)?;
Expand Down
161 changes: 42 additions & 119 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ extern crate log;
#[cfg(test)]
extern crate kvdb_memorydb;

pub mod light;

mod utils;

use std::sync::Arc;
use std::path::PathBuf;

use codec::Slicable;
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
use memorydb::MemoryDB;
use parking_lot::RwLock;
Expand All @@ -50,6 +53,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hashing
use runtime_primitives::BuildStorage;
use state_machine::backend::Backend as StateBackend;
use state_machine::{CodeExecutor, TrieH256, DBValue};
use utils::{Meta, db_err, meta_keys, number_to_db_key, open_database, read_db, read_id, read_meta};
use state_db::StateDb;
pub use state_db::PruningMode;

Expand Down Expand Up @@ -94,11 +98,6 @@ mod columns {
pub const HEADER: Option<u32> = Some(4);
pub const BODY: Option<u32> = Some(5);
pub const JUSTIFICATION: Option<u32> = Some(6);
pub const NUM_COLUMNS: u32 = 7;
}

mod meta {
pub const BEST_BLOCK: &[u8; 4] = b"best";
}

struct PendingBlock<Block: BlockT> {
Expand All @@ -108,37 +107,6 @@ struct PendingBlock<Block: BlockT> {
is_best: bool,
}

#[derive(Clone)]
struct Meta<N, H> {
best_hash: H,
best_number: N,
genesis_hash: H,
}

type BlockKey = [u8; 4];

// Little endian
fn number_to_db_key<N>(n: N) -> BlockKey where N: As<u32> {
let n: u32 = n.as_();

[
(n >> 24) as u8,
((n >> 16) & 0xff) as u8,
((n >> 8) & 0xff) as u8,
(n & 0xff) as u8
]
}

// Maps database error to client error
fn db_err(err: kvdb::Error) -> client::error::Error {
use std::error::Error;
match err.kind() {
&kvdb::ErrorKind::Io(ref err) => client::error::ErrorKind::Backend(err.description().into()).into(),
&kvdb::ErrorKind::Msg(ref m) => client::error::ErrorKind::Backend(m.clone()).into(),
_ => client::error::ErrorKind::Backend("Unknown backend error".into()).into(),
}
}

// wrapper that implements trait required for state_db
struct StateMetaDb<'a>(&'a KeyValueDB);

Expand All @@ -157,59 +125,14 @@ pub struct BlockchainDb<Block: BlockT> {
}

impl<Block: BlockT> BlockchainDb<Block> where <Block::Header as HeaderT>::Number: As<u32> {
fn id(&self, id: BlockId<Block>) -> Result<Option<BlockKey>, client::error::Error> {
match id {
BlockId::Hash(h) => {
{
let meta = self.meta.read();
if meta.best_hash == h {
return Ok(Some(number_to_db_key(meta.best_number)));
}
}
self.db.get(columns::BLOCK_INDEX, h.as_ref()).map(|v| v.map(|v| {
let mut key: [u8; 4] = [0; 4];
key.copy_from_slice(&v);
key
})).map_err(db_err)
},
BlockId::Number(n) => Ok(Some(number_to_db_key(n))),
}
}

fn new(db: Arc<KeyValueDB>) -> Result<Self, client::error::Error> {
let (best_hash, best_number) = if let Some(Some(header)) = db.get(columns::META, meta::BEST_BLOCK).and_then(|id|
match id {
Some(id) => db.get(columns::HEADER, &id).map(|h| h.map(|b| Block::Header::decode(&mut &b[..]))),
None => Ok(None),
}).map_err(db_err)?
{
let hash = header.hash();
debug!("DB Opened blockchain db, best {:?} ({})", hash, header.number());
(hash, header.number().clone())
} else {
(Default::default(), Zero::zero())
};
let genesis_hash = db.get(columns::HEADER, &number_to_db_key(<Block::Header as HeaderT>::Number::zero())).map_err(db_err)?
.map(|b| HashingFor::<Block>::hash(&b)).unwrap_or_default().into();

let meta = read_meta::<Block>(&*db, columns::HEADER)?;
Ok(BlockchainDb {
db,
meta: RwLock::new(Meta {
best_hash,
best_number,
genesis_hash,
})
meta: RwLock::new(meta)
})
}

fn read_db(&self, id: BlockId<Block>, column: Option<u32>) -> Result<Option<DBValue>, client::error::Error> {
self.id(id).and_then(|key|
match key {
Some(key) => self.db.get(column, &key).map_err(db_err),
None => Ok(None),
})
}

fn update_meta(&self, hash: Block::Hash, number: <Block::Header as HeaderT>::Number, is_best: bool) {
if is_best {
let mut meta = self.meta.write();
Expand All @@ -222,9 +145,9 @@ impl<Block: BlockT> BlockchainDb<Block> where <Block::Header as HeaderT>::Number
}
}

impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> where <Block::Header as HeaderT>::Number: As<u32> {
impl<Block: BlockT> client::blockchain::HeaderBackend<Block> for BlockchainDb<Block> where <Block::Header as HeaderT>::Number: As<u32> {
fn header(&self, id: BlockId<Block>) -> Result<Option<Block::Header>, client::error::Error> {
match self.read_db(id, columns::HEADER)? {
match read_db(&*self.db, columns::BLOCK_INDEX, columns::HEADER, id)? {
Some(header) => match Block::Header::decode(&mut &header[..]) {
Some(header) => Ok(Some(header)),
None => return Err(client::error::ErrorKind::Backend("Error decoding header".into()).into()),
Expand All @@ -233,26 +156,6 @@ impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> w
}
}

fn body(&self, id: BlockId<Block>) -> Result<Option<Vec<Block::Extrinsic>>, client::error::Error> {
match self.read_db(id, columns::BODY)? {
Some(body) => match Slicable::decode(&mut &body[..]) {
Some(body) => Ok(Some(body)),
None => return Err(client::error::ErrorKind::Backend("Error decoding body".into()).into()),
}
None => Ok(None),
}
}

fn justification(&self, id: BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, client::error::Error> {
match self.read_db(id, columns::JUSTIFICATION)? {
Some(justification) => match Slicable::decode(&mut &justification[..]) {
Some(justification) => Ok(Some(justification)),
None => return Err(client::error::ErrorKind::Backend("Error decoding justification".into()).into()),
}
None => Ok(None),
}
}

fn info(&self) -> Result<client::blockchain::Info<Block>, client::error::Error> {
let meta = self.meta.read();
Ok(client::blockchain::Info {
Expand All @@ -264,7 +167,7 @@ impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> w

fn status(&self, id: BlockId<Block>) -> Result<client::blockchain::BlockStatus, client::error::Error> {
let exists = match id {
BlockId::Hash(_) => self.id(id)?.is_some(),
BlockId::Hash(_) => read_id(&*self.db, columns::BLOCK_INDEX, id)?.is_some(),
BlockId::Number(n) => n <= self.meta.read().best_number,
};
match exists {
Expand All @@ -274,12 +177,34 @@ impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> w
}

fn hash(&self, number: <Block::Header as HeaderT>::Number) -> Result<Option<Block::Hash>, client::error::Error> {
self.read_db(BlockId::Number(number), columns::HEADER).map(|x|
read_db::<Block>(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x|
x.map(|raw| HashingFor::<Block>::hash(&raw[..])).map(Into::into)
)
}
}

impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> where <Block::Header as HeaderT>::Number: As<u32> {
fn body(&self, id: BlockId<Block>) -> Result<Option<Vec<Block::Extrinsic>>, client::error::Error> {
match read_db(&*self.db, columns::BLOCK_INDEX, columns::BODY, id)? {
Some(body) => match Slicable::decode(&mut &body[..]) {
Some(body) => Ok(Some(body)),
None => return Err(client::error::ErrorKind::Backend("Error decoding body".into()).into()),
}
None => Ok(None),
}
}

fn justification(&self, id: BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, client::error::Error> {
match read_db(&*self.db, columns::BLOCK_INDEX, columns::JUSTIFICATION, id)? {
Some(justification) => match Slicable::decode(&mut &justification[..]) {
Some(justification) => Ok(Some(justification)),
None => return Err(client::error::ErrorKind::Backend("Error decoding justification".into()).into()),
}
None => Ok(None),
}
}
}

/// Database transaction
pub struct BlockImportOperation<Block: BlockT> {
old_state: DbState,
Expand Down Expand Up @@ -351,18 +276,16 @@ pub struct Backend<Block: BlockT> {
impl<Block: BlockT> Backend<Block> where <Block::Header as HeaderT>::Number: As<u32> {
/// Create a new instance of database backend.
pub fn new(config: DatabaseSettings, finalization_window: u64) -> Result<Self, client::error::Error> {
let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS));
db_config.memory_budget = config.cache_size;
db_config.wal = true;
let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?;
let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?);
let db = open_database(&config, "full")?;

Backend::from_kvdb(db as Arc<_>, config.pruning, finalization_window)
}

#[cfg(test)]
fn new_test() -> Self {
let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS));
use utils::NUM_COLUMNS;

let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS));

Backend::from_kvdb(db as Arc<_>, PruningMode::keep_blocks(0), 0).expect("failed to create test-db")
}
Expand Down Expand Up @@ -417,12 +340,12 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
}

fn commit_operation(&self, mut operation: Self::BlockImportOperation) -> Result<(), client::error::Error> {
use client::blockchain::Backend;
use client::blockchain::HeaderBackend;
let mut transaction = DBTransaction::new();
if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.hash();
let number = pending_block.header.number().clone();
let key = number_to_db_key(pending_block.header.number().clone());
let key = number_to_db_key(number.clone());
transaction.put(columns::HEADER, &key, &pending_block.header.encode());
if let Some(body) = pending_block.body {
transaction.put(columns::BODY, &key, &body.encode());
Expand All @@ -432,7 +355,7 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
}
transaction.put(columns::BLOCK_INDEX, hash.as_ref(), &key);
if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key);
transaction.put(columns::META, meta_keys::BEST_BLOCK, &key);
}
let mut changeset: state_db::ChangeSet<H256> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.updates.drain() {
Expand Down Expand Up @@ -472,7 +395,7 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
}

fn state_at(&self, block: BlockId<Block>) -> Result<Self::State, client::error::Error> {
use client::blockchain::Backend as BcBackend;
use client::blockchain::HeaderBackend as BcHeaderBackend;

// special case for genesis initialization
match block {
Expand All @@ -499,7 +422,7 @@ mod tests {
use super::*;
use client::backend::Backend as BTrait;
use client::backend::BlockImportOperation as Op;
use client::blockchain::Backend as BCTrait;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use runtime_primitives::testing::{Header, Block as RawBlock};

type Block = RawBlock<u64>;
Expand Down
Loading