Skip to content

Commit

Permalink
update subxt
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed May 19, 2023
1 parent d1c1833 commit 2306a6c
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 220 deletions.
325 changes: 186 additions & 139 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pin-project-lite = "0.2"

# subxt
subxt = { git = "https://github.com/paritytech/subxt" }
scale-value = "0.6.0"
scale-value = "0.7.0"

# substrate
sp-io = { git = "https://github.com/paritytech/substrate" }
Expand Down
32 changes: 14 additions & 18 deletions src/commands/dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
use pallet_election_provider_multi_phase::RawSolution;

use crate::{epm, error::Error, opt::Solver, prelude::*, signer::Signer, static_types};
use crate::{
epm, error::Error, helpers::storage_at, opt::Solver, prelude::*, signer::Signer, static_types,
};
use clap::Parser;
use codec::Encode;

#[derive(Debug, Clone, Parser)]
#[cfg_attr(test, derive(PartialEq))]
pub struct DryRunConfig {
/// The block hash at which scraping happens. If none is provided, the latest head is used.
#[clap(long)]
pub at: Option<Hash>,
#[clap(long, default_value_t = Block::Latest)]
pub at: Block,

/// The solver algorithm to use.
#[clap(subcommand)]
Expand Down Expand Up @@ -62,10 +64,8 @@ where
+ 'static,
T::Solution: Send,
{
let round = api
.storage()
.at(config.at)
.await?
let storage = storage_at(config.at, &api).await?;
let round = storage
.fetch_or_default(&runtime::storage().election_provider_multi_phase().round())
.await?;

Expand All @@ -78,10 +78,7 @@ where
)
.await?;

let round = api
.storage()
.at(config.at)
.await?
let round = storage
.fetch(&runtime::storage().election_provider_multi_phase().round())
.await?
.unwrap_or(1);
Expand All @@ -107,10 +104,7 @@ where
// we've logged the solution above and we do nothing else.
if let Some(seed_or_path) = &config.seed_or_path {
let signer = Signer::new(seed_or_path)?;
let account_info = api
.storage()
.at(None)
.await?
let account_info = storage
.fetch(&runtime::storage().system().account(signer.account_id()))
.await?
.ok_or(Error::AccountDoesNotExists)?;
Expand All @@ -122,11 +116,13 @@ where
let xt =
api.tx()
.create_signed_with_nonce(&tx, &*signer, nonce, ExtrinsicParams::default())?;
let outcome = api.rpc().dry_run(xt.encoded(), config.at).await?;
let dry_run_bytes = api.rpc().dry_run(xt.encoded(), config.at.into()).await?;

let dry_run_result = dry_run_bytes.into_dry_run_result(&api.metadata())?;

log::info!(target: LOG_TARGET, "dry-run outcome is {:?}", outcome);
log::info!(target: LOG_TARGET, "dry-run outcome is {:?}", dry_run_result);

outcome.map_err(|e| Error::Other(format!("{e:?}")))?;
//dry_run_bytes.map_err(|e| Error::Other(format!("{e:?}")))?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/commands/emergency_solution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use clap::Parser;
#[cfg_attr(test, derive(PartialEq))]
pub struct EmergencySolutionConfig {
/// The block hash at which scraping happens. If none is provided, the latest head is used.
#[clap(long)]
pub at: Option<Hash>,
#[clap(long, default_value_t = Block::Latest)]
pub at: Block,

/// The solver algorithm to use.
#[clap(subcommand)]
Expand Down
59 changes: 33 additions & 26 deletions src/commands/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ use pallet_election_provider_multi_phase::{RawSolution, SolutionOf};
use sp_runtime::Perbill;
use std::{str::FromStr, sync::Arc};
use subxt::{
config::Header as _, error::RpcError, rpc::Subscription, tx::TxStatus, Error as SubxtError,
config::Header as _,
error::RpcError,
rpc::{types::DryRunResult, Subscription},
tx::TxStatus,
Error as SubxtError,
};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -171,7 +175,7 @@ where
let account_info = {
let addr = runtime::storage().system().account(signer.account_id());
api.storage()
.at(None)
.at_latest()
.await?
.fetch(&addr)
.await?
Expand Down Expand Up @@ -233,7 +237,7 @@ where

let account_info = api
.storage()
.at(None)
.at_latest()
.await?
.fetch(&runtime::storage().system().account(signer.account_id()))
.await?
Expand All @@ -258,15 +262,15 @@ where
+ 'static,
T::Solution: Send,
{
let hash = at.hash();
log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash);
let block_hash = at.hash();
log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, block_hash);

// NOTE: as we try to send at each block then the nonce is used guard against
// submitting twice. Because once a solution has been accepted on chain
// the "next transaction" at a later block but with the same nonce will be rejected
let nonce = api.rpc().system_account_next_index(signer.account_id()).await?;

ensure_signed_phase(&api, hash)
ensure_signed_phase(&api, block_hash)
.inspect_err(|e| {
log::debug!(
target: LOG_TARGET,
Expand All @@ -279,8 +283,7 @@ where

let round_fut = async {
api.storage()
.at(Some(hash))
.await?
.at(block_hash)
.fetch_or_default(&runtime::storage().election_provider_multi_phase().round())
.await
};
Expand All @@ -289,7 +292,7 @@ where
.inspect_err(|e| log::error!(target: LOG_TARGET, "Mining solution failed: {:?}", e))
.await?;

ensure_no_previous_solution::<T::Solution>(&api, hash, signer.account_id())
ensure_no_previous_solution::<T::Solution>(&api, block_hash, signer.account_id())
.inspect_err(|e| {
log::debug!(
target: LOG_TARGET,
Expand All @@ -305,7 +308,7 @@ where

let (solution, score) = match epm::fetch_snapshot_and_mine_solution::<T>(
&api,
Some(hash),
Block::At(block_hash),
config.solver,
round,
None,
Expand Down Expand Up @@ -405,7 +408,7 @@ where
&api,
signer,
(solution, score, round),
hash,
block_hash,
nonce,
config.listen,
config.dry_run,
Expand All @@ -430,13 +433,13 @@ where
}

/// Ensure that now is the signed phase.
async fn ensure_signed_phase(api: &SubxtClient, hash: Hash) -> Result<(), Error> {
async fn ensure_signed_phase(api: &SubxtClient, block_hash: Hash) -> Result<(), Error> {
use pallet_election_provider_multi_phase::Phase;

let addr = runtime::storage().election_provider_multi_phase().current_phase();
let phase = api.storage().at(Some(hash)).await?.fetch(&addr).await?;
let phase = api.storage().at(block_hash).fetch(&addr).await?;

if let Some(Phase::Signed) = phase {
if let Some(Phase::Signed) = phase.map(|p| p.0) {
Ok(())
} else {
Err(Error::IncorrectPhase)
Expand All @@ -446,17 +449,17 @@ async fn ensure_signed_phase(api: &SubxtClient, hash: Hash) -> Result<(), Error>
/// Ensure that our current `us` have not submitted anything previously.
async fn ensure_no_previous_solution<T>(
api: &SubxtClient,
at: Hash,
block_hash: Hash,
us: &AccountId,
) -> Result<(), Error>
where
T: NposSolution + scale_info::TypeInfo + Decode + 'static,
{
let addr = runtime::storage().election_provider_multi_phase().signed_submission_indices();
let indices = api.storage().at(Some(at)).await?.fetch_or_default(&addr).await?;
let indices = api.storage().at(block_hash).fetch_or_default(&addr).await?;

for (_score, _, idx) in indices.0 {
let submission = epm::signed_submission_at::<T>(idx, at, api).await?;
let submission = epm::signed_submission_at::<T>(idx, Block::At(block_hash), api).await?;

if let Some(submission) = submission {
if &submission.who == us {
Expand All @@ -470,7 +473,7 @@ where

async fn ensure_solution_passes_strategy(
api: &SubxtClient,
at: Hash,
block: Hash,
score: sp_npos_elections::ElectionScore,
strategy: SubmissionStrategy,
) -> Result<(), Error> {
Expand All @@ -480,14 +483,14 @@ async fn ensure_solution_passes_strategy(
}

let addr = runtime::storage().election_provider_multi_phase().signed_submission_indices();
let indices = api.storage().at(Some(at)).await?.fetch_or_default(&addr).await?;
let indices = api.storage().at(block).fetch_or_default(&addr).await?;

log::debug!(target: LOG_TARGET, "submitted solutions: {:?}", indices.0);

if indices
.0
.last()
.map_or(true, |(best_score, _, _)| score_passes_strategy(score, *best_score, strategy))
.map_or(true, |(best_score, _, _)| score_passes_strategy(score, best_score.0, strategy))
{
Ok(())
} else {
Expand All @@ -499,7 +502,7 @@ async fn submit_and_watch_solution<T: MinerConfig + Send + Sync + 'static>(
api: &SubxtClient,
signer: Signer,
(solution, score, round): (SolutionOf<T>, sp_npos_elections::ElectionScore, u32),
hash: Hash,
block_hash: Hash,
nonce: u32,
listen: Listen,
dry_run: bool,
Expand All @@ -511,10 +514,14 @@ async fn submit_and_watch_solution<T: MinerConfig + Send + Sync + 'static>(
.create_signed_with_nonce(&tx, &*signer, nonce, ExtrinsicParams::default())?;

if dry_run {
match api.rpc().dry_run(xt.encoded(), None).await? {
Ok(()) => (),
Err(e) => return Err(Error::TransactionRejected(format!("{e:?}"))),
};
let dry_run_bytes = api.rpc().dry_run(xt.encoded(), Some(block_hash)).await?;

match dry_run_bytes.into_dry_run_result(&api.metadata())? {
DryRunResult::Success => (),
DryRunResult::DispatchError(e) => return Err(Error::TransactionRejected(e.to_string())),
DryRunResult::TransactionValidityError =>
return Err(Error::TransactionRejected("TransactionValidityError".into())),
}
}

let mut status_sub = xt.submit_and_watch().await.map_err(|e| {
Expand All @@ -529,7 +536,7 @@ async fn submit_and_watch_solution<T: MinerConfig + Send + Sync + 'static>(
log::error!(
target: LOG_TARGET,
"watch submit extrinsic at {:?} failed: {:?}",
hash,
block_hash,
err
);
return Err(err.into())
Expand Down
Loading

0 comments on commit 2306a6c

Please sign in to comment.