Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pow: replace the thread-base mining loop with a future-based mining worker #7060

Merged
16 commits merged into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/consensus/pow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ sp-consensus-pow = { version = "0.8.0-rc6", path = "../../../primitives/consensu
sp-consensus = { version = "0.8.0-rc6", path = "../../../primitives/consensus/common" }
log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
futures-timer = "3.0.1"
parking_lot = "0.10.0"
sp-timestamp = { version = "2.0.0-rc6", path = "../../../primitives/timestamp" }
derive_more = "0.99.2"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc6"}
309 changes: 141 additions & 168 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@
//! as the storage, but it is not recommended as it won't work well with light
//! clients.
use std::sync::Arc;
use std::any::Any;
use std::borrow::Cow;
use std::thread;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::cmp::Ordering;
use sc_client_api::{BlockOf, backend::AuxStore};
mod worker;

pub use crate::worker::{MiningWorker, MiningMetadata, MiningBuild};

use std::{
sync::Arc, any::Any, borrow::Cow, collections::HashMap, marker::PhantomData,
cmp::Ordering, time::Duration,
};
use futures::{prelude::*, future::Either};
use parking_lot::Mutex;
use sc_client_api::{BlockOf, backend::AuxStore, BlockchainEvents};
use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_runtime::{Justification, RuntimeString};
Expand All @@ -61,6 +64,8 @@ use sc_client_api;
use log::*;
use sp_timestamp::{InherentError as TIError, TimestampInherentData};

use crate::worker::UntilImportedOrTimeout;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
#[display(fmt = "Header uses the wrong engine {:?}", _0)]
Expand Down Expand Up @@ -193,15 +198,6 @@ pub trait PowAlgorithm<B: BlockT> {
seal: &Seal,
difficulty: Self::Difficulty,
) -> Result<bool, Error<B>>;
/// Mine a seal that satisfies the given difficulty.
fn mine(
&self,
parent: &BlockId<B>,
pre_hash: &B::Hash,
pre_digest: Option<&[u8]>,
difficulty: Self::Difficulty,
round: u32,
) -> Result<Option<Seal>, Error<B>>;
}

/// A block importer for PoW.
Expand Down Expand Up @@ -534,194 +530,171 @@ pub fn import_queue<B, Transaction, Algorithm>(
))
}

/// Start the background mining thread for PoW. Note that because PoW mining
/// is CPU-intensive, it is not possible to use an async future to define this.
/// However, it's not recommended to use background threads in the rest of the
/// codebase.
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
///
/// Two values are returned -- a worker, which contains functions that allows querying the current
/// mining metadata and submitting mined blocks, and a future, which must be polled to fill in
/// information in the worker.
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime
/// digest to be inserted for blocks being built. This can encode authorship
/// information, or just be a graffiti. `round` is for number of rounds the
sorpaas marked this conversation as resolved.
Show resolved Hide resolved
/// CPU miner runs each time. This parameter should be tweaked so that each
/// mining round is within sub-second time.
pub fn start_mine<B: BlockT, C, Algorithm, E, SO, S, CAW>(
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
client: Arc<C>,
select_chain: S,
algorithm: Algorithm,
mut env: E,
pre_runtime: Option<Vec<u8>>,
round: u32,
mut sync_oracle: SO,
build_time: std::time::Duration,
select_chain: Option<S>,
pre_runtime: Option<Vec<u8>>,
inherent_data_providers: sp_inherents::InherentDataProviders,
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
) where
C: HeaderBackend<B> + AuxStore + ProvideRuntimeApi<B> + 'static,
Algorithm: PowAlgorithm<B> + Send + Sync + 'static,
E: Environment<B> + Send + Sync + 'static,
) -> (Arc<Mutex<MiningWorker<Block, Algorithm, C>>>, impl Future<Output = ()>) where
Block: BlockT,
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
S: SelectChain<Block> + 'static,
Algorithm: PowAlgorithm<Block> + Clone,
Algorithm::Difficulty: 'static,
E: Environment<Block> + Send + Sync + 'static,
E::Error: std::fmt::Debug,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Send + Sync + 'static,
S: SelectChain<B> + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
{
if let Err(_) = register_pow_inherent_data_provider(&inherent_data_providers) {
warn!("Registering inherent data provider for timestamp failed");
}

thread::spawn(move || {
loop {
match mine_loop(
&mut block_import,
client.as_ref(),
&algorithm,
&mut env,
pre_runtime.as_ref(),
round,
&mut sync_oracle,
build_time.clone(),
select_chain.as_ref(),
&inherent_data_providers,
&can_author_with,
) {
Ok(()) => (),
Err(e) => error!(
"Mining block failed with {:?}. Sleep for 1 second before restarting...",
e
),
}
std::thread::sleep(std::time::Duration::new(1, 0));
}
});
}
let timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C> {
build: None,
algorithm: algorithm.clone(),
block_import,
}));
let worker_ret = worker.clone();

let task = timer.for_each(move |()| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use a async block here, you don't need the Either and probably make the code much easier to follow.

Copy link
Member Author

@sorpaas sorpaas Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some lifetime issues, but the main problem removing the Either would be the additional clones. For example, if I change the whole block into async, I'd have to clone sync_oracle, inherent_data_providers, algorithm and some other variables. With the current design they don't have to be cloned at all.

let worker = worker.clone();

fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
client: &C,
algorithm: &Algorithm,
env: &mut E,
pre_runtime: Option<&Vec<u8>>,
round: u32,
sync_oracle: &mut SO,
build_time: std::time::Duration,
select_chain: Option<&S>,
inherent_data_providers: &sp_inherents::InherentDataProviders,
can_author_with: &CAW,
) -> Result<(), Error<B>> where
C: HeaderBackend<B> + AuxStore + ProvideRuntimeApi<B>,
Algorithm: PowAlgorithm<B>,
Algorithm::Difficulty: 'static,
E: Environment<B>,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
E::Error: std::fmt::Debug,
SO: SyncOracle,
S: SelectChain<B>,
sp_api::TransactionFor<C, B>: 'static,
CAW: CanAuthorWith<B>,
{
'outer: loop {
if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
std::thread::sleep(std::time::Duration::new(1, 0));
continue 'outer
worker.lock().on_major_syncing();
return Either::Left(future::ready(()))
}

let (best_hash, best_header) = match select_chain {
Some(select_chain) => {
let header = select_chain.best_chain()
.map_err(Error::BestHeaderSelectChain)?;
let hash = header.hash();
(hash, header)
},
None => {
let hash = client.info().best_hash;
let header = client.header(BlockId::Hash(hash))
.map_err(Error::BestHeader)?
.ok_or(Error::NoBestHeader)?;
(hash, header)
let best_header = match select_chain.best_chain() {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to pull new block for authoring. \
Select best chain error: {:?}",
err
);
return Either::Left(future::ready(()))
},
};
let best_hash = best_header.hash();

if let Err(err) = can_author_with.can_author_with(&BlockId::Hash(best_hash)) {
warn!(
target: "pow",
"Skipping proposal `can_author_with` returned: {} \
Probably a node update is required!",
Probably a node update is required!",
err,
);
std::thread::sleep(std::time::Duration::from_secs(1));
continue 'outer
return Either::Left(future::ready(()))
}

let proposer = futures::executor::block_on(env.init(&best_header))
.map_err(|e| Error::Environment(format!("{:?}", e)))?;

let inherent_data = inherent_data_providers
.create_inherent_data().map_err(Error::CreateInherents)?;
let mut inherent_digest = Digest::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
if worker.lock().best_hash() == Some(best_hash) {
return Either::Left(future::ready(()))
}
let proposal = futures::executor::block_on(proposer.propose(
inherent_data,
inherent_digest,
build_time.clone(),
RecordProof::No,
)).map_err(|e| Error::BlockProposingError(format!("{:?}", e)))?;

let (header, body) = proposal.block.deconstruct();
let (difficulty, seal) = {
let difficulty = algorithm.difficulty(best_hash)?;

loop {
let seal = algorithm.mine(
&BlockId::Hash(best_hash),
&header.hash(),
pre_runtime.map(|v| &v[..]),
difficulty,
round,
)?;

if let Some(seal) = seal {
break (difficulty, seal)
}

if best_hash != client.info().best_hash {
continue 'outer
}
}
// The worker is locked for the duration of the whole proposing period. Within this period,
// the mining target is outdated and useless anyway.

let difficulty = match algorithm.difficulty(best_hash) {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Fetch difficulty failed: {:?}",
err,
);
return Either::Left(future::ready(()))
},
};

log::info!("✅ Successfully mined block: {}", best_hash);

let (hash, seal) = {
let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let mut header = header.clone();
header.digest_mut().push(seal);
let hash = header.hash();
let seal = header.digest_mut().pop()
.expect("Pushed one seal above; length greater than zero; qed");
(hash, seal)
let awaiting_proposer = env.init(&best_header);
let inherent_data = match inherent_data_providers.create_inherent_data() {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Creating inherent data failed: {:?}",
err,
);
return Either::Left(future::ready(()))
},
};
let mut inherent_digest = Digest::<Block::Hash>::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
}

let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(difficulty),
};
let pre_runtime = pre_runtime.clone();

Either::Right(async move {
let proposer = match awaiting_proposer.await {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Creating proposer failed: {:?}",
err,
);
return
},
};

let proposal = match proposer.propose(
inherent_data,
inherent_digest,
build_time.clone(),
RecordProof::No,
).await {
Ok(x) => x,
Err(err) => {
warn!(
target: "pow",
"Unable to propose new block for authoring. \
Creating proposal failed: {:?}",
err,
);
return
},
};

let build = MiningBuild::<Block, Algorithm, C> {
metadata: MiningMetadata {
best_hash,
pre_hash: proposal.block.header().hash(),
pre_runtime: pre_runtime.clone(),
difficulty,
},
proposal,
};

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.storage_changes = Some(proposal.storage_changes);
import_block.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(intermediate) as Box<dyn Any>
);
import_block.post_hash = Some(hash);
worker.lock().on_build(build);
})
});

block_import.import_block(import_block, HashMap::default())
.map_err(|e| Error::BlockBuiltError(best_hash, e))?;
}
(worker_ret, task)
}

/// Find PoW pre-runtime.
Expand Down
Loading