Skip to content

Commit

Permalink
Keep all enacted blocks notify in order (openethereum#8524)
Browse files Browse the repository at this point in the history
* Keep all enacted blocks notify in order

* Collect is unnecessary

* Update ChainNotify to use ChainRouteType

* Fix all ethcore fn defs

* Wrap the type within ChainRoute

* Fix private-tx and sync api

* Fix secret_store API

* Fix updater API

* Fix rpc api

* Fix informant api

* Eagerly cache enacted/retracted and remove contain_enacted/retracted

* Fix indent

* tests: should use full expr form for struct constructor

* Use into_enacted_retracted to further avoid copy

* typo: not a function

* rpc/tests: ChainRoute -> ChainRoute::new
  • Loading branch information
sorpaas authored and VladLupashevskyi committed May 23, 2018
1 parent f196dc4 commit e952e0f
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 98 deletions.
4 changes: 2 additions & 2 deletions ethcore/private-tx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use ethcore::executed::{Executed};
use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction};
use ethcore::{contract_address as ethcore_contract_address};
use ethcore::client::{
Client, ChainNotify, ChainMessageType, ClientIoMessage, BlockId, CallContract
Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, CallContract
};
use ethcore::account_provider::AccountProvider;
use ethcore::miner::{self, Miner, MinerService};
Expand Down Expand Up @@ -668,7 +668,7 @@ fn find_account_password(passwords: &Vec<String>, account_provider: &AccountProv
}

impl ChainNotify for Provider {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
if !imported.is_empty() {
trace!("New blocks imported, try to prune the queue");
if let Err(err) = self.process_queue() {
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/blockchain/import_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ethereum_types::H256;
use blockchain::block_info::{BlockInfo, BlockLocation};

/// Import route for newly inserted block.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct ImportRoute {
/// Blocks that were invalidated by new block.
pub retracted: Vec<H256>,
Expand Down
88 changes: 86 additions & 2 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use bytes::Bytes;
use ethereum_types::H256;
use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
use std::collections::HashMap;

/// Messages to broadcast via chain
pub enum ChainMessageType {
Expand All @@ -29,15 +31,97 @@ pub enum ChainMessageType {
SignedPrivateTransaction(Vec<u8>),
}

/// Route type to indicate whether it is enacted or retracted.
#[derive(Clone)]
pub enum ChainRouteType {
/// Enacted block
Enacted,
/// Retracted block
Retracted
}

/// A complete chain enacted retracted route.
#[derive(Default, Clone)]
pub struct ChainRoute {
route: Vec<(H256, ChainRouteType)>,
enacted: Vec<H256>,
retracted: Vec<H256>,
}

impl<'a> From<&'a [ImportRoute]> for ChainRoute {
fn from(import_results: &'a [ImportRoute]) -> ChainRoute {
ChainRoute::new(import_results.iter().flat_map(|route| {
route.retracted.iter().map(|h| (*h, ChainRouteType::Retracted))
.chain(route.enacted.iter().map(|h| (*h, ChainRouteType::Enacted)))
}).collect())
}
}

impl ChainRoute {
/// Create a new ChainRoute based on block hash and route type pairs.
pub fn new(route: Vec<(H256, ChainRouteType)>) -> Self {
let (enacted, retracted) = Self::to_enacted_retracted(&route);

Self { route, enacted, retracted }
}

/// Gather all non-duplicate enacted and retracted blocks.
fn to_enacted_retracted(route: &[(H256, ChainRouteType)]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}

// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = route.iter().fold(HashMap::new(), |mut map, route| {
match &route.1 {
&ChainRouteType::Enacted => {
map.insert(route.0, true);
},
&ChainRouteType::Retracted => {
map.insert(route.0, false);
},
}
map
});

// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}

/// Consume route and return the enacted retracted form.
pub fn into_enacted_retracted(self) -> (Vec<H256>, Vec<H256>) {
(self.enacted, self.retracted)
}

/// All non-duplicate enacted blocks.
pub fn enacted(&self) -> &[H256] {
&self.enacted
}

/// All non-duplicate retracted blocks.
pub fn retracted(&self) -> &[H256] {
&self.retracted
}

/// All blocks in the route.
pub fn route(&self) -> &[(H256, ChainRouteType)] {
&self.route
}
}

/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks.
fn new_blocks(
&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_route: ChainRoute,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
Expand Down
49 changes: 10 additions & 39 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
Expand Down Expand Up @@ -45,7 +45,7 @@ use client::{
use client::{
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
};
use encoded;
use engines::{EthEngine, EpochTransition};
Expand Down Expand Up @@ -245,32 +245,6 @@ impl Importer {
})
}

fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}

// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
for hash in &route.enacted {
map.insert(hash.clone(), true);
}
for hash in &route.retracted {
map.insert(hash.clone(), false);
}
map
});

// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, client: &Client) -> usize {

Expand Down Expand Up @@ -336,18 +310,17 @@ impl Importer {

{
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
let route = ChainRoute::from(import_results.as_ref());

if is_empty {
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, &enacted, &retracted, false);
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
}

client.notify(|notify| {
notify.new_blocks(
imported_blocks.clone(),
invalid_blocks.clone(),
enacted.clone(),
retracted.clone(),
route.clone(),
Vec::new(),
proposed_blocks.clone(),
duration,
Expand Down Expand Up @@ -1421,7 +1394,7 @@ impl ImportBlock for Client {
}

fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
{
// check block order
if self.chain.read().is_known(&header.hash()) {
Expand Down Expand Up @@ -2155,14 +2128,13 @@ impl ImportSealedBlock for Client {
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route
};
let (enacted, retracted) = self.importer.calculate_enacted_retracted(&[route]);
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, true);
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
vec![],
enacted.clone(),
retracted.clone(),
route.clone(),
vec![h.clone()],
vec![],
start.elapsed(),
Expand All @@ -2180,8 +2152,7 @@ impl BroadcastProposalBlock for Client {
notify.new_blocks(
vec![],
vec![],
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![block.rlp_bytes()],
DURATION_ZERO,
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use self::error::Error;
pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult};
pub use self::io_message::ClientIoMessage;
pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::{ChainNotify, ChainMessageType};
pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::traits::{
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter
Expand Down
10 changes: 4 additions & 6 deletions ethcore/src/snapshot/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Watcher for snapshot-related chain events.

use parking_lot::Mutex;
use client::{BlockInfo, Client, ChainNotify, ClientIoMessage};
use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage};
use ids::BlockId;

use io::IoChannel;
Expand Down Expand Up @@ -103,8 +103,7 @@ impl ChainNotify for Watcher {
&self,
imported: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: ChainRoute,
_: Vec<H256>,
_: Vec<Bytes>,
_duration: Duration)
Expand All @@ -131,7 +130,7 @@ impl ChainNotify for Watcher {
mod tests {
use super::{Broadcast, Oracle, Watcher};

use client::ChainNotify;
use client::{ChainNotify, ChainRoute};

use ethereum_types::{H256, U256};

Expand Down Expand Up @@ -174,8 +173,7 @@ mod tests {
watcher.new_blocks(
hashes,
vec![],
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![],
DURATION_ZERO,
Expand Down
9 changes: 4 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, Protocol
use ethereum_types::{H256, H512, U256};
use io::{TimerToken};
use ethcore::ethstore::ethkey::Secret;
use ethcore::client::{BlockChainClient, ChainNotify, ChainMessageType};
use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageType};
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
Expand Down Expand Up @@ -409,8 +409,7 @@ impl ChainNotify for EthSync {
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
enacted: Vec<H256>,
retracted: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: Duration)
Expand All @@ -424,8 +423,8 @@ impl ChainNotify for EthSync {
&mut sync_io,
&imported,
&invalid,
&enacted,
&retracted,
route.enacted(),
route.retracted(),
&sealed,
&proposed);
});
Expand Down
7 changes: 4 additions & 3 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bytes::Bytes;
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, ChainMessageType, ClientIoMessage};
ClientConfig, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage};
use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService;
use ethcore::spec::Spec;
Expand Down Expand Up @@ -535,12 +535,13 @@ impl ChainNotify for EthPeer<EthcoreClient> {
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
enacted: Vec<H256>,
retracted: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: Duration)
{
let (enacted, retracted) = route.into_enacted_retracted();

self.new_blocks_queue.write().push_back(NewBlockMessage {
imported,
invalid,
Expand Down
4 changes: 2 additions & 2 deletions parity/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::{Instant, Duration};
use atty;
use ethcore::client::{
BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo,
BlockQueueInfo, ChainNotify, ClientReport, Client, ClientIoMessage
BlockQueueInfo, ChainNotify, ChainRoute, ClientReport, Client, ClientIoMessage
};
use ethcore::header::BlockNumber;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
Expand Down Expand Up @@ -351,7 +351,7 @@ impl<T: InformantData> Informant<T> {
}

impl ChainNotify for Informant<FullNodeInformantData> {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: Duration) {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: Duration) {
let mut last_import = self.last_import.lock();
let client = &self.target.client;

Expand Down
Loading

0 comments on commit e952e0f

Please sign in to comment.