Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

[beta] Backports #8785

Merged
merged 13 commits into from
Jun 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,23 @@ windows:
paths:
- parity.zip
name: "x86_64-pc-windows-msvc_parity"
android-armv7:
stage: build
image: parity/parity-android:latest
only:
- beta
- tags
- stable
- triggers
script:
- cargo build --target=armv7-linux-androideabi
tags:
- rust-arm
allow_failure: true
artifacts:
paths:
- parity.zip
name: "armv7-linux-androideabi_parity"
docker-build:
stage: build
only:
Expand Down
195 changes: 98 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sync::PrivateTxHandler;
use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage};
use ethcore::miner::Miner;
use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
use ethcore::snapshot::{RestorationStatus};
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus};
use ethcore::spec::Spec;
use ethcore::account_provider::AccountProvider;

Expand Down Expand Up @@ -168,6 +168,11 @@ impl ClientService {

/// Get a handle to the database.
pub fn db(&self) -> Arc<KeyValueDB> { self.database.clone() }

/// Shutdown the Client Service
pub fn shutdown(&self) {
self.snapshot.shutdown();
}
}

/// IO interface for the Client handler
Expand Down
64 changes: 40 additions & 24 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant};

// util
Expand Down Expand Up @@ -208,6 +208,8 @@ pub struct Client {
queue_transactions: IoChannelQueue,
/// Ancient blocks import queue
queue_ancient_blocks: IoChannelQueue,
/// Hashes of pending ancient block wainting to be included
pending_ancient_blocks: RwLock<HashSet<H256>>,
/// Consensus messages import queue
queue_consensus_message: IoChannelQueue,

Expand Down Expand Up @@ -461,6 +463,7 @@ impl Importer {
let hash = header.hash();
let _import_lock = self.import_lock.lock();

trace!(target: "client", "Trying to import old block #{}", header.number());
{
trace_time!("import_old_block");
// verify the block, passing the chain for updating the epoch verifier.
Expand Down Expand Up @@ -741,6 +744,7 @@ impl Client {
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
pending_ancient_blocks: RwLock::new(HashSet::new()),
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()),
factories: factories,
Expand Down Expand Up @@ -1972,7 +1976,7 @@ impl BlockChainClient for Client {
impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| {
trace_time!("import_queued_transactions");

let txs: Vec<UnverifiedTransaction> = transactions
Expand All @@ -1996,23 +2000,32 @@ impl IoClient for Client {

{
// check block order
if self.chain.read().is_known(&header.hash()) {
if self.chain.read().is_known(&hash) {
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
}
let status = self.block_status(BlockId::Hash(*header.parent_hash()));
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash())));

let parent_hash = *header.parent_hash();
let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash);
let status = self.block_status(BlockId::Hash(parent_hash));
if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash)));
}
}

match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
client.importer.import_old_block(
self.pending_ancient_blocks.write().insert(hash);

trace!(target: "client", "Queuing old block #{}", header.number());
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| {
let result = client.importer.import_old_block(
&header,
&block_bytes,
&receipts_bytes,
&**client.db.read(),
&*client.chain.read()
).map(|_| ()).unwrap_or_else(|e| {
);

client.pending_ancient_blocks.write().remove(&hash);
result.map(|_| ()).unwrap_or_else(|e| {
error!(target: "client", "Error importing ancient block: {}", e);
});
}) {
Expand All @@ -2022,7 +2035,7 @@ impl IoClient for Client {
}

fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
Expand Down Expand Up @@ -2131,7 +2144,7 @@ impl ImportSealedBlock for Client {
route
};
let (enacted, retracted) = self.importer.calculate_enacted_retracted(&[route]);
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, true);
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, self.engine.seals_internally().is_some());
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
Expand Down Expand Up @@ -2433,35 +2446,38 @@ impl fmt::Display for QueueError {

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
queue: Arc<Mutex<VecDeque<Box<Fn(&Client) + Send>>>>,
limit: usize,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
queue: Default::default(),
limit,
}
}

pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, fun: F) -> Result<(), QueueError>
where F: Fn(&Client) + Send + Sync + 'static
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
{
let mut queue = self.queue.lock();
let queue_size = queue.len();
ensure!(queue_size < self.limit, QueueError::Full(self.limit));

let currently_queued = self.currently_queued.clone();
queue.push_back(Box::new(fun));
}

let queue = self.queue.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
while let Some(fun) = queue.lock().pop_front() {
fun(client);
}
}));

match result {
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Ok(_) => Ok(()),
Err(e) => Err(QueueError::Channel(e)),
}
}
Expand Down
Loading