From fe7b0d81f7e796ca098534bffb510777c1d0edfd Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Wed, 11 Sep 2019 14:23:04 +0200 Subject: [PATCH] runtime: Verify read/write sets during transaction execution --- client/src/transaction/snapshot.rs | 4 +- runtime/src/dispatcher.rs | 20 ++- runtime/src/storage/context.rs | 34 +++-- runtime/src/storage/mkvs/mod.rs | 2 +- runtime/src/storage/mkvs/urkel/tree/mkvs.rs | 4 +- runtime/src/transaction/dispatcher.rs | 159 ++++++++++++++++++-- runtime/src/transaction/rwset.rs | 148 +++++++++++++++++- tests/clients/simple-keyvalue/src/main.rs | 2 +- tests/runtimes/simple-keyvalue/src/main.rs | 2 +- 9 files changed, 328 insertions(+), 47 deletions(-) diff --git a/client/src/transaction/snapshot.rs b/client/src/transaction/snapshot.rs index 817fe16b470..6feb43b4f6c 100644 --- a/client/src/transaction/snapshot.rs +++ b/client/src/transaction/snapshot.rs @@ -110,8 +110,8 @@ impl BlockSnapshot { } impl MKVS for BlockSnapshot { - fn get(&self, ctx: Context, key: &[u8]) -> Option> { - MKVS::get(&self.mkvs, ctx, key) + fn get(&mut self, ctx: Context, key: &[u8]) -> Option> { + MKVS::get(&mut self.mkvs, ctx, key) } fn insert(&mut self, _ctx: Context, _key: &[u8], _value: &[u8]) -> Option> { diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index 7519d2fa633..0e2da070347 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -36,10 +36,12 @@ use crate::{ }, UrkelTree, }, - StorageContext, + KeyValue, StorageContext, }, transaction::{ - dispatcher::Dispatcher as TxnDispatcher, tree::Tree as TxnTree, types::TxnBatch, + dispatcher::{DispatchStorageContext, Dispatcher as TxnDispatcher}, + tree::Tree as TxnTree, + types::TxnBatch, Context as TxnContext, }, types::{Body, ComputedBatch}, @@ -267,15 +269,19 @@ impl Dispatcher { }, ); - let untrusted_local = Arc::new(ProtocolUntrustedLocalStorage::new( + let untrusted_local: Arc = Arc::new(ProtocolUntrustedLocalStorage::new( Context::create_child(&ctx), protocol.clone(), )); let txn_ctx = TxnContext::new(ctx.clone(), &block.header, check_only); - let (mut outputs, mut tags) = - StorageContext::enter(&mut cache.mkvs, untrusted_local.clone(), || { - txn_dispatcher.dispatch_batch(&inputs, txn_ctx) - }); + let (mut outputs, mut tags) = txn_dispatcher.dispatch_batch( + DispatchStorageContext { + mkvs: &mut cache.mkvs, + untrusted_local: &untrusted_local, + }, + &inputs, + txn_ctx, + ); if check_only { debug!(self.logger, "Transaction batch check complete"); diff --git a/runtime/src/storage/context.rs b/runtime/src/storage/context.rs index 61f2e610f0b..a0f22215839 100644 --- a/runtime/src/storage/context.rs +++ b/runtime/src/storage/context.rs @@ -1,8 +1,8 @@ //! Thread-local storage context. //! -//! The storage context is a convenient way to share CAS and MKVS +//! The storage context is a convenient way to share MKVS //! implementations across the current thread. -use std::{cell::RefCell, sync::Arc}; +use std::{cell::RefCell, mem::transmute, sync::Arc}; use super::{KeyValue, MKVS}; @@ -15,29 +15,30 @@ thread_local! { static CTX: RefCell> = RefCell::new(None); } -struct CtxGuard; +struct CtxGuard { + old_ctx: Option, +} impl CtxGuard { - fn new(mkvs: &mut M, untrusted_local: Arc) -> Self - where - M: MKVS + 'static, - { - CTX.with(|ctx| { - assert!(ctx.borrow().is_none(), "nested enter is not allowed"); + fn new(mkvs: &mut dyn MKVS, untrusted_local: Arc) -> Self { + let old_ctx = CTX.with(|ctx| { ctx.borrow_mut().replace(Ctx { - mkvs, + // Need to fake the 'static lifetime on the trait. This is fine as we know + // that the pointer can only actually be used while the StorageContext is + // live. + mkvs: unsafe { transmute::<&mut dyn MKVS, &'static mut dyn MKVS>(mkvs) }, untrusted_local, - }); + }) }); - CtxGuard + CtxGuard { old_ctx } } } impl Drop for CtxGuard { fn drop(&mut self) { CTX.with(|local| { - drop(local.borrow_mut().take()); + drop(local.replace(self.old_ctx.take())); }); } } @@ -47,9 +48,8 @@ pub struct StorageContext; impl StorageContext { /// Enter the storage context. - pub fn enter(mkvs: &mut M, untrusted_local: Arc, f: F) -> R + pub fn enter(mkvs: &mut dyn MKVS, untrusted_local: Arc, f: F) -> R where - M: MKVS + 'static, F: FnOnce() -> R, { let _guard = CtxGuard::new(mkvs, untrusted_local); @@ -69,8 +69,10 @@ impl StorageContext { let ctx = ctx.borrow(); let ctx_ref = ctx.as_ref().expect("must only be called while entered"); let mkvs_ref = unsafe { ctx_ref.mkvs.as_mut().expect("pointer is never null") }; + let untrusted_local_ref = ctx_ref.untrusted_local.clone(); + drop(ctx); - f(mkvs_ref, &ctx_ref.untrusted_local) + f(mkvs_ref, &untrusted_local_ref) }) } } diff --git a/runtime/src/storage/mkvs/mod.rs b/runtime/src/storage/mkvs/mod.rs index 7a9615ea156..384c255b16d 100644 --- a/runtime/src/storage/mkvs/mod.rs +++ b/runtime/src/storage/mkvs/mod.rs @@ -109,7 +109,7 @@ impl From> for Prefix { /// Merklized key-value store. pub trait MKVS: Send + Sync { /// Fetch entry with given key. - fn get(&self, ctx: Context, key: &[u8]) -> Option>; + fn get(&mut self, ctx: Context, key: &[u8]) -> Option>; /// Update entry with given key. /// diff --git a/runtime/src/storage/mkvs/urkel/tree/mkvs.rs b/runtime/src/storage/mkvs/urkel/tree/mkvs.rs index 7fd2a2e3f68..8f6a16e4306 100644 --- a/runtime/src/storage/mkvs/urkel/tree/mkvs.rs +++ b/runtime/src/storage/mkvs/urkel/tree/mkvs.rs @@ -12,9 +12,9 @@ unsafe impl Sync for UrkelTree {} // TODO: We should likely change the MKVS interface to propagate errors instead of unwrapping. impl MKVS for UrkelTree { - fn get(&self, ctx: Context, key: &[u8]) -> Option> { + fn get(&mut self, ctx: Context, key: &[u8]) -> Option> { let _lock = self.lock.lock().unwrap(); - self.get(ctx, key).unwrap() + UrkelTree::get(self, ctx, key).unwrap() } fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Option> { diff --git a/runtime/src/transaction/dispatcher.rs b/runtime/src/transaction/dispatcher.rs index 93017e80bdc..32f53cf924a 100644 --- a/runtime/src/transaction/dispatcher.rs +++ b/runtime/src/transaction/dispatcher.rs @@ -1,7 +1,8 @@ //! Runtime transaction batch dispatcher. -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use failure::{Fallible, ResultExt}; +use io_context::Context as IoContext; use serde::{de::DeserializeOwned, Serialize}; use super::{ @@ -9,7 +10,10 @@ use super::{ tags::Tags, types::{TxnBatch, TxnCall, TxnOutput}, }; -use crate::common::{cbor, crypto::hash::Hash}; +use crate::{ + common::{cbor, crypto::hash::Hash}, + storage::{context::StorageContext, mkvs::MKVS, KeyValue}, +}; /// Dispatch error. #[derive(Debug, Fail)] @@ -18,6 +22,14 @@ enum DispatchError { MethodNotFound { method: String }, } +/// Storage context configuration to be used during transaction dispatch. +pub struct DispatchStorageContext<'a> { + /// Reference to the MKVS backend. + pub mkvs: &'a mut dyn MKVS, + /// Reference to the untrusted local key/value store. + pub untrusted_local: &'a Arc, +} + /// Error indicating that performing a transaction check was successful. #[derive(Debug, Fail)] #[fail(display = "transaction check successful")] @@ -101,7 +113,12 @@ pub trait MethodHandlerDispatch { fn get_descriptor(&self) -> &MethodDescriptor; /// Dispatches the given raw call. - fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> Fallible; + fn dispatch( + &self, + storage: &mut DispatchStorageContext, + call: TxnCall, + ctx: &mut Context, + ) -> Fallible; } struct MethodHandlerDispatchImpl { @@ -120,11 +137,36 @@ where &self.descriptor } - fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> Fallible { + fn dispatch( + &self, + storage: &mut DispatchStorageContext, + call: TxnCall, + ctx: &mut Context, + ) -> Fallible { + let predicted_rw_set = call.predicted_rw_set; let call = cbor::from_value(call.args).context("unable to parse call arguments")?; - let response = self.handler.handle(&call, ctx)?; - Ok(cbor::to_value(response)) + let response = if predicted_rw_set.is_empty() { + // If the predicted read/write set is empty, there is no need to do any + // verification. It is very important that this notion of an empty read + // write set is understood by the transaction scheduler in the same way. + StorageContext::enter(storage.mkvs, storage.untrusted_local.clone(), || { + self.handler.handle(&call, ctx) + }) + } else { + // Wrap current storage context into a read-write set verifier. + let mut verifier = predicted_rw_set.into_verifier(storage.mkvs); + let response = + StorageContext::enter(&mut verifier, storage.untrusted_local.clone(), || { + self.handler.handle(&call, ctx) + }); + + // Try to commit. + verifier.commit(IoContext::create_child(&ctx.io_ctx))?; + response + }; + + Ok(cbor::to_value(response?)) } } @@ -156,8 +198,13 @@ impl Method { } /// Dispatch method call. - pub fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> Fallible { - self.dispatcher.dispatch(call, ctx) + pub fn dispatch( + &self, + storage: &mut DispatchStorageContext, + call: TxnCall, + ctx: &mut Context, + ) -> Fallible { + self.dispatcher.dispatch(storage, call, ctx) } } @@ -217,7 +264,12 @@ impl Dispatcher { } /// Dispatches a batch of runtime requests. - pub fn dispatch_batch(&self, batch: &TxnBatch, mut ctx: Context) -> (TxnBatch, Vec) { + pub fn dispatch_batch( + &self, + mut storage: DispatchStorageContext, + batch: &TxnBatch, + mut ctx: Context, + ) -> (TxnBatch, Vec) { if let Some(ref ctx_init) = self.ctx_initializer { ctx_init.init(&mut ctx); } @@ -233,7 +285,7 @@ impl Dispatcher { .iter() .map(|call| { ctx.start_transaction(); - self.dispatch(call, &mut ctx) + self.dispatch(&mut storage, call, &mut ctx) }) .collect(), ); @@ -247,8 +299,13 @@ impl Dispatcher { } /// Dispatches a raw runtime invocation request. - pub fn dispatch(&self, call: &Vec, ctx: &mut Context) -> Vec { - let rsp = match self.dispatch_fallible(call, ctx) { + pub fn dispatch( + &self, + storage: &mut DispatchStorageContext, + call: &Vec, + ctx: &mut Context, + ) -> Vec { + let rsp = match self.dispatch_fallible(storage, call, ctx) { Ok(response) => TxnOutput::Success(response), Err(error) => { if let Some(check_msg) = error.downcast_ref::() { @@ -262,11 +319,16 @@ impl Dispatcher { cbor::to_vec(&rsp) } - fn dispatch_fallible(&self, call: &Vec, ctx: &mut Context) -> Fallible { + fn dispatch_fallible( + &self, + storage: &mut DispatchStorageContext, + call: &Vec, + ctx: &mut Context, + ) -> Fallible { let call: TxnCall = cbor::from_slice(call).context("unable to parse call")?; match self.methods.get(&call.method) { - Some(dispatcher) => dispatcher.dispatch(call, ctx), + Some(dispatcher) => dispatcher.dispatch(storage, call, ctx), None => Err(DispatchError::MethodNotFound { method: call.method, } @@ -284,10 +346,23 @@ impl Dispatcher { #[cfg(test)] mod tests { + use std::sync::Arc; + + use failure::Fallible; use io_context::Context as IoContext; use serde_derive::{Deserialize, Serialize}; - use crate::common::{cbor, roothash::Header}; + use crate::{ + common::{ + cbor, + crypto::hash::Hash, + roothash::{Header, Namespace}, + }, + storage::{ + mkvs::{Prefix, WriteLog, MKVS}, + KeyValue, + }, + }; use super::*; @@ -317,6 +392,49 @@ mod tests { )); } + struct Dummy; + + impl MKVS for Dummy { + fn get(&mut self, _ctx: IoContext, _key: &[u8]) -> Option> { + unimplemented!(); + } + + fn insert(&mut self, _ctx: IoContext, _key: &[u8], _value: &[u8]) -> Option> { + unimplemented!(); + } + + fn remove(&mut self, _ctx: IoContext, _key: &[u8]) -> Option> { + unimplemented!(); + } + + fn prefetch_prefixes(&self, _ctx: IoContext, _prefixes: &Vec, _limit: u16) { + unimplemented!(); + } + + fn commit( + &mut self, + _ctx: IoContext, + _namespace: Namespace, + _round: u64, + ) -> Fallible<(WriteLog, Hash)> { + unimplemented!(); + } + + fn rollback(&mut self) { + unimplemented!(); + } + } + + impl KeyValue for Dummy { + fn get(&self, _key: Vec) -> Fallible> { + unimplemented!(); + } + + fn insert(&self, _key: Vec, _value: Vec) -> Fallible<()> { + unimplemented!(); + } + } + #[test] fn test_dispatcher() { let mut dispatcher = Dispatcher::new(); @@ -340,7 +458,16 @@ mod tests { let mut ctx = Context::new(IoContext::background().freeze(), &header, false); // Call runtime. - let result = dispatcher.dispatch(&call_encoded, &mut ctx); + let mut dummy_mkvs = Dummy; + let dummy_kv: Arc = Arc::new(Dummy); + let result = dispatcher.dispatch( + &mut DispatchStorageContext { + mkvs: &mut dummy_mkvs, + untrusted_local: &dummy_kv, + }, + &call_encoded, + &mut ctx, + ); // Decode result. let result_decoded: TxnOutput = cbor::from_slice(&result).unwrap(); diff --git a/runtime/src/transaction/rwset.rs b/runtime/src/transaction/rwset.rs index 02bc3070e12..562f8958e08 100644 --- a/runtime/src/transaction/rwset.rs +++ b/runtime/src/transaction/rwset.rs @@ -1,10 +1,30 @@ //! Read/write set. +use std::{ + borrow::Borrow, + collections::{HashMap, HashSet}, + iter::FromIterator, +}; + +use failure::Fallible; +use io_context::Context; use serde_bytes; use serde_derive::{Deserialize, Serialize}; +use crate::{ + common::{crypto::hash::Hash, roothash::Namespace}, + storage::mkvs::{Prefix, WriteLog, MKVS}, +}; + +/// Read-write set validation error. +#[derive(Debug, Fail)] +enum ReadWriteSetValidationError { + #[fail(display = "predicted read/write set differs from actual")] + Misprediction, +} + /// A coarsened key prefix that represents any key that starts with /// this prefix. -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct CoarsenedKey(#[serde(with = "serde_bytes")] pub Vec); impl AsRef<[u8]> for CoarsenedKey { @@ -25,6 +45,13 @@ impl From> for CoarsenedKey { } } +impl Borrow<[u8]> for CoarsenedKey { + #[inline] + fn borrow(&self) -> &[u8] { + self.as_ref() + } +} + /// A set of coarsened keys. pub type CoarsenedSet = Vec; @@ -39,6 +66,125 @@ pub struct ReadWriteSet { pub write_set: CoarsenedSet, } +impl ReadWriteSet { + /// Returns true if this read/write set contains no elements. + pub fn is_empty(&self) -> bool { + self.granularity == 0 && self.read_set.is_empty() && self.write_set.is_empty() + } + + /// Turns this read-write set into a verifeir. + pub fn into_verifier(self, mkvs: &mut dyn MKVS) -> Verifier { + Verifier::new(self, mkvs) + } +} + +/// A read/write set verifier makes sure that all MKVS queries conform to +/// the provided read-write set. +/// +/// To do this it wraps an existing MKVS implementation and intercepts all +/// read and write operations. The wrapped MKVS is always the one from the +/// current storage context. +pub struct Verifier<'a> { + mkvs: &'a mut dyn MKVS, + granularity: usize, + read_set: HashSet, + write_set: HashSet, + updates: HashMap, Option>>, + valid: bool, +} + +impl<'a> Verifier<'a> { + /// Create a new verifier. + pub fn new(rw_set: ReadWriteSet, mkvs: &'a mut dyn MKVS) -> Self { + Self { + mkvs, + granularity: rw_set.granularity as usize, + read_set: HashSet::from_iter(rw_set.read_set.into_iter()), + write_set: HashSet::from_iter(rw_set.write_set.into_iter()), + updates: HashMap::new(), + valid: true, + } + } + + /// Commit updates to the wrapped MKVS storage backends in case read + /// write set matched predictions. + /// + /// In case of mismatch this will return an error and not commit any + /// updates. + pub fn commit(self, ctx: Context) -> Fallible<()> { + if !self.valid { + return Err(ReadWriteSetValidationError::Misprediction.into()); + } + + // Apply updates to backing store if read-write set matches. + let ctx = ctx.freeze(); + for (key, value) in self.updates { + let ctx = Context::create_child(&ctx); + match value { + Some(value) => self.mkvs.insert(ctx, &key, &value), + None => self.mkvs.remove(ctx, &key), + }; + } + + Ok(()) + } + + fn check_read(&mut self, key: &[u8]) { + if !self.read_set.contains(&key[0..self.granularity]) { + self.valid = false; + } + } + + fn check_write(&mut self, key: &[u8]) { + if !self.write_set.contains(&key[0..self.granularity]) { + self.valid = false; + } + } +} + +impl<'a> MKVS for Verifier<'a> { + fn get(&mut self, ctx: Context, key: &[u8]) -> Option> { + self.check_read(key); + + // Check local update map first. + if let Some(value) = self.updates.get(key) { + return value.clone(); + } + + self.mkvs.get(ctx, key) + } + + fn insert(&mut self, ctx: Context, key: &[u8], value: &[u8]) -> Option> { + self.check_write(key); + self.updates.insert(key.to_owned(), Some(value.to_owned())); + self.mkvs.insert(ctx, key, value) + } + + fn remove(&mut self, ctx: Context, key: &[u8]) -> Option> { + self.check_write(key); + self.updates.insert(key.to_owned(), None); + self.mkvs.remove(ctx, key) + } + + fn prefetch_prefixes(&self, ctx: Context, prefixes: &Vec, limit: u16) { + // Pass-through as it has no effect on either reads or writes. + self.mkvs.prefetch_prefixes(ctx, prefixes, limit) + } + + fn commit( + &mut self, + _ctx: Context, + _namespace: Namespace, + _round: u64, + ) -> Fallible<(WriteLog, Hash)> { + panic!("should not call commit on read/write set verification wrapper"); + } + + fn rollback(&mut self) { + panic!("should not call rollback on read/write set verification wrapper"); + } +} + #[cfg(test)] mod test { use crate::common::cbor; diff --git a/tests/clients/simple-keyvalue/src/main.rs b/tests/clients/simple-keyvalue/src/main.rs index 357b84a81df..edbe3b9b5c6 100644 --- a/tests/clients/simple-keyvalue/src/main.rs +++ b/tests/clients/simple-keyvalue/src/main.rs @@ -120,7 +120,7 @@ fn main() { // Test get_latest_block call. println!("Getting latest block..."); - let snapshot = rt + let mut snapshot = rt .block_on(kv_client.txn_client().get_latest_block()) .expect("get latest block snapshot"); println!("Retrieved block: {:?}", snapshot.block); diff --git a/tests/runtimes/simple-keyvalue/src/main.rs b/tests/runtimes/simple-keyvalue/src/main.rs index f466d87f7b2..14fc404e4cc 100644 --- a/tests/runtimes/simple-keyvalue/src/main.rs +++ b/tests/runtimes/simple-keyvalue/src/main.rs @@ -177,7 +177,7 @@ impl EncryptionContext { } /// Get encrypted MKVS entry. - pub fn get(&self, mkvs: &dyn MKVS, ctx: IoContext, key: &[u8]) -> Option> { + pub fn get(&self, mkvs: &mut dyn MKVS, ctx: IoContext, key: &[u8]) -> Option> { let key = self.derive_encrypted_key(key); let ciphertext = match mkvs.get(ctx, &key) { Some(ciphertext) => ciphertext,