Skip to content

Commit

Permalink
feat: add feasibility_check on mined solution (#483)
Browse files Browse the repository at this point in the history
* feat: add `feasibility_check` on mined solution

* update substrate

* update metadata

* fix miner tests

* update staking miner playground

* remove noise

* simplify code, remove DRY code

* Update src/epm.rs
  • Loading branch information
niklasad1 authored Mar 31, 2023
1 parent b1a9780 commit 96e7798
Show file tree
Hide file tree
Showing 20 changed files with 3,228 additions and 1,598 deletions.
998 changes: 692 additions & 306 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ tokio = { version = "1.27", features = ["macros", "rt-multi-thread", "sync", "si
pin-project-lite = "0.2"

# subxt
subxt = "0.27.1"
subxt = { git = "https://github.com/paritytech/subxt" }
scale-value = "0.6.0"

# substrate
frame-election-provider-support = "16.0.0"
pallet-election-provider-multi-phase = "15.0.0"
sp-npos-elections = "14.0.0"
frame-support = "16.0.0"
sp-core = "16.0.0"
sp-runtime = "18.0.0"
sp-io = { git = "https://github.com/paritytech/substrate" }
frame-system = { git = "https://github.com/paritytech/substrate" }
frame-election-provider-support = { git = "https://github.com/paritytech/substrate" }
pallet-election-provider-multi-phase = { git = "https://github.com/paritytech/substrate" }
sp-npos-elections = { git = "https://github.com/paritytech/substrate" }
frame-support = { git = "https://github.com/paritytech/substrate" }
sp-version = { git = "https://github.com/paritytech/substrate" }
sp-runtime = { git = "https://github.com/paritytech/substrate" }

# prometheus
prometheus = "0.13"
Expand All @@ -38,7 +40,6 @@ once_cell = "1.17"
[dev-dependencies]
assert_cmd = "2.0"
sp-storage = "11.0.0"
sp-keyring = "18.0.0"
regex = "1"

[features]
Expand Down
Binary file modified artifacts/metadata.scale
Binary file not shown.
9 changes: 8 additions & 1 deletion src/dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ where

log::info!(target: LOG_TARGET, "Loaded account {}, {:?}", signer, account_info);

let round = api
.storage()
.at(config.at)
.await?
.fetch_or_default(&runtime::storage().election_provider_multi_phase().round())
.await?;

let (solution, score, _size) =
epm::fetch_snapshot_and_mine_solution::<T>(&api, config.at, config.solver).await?;
epm::fetch_snapshot_and_mine_solution::<T>(&api, config.at, config.solver, round).await?;

let round = api
.storage()
Expand Down
113 changes: 69 additions & 44 deletions src/epm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
use crate::{helpers::RuntimeDispatchInfo, opt::Solver, prelude::*, static_types};
use codec::{Decode, Encode};
use frame_election_provider_support::{NposSolution, PhragMMS, SequentialPhragmen};
use frame_support::{weights::Weight, BoundedVec};
use frame_support::weights::Weight;
use pallet_election_provider_multi_phase::{RawSolution, SolutionOf, SolutionOrSnapshotSize};
use runtime::runtime_types::pallet_election_provider_multi_phase::RoundSnapshot;
use scale_info::{PortableRegistry, TypeInfo};
use scale_value::scale::{decode_as_type, TypeId};
use sp_core::Bytes;
Expand All @@ -30,6 +29,11 @@ use subxt::{dynamic::Value, rpc::rpc_params, tx::DynamicTxPayload};

const EPM_PALLET_NAME: &str = "ElectionProviderMultiPhase";

type MinerVoterOf =
frame_election_provider_support::Voter<AccountId, crate::static_types::MaxVotesPerVoter>;

type RoundSnapshot = pallet_election_provider_multi_phase::RoundSnapshot<AccountId, MinerVoterOf>;

#[derive(Copy, Clone, Debug)]
struct EpmConstant {
epm: &'static str,
Expand Down Expand Up @@ -57,33 +61,26 @@ pub(crate) async fn update_metadata_constants(api: &SubxtClient) -> Result<(), E
const SIGNED_MAX_WEIGHT: EpmConstant = EpmConstant::new("SignedMaxWeight");
const MAX_LENGTH: EpmConstant = EpmConstant::new("MinerMaxLength");
const MAX_VOTES_PER_VOTER: EpmConstant = EpmConstant::new("MinerMaxVotesPerVoter");
const MAX_WINNERS: EpmConstant = EpmConstant::new("MinerMaxWinners");

fn log_metadata(metadata: EpmConstant, val: impl std::fmt::Display) {
log::trace!(target: LOG_TARGET, "updating metadata constant `{metadata}`: {val}",);
}

let max_weight = read_constant::<Weight>(api, SIGNED_MAX_WEIGHT)?;
let max_length: u32 = read_constant(api, MAX_LENGTH)?;
let max_votes_per_voter: u32 = read_constant(api, MAX_VOTES_PER_VOTER)?;
let max_winners: u32 = read_constant(api, MAX_WINNERS)?;

log::trace!(
target: LOG_TARGET,
"updating metadata constant `{}`: {}",
SIGNED_MAX_WEIGHT.to_string(),
max_weight.ref_time()
);
log::trace!(
target: LOG_TARGET,
"updating metadata constant `{}`: {}",
MAX_LENGTH.to_string(),
max_length
);
log::trace!(
target: LOG_TARGET,
"updating metadata constant `{}`: {}",
MAX_VOTES_PER_VOTER.to_string(),
max_votes_per_voter
);
log_metadata(SIGNED_MAX_WEIGHT, max_weight);
log_metadata(MAX_LENGTH, max_length);
log_metadata(MAX_VOTES_PER_VOTER, max_votes_per_voter);
log_metadata(MAX_WINNERS, max_winners);

static_types::MaxWeight::set(max_weight);
static_types::MaxLength::set(max_length);
static_types::MaxVotesPerVoter::set(max_votes_per_voter);
static_types::MaxWinners::set(max_winners);

Ok(())
}
Expand Down Expand Up @@ -150,12 +147,28 @@ pub async fn signed_submission_at<S: NposSolution + Decode + TypeInfo + 'static>
}
}

/// Helper to the signed submissions at the block `at`.
pub async fn snapshot_at(at: Option<Hash>, api: &SubxtClient) -> Result<RoundSnapshot, Error> {
let empty = Vec::<Value>::new();
let addr = subxt::dynamic::storage(EPM_PALLET_NAME, "Snapshot", empty);

match api.storage().at(at).await?.fetch(&addr).await {
Ok(Some(val)) => {
let snapshot = Decode::decode(&mut val.encoded())?;
Ok(snapshot)
},
Ok(None) => Err(Error::EmptySnapshot),
Err(err) => Err(err.into()),
}
}

/// Helper to fetch snapshot data via RPC
/// and compute an NPos solution via [`pallet_election_provider_multi_phase`].
pub async fn fetch_snapshot_and_mine_solution<T>(
api: &SubxtClient,
hash: Option<Hash>,
solver: Solver,
round: u32,
) -> Result<(SolutionOf<T>, ElectionScore, SolutionOrSnapshotSize), Error>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
Expand All @@ -164,31 +177,24 @@ where
+ 'static,
T::Solution: Send,
{
let RoundSnapshot { voters, targets } = api
let snapshot = snapshot_at(hash, &api).await?;
let desired_targets = api
.storage()
.at(hash)
.await?
.fetch(&runtime::storage().election_provider_multi_phase().snapshot())
.fetch(&runtime::storage().election_provider_multi_phase().desired_targets())
.await?
.unwrap_or_default();
.expect("Snapshot is non-empty; `desired_target` should exist; qed");

let desired_targets = api
let minimum_untrusted_score = api
.storage()
.at(hash)
.await?
.fetch(&runtime::storage().election_provider_multi_phase().desired_targets())
.await?
.unwrap_or_default();

let voters: Vec<_> = voters
.into_iter()
.map(|(a, b, mut c)| {
let mut bounded_vec: BoundedVec<AccountId, static_types::MaxVotesPerVoter> = BoundedVec::default();
// If this fails just crash the task.
bounded_vec.try_append(&mut c.0).unwrap_or_else(|_| panic!("BoundedVec capacity: {} failed; `MinerConfig::MaxVotesPerVoter` is different from the chain data; this is a bug please file an issue", static_types::MaxVotesPerVoter::get()));
(a, b, bounded_vec)
})
.collect();
.fetch(&runtime::storage().election_provider_multi_phase().minimum_untrusted_score())
.await?;

let voters = snapshot.voters.clone();
let targets = snapshot.targets.clone();

let blocking_task = tokio::task::spawn_blocking(move || match solver {
Solver::SeqPhragmen { iterations } => {
Expand All @@ -209,9 +215,24 @@ where
.await;

match blocking_task {
Ok(Ok(res)) => Ok(res),
Ok(Ok((solution, score, solution_or_snapshot))) => {
match Miner::<T>::feasibility_check(
RawSolution { solution: solution.clone(), score, round },
pallet_election_provider_multi_phase::ElectionCompute::Signed,
desired_targets,
snapshot,
round,
minimum_untrusted_score,
) {
Ok(_) => Ok((solution, score, solution_or_snapshot)),
Err(e) => {
log::error!(target: LOG_TARGET, "Solution feasibility error {:?}", e);
Err(Error::Feasibility(format!("{:?}", e)))
},
}
},
Ok(Err(err)) => Err(Error::Other(format!("{:?}", err))),
Err(err) => Err(Error::Other(format!("{:?}", err))),
Err(err) => Err(err.into()),
}
}

Expand Down Expand Up @@ -278,14 +299,18 @@ pub async fn runtime_api_solution_weight<S: Encode + NposSolution + TypeInfo + '
}

/// Helper to mock the votes based on `voters` and `desired_targets`.
pub fn mock_votes(voters: u32, desired_targets: u16) -> Vec<(u32, u16)> {
assert!(voters >= desired_targets as u32);
(0..voters).zip((0..desired_targets).cycle()).collect()
pub fn mock_votes(voters: u32, desired_targets: u16) -> Option<Vec<(u32, u16)>> {
if voters >= desired_targets as u32 {
Some((0..voters).zip((0..desired_targets).cycle()).collect())
} else {
None
}
}

#[cfg(test)]
#[test]
fn mock_votes_works() {
assert_eq!(mock_votes(3, 2), vec![(0, 0), (1, 1), (2, 0)]);
assert_eq!(mock_votes(3, 3), vec![(0, 0), (1, 1), (2, 2)]);
assert_eq!(mock_votes(3, 2), Some(vec![(0, 0), (1, 1), (2, 0)]));
assert_eq!(mock_votes(3, 3), Some(vec![(0, 0), (1, 1), (2, 2)]));
assert_eq!(mock_votes(2, 3), None);
}
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prelude::*;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to parse log directive: `{0}´")]
Expand Down Expand Up @@ -48,4 +50,10 @@ pub enum Error {
SubscriptionClosed,
#[error("Dynamic transaction error: {0}")]
DynamicTransaction(String),
#[error("Feasibility error: {0}")]
Feasibility(String),
#[error("{0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("Empty snapshot")]
EmptySnapshot,
}
65 changes: 65 additions & 0 deletions src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::Error, prelude::*};
use codec::Decode;
use frame_support::weights::Weight;
use jsonrpsee::{core::Error as JsonRpseeError, types::error::CallError};
use pin_project_lite::pin_project;
use serde::Deserialize;
use std::{
Expand All @@ -24,6 +26,7 @@ use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use subxt::error::{Error as SubxtError, RpcError};

pin_project! {
pub struct Timed<Fut>
Expand Down Expand Up @@ -73,3 +76,65 @@ pub struct RuntimeDispatchInfo {
/// Weight of this dispatch.
pub weight: Weight,
}

pub fn kill_main_task_if_critical_err(tx: &tokio::sync::mpsc::UnboundedSender<Error>, err: Error) {
match err {
Error::AlreadySubmitted |
Error::BetterScoreExist |
Error::IncorrectPhase |
Error::TransactionRejected(_) |
Error::JoinError(_) |
Error::Feasibility(_) |
Error::EmptySnapshot |
Error::SubscriptionClosed => {},
Error::Subxt(SubxtError::Rpc(rpc_err)) => {
log::debug!(target: LOG_TARGET, "rpc error: {:?}", rpc_err);

match rpc_err {
RpcError::ClientError(e) => {
let jsonrpsee_err = match e.downcast::<JsonRpseeError>() {
Ok(e) => *e,
Err(_) => {
let _ = tx.send(Error::Other(
"Failed to downcast RPC error; this is a bug please file an issue"
.to_string(),
));
return
},
};

match jsonrpsee_err {
JsonRpseeError::Call(CallError::Custom(e)) => {
const BAD_EXTRINSIC_FORMAT: i32 = 1001;
const VERIFICATION_ERROR: i32 = 1002;
use jsonrpsee::types::error::ErrorCode;

// Check if the transaction gets fatal errors from the `author` RPC.
// It's possible to get other errors such as outdated nonce and similar
// but then it should be possible to try again in the next block or round.
if e.code() == BAD_EXTRINSIC_FORMAT ||
e.code() == VERIFICATION_ERROR || e.code() ==
ErrorCode::MethodNotFound.code()
{
let _ = tx.send(Error::Subxt(SubxtError::Rpc(
RpcError::ClientError(Box::new(CallError::Custom(e))),
)));
}
},
JsonRpseeError::Call(CallError::Failed(_)) => {},
JsonRpseeError::RequestTimeout => {},
err => {
let _ = tx.send(Error::Subxt(SubxtError::Rpc(RpcError::ClientError(
Box::new(err),
))));
},
}
},
RpcError::SubscriptionDropped => (),
}
},
err => {
let _ = tx.send(err);
},
}
}
Loading

0 comments on commit 96e7798

Please sign in to comment.