Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
multithreaded pow mining worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizdave97 committed Aug 29, 2021
1 parent 17ce41a commit 4d5f55c
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 246 deletions.
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/consensus/pow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ parking_lot = "0.11.1"
derive_more = "0.99.2"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.9.0"}
async-trait = "0.1.50"
tokio = { version = "1.10.1", features = ["sync"] }
futures-lite= "1.12.0"
tokio-stream = { version = "0.1.7", features = ['sync'] }
228 changes: 182 additions & 46 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@
mod worker;

pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
pub use crate::worker::{MiningBuild, MiningData, MiningMetadata};

use crate::worker::UntilImportedOrTimeout;
use codec::{Decode, Encode};
use futures::{Future, StreamExt};
use futures::{executor::block_on, Future, StreamExt};
use log::*;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
use sc_consensus::{
BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier,
BoxJustificationImport, ForkChoiceStrategy, ImportResult, StateAction, StorageChanges,
Verifier,
};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache};
use sp_consensus::{
CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle,
BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain,
SyncOracle,
};
use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID};
use sp_core::ExecutionContext;
Expand All @@ -69,9 +69,10 @@ use sp_runtime::{
RuntimeString,
};
use std::{
borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc,
borrow::Cow, cmp::Ordering, collections::HashMap, hint, marker::PhantomData, sync::Arc, thread,
time::Duration,
};
use tokio_stream::wrappers::ReceiverStream;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -502,6 +503,7 @@ where
Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry))
}

type SealStream = ReceiverStream<Seal>;
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
///
Expand All @@ -511,55 +513,78 @@ where
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub fn start_mining_worker<B, C, S, A, E, SO, L, CIDP, CAW, M>(
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
client: Arc<C>,
select_chain: S,
algorithm: Algorithm,
algorithm: A,
mut env: E,
mut sync_oracle: SO,
justification_sync_link: L,
mut justification_sync_link: L,
pre_runtime: Option<Vec<u8>>,
create_inherent_data_providers: CIDP,
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
impl Future<Output = ()>,
)
thread_count: u32,
compute: M,
) -> impl Future<Output = ()>
where
Block: BlockT,
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
S: SelectChain<Block> + 'static,
Algorithm: PowAlgorithm<Block> + Clone,
Algorithm::Difficulty: Send + 'static,
E: Environment<Block> + Send + Sync + 'static,
B: BlockT,
B::Hash: Unpin,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SelectChain<B> + 'static,
A: PowAlgorithm<B> + Clone,
A::Difficulty: Send + Sync + Unpin + 'static,
E: Environment<B> + Send + Sync + 'static,
E::Error: std::fmt::Debug,
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
L: sc_consensus::JustificationSyncLink<Block>,
CIDP: CreateInherentDataProviders<Block, ()>,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
L: sc_consensus::JustificationSyncLink<B>,
CIDP: CreateInherentDataProviders<B, ()>,
CAW: CanAuthorWith<B> + Clone + Send + 'static,
M: Fn(&MiningData<B::Hash, A::Difficulty>) -> Option<Seal> + Send + Copy + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker_ret = worker.clone();
use futures::future::Either;

// Create a spmc channel here
let (producer, consumer) = tokio::sync::watch::channel(None);

// Create channel for receiving a seal from the node
let mut seal_channel: Option<SealStream> = None;
let mut import_stream = client.import_notification_stream();
let mut build = None;

// authorship
let task = async move {
loop {
if timer.next().await.is_none() {
break
}
if let Some(mut channel) = seal_channel.take() {
let result = futures::future::select(channel.next(), import_stream.next()).await;

match result {
// we only care about these two cases.
Either::Left((Some(seal), _)) => {
if let Some(mining_build) = build.take() {
do_import_block(
seal,
mining_build,
&algorithm,
&mut block_import,
&mut justification_sync_link,
)
.await
}
}
Either::Right((Some(block), _)) => {
if matches!(block.origin, BlockOrigin::Own) {
continue;
}
}
_ => {}
}
};

if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
worker.lock().on_major_syncing();
continue
}

Expand Down Expand Up @@ -587,10 +612,6 @@ where
continue
}

if worker.lock().best_hash() == Some(best_hash) {
continue
}

// The worker is locked for the duration of the whole proposing period. Within this
// period, the mining target is outdated and useless anyway.

Expand Down Expand Up @@ -636,7 +657,7 @@ where
},
};

let mut inherent_digest = Digest::<Block::Hash>::default();
let mut inherent_digest = Digest::<B::Hash>::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
}
Expand Down Expand Up @@ -672,7 +693,11 @@ where
},
};

let build = MiningBuild::<Block, Algorithm, C, _> {
let (sender, consumer) = tokio::sync::mpsc::channel(10);

seal_channel = Some(ReceiverStream::new(consumer));

let mining_build = MiningBuild::<B, A, C, _> {
metadata: MiningMetadata {
best_hash,
pre_hash: proposal.block.header().hash(),
Expand All @@ -682,11 +707,49 @@ where
proposal,
};

worker.lock().on_build(build);
let _res =
producer.send(Some(MiningData { metadata: mining_build.metadata.clone(), sender }));

build = Some(mining_build);
}
};

(worker_ret, task)
// mining
for _ in 0..thread_count {
let rx = consumer.clone();
thread::spawn(move || {
use futures_lite::future::poll_once;
let mut stream = tokio_stream::wrappers::WatchStream::new(rx);
let mut item = futures::executor::block_on(stream.next()).flatten();

block_on(async {
loop {
// figured it out, we simply have to check once if there's a new item
// in the stream, otherwise we run compute in a hot loop
// this ensures that when a new block comes in, we immediately start building on it
match poll_once(stream.next()).await {
Some(Some(new_item)) => {
item = new_item;
}
// stream has ended, shutdown this thread
Some(None) => return,
_ => {}
}

if let Some(ref build) = item {
if let Some(seal) = compute(build) {
let _ = build.sender.send(seal).await;
}
}
// machine instruction that tells the cpu, we're in a hot loop.
// and cpu can optimize for it.
hint::spin_loop();
}
});
});
}

task
}

/// Find PoW pre-runtime.
Expand Down Expand Up @@ -722,3 +785,76 @@ fn fetch_seal<B: BlockT>(
_ => return Err(Error::<B>::HeaderUnsealed(hash).into()),
}
}

pub async fn do_import_block<B, C, A, P, L>(
seal: Seal,
build: MiningBuild<B, A, C, P>,
algorithm: &A,
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
justification_sync_link: &mut L,
) where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
A: PowAlgorithm<B> + Clone,
A::Difficulty: Send + 'static,
L: sc_consensus::JustificationSyncLink<B>,
{
match algorithm.verify(
&BlockId::Hash(build.metadata.best_hash),
&build.metadata.pre_hash,
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
&seal,
build.metadata.difficulty,
) {
Ok(true) => (),
Ok(false) => {
warn!(
target: "pow",
"Unable to import mined block: seal is invalid",
);
}
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
}
}

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate =
PowIntermediate::<A::Difficulty> { difficulty: Some(build.metadata.difficulty) };

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
match block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(&header.hash(), *header.number(), justification_sync_link);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
}
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
}
}
}
Loading

0 comments on commit 4d5f55c

Please sign in to comment.