diff --git a/clients/utils/Cargo.toml b/clients/utils/Cargo.toml index 03274535211..8641b3af4dd 100644 --- a/clients/utils/Cargo.toml +++ b/clients/utils/Cargo.toml @@ -14,15 +14,23 @@ threadpool = { version = "1.7.1", optional = true } time = { version = "0.1", optional = true } histogram = { version = "0.6.8", optional = true } grpcio = { git = "https://github.com/ekiden/grpc-rs", tag = "v0.3.0-ekiden2", features = ["openssl"] } +ekiden-consensus-base = { path = "../../consensus/base", version = "0.2.0-alpha" } +ekiden-consensus-client = { path = "../../consensus/client", version = "0.2.0-alpha" } ekiden-core = { path = "../../core/common", version = "0.2.0-alpha" } +ekiden-db-trusted = { path = "../../db/trusted", version = "0.2.0-alpha" } ekiden-di = { path = "../../di", version = "0.2.0-alpha" } ekiden-epochtime = { path = "../../epochtime", version = "0.2.0-alpha" } ekiden-ethereum = { path = "../../ethereum", version = "0.2.0-alpha" } ekiden-instrumentation-prometheus = { path = "../../instrumentation/prometheus", version = "0.2.0-alpha" } ekiden-scheduler-base = { path = "../../scheduler/base", version = "0.2.0-alpha" } ekiden-scheduler-client = { path = "../../scheduler/client", version = "0.2.0-alpha" } +ekiden-storage-base = { path = "../../storage/base", version = "0.2.0-alpha" } +ekiden-storage-frontend = { path = "../../storage/frontend", version = "0.2.0-alpha" } ekiden-registry-base = { path = "../../registry/base", version = "0.2.0-alpha" } ekiden-registry-client = { path = "../../registry/client", version = "0.2.0-alpha" } ekiden-rpc-client = { path = "../../rpc/client", version = "0.2.0-alpha" } pretty_env_logger = "0.2" log = "0.4" + +[dev-dependencies] +ekiden-storage-dummy = { path = "../../storage/dummy", version = "0.2.0-alpha" } diff --git a/clients/utils/src/components.rs b/clients/utils/src/components.rs index b4bd3db8e95..ae5f69e3cf9 100644 --- a/clients/utils/src/components.rs +++ b/clients/utils/src/components.rs @@ -1,4 +1,5 @@ //! DI components for use in clients. +use ekiden_consensus_client; use ekiden_core; use ekiden_di::{Component, KnownComponents}; use ekiden_epochtime; @@ -6,6 +7,7 @@ use ekiden_ethereum; use ekiden_instrumentation_prometheus; use ekiden_registry_client; use ekiden_scheduler_client; +use ekiden_storage_frontend; /// Register known components for dependency injection. pub fn register_components(known_components: &mut KnownComponents) { @@ -26,6 +28,10 @@ pub fn register_components(known_components: &mut KnownComponents) { ekiden_scheduler_client::SchedulerClient::register(known_components); // Entity registry. ekiden_registry_client::EntityRegistryClient::register(known_components); + // Consensus. + ekiden_consensus_client::ConsensusClient::register(known_components); + // Storage. + ekiden_storage_frontend::StorageClient::register(known_components); } /// Create known component registry. diff --git a/clients/utils/src/db.rs b/clients/utils/src/db.rs new file mode 100644 index 00000000000..3a95fc679c9 --- /dev/null +++ b/clients/utils/src/db.rs @@ -0,0 +1,249 @@ +//! Read-only database access with best-effort freshness. + +use std::sync::Arc; +use std::sync::Mutex; + +use ekiden_consensus_base::ConsensusBackend; +use ekiden_core; +use ekiden_core::bytes::B256; +use ekiden_core::bytes::H256; +use ekiden_core::environment::Environment; +use ekiden_core::error::Result; +use ekiden_core::futures::Future; +use ekiden_core::futures::Stream; +use ekiden_db_trusted::patricia_trie::PatriciaTrie; +use ekiden_db_trusted::Database; +use ekiden_di::Container; +use ekiden_storage_base::BackendIdentityMapper; +use ekiden_storage_base::StorageBackend; +use ekiden_storage_base::StorageMapper; + +/// An implementation of the read methods of `Database`. Represents a single fixed state. +pub struct Snapshot { + /// The root hash that identifies the state in this snapshot. + root_hash: Option, + /// This handles access to the database and holds on to the storage mapper reference. + trie: PatriciaTrie, +} + +impl Database for Snapshot { + fn contains_key(&self, key: &[u8]) -> bool { + self.get(key).is_some() + } + + fn get(&self, key: &[u8]) -> Option> { + self.trie.get(self.root_hash, key) + } + + fn insert(&mut self, _key: &[u8], _value: &[u8]) -> Option> { + panic!("Can't insert into Snapshot") + } + + fn remove(&mut self, _key: &[u8]) -> Option> { + panic!("Can't remove from Snapshot") + } + + fn clear(&mut self) { + panic!("Can't clear Snapshot") + } +} + +/// A holder of a (i) a consensus backend and (ii) a storage mapper, the two of which it uses to +/// create `Snapshot`s of recent (best-effort) states on demand. +pub struct Manager { + /// The latest root hash that we're aware of. + root_hash: Arc>>, + /// The storage mapper that we give to snapshots. + mapper: Arc, + /// For killing our consensus follower task. + blocks_kill_handle: ekiden_core::futures::KillHandle, +} + +impl Manager { + pub fn new( + env: &Environment, + contract_id: B256, + consensus: &ConsensusBackend, + mapper: Arc, + ) -> Self { + let root_hash = Arc::new(Mutex::new(None)); + let root_hash_2 = root_hash.clone(); + let (watch_blocks, blocks_kill_handle) = ekiden_core::futures::killable( + consensus.get_blocks(contract_id).for_each(move |block| { + let mut guard = root_hash.lock().unwrap(); + *guard = Some(block.header.state_root); + Ok(()) + }), + ); + env.spawn(Box::new(watch_blocks.then(|r| { + match r { + // Block stream ended. + Ok(Ok(())) => { + warn!("manager block stream ended"); + } + // Kill handle dropped. + Ok(Err(_ /* ekiden_core::futures::killable::Killed */)) => {} + // Block stream errored. + Err(e) => { + error!("manager block stream error: {}", e); + } + } + Ok(()) + }))); + Self { + root_hash: root_hash_2, + mapper, + blocks_kill_handle, + } + } + + /// Make a `Manager` from an injected `ConsensusBackend` and an identity map over an injected + /// `StorageBackend`. + pub fn new_from_injected(contract_id: B256, container: &mut Container) -> Result { + let env: Arc = container.inject()?; + let consensus: Arc = container.inject()?; + let storage: Arc = container.inject()?; + let mapper = Arc::new(BackendIdentityMapper::new(storage)); + Ok(Self::new( + env.as_ref(), + contract_id, + consensus.as_ref(), + mapper, + )) + } + + pub fn get_snapshot(&self) -> Snapshot { + Snapshot { + root_hash: self.root_hash.lock().unwrap().clone(), + trie: PatriciaTrie::new(self.mapper.clone()), + } + } +} + +impl Drop for Manager { + fn drop(&mut self) { + self.blocks_kill_handle.kill(); + } +} + +#[cfg(test)] +mod tests { + use std; + use std::sync::Arc; + use std::sync::Mutex; + use std::time::Duration; + + extern crate grpcio; + + use ekiden_consensus_base::backend::ConsensusBackend; + use ekiden_consensus_base::backend::Event; + use ekiden_consensus_base::block::Block; + use ekiden_consensus_base::commitment::Commitment; + use ekiden_consensus_base::commitment::Reveal; + use ekiden_consensus_base::header::Header; + use ekiden_core; + use ekiden_core::bytes::B256; + use ekiden_core::environment::GrpcEnvironment; + use ekiden_core::futures::BoxFuture; + use ekiden_core::futures::BoxStream; + use ekiden_core::futures::Stream; + use ekiden_db_trusted::patricia_trie::PatriciaTrie; + use ekiden_db_trusted::Database; + use ekiden_storage_base::mapper::BackendIdentityMapper; + extern crate ekiden_storage_dummy; + use self::ekiden_storage_dummy::DummyStorageBackend; + + /// A ConsensusBackend that adapts a simple `Block` stream. + struct MockConsensus { + blocks_rx: Mutex>>, + } + + impl ConsensusBackend for MockConsensus { + fn get_blocks(&self, _contract_id: B256) -> BoxStream { + Box::new( + self.blocks_rx + .lock() + .unwrap() + .take() + .expect("MockConsensus only supports one block stream") + .map_err(|()| unimplemented!()), + ) + } + + fn get_events(&self, _contract_id: B256) -> BoxStream { + unimplemented!() + } + + fn commit(&self, _contract_id: B256, _commitment: Commitment) -> BoxFuture<()> { + unimplemented!() + } + + fn reveal(&self, _contract_id: B256, _reveal: Reveal) -> BoxFuture<()> { + unimplemented!() + } + + fn commit_many(&self, _contract_id: B256, _commitments: Vec) -> BoxFuture<()> { + unimplemented!() + } + + fn reveal_many(&self, _contract_id: B256, _reveals: Vec) -> BoxFuture<()> { + unimplemented!() + } + } + + #[test] + fn play() { + let grpc_environment = grpcio::EnvBuilder::new().build(); + let environment = Arc::new(GrpcEnvironment::new(grpc_environment)); + let contract_id = B256::from(*b"dummy contract------------------"); + let storage = Arc::new(DummyStorageBackend::new()); + let (blocks_tx, blocks_rx) = ekiden_core::futures::sync::mpsc::unbounded(); + let consensus = Arc::new(MockConsensus { + blocks_rx: Mutex::new(Some(blocks_rx)), + }); + let mapper = Arc::new(BackendIdentityMapper::new(storage)); + let trie = PatriciaTrie::new(mapper.clone()); + let manager = super::Manager::new( + environment.as_ref(), + contract_id, + consensus.as_ref(), + mapper, + ); + + let root_hash_before = trie.insert(None, b"changeme", b"before"); + blocks_tx + .unbounded_send(Block { + header: Header { + state_root: root_hash_before, + ..Default::default() + }, + ..Default::default() + }) + .unwrap(); + // Give the manager some time to pickup the new block. + std::thread::sleep(Duration::from_millis(1000)); + + // Check that a snapshot can read data. + let snapshot_before = manager.get_snapshot(); + assert_eq!(&snapshot_before.get(b"changeme").unwrap(), b"before"); + + let root_hash_after = trie.insert(Some(root_hash_before), b"changeme", b"after"); + blocks_tx + .unbounded_send(Block { + header: Header { + state_root: root_hash_after, + ..Default::default() + }, + ..Default::default() + }) + .unwrap(); + std::thread::sleep(Duration::from_millis(1000)); + + // Check that a new snapshot has new data. + let snapshot_after = manager.get_snapshot(); + assert_eq!(&snapshot_after.get(b"changeme").unwrap(), b"after"); + + // Check that the old snapshot is still consistent. + assert_eq!(&snapshot_before.get(b"changeme").unwrap(), b"before"); + } +} diff --git a/clients/utils/src/lib.rs b/clients/utils/src/lib.rs index 82fb67b0811..30448cd6193 100644 --- a/clients/utils/src/lib.rs +++ b/clients/utils/src/lib.rs @@ -1,5 +1,6 @@ #[cfg(feature = "benchmark")] extern crate histogram; +#[macro_use] extern crate log; extern crate pretty_env_logger; #[cfg(feature = "benchmark")] @@ -7,7 +8,10 @@ extern crate threadpool; #[cfg(feature = "benchmark")] extern crate time; +extern crate ekiden_consensus_base; +extern crate ekiden_consensus_client; extern crate ekiden_core; +extern crate ekiden_db_trusted; extern crate ekiden_di; extern crate ekiden_epochtime; extern crate ekiden_ethereum; @@ -16,6 +20,8 @@ extern crate ekiden_registry_base; extern crate ekiden_registry_client; extern crate ekiden_scheduler_base; extern crate ekiden_scheduler_client; +extern crate ekiden_storage_base; +extern crate ekiden_storage_frontend; #[cfg(feature = "benchmark")] pub mod benchmark; @@ -24,3 +30,5 @@ pub mod components; #[doc(hidden)] #[macro_use] pub mod macros; + +pub mod db; diff --git a/db/trusted/src/handle.rs b/db/trusted/src/handle.rs index 3656890e8aa..0bbcf5c8914 100644 --- a/db/trusted/src/handle.rs +++ b/db/trusted/src/handle.rs @@ -5,6 +5,7 @@ use std::sync::{Mutex, MutexGuard}; use ekiden_common::bytes::H256; use ekiden_common::error::Result; use ekiden_common::hash::empty_hash; +use ekiden_storage_base::mapper::BackendIdentityMapper; #[cfg(not(target_env = "sgx"))] use ekiden_storage_dummy::DummyStorageBackend; use ekiden_storage_lru::LruCacheStorageBackend; @@ -41,12 +42,14 @@ impl DatabaseHandle { let backend = Arc::new(DummyStorageBackend::new()); #[cfg(target_env = "sgx")] let backend = Arc::new(UntrustedStorageBackend::new()); + let cached_backend = Arc::new(LruCacheStorageBackend::new( + backend, + Self::STORAGE_CACHE_SIZE, + )); + let mapper = Arc::new(BackendIdentityMapper::new(cached_backend)); DatabaseHandle { - state: PatriciaTrie::new(Arc::new(LruCacheStorageBackend::new( - backend, - Self::STORAGE_CACHE_SIZE, - ))), + state: PatriciaTrie::new(mapper), root_hash: None, } } diff --git a/db/trusted/src/patricia_trie/trie.rs b/db/trusted/src/patricia_trie/trie.rs index b0e68da7d82..1e117d84a71 100644 --- a/db/trusted/src/patricia_trie/trie.rs +++ b/db/trusted/src/patricia_trie/trie.rs @@ -604,6 +604,7 @@ mod test { use self::test::Bencher; + use ekiden_storage_base::BackendIdentityMapper; use ekiden_storage_dummy::DummyStorageBackend; use super::*; @@ -611,7 +612,8 @@ mod test { #[test] fn test_basic_ops() { let storage = Arc::new(DummyStorageBackend::new()); - let tree = PatriciaTrie::new(storage); + let mapper = Arc::new(BackendIdentityMapper::new(storage)); + let tree = PatriciaTrie::new(mapper); assert_eq!(tree.get(None, b"foo"), None); let new_root = tree.insert(None, b"foo", b"bar"); @@ -683,7 +685,8 @@ mod test { #[test] fn test_feather() { let storage = Arc::new(DummyStorageBackend::new()); - let tree = PatriciaTrie::new(storage); + let mapper = Arc::new(BackendIdentityMapper::new(storage)); + let tree = PatriciaTrie::new(mapper); let mut root_hash = None; let mut key_buf = *b"\x83gStateDbhaccountsx(aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; let val = b"don't embed don't embed don't embed don't embed don't embed"; @@ -702,7 +705,8 @@ mod test { #[bench] fn bench_feather(b: &mut Bencher) { let storage = Arc::new(DummyStorageBackend::new()); - let tree = PatriciaTrie::new(storage); + let mapper = Arc::new(BackendIdentityMapper::new(storage)); + let tree = PatriciaTrie::new(mapper); let mut root_hash = None; let mut key_buf = *b"\x83gStateDbhaccountsx(aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; let val = b"don't embed don't embed don't embed don't embed don't embed"; diff --git a/ethereum/Cargo.toml b/ethereum/Cargo.toml index 6a3de388688..696774c213f 100644 --- a/ethereum/Cargo.toml +++ b/ethereum/Cargo.toml @@ -15,6 +15,7 @@ constant_time_eq = "0.1.2" ekiden-beacon-base = { path = "../beacon/base", version = "0.2.0-alpha" } ekiden-common = { path = "../common", version = "0.2.0-alpha" } ekiden-epochtime = { path = "../epochtime", version = "0.2.0-alpha" } +ekiden-instrumentation = { path = "../instrumentation", version = "0.2.0-alpha" } ekiden-instrumentation-prometheus = { path = "../instrumentation/prometheus", version = "0.2.0-alpha" } ekiden-registry-base = { path = "../registry/base", version = "0.2.0-alpha" } ekiden-registry-dummy = { path = "../registry/dummy", version = "0.2.0-alpha" } diff --git a/ethereum/contracts/ContractDeployer.sol b/ethereum/contracts/ContractDeployer.sol index f2cdfcd04f6..ab7712325fd 100644 --- a/ethereum/contracts/ContractDeployer.sol +++ b/ethereum/contracts/ContractDeployer.sol @@ -13,11 +13,11 @@ contract ContractDeployer { address public oasis_contract_registry; address public mock_contract_registry; - constructor(address oasis_addr, address mock_addr) public { + constructor(address oasis_addr, address mock_addr, address stake_addr) public { oasis_beacon = new RandomBeacon(oasis_addr); mock_beacon = new RandomBeacon(mock_addr); - oasis_entity_registry = new EntityRegistry(oasis_addr); - mock_entity_registry = new EntityRegistry(mock_addr); + oasis_entity_registry = new EntityRegistry(oasis_addr, stake_addr); + mock_entity_registry = new EntityRegistry(mock_addr, stake_addr); oasis_contract_registry = new ContractRegistry(oasis_addr); mock_contract_registry = new ContractRegistry(mock_addr); } diff --git a/ethereum/contracts/DisputeResolution.sol b/ethereum/contracts/DisputeResolution.sol new file mode 100644 index 00000000000..09cc5175250 --- /dev/null +++ b/ethereum/contracts/DisputeResolution.sol @@ -0,0 +1,475 @@ +pragma solidity ^0.4.23; + +contract DisputeResolution { + // Signature prefix (for web3.eth.sign compatibility). + bytes constant SIGNATURE_PREFIX = "\x19Ethereum Signed Message:\n32"; + + // The event emitted when a dispute is in progress. + event OnDispute( + bytes32 _batch_hash + ); + + // The event emitted when a dispute is resolved. + event OnDisputeResolution( + bytes32 _batch_hash, + bytes32 _value + ); + + // The event emitted when a transition occurs. + event OnTransition( + uint64 _serial, + bytes32 _finalized_root_hash + ); + + // DisputeResolution contract state. + enum State { + Invalid, + Optimistic, + Dispute + } + + // A single node's role in a committee. + enum Role { + Invalid, + Worker, + Backup + } + + // A single node's reveal. + struct Reveal { + Role role; + bytes32 value; + } + + // A per-epoch committee. + struct Committee { + // Committee members. + address[] workers; + address[] backups; + + // Revealed values for dispute/stake resolution. + mapping(address => Reveal) reveals; + + address dispute_sender; + bytes32 dispute_batch_hash; + + // Serial number for the finalized root hashes, starting from 0. + uint64 serial; + } + + // The contract owner. + address public owner; + + // The current state. + State public state; + + // The current committee. + Committee public committee; + + // Construct a new DisputeResolution contract instance. + constructor() public { + owner = msg.sender; + } + + function() public { + revert(); + } + + modifier onlyMembersOrOwner() { + require( + (state == State.Invalid && msg.sender == owner) || + _is_member(msg.sender) + ); + _; + } + + modifier onlyWorkers() { + require(_is_worker(msg.sender)); + _; + } + + modifier onlyBackups() { + require(_is_backup(msg.sender)); + _; + } + + // Transition to a new committee. + // + // XXX: This *should* support bootstrapping from any entity rather + // than being restricted to the contract owner for ease of testing. + function transition( + address[] _workers, + address[] _backups, + bytes32 _finalized_root_hash, + bytes32[] _signatures_r, + bytes32[] _signatures_s, + bytes _signatures_v + ) external onlyMembersOrOwner() { + require(_workers.length > 0); + require(_backups.length > 0); + + // If the contract has been initialized, validate the signatures + // with the current committee, otherwise accept the initial root + // hash and committee (bootstrapping). + if (state != State.Invalid) { + // For now, attempting to transition committees mid-dispute + // resolution is invalid. + // dispute resolution state. + require(state == State.Optimistic); + + bytes32 digest = derive_transition_digest( + committee.serial, + _workers, + _backups, + _finalized_root_hash + ); + + // WARNING: The validation obliterates the dispute state + // if successful. + _validate_transition_signatures( + digest, + _signatures_r, + _signatures_s, + _signatures_v + ); + } + + _reset(true); + + // XXX: Query the registry contract to see if the new nodes + // are actually members. + + // Store the new committee. + committee.workers.length = _workers.length; + committee.backups.length = _backups.length; + for (uint i = 0; i < _workers.length; i++) { + Reveal storage r = committee.reveals[_workers[i]]; + require(r.role == Role.Invalid); + r.role = Role.Worker; + + committee.workers[i] = _workers[i]; + } + for (i = 0; i < _backups.length; i++) { + r = committee.reveals[_backups[i]]; + require(r.role == Role.Invalid); + r.role = Role.Backup; + + committee.backups[i] = _backups[i]; + } + + state = State.Optimistic; + emit OnTransition(committee.serial, _finalized_root_hash); + committee.serial++; + } + + // Submit a dispute. + function dispute( + bytes32 _batch_hash, + bytes32[] _reveals, + bytes32[] _signatures_r, + bytes32[] _signatures_s, + bytes _signatures_v + ) external onlyWorkers { + require(state == State.Optimistic); + require( + _signatures_r.length == _signatures_s.length && + _signatures_s.length == _signatures_v.length && + _signatures_v.length == _reveals.length + ); + require(_reveals.length > 0); + + // Prepare the mapping. + _reset_reveals(committee.workers, false); + + // Handle the reveals. + _handle_reveals( + Role.Worker, + _batch_hash, + _reveals, + _signatures_r, + _signatures_s, + _signatures_v + ); + + // Ensure that there either was at least one node missing or + // at least one mismatch. + if (_reveals.length == committee.workers.length) { + uint matches = 0; + for (uint i = 0; i < _reveals.length; i++) { + if (_reveals[i] == _reveals[0]) { + matches = matches + 1; + } + } + require(matches < committee.workers.length); + } + + state = State.Dispute; + committee.dispute_sender = msg.sender; + committee.dispute_batch_hash = _batch_hash; + emit OnDispute(_batch_hash); + } + + // Resolve a dispute. + function resolve_dispute( + bytes32[] _reveals, + bytes32[] _signatures_r, + bytes32[] _signatures_s, + bytes _signatures_v + ) external onlyBackups { + require(state == State.Dispute); + require( + _signatures_r.length == _signatures_s.length && + _signatures_s.length == _signatures_v.length && + _signatures_v.length == _reveals.length + ); + require(_reveals.length > 0); + + // Prepare the mapping. + _reset_reveals(committee.backups, false); + + // Handle the reveals. + _handle_reveals( + Role.Backup, + committee.dispute_batch_hash, + _reveals, + _signatures_r, + _signatures_s, + _signatures_v + ); + + _resolve_dispute(); + state = State.Optimistic; + _reset(false); + } + + // Derive the digest that each node will sign when making the + // `transition()` call. + // + // This can be called over web3, but if it is required to derive + // this programatically elsewhere the digest is: + // + // Keccak256( + // serial | + // uint32(len(workers)) | workers | + // uint32(len(backups)) | backups | + // finalized_root_state + // ) + function derive_transition_digest( + uint64 _serial, + address[] _workers, + address[] _backups, + bytes32 _finalized_root_hash + ) public pure returns (bytes32 digest_) { + uint32 nr_workers = uint32(_workers.length); + uint32 nr_backups = uint32(_backups.length); + + digest_ = keccak256(abi.encodePacked( + _serial, + nr_workers, + _workers, + nr_backups, + _backups, + _finalized_root_hash + )); + } + + // Derive the digest that each node will sign when doing the commit/ + // reveal, that is also used for the `dispute()` and `resolve()` calls. + // + // This can be called over web3, but if it is required to derive + // this programatically elsewhere the digest is: + // + // Keccak256(serial | batch_hash | reveal) + function derive_reveal_digest( + uint64 _serial, + bytes32 _batch_hash, + bytes32 _reveal + ) public pure returns (bytes32 digest_) { + digest_ = keccak256(abi.encodePacked( + _serial, + _batch_hash, + _reveal + )); + } + + function _is_member(address _addr) internal view returns (bool ok_) { + ok_ = _is_worker(_addr) || _is_backup(_addr); + } + + function _is_worker(address _addr) internal view returns (bool ok_) { + require(state != State.Invalid); + require(committee.workers.length > 0); + + ok_ = false; + for (uint i = 0; i < committee.workers.length; i++) { + if (_addr == committee.workers[i]) { + ok_ = true; + return; + } + } + } + + function _is_backup(address _addr) internal view returns (bool ok_) { + require(state != State.Invalid); + require(committee.workers.length > 0); + + ok_ = false; + for (uint i = 0; i < committee.backups.length; i++) { + if (_addr == committee.backups[i]) { + ok_ = true; + return; + } + } + } + + function _reset(bool _clear_role) internal { + _reset_reveals(committee.workers, _clear_role); + _reset_reveals(committee.backups, _clear_role); + committee.dispute_batch_hash = bytes32(0); + committee.dispute_sender = address(0); + } + + function _reset_reveals(address[] _addrs, bool _clear_role) internal { + for (uint i = 0; i < _addrs.length; i++) { + Reveal storage r = committee.reveals[_addrs[i]]; + if (_clear_role) { + r.role = Role.Invalid; + } + r.value = bytes32(0); + } + } + + function _validate_transition_signatures( + bytes32 _digest, + bytes32[] _signatures_r, + bytes32[] _signatures_s, + bytes _signatures_v + ) internal { + require( + _signatures_r.length == _signatures_s.length && + _signatures_s.length == _signatures_v.length + ); + require(_signatures_r.length == committee.workers.length); + + // Prepare the mapping. + _reset_reveals(committee.workers, false); + + // Validate that every expected address has signed + // the digest. + for (uint i = 0; i < _signatures_r.length; i++) { + (bool ok, address addr) = _verify_split_signature( + _signatures_r[i], + _signatures_s[i], + uint8(_signatures_v[i]), + _digest + ); + + Reveal storage r = committee.reveals[addr]; + require(ok && r.role == Role.Worker); + require(r.value == 0); + + r.value = _digest; + } + + // Clean up. + _reset_reveals(committee.workers, false); + } + + function _handle_reveals( + Role _expected_role, + bytes32 _batch_hash, + bytes32[] _reveals, + bytes32[] _signatures_r, + bytes32[] _signatures_s, + bytes _signatures_v + ) internal { + // WARNING: Caller is responsible for providing well formed + // arguments. + for (uint i = 0; i < _reveals.length; i++) { + // HACK: This should "never" happen. + require(_reveals[i] != 0); + + bytes32 digest = derive_reveal_digest( + committee.serial, + _batch_hash, + _reveals[i] + ); + + (bool ok, address addr) = _verify_split_signature( + _signatures_r[i], + _signatures_s[i], + uint8(_signatures_v[i]), + digest + ); + + Reveal storage r = committee.reveals[addr]; + require(ok && r.role == _expected_role); + require(r.value == 0); + + r.value = _reveals[i]; + } + } + + function _resolve_dispute() internal { + require(state == State.Dispute); + + // Figure out the reveal value after resolving the discrepancy. + // + // Avaliable information: + // + // * `msg.sender` -> node that called `resolve()`. + // * `committee.dispute_sender` -> Node that called `dispute()`. + // * `committee.reveals` -> Every reveal value submitted to the + // `dispute()` and `resolve()` calls. + // + // nb: The state transition will happily happen if at least 1 + // reveal is present in either. + // + // `revert()`ing from here will result in the `resolve()` call + // being reverted. + // + // TODO: This would be the logical location from which to + // resolve stake. + + // XXX: For now, require that every single backup has come to + // total consensus. + bytes32 expected_value; + for (uint i = 0; i < committee.backups.length; i++) { + Reveal storage r = committee.reveals[committee.backups[i]]; + require(r.role == Role.Backup); + if (i == 0) { + require(r.value != 0); + expected_value = r.value; + } else { + // Require total consensus. + require(r.value == expected_value); + } + } + + emit OnDisputeResolution(committee.dispute_batch_hash, expected_value); + } + + // Verify a signature over a 32 byte message, and recover/return the + // signer's address iff the signature is valid. + function _verify_split_signature( + bytes32 _r, + bytes32 _s, + uint8 _v, + bytes32 _message + ) internal pure returns (bool ok_, address addr_) { + bytes32 sig_hash = keccak256(abi.encodePacked( + SIGNATURE_PREFIX, + _message + )); + // web3.eth.sign and web3.accounts.sign disagree on how `v` + // should be set. See EIP-155. + if (_v < 27) { + _v += 27; + } + if (_v != 27 && _v != 28) { + return (false, 0); + } + + return (true, ecrecover(sig_hash, _v, _r, _s)); + } +} \ No newline at end of file diff --git a/ethereum/contracts/EntityRegistry.sol b/ethereum/contracts/EntityRegistry.sol index cbf8b177d20..ad8b87dec33 100644 --- a/ethereum/contracts/EntityRegistry.sol +++ b/ethereum/contracts/EntityRegistry.sol @@ -1,6 +1,7 @@ pragma solidity ^0.4.23; import "./EpochABI.sol"; +import "./Stake.sol"; contract EntityRegistry { event Entity(address indexed _from, bytes32 indexed id, uint64 epoch); @@ -8,27 +9,56 @@ contract EntityRegistry { event Node(address indexed _from, bytes32 indexed id, uint64 epoch); EpochContract epoch_source; + Stake stake_source; - constructor(address epoch_addr) public { + address[] public nodes; + // The reverse-map into nodes, to support efficient deletion. + // Note: stores (index+1), because nodes is 0-indexed, but 0 is special. + mapping(address => uint64) node_idxs; + + constructor(address epoch_addr, address stake_addr) public { epoch_source = EpochContract(epoch_addr); + stake_source = Stake(stake_addr); } function() public { revert(); } + function is_registered(address node) public view returns (bool member_) { + member_ = (node_idxs[node] != 0); + } + function register(bytes32 id) public { + // TODO: validate stake. (uint64 epoch, , ) = epoch_source.get_epoch(uint64(block.timestamp)); emit Entity(msg.sender, id, epoch); } function deregister(bytes32 id) public { (uint64 epoch, , ) = epoch_source.get_epoch(uint64(block.timestamp)); + + // TODO: require deregistration in an epoch after registration. + // TODO: require address has no stake in jeopardy at time of dereg. + uint64 offset = node_idxs[msg.sender]; + if (offset != 0) { + nodes[offset - 1] = nodes[nodes.length - 1]; + node_idxs[nodes[offset - 1]] = offset; + delete nodes[nodes.length - 1]; + nodes.length--; + node_idxs[msg.sender] = 0; + } emit Dereg(msg.sender, id, epoch); } function registerNode(bytes32 node) public { + // TODO: validate stake. (uint64 epoch, , ) = epoch_source.get_epoch(uint64(block.timestamp)); + + require(node_idxs[msg.sender] == 0); + require(nodes.length < 0xFFFFFFFFFFFFFFFF); + node_idxs[msg.sender] = uint64(nodes.push(msg.sender)); + emit Node(msg.sender, node, epoch); } } diff --git a/ethereum/migrations/2_deploy_contracts.js b/ethereum/migrations/2_deploy_contracts.js index e892fad6e42..5e72469f8fc 100644 --- a/ethereum/migrations/2_deploy_contracts.js +++ b/ethereum/migrations/2_deploy_contracts.js @@ -6,6 +6,7 @@ var ContractRegistry = artifacts.require("./ContractRegistry.sol") var EntityRegistry = artifacts.require("./EntityRegistry.sol"); var UintSet = artifacts.require("./UintSet.sol"); var Stake = artifacts.require("./Stake.sol"); +var DisputeResolution = artifacts.require("./DisputeResolution"); const deploy = async function (deployer, network) { if (network == "test") { @@ -15,17 +16,24 @@ const deploy = async function (deployer, network) { // The tests only use the standard epoch timesource anyway. await deployer.deploy([OasisEpoch, MockEpoch]); await deployer.deploy(RandomBeacon, OasisEpoch.address); - await deployer.deploy(ContractRegistry, OasisEpoch.address); - await deployer.deploy(EntityRegistry, OasisEpoch.address); await deployer.deploy(UintSet); await deployer.link(UintSet, Stake); await deployer.deploy(Stake, 1, "EkidenStake", "E$"); + await deployer.deploy(ContractRegistry, OasisEpoch.address); + await deployer.deploy(EntityRegistry, OasisEpoch.address, Stake.address); + await deployer.deploy(DisputeResolution); } else { // truffle does not really support deploying more than 1 instance // of a given contract all that well yet, so this uses a nasty kludge // to deploy the RandomBeacon for each time source. await deployer.deploy([OasisEpoch, MockEpoch]); - let instance = await deployer.deploy(ContractDeployer, OasisEpoch.address, MockEpoch.address); + + // Stake + await deployer.deploy(UintSet); + await deployer.link(UintSet, Stake); + await deployer.deploy(Stake, 1000000000, "EkidenStake", "E$"); + + let instance = await deployer.deploy(ContractDeployer, OasisEpoch.address, MockEpoch.address, Stake.address); let instance_addrs = await Promise.all([ instance.oasis_beacon.call(), instance.mock_beacon.call(), @@ -34,11 +42,8 @@ const deploy = async function (deployer, network) { instance.oasis_contract_registry.call(), instance.mock_contract_registry.call() ]); + await deployer.deploy(DisputeResolution); - // Stake - await deployer.deploy(UintSet); - await deployer.link(UintSet, Stake); - await deployer.deploy(Stake, 1000000000, "EkidenStake", "E$"); // Pass all the contract addresses to truffle_deploy in the rust // side as a simple JSON formatted dictionary. @@ -51,6 +56,7 @@ const deploy = async function (deployer, network) { "ContractRegistryMock": instance_addrs[5], "MockEpoch": MockEpoch.address, "Stake": Stake.address, + "DisputeResolution": DisputeResolution.address }; console.log("CONTRACT_ADDRESSES: " + JSON.stringify(addrs)); Object.keys(addrs).forEach(function (key) { diff --git a/ethereum/src/entity_registry.rs b/ethereum/src/entity_registry.rs index 208c89a7e31..bd2f0f60596 100644 --- a/ethereum/src/entity_registry.rs +++ b/ethereum/src/entity_registry.rs @@ -141,6 +141,7 @@ where let sender = sender.clone(); let (epoch, _) = notify; let current_block = beacon.get_block_for_epoch(epoch).unwrap(); + // TODO: replace catchup of all blocks with state of contract. let filter = web3::types::FilterBuilder::default() .from_block(BlockNumber::Number(last_block)) .to_block(BlockNumber::Number(current_block - 1)) @@ -197,6 +198,7 @@ where debug!("Log Received: {:?}", log); match format!("0x{:#x}", log.topics[0]).as_str() { ENTITY_EVENT_HASH => { + measure_counter_inc!("entity_log", 1); //let eth_address = log.topics[1]; let storage_hash = log.topics[2]; storage @@ -213,6 +215,7 @@ where .into_box() } DEREG_EVENT_HASH => { + measure_counter_inc!("entity_log", -1); let storage_hash = log.topics[2]; storage .get(bytes::H256(storage_hash.0)) @@ -226,6 +229,7 @@ where .into_box() } NODE_EVENT_HASH => { + measure_counter_inc!("node_log", 1); let storage_hash = log.topics[2]; storage .get(bytes::H256(storage_hash.0)) diff --git a/ethereum/src/lib.rs b/ethereum/src/lib.rs index c8cff2310f6..edc468ca7f6 100644 --- a/ethereum/src/lib.rs +++ b/ethereum/src/lib.rs @@ -8,6 +8,8 @@ extern crate ekiden_common; #[macro_use] extern crate ekiden_di; extern crate ekiden_epochtime; +#[macro_use] +extern crate ekiden_instrumentation; extern crate ekiden_registry_base; extern crate ekiden_registry_dummy; extern crate ekiden_stake_base; diff --git a/ethereum/test/dispute_resolution.js b/ethereum/test/dispute_resolution.js new file mode 100644 index 00000000000..c636b614ef2 --- /dev/null +++ b/ethereum/test/dispute_resolution.js @@ -0,0 +1,213 @@ +const DisputeResolution = artifacts.require("DisputeResolution"); +const crypto = require("crypto"); +const truffleAssert = require("truffle-assertions"); +const util = require("util"); + +contract("Dispute Resolution test", async (accounts) => { + let empty_addr = "0x0000000000000000000000000000000000000000"; + let empty_hash = "0x0000000000000000000000000000000000000000000000000000000000000000"; + + let owner = accounts[0]; + var workers = accounts.slice(1, 5); + var backups = accounts.slice(6, 10); + + var serial = 0; + var batch_hash; + + function ethSign(addr, hash) { + let sig = web3.eth.sign(addr, hash); + let r = sig.substr(0, 66); + let s = "0x" + sig.substr(66, 64); + let v = sig.substr(130, 2); + return [r, s, v]; + } + + function toHexString(b) { + return "0x" + Buffer.from(b).toString("hex"); + } + + function randomBytes(size) { + return toHexString(crypto.randomBytes(size)); + } + + function assertTransitionOk(state, committee) { + assert.equal(state, 1, "unexpected state"); + assert.equal(committee[0], empty_addr, "invalid dispute_sender"); + assert.equal(committee[1], empty_hash, "invalid dispute_batch_hash"); + assert.equal(committee[2], serial + 1, "invalid serial"); + } + + // Bootstrap committee. + it("should accept a new committee from the owner", async () => { + let instance = await DisputeResolution.deployed(); + + // Generate a random initial root hash. + let root_hash = randomBytes(32); + + let res = await instance.transition( + workers, + backups, + root_hash, + [], [], "", // No committee, no signatures. + { from: owner } + ); + + let state = await instance.state.call(); + let committee = await instance.committee.call(); + assertTransitionOk(state, committee); + + truffleAssert.eventEmitted(res, "OnTransition", (ev) => { + return ev._serial == serial && ev._finalized_root_hash == root_hash; + }); + + serial = serial + 1; + }) + + // Optimistic transition. + it("should transition in the optimisic path", async () => { + let instance = await DisputeResolution.deployed(); + + // Generate the next root hash. + let root_hash = randomBytes(32); + + // "Generate" the new committee. + let new_workers = backups; + let new_backups = workers; + + // Derive the digest that each worker would, to sign for the + // transition. + let digest = await instance.derive_transition_digest.call( + serial, + new_workers, + new_backups, + root_hash + ); + + // Sign the digest with each worker. + var sigs_r = []; + var sigs_s = []; + var sigs_v = "0x"; + for (let addr of workers) { + let sig = ethSign(addr, digest); + sigs_r.push(sig[0]); + sigs_s.push(sig[1]); + sigs_v += sig[2]; + } + + let res = await instance.transition( + new_workers, + new_backups, + root_hash, + sigs_r, + sigs_s, + sigs_v, + { from: workers[0] } + ); + + let state = await instance.state.call(); + let committee = await instance.committee.call(); + assertTransitionOk(state, committee); + + truffleAssert.eventEmitted(res, "OnTransition", (ev) => { + return ev._serial == serial && ev._finalized_root_hash == root_hash; + }); + + serial = serial + 1; + workers = new_workers; + backups = new_backups; + }); + + // Dispute. + it("should accept a dispute from a worker", async () => { + let instance = await DisputeResolution.deployed(); + + // Generate the batch hash for this dispute. + batch_hash = randomBytes(32); + + var reveals = []; + var sigs_r = []; + var sigs_s = []; + var sigs_v = "0x"; + for (let addr of workers) { + let reveal = randomBytes(32); + let digest = await instance.derive_reveal_digest.call( + serial, + batch_hash, + reveal + ); + reveals.push(reveal); + + let sig = ethSign(addr, digest); + sigs_r.push(sig[0]); + sigs_s.push(sig[1]); + sigs_v += sig[2]; + } + + let res = await instance.dispute( + batch_hash, + reveals, + sigs_r, + sigs_s, + sigs_v, + { from: workers[1] } + ); + + let state = await instance.state.call(); + assert.equal(state, 2, "unexpected state"); + let committee = await instance.committee.call(); + assert.equal(committee[0], workers[1], "invalid dispute_sender"); + assert.equal(committee[1], batch_hash, "invalid dispute_batch_hash"); + + truffleAssert.eventEmitted(res, "OnDispute", (ev) => { + return ev._batch_hash == batch_hash; + }); + }); + + // Resolve. + it("should accept a resolution from a backup", async () => { + let instance = await DisputeResolution.deployed(); + + // Generate the "correct" reveal, current contract requires + // total consensus amongst backups. + let reveal = randomBytes(32); + let digest = await instance.derive_reveal_digest.call( + serial, + batch_hash, + reveal + ); + + var reveals = []; + var sigs_r = []; + var sigs_s = []; + var sigs_v = "0x"; + for (let addr of backups) { + reveals.push(reveal); + + let sig = ethSign(addr, digest); + sigs_r.push(sig[0]); + sigs_s.push(sig[1]); + sigs_v += sig[2]; + } + + let res = await instance.resolve_dispute( + reveals, + sigs_r, + sigs_s, + sigs_v, + { from: backups[0] } + ); + + truffleAssert.eventEmitted(res, "OnDisputeResolution", (ev) => { + return ev._batch_hash == batch_hash && ev._value == reveal; + }) + + // Ensure that the contract is back to the optimistic path. + let state = await instance.state.call(); + assert.equal(state, 1, "unexpected state"); + let committee = await instance.committee.call(); + assert.equal(committee[0], empty_addr, "invalid dispute_sender"); + assert.equal(committee[1], empty_hash, "invalid dispute_batch_hash"); + }); + + // TODO: Test all the various failure cases. +}) \ No newline at end of file diff --git a/instrumentation/prometheus/src/di.rs b/instrumentation/prometheus/src/di.rs index 17d7c0c33de..da122cd5cf4 100644 --- a/instrumentation/prometheus/src/di.rs +++ b/instrumentation/prometheus/src/di.rs @@ -33,7 +33,13 @@ create_component!( let interval = value_t!(args, "prometheus-push-interval", u64).unwrap_or(5); let job = value_t!(args, "prometheus-push-job-name", String).unwrap(); let instance = value_t!(args, "prometheus-push-instance-label", String).unwrap(); - push::start(environment, address, Duration::from_secs(interval), job, instance); + push::start( + environment, + address, + Duration::from_secs(interval), + job, + instance, + ); } } _ => (), diff --git a/storage/base/src/mapper.rs b/storage/base/src/mapper.rs index 83344666c8e..dc12f7fae72 100644 --- a/storage/base/src/mapper.rs +++ b/storage/base/src/mapper.rs @@ -1,4 +1,6 @@ //! Storage mapper interface. +use std::sync::Arc; + use ekiden_common::bytes::H256; use ekiden_common::error::Result; use ekiden_common::futures::{future, BoxFuture, Future}; @@ -62,9 +64,20 @@ pub trait StorageMapper: Sync + Send { } } -// Each storage backend trivially implements the storage mapper interface. -impl StorageMapper for T { +/// An "identity" mapper, which forwards operations to a shared storage backend without altering +/// the content. +pub struct BackendIdentityMapper { + backend: Arc, +} + +impl BackendIdentityMapper { + pub fn new(backend: Arc) -> Self { + Self { backend } + } +} + +impl StorageMapper for BackendIdentityMapper { fn backend(&self) -> &StorageBackend { - self + self.backend.as_ref() } } diff --git a/storage/dummy/tests/backend.rs b/storage/dummy/tests/backend.rs index 339d377cef4..c4365330781 100644 --- a/storage/dummy/tests/backend.rs +++ b/storage/dummy/tests/backend.rs @@ -27,16 +27,18 @@ fn test_dummy_backend() { #[test] fn test_dummy_storage_mapper() { + use std::sync::Arc; // Import the StorageMapper trait to use the mapper get/insert. - use ekiden_storage_base::StorageMapper; + use ekiden_storage_base::{BackendIdentityMapper, StorageMapper}; - let backend = DummyStorageBackend::new(); + let backend = Arc::new(DummyStorageBackend::new()); + let mapper = BackendIdentityMapper::new(backend); let key = hash_storage_key(b"value"); - assert!(backend.get(key).wait().is_err()); - let key_result = backend.insert(b"value".to_vec(), 10).wait().unwrap(); + assert!(mapper.get(key).wait().is_err()); + let key_result = mapper.insert(b"value".to_vec(), 10).wait().unwrap(); assert_eq!(key, key_result); - assert_eq!(backend.get(key).wait(), Ok(b"value".to_vec())); + assert_eq!(mapper.get(key).wait(), Ok(b"value".to_vec())); } #[bench]