diff --git a/Cargo.lock b/Cargo.lock index e80ba143d3de8..33e037b7c6444 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6642,8 +6642,10 @@ version = "0.8.0-rc6" dependencies = [ "derive_more", "futures 0.3.5", + "futures-timer 3.0.2", "log", "parity-scale-codec", + "parking_lot 0.10.2", "sc-client-api", "sp-api", "sp-block-builder", diff --git a/client/consensus/pow/Cargo.toml b/client/consensus/pow/Cargo.toml index 993502972f2d0..9e97052373adc 100644 --- a/client/consensus/pow/Cargo.toml +++ b/client/consensus/pow/Cargo.toml @@ -24,6 +24,8 @@ sp-consensus-pow = { version = "0.8.0-rc6", path = "../../../primitives/consensu sp-consensus = { version = "0.8.0-rc6", path = "../../../primitives/consensus/common" } log = "0.4.8" futures = { version = "0.3.1", features = ["compat"] } +futures-timer = "3.0.1" +parking_lot = "0.10.0" sp-timestamp = { version = "2.0.0-rc6", path = "../../../primitives/timestamp" } derive_more = "0.99.2" prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc6"} diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 70a7bb47873f6..b73b9aa91f802 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -31,14 +31,17 @@ //! as the storage, but it is not recommended as it won't work well with light //! clients. -use std::sync::Arc; -use std::any::Any; -use std::borrow::Cow; -use std::thread; -use std::collections::HashMap; -use std::marker::PhantomData; -use std::cmp::Ordering; -use sc_client_api::{BlockOf, backend::AuxStore}; +mod worker; + +pub use crate::worker::{MiningWorker, MiningMetadata, MiningBuild}; + +use std::{ + sync::Arc, any::Any, borrow::Cow, collections::HashMap, marker::PhantomData, + cmp::Ordering, time::Duration, +}; +use futures::{prelude::*, future::Either}; +use parking_lot::Mutex; +use sc_client_api::{BlockOf, backend::AuxStore, BlockchainEvents}; use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId}; use sp_block_builder::BlockBuilder as BlockBuilderApi; use sp_runtime::{Justification, RuntimeString}; @@ -61,6 +64,8 @@ use sc_client_api; use log::*; use sp_timestamp::{InherentError as TIError, TimestampInherentData}; +use crate::worker::UntilImportedOrTimeout; + #[derive(derive_more::Display, Debug)] pub enum Error { #[display(fmt = "Header uses the wrong engine {:?}", _0)] @@ -193,15 +198,6 @@ pub trait PowAlgorithm { seal: &Seal, difficulty: Self::Difficulty, ) -> Result>; - /// Mine a seal that satisfies the given difficulty. - fn mine( - &self, - parent: &BlockId, - pre_hash: &B::Hash, - pre_digest: Option<&[u8]>, - difficulty: Self::Difficulty, - round: u32, - ) -> Result, Error>; } /// A block importer for PoW. @@ -534,194 +530,171 @@ pub fn import_queue( )) } -/// Start the background mining thread for PoW. Note that because PoW mining -/// is CPU-intensive, it is not possible to use an async future to define this. -/// However, it's not recommended to use background threads in the rest of the -/// codebase. +/// Start the mining worker for PoW. This function provides the necessary helper functions that can +/// be used to implement a miner. However, it does not do the CPU-intensive mining itself. +/// +/// Two values are returned -- a worker, which contains functions that allows querying the current +/// mining metadata and submitting mined blocks, and a future, which must be polled to fill in +/// information in the worker. /// -/// `pre_runtime` is a parameter that allows a custom additional pre-runtime -/// digest to be inserted for blocks being built. This can encode authorship -/// information, or just be a graffiti. `round` is for number of rounds the -/// CPU miner runs each time. This parameter should be tweaked so that each -/// mining round is within sub-second time. -pub fn start_mine( - mut block_import: BoxBlockImport>, +/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted +/// for blocks being built. This can encode authorship information, or just be a graffiti. +pub fn start_mining_worker( + block_import: BoxBlockImport>, client: Arc, + select_chain: S, algorithm: Algorithm, mut env: E, - pre_runtime: Option>, - round: u32, mut sync_oracle: SO, - build_time: std::time::Duration, - select_chain: Option, + pre_runtime: Option>, inherent_data_providers: sp_inherents::InherentDataProviders, + timeout: Duration, + build_time: Duration, can_author_with: CAW, -) where - C: HeaderBackend + AuxStore + ProvideRuntimeApi + 'static, - Algorithm: PowAlgorithm + Send + Sync + 'static, - E: Environment + Send + Sync + 'static, +) -> (Arc>>, impl Future) where + Block: BlockT, + C: ProvideRuntimeApi + BlockchainEvents + 'static, + S: SelectChain + 'static, + Algorithm: PowAlgorithm + Clone, + Algorithm::Difficulty: 'static, + E: Environment + Send + Sync + 'static, E::Error: std::fmt::Debug, - E::Proposer: Proposer>, - SO: SyncOracle + Send + Sync + 'static, - S: SelectChain + 'static, - CAW: CanAuthorWith + Send + 'static, + E::Proposer: Proposer>, + SO: SyncOracle + Clone + Send + Sync + 'static, + CAW: CanAuthorWith + Clone + Send + 'static, { if let Err(_) = register_pow_inherent_data_provider(&inherent_data_providers) { warn!("Registering inherent data provider for timestamp failed"); } - thread::spawn(move || { - loop { - match mine_loop( - &mut block_import, - client.as_ref(), - &algorithm, - &mut env, - pre_runtime.as_ref(), - round, - &mut sync_oracle, - build_time.clone(), - select_chain.as_ref(), - &inherent_data_providers, - &can_author_with, - ) { - Ok(()) => (), - Err(e) => error!( - "Mining block failed with {:?}. Sleep for 1 second before restarting...", - e - ), - } - std::thread::sleep(std::time::Duration::new(1, 0)); - } - }); -} + let timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); + let worker = Arc::new(Mutex::new(MiningWorker:: { + build: None, + algorithm: algorithm.clone(), + block_import, + })); + let worker_ret = worker.clone(); + + let task = timer.for_each(move |()| { + let worker = worker.clone(); -fn mine_loop( - block_import: &mut BoxBlockImport>, - client: &C, - algorithm: &Algorithm, - env: &mut E, - pre_runtime: Option<&Vec>, - round: u32, - sync_oracle: &mut SO, - build_time: std::time::Duration, - select_chain: Option<&S>, - inherent_data_providers: &sp_inherents::InherentDataProviders, - can_author_with: &CAW, -) -> Result<(), Error> where - C: HeaderBackend + AuxStore + ProvideRuntimeApi, - Algorithm: PowAlgorithm, - Algorithm::Difficulty: 'static, - E: Environment, - E::Proposer: Proposer>, - E::Error: std::fmt::Debug, - SO: SyncOracle, - S: SelectChain, - sp_api::TransactionFor: 'static, - CAW: CanAuthorWith, -{ - 'outer: loop { if sync_oracle.is_major_syncing() { debug!(target: "pow", "Skipping proposal due to sync."); - std::thread::sleep(std::time::Duration::new(1, 0)); - continue 'outer + worker.lock().on_major_syncing(); + return Either::Left(future::ready(())) } - let (best_hash, best_header) = match select_chain { - Some(select_chain) => { - let header = select_chain.best_chain() - .map_err(Error::BestHeaderSelectChain)?; - let hash = header.hash(); - (hash, header) - }, - None => { - let hash = client.info().best_hash; - let header = client.header(BlockId::Hash(hash)) - .map_err(Error::BestHeader)? - .ok_or(Error::NoBestHeader)?; - (hash, header) + let best_header = match select_chain.best_chain() { + Ok(x) => x, + Err(err) => { + warn!( + target: "pow", + "Unable to pull new block for authoring. \ + Select best chain error: {:?}", + err + ); + return Either::Left(future::ready(())) }, }; + let best_hash = best_header.hash(); if let Err(err) = can_author_with.can_author_with(&BlockId::Hash(best_hash)) { warn!( target: "pow", "Skipping proposal `can_author_with` returned: {} \ - Probably a node update is required!", + Probably a node update is required!", err, ); - std::thread::sleep(std::time::Duration::from_secs(1)); - continue 'outer + return Either::Left(future::ready(())) } - let proposer = futures::executor::block_on(env.init(&best_header)) - .map_err(|e| Error::Environment(format!("{:?}", e)))?; - - let inherent_data = inherent_data_providers - .create_inherent_data().map_err(Error::CreateInherents)?; - let mut inherent_digest = Digest::default(); - if let Some(pre_runtime) = &pre_runtime { - inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec())); + if worker.lock().best_hash() == Some(best_hash) { + return Either::Left(future::ready(())) } - let proposal = futures::executor::block_on(proposer.propose( - inherent_data, - inherent_digest, - build_time.clone(), - RecordProof::No, - )).map_err(|e| Error::BlockProposingError(format!("{:?}", e)))?; - - let (header, body) = proposal.block.deconstruct(); - let (difficulty, seal) = { - let difficulty = algorithm.difficulty(best_hash)?; - - loop { - let seal = algorithm.mine( - &BlockId::Hash(best_hash), - &header.hash(), - pre_runtime.map(|v| &v[..]), - difficulty, - round, - )?; - - if let Some(seal) = seal { - break (difficulty, seal) - } - if best_hash != client.info().best_hash { - continue 'outer - } - } + // The worker is locked for the duration of the whole proposing period. Within this period, + // the mining target is outdated and useless anyway. + + let difficulty = match algorithm.difficulty(best_hash) { + Ok(x) => x, + Err(err) => { + warn!( + target: "pow", + "Unable to propose new block for authoring. \ + Fetch difficulty failed: {:?}", + err, + ); + return Either::Left(future::ready(())) + }, }; - log::info!("✅ Successfully mined block: {}", best_hash); - - let (hash, seal) = { - let seal = DigestItem::Seal(POW_ENGINE_ID, seal); - let mut header = header.clone(); - header.digest_mut().push(seal); - let hash = header.hash(); - let seal = header.digest_mut().pop() - .expect("Pushed one seal above; length greater than zero; qed"); - (hash, seal) + let awaiting_proposer = env.init(&best_header); + let inherent_data = match inherent_data_providers.create_inherent_data() { + Ok(x) => x, + Err(err) => { + warn!( + target: "pow", + "Unable to propose new block for authoring. \ + Creating inherent data failed: {:?}", + err, + ); + return Either::Left(future::ready(())) + }, }; + let mut inherent_digest = Digest::::default(); + if let Some(pre_runtime) = &pre_runtime { + inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec())); + } - let intermediate = PowIntermediate:: { - difficulty: Some(difficulty), - }; + let pre_runtime = pre_runtime.clone(); + + Either::Right(async move { + let proposer = match awaiting_proposer.await { + Ok(x) => x, + Err(err) => { + warn!( + target: "pow", + "Unable to propose new block for authoring. \ + Creating proposer failed: {:?}", + err, + ); + return + }, + }; + + let proposal = match proposer.propose( + inherent_data, + inherent_digest, + build_time.clone(), + RecordProof::No, + ).await { + Ok(x) => x, + Err(err) => { + warn!( + target: "pow", + "Unable to propose new block for authoring. \ + Creating proposal failed: {:?}", + err, + ); + return + }, + }; + + let build = MiningBuild:: { + metadata: MiningMetadata { + best_hash, + pre_hash: proposal.block.header().hash(), + pre_runtime: pre_runtime.clone(), + difficulty, + }, + proposal, + }; - let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); - import_block.post_digests.push(seal); - import_block.body = Some(body); - import_block.storage_changes = Some(proposal.storage_changes); - import_block.intermediates.insert( - Cow::from(INTERMEDIATE_KEY), - Box::new(intermediate) as Box - ); - import_block.post_hash = Some(hash); + worker.lock().on_build(build); + }) + }); - block_import.import_block(import_block, HashMap::default()) - .map_err(|e| Error::BlockBuiltError(best_hash, e))?; - } + (worker_ret, task) } /// Find PoW pre-runtime. diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs new file mode 100644 index 0000000000000..4ed863dcd9ed9 --- /dev/null +++ b/client/consensus/pow/src/worker.rs @@ -0,0 +1,213 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::{pin::Pin, time::Duration, collections::HashMap, any::Any, borrow::Cow}; +use sc_client_api::ImportNotifications; +use sp_runtime::{DigestItem, traits::Block as BlockT, generic::BlockId}; +use sp_consensus::{Proposal, BlockOrigin, BlockImportParams, import_queue::BoxBlockImport}; +use futures::{prelude::*, task::{Context, Poll}}; +use futures_timer::Delay; +use log::*; + +use crate::{INTERMEDIATE_KEY, POW_ENGINE_ID, Seal, PowAlgorithm, PowIntermediate}; + +/// Mining metadata. This is the information needed to start an actual mining loop. +#[derive(Clone, Eq, PartialEq)] +pub struct MiningMetadata { + /// Currently known best hash which the pre-hash is built on. + pub best_hash: H, + /// Mining pre-hash. + pub pre_hash: H, + /// Pre-runtime digest item. + pub pre_runtime: Option>, + /// Mining target difficulty. + pub difficulty: D, +} + +/// A build of mining, containing the metadata and the block proposal. +pub struct MiningBuild, C: sp_api::ProvideRuntimeApi> { + /// Mining metadata. + pub metadata: MiningMetadata, + /// Mining proposal. + pub proposal: Proposal>, +} + +/// Mining worker that exposes structs to query the current mining build and submit mined blocks. +pub struct MiningWorker, C: sp_api::ProvideRuntimeApi> { + pub(crate) build: Option>, + pub(crate) algorithm: Algorithm, + pub(crate) block_import: BoxBlockImport>, +} + +impl MiningWorker where + Block: BlockT, + C: sp_api::ProvideRuntimeApi, + Algorithm: PowAlgorithm, + Algorithm::Difficulty: 'static, +{ + /// Get the current best hash. `None` if the worker has just started or the client is doing + /// major syncing. + pub fn best_hash(&self) -> Option { + self.build.as_ref().map(|b| b.metadata.best_hash) + } + + pub(crate) fn on_major_syncing(&mut self) { + self.build = None; + } + + pub(crate) fn on_build( + &mut self, + build: MiningBuild, + ) { + self.build = Some(build); + } + + /// Get a copy of the current mining metadata, if available. + pub fn metadata(&self) -> Option> { + self.build.as_ref().map(|b| b.metadata.clone()) + } + + /// Submit a mined seal. The seal will be validated again. Returns true if the submission is + /// successful. + pub fn submit(&mut self, seal: Seal) -> bool { + if let Some(build) = self.build.take() { + match self.algorithm.verify( + &BlockId::Hash(build.metadata.best_hash), + &build.metadata.pre_hash, + build.metadata.pre_runtime.as_ref().map(|v| &v[..]), + &seal, + build.metadata.difficulty, + ) { + Ok(true) => (), + Ok(false) => { + warn!( + target: "pow", + "Unable to import mined block: seal is invalid", + ); + return false + }, + Err(err) => { + warn!( + target: "pow", + "Unable to import mined block: {:?}", + err, + ); + return false + }, + } + + let seal = DigestItem::Seal(POW_ENGINE_ID, seal); + let (header, body) = build.proposal.block.deconstruct(); + + let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); + import_block.post_digests.push(seal); + import_block.body = Some(body); + import_block.storage_changes = Some(build.proposal.storage_changes); + + let intermediate = PowIntermediate:: { + difficulty: Some(build.metadata.difficulty), + }; + + import_block.intermediates.insert( + Cow::from(INTERMEDIATE_KEY), + Box::new(intermediate) as Box + ); + + match self.block_import.import_block(import_block, HashMap::default()) { + Ok(_) => { + info!( + target: "pow", + "✅ Successfully mined block on top of: {}", + build.metadata.best_hash + ); + true + }, + Err(err) => { + warn!( + target: "pow", + "Unable to import mined block: {:?}", + err, + ); + false + }, + } + } else { + warn!( + target: "pow", + "Unable to import mined block: build does not exist", + ); + false + } + } +} + +/// A stream that waits for a block import or timeout. +pub struct UntilImportedOrTimeout { + import_notifications: ImportNotifications, + timeout: Duration, + inner_delay: Option, +} + +impl UntilImportedOrTimeout { + /// Create a new stream using the given import notification and timeout duration. + pub fn new( + import_notifications: ImportNotifications, + timeout: Duration, + ) -> Self { + Self { + import_notifications, + timeout, + inner_delay: None, + } + } +} + +impl Stream for UntilImportedOrTimeout { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut fire = false; + + loop { + match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { + Poll::Pending => break, + Poll::Ready(Some(_)) => { + fire = true; + }, + Poll::Ready(None) => return Poll::Ready(None), + } + } + + let timeout = self.timeout.clone(); + let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout)); + + match Future::poll(Pin::new(inner_delay), cx) { + Poll::Pending => (), + Poll::Ready(()) => { + fire = true; + }, + } + + if fire { + self.inner_delay = None; + Poll::Ready(Some(())) + } else { + Poll::Pending + } + } +}