Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pow: add Version for quick-check of metadata state and refactor lock handling #9698

Merged
6 commits merged into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/consensus/common/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ImportResult {
&self,
hash: &B::Hash,
number: NumberFor<B>,
justification_sync_link: &mut dyn JustificationSyncLink<B>,
justification_sync_link: &dyn JustificationSyncLink<B>,
) where
B: BlockT,
{
Expand Down
18 changes: 6 additions & 12 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@

mod worker;

pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata};

use crate::worker::UntilImportedOrTimeout;
use codec::{Decode, Encode};
use futures::{Future, StreamExt};
use log::*;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
use sc_consensus::{
Expand Down Expand Up @@ -525,7 +524,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
build_time: Duration,
can_author_with: CAW,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
MiningHandle<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>,
impl Future<Output = ()>,
)
where
Expand All @@ -543,12 +542,7 @@ where
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker = MiningHandle::new(algorithm.clone(), block_import, justification_sync_link);
let worker_ret = worker.clone();

let task = async move {
Expand All @@ -559,7 +553,7 @@ where

if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
worker.lock().on_major_syncing();
worker.on_major_syncing();
continue
}

Expand Down Expand Up @@ -587,7 +581,7 @@ where
continue
}

if worker.lock().best_hash() == Some(best_hash) {
if worker.best_hash() == Some(best_hash) {
continue
}

Expand Down Expand Up @@ -682,7 +676,7 @@ where
proposal,
};

worker.lock().on_build(build);
worker.on_build(build);
}
};

Expand Down
210 changes: 146 additions & 64 deletions client/consensus/pow/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::{
};
use futures_timer::Delay;
use log::*;
use parking_lot::Mutex;
use sc_client_api::ImportNotifications;
use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges};
use sp_consensus::{BlockOrigin, Proposal};
Expand All @@ -30,7 +31,16 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
DigestItem,
};
use std::{borrow::Cow, collections::HashMap, pin::Pin, time::Duration};
use std::{
borrow::Cow,
collections::HashMap,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID};

Expand Down Expand Up @@ -60,21 +70,26 @@ pub struct MiningBuild<
pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>, Proof>,
}

/// Version of the mining worker.
#[derive(Eq, PartialEq, Clone, Copy)]
pub struct Version(usize);

/// Mining worker that exposes structs to query the current mining build and submit mined blocks.
pub struct MiningWorker<
pub struct MiningHandle<
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
L: sc_consensus::JustificationSyncLink<Block>,
Proof,
> {
pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>,
pub(crate) algorithm: Algorithm,
pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub(crate) justification_sync_link: L,
version: Arc<AtomicUsize>,
algorithm: Arc<Algorithm>,
justification_sync_link: Arc<L>,
build: Arc<Mutex<Option<MiningBuild<Block, Algorithm, C, Proof>>>>,
block_import: Arc<Mutex<BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>>>,
}

impl<Block, Algorithm, C, L, Proof> MiningWorker<Block, Algorithm, C, L, Proof>
impl<Block, Algorithm, C, L, Proof> MiningHandle<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
C: sp_api::ProvideRuntimeApi<Block>,
Expand All @@ -83,35 +98,65 @@ where
L: sc_consensus::JustificationSyncLink<Block>,
sp_api::TransactionFor<C, Block>: Send + 'static,
{
/// Get the current best hash. `None` if the worker has just started or the client is doing
/// major syncing.
pub fn best_hash(&self) -> Option<Block::Hash> {
self.build.as_ref().map(|b| b.metadata.best_hash)
fn increment_version(&self) {
self.version.fetch_add(1, Ordering::SeqCst);
}

pub(crate) fn on_major_syncing(&mut self) {
self.build = None;
pub(crate) fn new(
algorithm: Algorithm,
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
justification_sync_link: L,
) -> Self {
Self {
version: Arc::new(AtomicUsize::new(0)),
algorithm: Arc::new(algorithm),
justification_sync_link: Arc::new(justification_sync_link),
build: Arc::new(Mutex::new(None)),
block_import: Arc::new(Mutex::new(block_import)),
}
}

pub(crate) fn on_build(&mut self, build: MiningBuild<Block, Algorithm, C, Proof>) {
self.build = Some(build);
pub(crate) fn on_major_syncing(&self) {
let mut build = self.build.lock();
*build = None;
self.increment_version();
}

pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm, C, Proof>) {
let mut build = self.build.lock();
*build = Some(value);
self.increment_version();
}

/// Get the version of the mining worker.
///
/// This returns type `Version` which can only compare equality. If `Version` is unchanged, then
/// it can be certain that `best_hash` and `metadata` were not changed.
pub fn version(&self) -> Version {
Version(self.version.load(Ordering::SeqCst))
}

/// Get the current best hash. `None` if the worker has just started or the client is doing
/// major syncing.
pub fn best_hash(&self) -> Option<Block::Hash> {
self.build.lock().as_ref().map(|b| b.metadata.best_hash)
}

/// Get a copy of the current mining metadata, if available.
pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> {
self.build.as_ref().map(|b| b.metadata.clone())
self.build.lock().as_ref().map(|b| b.metadata.clone())
}

/// Submit a mined seal. The seal will be validated again. Returns true if the submission is
/// successful.
pub async fn submit(&mut self, seal: Seal) -> bool {
if let Some(build) = self.build.take() {
pub async fn submit(&self, seal: Seal) -> bool {
if let Some(metadata) = self.metadata() {
match self.algorithm.verify(
&BlockId::Hash(build.metadata.best_hash),
&build.metadata.pre_hash,
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
&BlockId::Hash(metadata.best_hash),
&metadata.pre_hash,
metadata.pre_runtime.as_ref().map(|v| &v[..]),
&seal,
build.metadata.difficulty,
metadata.difficulty,
) {
Ok(true) => (),
Ok(false) => {
Expand All @@ -130,55 +175,92 @@ where
return false
},
}
} else {
warn!(
target: "pow",
"Unable to import mined block: metadata does not exist",
);
return false
}

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
};

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
match self.block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(
&header.hash(),
*header.number(),
&mut self.justification_sync_link,
);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
true
},
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
false
},
let build = if let Some(build) = {
let mut build = self.build.lock();
let value = build.take();
if value.is_some() {
self.increment_version();
}
value
} {
build
} else {
warn!(
target: "pow",
"Unable to import mined block: build does not exist",
);
false
return false
};

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate = PowIntermediate::<Algorithm::Difficulty> {
difficulty: Some(build.metadata.difficulty),
};

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
let mut block_import = self.block_import.lock();

match block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(
&header.hash(),
*header.number(),
&self.justification_sync_link,
);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
true
},
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
false
},
}
}
}

impl<Block, Algorithm, C, L, Proof> Clone for MiningHandle<Block, Algorithm, C, L, Proof>
where
Block: BlockT,
Algorithm: PowAlgorithm<Block>,
C: sp_api::ProvideRuntimeApi<Block>,
L: sc_consensus::JustificationSyncLink<Block>,
{
fn clone(&self) -> Self {
Self {
version: self.version.clone(),
algorithm: self.algorithm.clone(),
justification_sync_link: self.justification_sync_link.clone(),
build: self.build.clone(),
block_import: self.block_import.clone(),
}
}
}
Expand Down