diff --git a/cli/src/lib.rs b/cli/src/lib.rs index eb631467a21..021fb137326 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -251,7 +251,7 @@ impl Iroha { }); let state = Arc::new(state); - let queue = Arc::new(Queue::from_config(config.queue)); + let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone())); match Self::start_telemetry(&logger, &config).await? { TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"), TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"), diff --git a/client/benches/tps/utils.rs b/client/benches/tps/utils.rs index d215d1ce203..3b82ca153e2 100644 --- a/client/benches/tps/utils.rs +++ b/client/benches/tps/utils.rs @@ -18,6 +18,7 @@ use iroha_client::{ prelude::*, }, }; +use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus}; use serde::Deserialize; use test_network::*; @@ -172,9 +173,7 @@ impl MeasurerUnit { fn spawn_event_counter(&self) -> thread::JoinHandle> { let listener = self.client.clone(); let (init_sender, init_receiver) = mpsc::channel(); - let event_filter = PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .for_status(PipelineStatusKind::Committed); + let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied); let blocks_expected = self.config.blocks as usize; let name = self.name; let handle = thread::spawn(move || -> Result<()> { diff --git a/client/src/client.rs b/client/src/client.rs index ce942c1752c..7dfb8a9d4c6 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -14,7 +14,10 @@ use eyre::{eyre, Result, WrapErr}; use futures_util::StreamExt; use http_default::{AsyncWebSocketStream, WebSocketStream}; pub use iroha_config::client_api::ConfigDTO; -use iroha_data_model::query::QueryOutputBox; +use iroha_data_model::{ + events::pipeline::{BlockStatus, PipelineEventBox, TransactionEventFilter, TransactionStatus}, + query::QueryOutputBox, +}; use iroha_logger::prelude::*; use iroha_telemetry::metrics::Status; use iroha_torii_const::uri as torii_uri; @@ -605,7 +608,7 @@ impl Client { let mut event_iterator = { let event_iterator_result = tokio::time::timeout_at( deadline, - self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())), + self.listen_for_events_async(TransactionEventFilter::default().for_hash(hash)), ) .await .map_err(Into::into) @@ -631,17 +634,34 @@ impl Client { event_iterator: &mut AsyncEventStream, hash: HashOf, ) -> Result> { + let mut block_height = None; + while let Some(event) = event_iterator.next().await { - if let Event::Pipeline(this_event) = event? { - match this_event.status() { - PipelineStatus::Validating => {} - PipelineStatus::Rejected(ref reason) => { - return Err(reason.clone().into()); + if let EventBox::Pipeline(this_event) = event? { + match this_event { + PipelineEventBox::Transaction(transaction_event) => { + match transaction_event.status() { + TransactionStatus::Queued => {} + TransactionStatus::Approved => { + block_height = transaction_event.block_height; + } + TransactionStatus::Rejected(reason) => { + return Err(reason.clone().into()); + } + TransactionStatus::Expired => return Err(eyre!("Transaction expired")), + } + } + PipelineEventBox::Block(block_event) => { + if Some(block_event.height) == block_height { + if let BlockStatus::Applied = block_event.status() { + return Ok(hash); + } + } } - PipelineStatus::Committed => return Ok(hash), } } } + Err(eyre!( "Connection dropped without `Committed` or `Rejected` event" )) @@ -904,9 +924,7 @@ impl Client { pub fn listen_for_events( &self, event_filter: impl Into, - ) -> Result>> { - let event_filter = event_filter.into(); - iroha_logger::trace!(?event_filter); + ) -> Result>> { events_api::EventIterator::new(self.events_handler(event_filter)?) } @@ -919,8 +937,6 @@ impl Client { &self, event_filter: impl Into + Send, ) -> Result { - let event_filter = event_filter.into(); - iroha_logger::trace!(?event_filter, "Async listening with"); events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await } @@ -933,8 +949,11 @@ impl Client { &self, event_filter: impl Into, ) -> Result { + let event_filter = event_filter.into(); + iroha_logger::trace!(?event_filter); + events_api::flow::Init::new( - event_filter.into(), + event_filter, self.headers.clone(), self.torii_url .join(torii_uri::SUBSCRIPTION) @@ -1284,7 +1303,7 @@ pub mod events_api { pub struct Events; impl FlowEvents for Events { - type Event = crate::data_model::prelude::Event; + type Event = crate::data_model::prelude::EventBox; fn message(&self, message: Vec) -> Result { let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?; diff --git a/client/src/config.rs b/client/src/config.rs index 34b7e8663c7..72bb909d8c7 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -9,7 +9,7 @@ use iroha_config::{ base, base::{FromEnv, StdEnv, UnwrapPartial}, }; -use iroha_crypto::prelude::*; +use iroha_crypto::KeyPair; use iroha_data_model::{prelude::*, ChainId}; use iroha_primitives::small::SmallStr; use serde::{Deserialize, Serialize}; diff --git a/client/tests/integration/asset.rs b/client/tests/integration/asset.rs index fe95e30f348..34a102afd93 100644 --- a/client/tests/integration/asset.rs +++ b/client/tests/integration/asset.rs @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config; use iroha_data_model::{ asset::{AssetId, AssetValue, AssetValueType}, isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError}, + transaction::error::TransactionRejectionReason, }; use serde_json::json; use test_network::*; @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() { .expect_err("Should be rejected due to non integer value"); let rejection_reason = err - .downcast_ref::() - .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason")); + .downcast_ref::() + .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason")); assert_eq!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate( - InstructionEvaluationError::Type(TypeError::from(Mismatch { + &TransactionRejectionReason::Validation(ValidationFail::InstructionFailed( + InstructionExecutionError::Evaluate(InstructionEvaluationError::Type( + TypeError::from(Mismatch { expected: AssetValueType::Numeric(NumericSpec::integer()), actual: AssetValueType::Numeric(NumericSpec::fractional(2)) - })) + }) )) )) ); diff --git a/client/tests/integration/domain_owner_permissions.rs b/client/tests/integration/domain_owner_permissions.rs index e0945b85f70..af78eff12ac 100644 --- a/client/tests/integration/domain_owner_permissions.rs +++ b/client/tests/integration/domain_owner_permissions.rs @@ -3,6 +3,7 @@ use iroha_client::{ crypto::KeyPair, data_model::{account::SignatureCheckCondition, prelude::*}, }; +use iroha_data_model::transaction::error::TransactionRejectionReason; use serde_json::json; use test_network::*; @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> { .expect_err("Tx should fail due to permissions"); let rejection_reason = err - .downcast_ref::() - .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason")); + .downcast_ref::() + .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason")); assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); // "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner diff --git a/client/tests/integration/events/pipeline.rs b/client/tests/integration/events/pipeline.rs index 30f17528219..ed9857e2f4f 100644 --- a/client/tests/integration/events/pipeline.rs +++ b/client/tests/integration/events/pipeline.rs @@ -9,6 +9,14 @@ use iroha_client::{ }, }; use iroha_config::parameters::actual::Root as Config; +use iroha_data_model::{ + events::pipeline::{ + BlockEvent, BlockEventFilter, BlockStatus, TransactionEventFilter, TransactionStatus, + }, + isi::error::InstructionExecutionError, + transaction::error::TransactionRejectionReason, + ValidationFail, +}; use test_network::*; // Needed to re-enable ignored tests. @@ -17,24 +25,28 @@ const PEER_COUNT: usize = 7; #[ignore = "ignore, more in #2851"] #[test] fn transaction_with_no_instructions_should_be_committed() -> Result<()> { - test_with_instruction_and_status_and_port(None, PipelineStatusKind::Committed, 10_250) + test_with_instruction_and_status_and_port(None, &TransactionStatus::Approved, 10_250) } #[ignore = "ignore, more in #2851"] // #[ignore = "Experiment"] #[test] fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> { - let fail = Fail::new("Should be rejected".to_owned()); + let msg = "Should be rejected".to_owned(); + + let fail = Fail::new(msg.clone()); test_with_instruction_and_status_and_port( Some(fail.into()), - PipelineStatusKind::Rejected, + &TransactionStatus::Rejected(Box::new(TransactionRejectionReason::Validation( + ValidationFail::InstructionFailed(InstructionExecutionError::Fail(msg)), + ))), 10_350, ) } fn test_with_instruction_and_status_and_port( instruction: Option, - should_be: PipelineStatusKind, + should_be: &TransactionStatus, port: u16, ) -> Result<()> { let (_rt, network, client) = @@ -56,9 +68,9 @@ fn test_with_instruction_and_status_and_port( let mut handles = Vec::new(); for listener in clients { let checker = Checker { listener, hash }; - let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating); + let handle_validating = checker.clone().spawn(TransactionStatus::Queued); handles.push(handle_validating); - let handle_validated = checker.spawn(should_be); + let handle_validated = checker.spawn(should_be.clone()); handles.push(handle_validated); } // When @@ -78,15 +90,14 @@ struct Checker { } impl Checker { - fn spawn(self, status_kind: PipelineStatusKind) -> JoinHandle<()> { + fn spawn(self, status_kind: TransactionStatus) -> JoinHandle<()> { thread::spawn(move || { let mut event_iterator = self .listener .listen_for_events( - PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Transaction) + TransactionEventFilter::default() .for_status(status_kind) - .for_hash(*self.hash), + .for_hash(self.hash), ) .expect("Failed to create event iterator."); let event_result = event_iterator.next().expect("Stream closed"); @@ -96,13 +107,11 @@ impl Checker { } #[test] -fn committed_block_must_be_available_in_kura() { +fn applied_block_must_be_available_in_kura() { let (_rt, peer, client) = ::new().with_port(11_040).start_with_runtime(); wait_for_genesis_committed(&[client.clone()], 0); - let event_filter = PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .for_status(PipelineStatusKind::Committed); + let event_filter = BlockEventFilter::default().for_status(BlockStatus::Committed); let mut event_iter = client .listen_for_events(event_filter) .expect("Failed to subscribe for events"); @@ -111,21 +120,17 @@ fn committed_block_must_be_available_in_kura() { .submit(Fail::new("Dummy instruction".to_owned())) .expect("Failed to submit transaction"); - let event = event_iter.next().expect("Block must be committed"); - let Ok(Event::Pipeline(PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash, - })) = event - else { - panic!("Received unexpected event") - }; - let hash = HashOf::from_untyped_unchecked(hash); + let event: BlockEvent = event_iter + .next() + .expect("Block must be committed") + .expect("Block must be committed") + .try_into() + .expect("Received unexpected event"); peer.iroha .as_ref() .expect("Must be some") .kura - .get_block_height_by_hash(&hash) + .get_block_by_height(event.height) .expect("Block committed event was received earlier"); } diff --git a/client/tests/integration/permissions.rs b/client/tests/integration/permissions.rs index e7fea53ac18..9a4578b8660 100644 --- a/client/tests/integration/permissions.rs +++ b/client/tests/integration/permissions.rs @@ -6,7 +6,9 @@ use iroha_client::{ crypto::KeyPair, data_model::prelude::*, }; -use iroha_data_model::permission::PermissionToken; +use iroha_data_model::{ + permission::PermissionToken, transaction::error::TransactionRejectionReason, +}; use iroha_genesis::GenesisNetwork; use serde_json::json; use test_network::{PeerBuilder, *}; @@ -104,14 +106,12 @@ fn permissions_disallow_asset_transfer() { .submit_transaction_blocking(&transfer_tx) .expect_err("Transaction was not rejected."); let rejection_reason = err - .downcast_ref::() - .expect("Error {err} is not PipelineRejectionReason"); + .downcast_ref::() + .expect("Error {err} is not TransactionRejectionReason"); //Then assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); let alice_assets = get_assets(&iroha_client, &alice_id); assert_eq!(alice_assets, alice_start_assets); @@ -156,14 +156,12 @@ fn permissions_disallow_asset_burn() { .submit_transaction_blocking(&burn_tx) .expect_err("Transaction was not rejected."); let rejection_reason = err - .downcast_ref::() - .expect("Error {err} is not PipelineRejectionReason"); + .downcast_ref::() + .expect("Error {err} is not TransactionRejectionReason"); assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); let alice_assets = get_assets(&iroha_client, &alice_id); diff --git a/client/tests/integration/roles.rs b/client/tests/integration/roles.rs index 12a03f333c1..6f260e3709f 100644 --- a/client/tests/integration/roles.rs +++ b/client/tests/integration/roles.rs @@ -6,6 +6,7 @@ use iroha_client::{ crypto::KeyPair, data_model::prelude::*, }; +use iroha_data_model::transaction::error::TransactionRejectionReason; use serde_json::json; use test_network::*; @@ -164,14 +165,12 @@ fn role_with_invalid_permissions_is_not_accepted() -> Result<()> { .expect_err("Submitting role with invalid permission token should fail"); let rejection_reason = err - .downcast_ref::() - .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason")); + .downcast_ref::() + .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason")); assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); Ok(()) diff --git a/client/tests/integration/triggers/time_trigger.rs b/client/tests/integration/triggers/time_trigger.rs index 1f29a0d8ba9..c81b60ba4c3 100644 --- a/client/tests/integration/triggers/time_trigger.rs +++ b/client/tests/integration/triggers/time_trigger.rs @@ -6,6 +6,7 @@ use iroha_client::{ data_model::{prelude::*, transaction::WasmSmartContract}, }; use iroha_config::parameters::defaults::chain_wide::DEFAULT_CONSENSUS_ESTIMATION; +use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus}; use iroha_logger::info; use test_network::*; @@ -272,10 +273,8 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> { /// Get block committed event listener fn get_block_committed_event_listener( client: &Client, -) -> Result>> { - let block_filter = PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .for_status(PipelineStatusKind::Committed); +) -> Result>> { + let block_filter = BlockEventFilter::default().for_status(BlockStatus::Committed); client.listen_for_events(block_filter) } @@ -292,7 +291,7 @@ fn get_asset_value(client: &mut Client, asset_id: AssetId) -> Numeric { /// Submit some sample ISIs to create new blocks fn submit_sample_isi_on_every_block_commit( - block_committed_event_listener: impl Iterator>, + block_committed_event_listener: impl Iterator>, test_client: &mut Client, account_id: &AccountId, timeout: Duration, diff --git a/client_cli/src/main.rs b/client_cli/src/main.rs index 807d504a280..5173858371f 100644 --- a/client_cli/src/main.rs +++ b/client_cli/src/main.rs @@ -249,13 +249,17 @@ mod filter { mod events { + use iroha_client::data_model::events::pipeline::{BlockEventFilter, TransactionEventFilter}; + use super::*; /// Get event stream from iroha peer #[derive(clap::Subcommand, Debug, Clone, Copy)] pub enum Args { - /// Gets pipeline events - Pipeline, + /// Gets block pipeline events + BlockPipeline, + /// Gets transaction pipeline events + TransactionPipeline, /// Gets data events Data, /// Get execute trigger events @@ -267,7 +271,8 @@ mod events { impl RunArgs for Args { fn run(self, context: &mut dyn RunContext) -> Result<()> { match self { - Args::Pipeline => listen(PipelineEventFilter::new(), context), + Args::TransactionPipeline => listen(TransactionEventFilter::default(), context), + Args::BlockPipeline => listen(BlockEventFilter::default(), context), Args::Data => listen(DataEventFilter::Any, context), Args::ExecuteTrigger => listen(ExecuteTriggerEventFilter::new(), context), Args::TriggerCompleted => listen(TriggerCompletedEventFilter::new(), context), diff --git a/configs/swarm/executor.wasm b/configs/swarm/executor.wasm index fb05db1652e..e5562d19600 100644 Binary files a/configs/swarm/executor.wasm and b/configs/swarm/executor.wasm differ diff --git a/core/benches/blocks/apply_blocks.rs b/core/benches/blocks/apply_blocks.rs index f85921695d5..104f6728dca 100644 --- a/core/benches/blocks/apply_blocks.rs +++ b/core/benches/blocks/apply_blocks.rs @@ -39,7 +39,7 @@ impl StateApplyBlocks { let state = build_state(rt, &account_id, &key_pair); instructions .into_iter() - .map(|instructions| -> Result<_> { + .map(|instructions| { let mut state_block = state.block(); let block = create_block( &mut state_block, @@ -47,11 +47,11 @@ impl StateApplyBlocks { account_id.clone(), &key_pair, ); - state_block.apply_without_execution(&block)?; + let _wsv_events = state_block.apply_without_execution(&block); state_block.commit(); - Ok(block) + block }) - .collect::, _>>()? + .collect::>() }; Ok(Self { state, blocks }) diff --git a/core/benches/blocks/common.rs b/core/benches/blocks/common.rs index e4070b458c5..d88514f7c9f 100644 --- a/core/benches/blocks/common.rs +++ b/core/benches/blocks/common.rs @@ -42,7 +42,9 @@ pub fn create_block( ) .chain(0, state) .sign(key_pair) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .unwrap(); // Verify that transactions are valid diff --git a/core/benches/blocks/validate_blocks.rs b/core/benches/blocks/validate_blocks.rs index 3390d7aaebe..6aa027d5f65 100644 --- a/core/benches/blocks/validate_blocks.rs +++ b/core/benches/blocks/validate_blocks.rs @@ -1,4 +1,3 @@ -use eyre::Result; use iroha_core::{prelude::*, state::State}; use iroha_data_model::{isi::InstructionBox, prelude::*}; @@ -21,11 +20,11 @@ impl StateValidateBlocks { /// - Failed to parse [`AccountId`] /// - Failed to generate [`KeyPair`] /// - Failed to create instructions for block - pub fn setup(rt: &tokio::runtime::Handle) -> Result { + pub fn setup(rt: &tokio::runtime::Handle) -> Self { let domains = 100; let accounts_per_domain = 1000; let assets_per_domain = 1000; - let account_id: AccountId = "alice@wonderland".parse()?; + let account_id: AccountId = "alice@wonderland".parse().unwrap(); let key_pair = KeyPair::random(); let state = build_state(rt, &account_id, &key_pair); @@ -38,12 +37,12 @@ impl StateValidateBlocks { .into_iter() .collect::>(); - Ok(Self { + Self { state, instructions, key_pair, account_id, - }) + } } /// Run benchmark body. @@ -61,7 +60,7 @@ impl StateValidateBlocks { key_pair, account_id, }: Self, - ) -> Result<()> { + ) { for (instructions, i) in instructions.into_iter().zip(1..) { let mut state_block = state.block(); let block = create_block( @@ -70,11 +69,9 @@ impl StateValidateBlocks { account_id.clone(), &key_pair, ); - state_block.apply_without_execution(&block)?; + let _wsv_events = state_block.apply_without_execution(&block); assert_eq!(state_block.height(), i); state_block.commit(); } - - Ok(()) } } diff --git a/core/benches/blocks/validate_blocks_benchmark.rs b/core/benches/blocks/validate_blocks_benchmark.rs index 454e07e3f4c..c3592b506f2 100644 --- a/core/benches/blocks/validate_blocks_benchmark.rs +++ b/core/benches/blocks/validate_blocks_benchmark.rs @@ -15,10 +15,8 @@ fn validate_blocks(c: &mut Criterion) { group.significance_level(0.1).sample_size(10); group.bench_function("validate_blocks", |b| { b.iter_batched( - || StateValidateBlocks::setup(rt.handle()).expect("Failed to setup benchmark"), - |bench| { - StateValidateBlocks::measure(bench).expect("Failed to execute benchmark"); - }, + || StateValidateBlocks::setup(rt.handle()), + StateValidateBlocks::measure, criterion::BatchSize::SmallInput, ); }); diff --git a/core/benches/blocks/validate_blocks_oneshot.rs b/core/benches/blocks/validate_blocks_oneshot.rs index 118ce739b99..8c8b20b1343 100644 --- a/core/benches/blocks/validate_blocks_oneshot.rs +++ b/core/benches/blocks/validate_blocks_oneshot.rs @@ -20,6 +20,6 @@ fn main() { } iroha_logger::test_logger(); iroha_logger::info!("Starting..."); - let bench = StateValidateBlocks::setup(rt.handle()).expect("Failed to setup benchmark"); - StateValidateBlocks::measure(bench).expect("Failed to execute bnechmark"); + let bench = StateValidateBlocks::setup(rt.handle()); + StateValidateBlocks::measure(bench); } diff --git a/core/benches/kura.rs b/core/benches/kura.rs index 06f78dcfc9b..521e242f60e 100644 --- a/core/benches/kura.rs +++ b/core/benches/kura.rs @@ -56,6 +56,7 @@ async fn measure_block_size_for_n_executors(n_executors: u32) { BlockBuilder::new(vec![tx], topology, Vec::new()) .chain(0, &mut state_block) .sign(&KeyPair::random()) + .unpack(|_| {}) }; for _ in 1..n_executors { diff --git a/core/benches/validation.rs b/core/benches/validation.rs index 8aff8c01ce0..d7e5459f090 100644 --- a/core/benches/validation.rs +++ b/core/benches/validation.rs @@ -186,7 +186,7 @@ fn sign_blocks(criterion: &mut Criterion) { b.iter_batched( || block.clone(), |block| { - let _: ValidBlock = block.sign(&key_pair); + let _: ValidBlock = block.sign(&key_pair).unpack(|_| {}); count += 1; }, BatchSize::SmallInput, diff --git a/core/src/block.rs b/core/src/block.rs index 4a6f210502e..9d90862cc8a 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -18,6 +18,7 @@ use iroha_genesis::GenesisTransaction; use iroha_primitives::unique_vec::UniqueVec; use thiserror::Error; +pub(crate) use self::event::WithEvents; pub use self::{chained::Chained, commit::CommittedBlock, valid::ValidBlock}; use crate::{prelude::*, sumeragi::network_topology::Topology, tx::AcceptTransactionFail}; @@ -110,7 +111,7 @@ mod pending { /// Transaction will be validated when block is chained. transactions: Vec, /// Event recommendations for use in triggers and off-chain work - event_recommendations: Vec, + event_recommendations: Vec, } impl BlockBuilder { @@ -123,7 +124,7 @@ mod pending { pub fn new( transactions: Vec, commit_topology: Topology, - event_recommendations: Vec, + event_recommendations: Vec, ) -> Self { assert!(!transactions.is_empty(), "Empty block created"); @@ -222,16 +223,16 @@ mod chained { impl BlockBuilder { /// Sign this block and get [`SignedBlock`]. - pub fn sign(self, key_pair: &KeyPair) -> ValidBlock { + pub fn sign(self, key_pair: &KeyPair) -> WithEvents { let signature = SignatureOf::new(key_pair, &self.0 .0); - ValidBlock( + WithEvents::new(ValidBlock( SignedBlockV1 { payload: self.0 .0, signatures: SignaturesOf::from(signature), } .into(), - ) + )) } } } @@ -245,7 +246,7 @@ mod valid { /// Block that was validated and accepted #[derive(Debug, Clone)] #[repr(transparent)] - pub struct ValidBlock(pub(crate) SignedBlock); + pub struct ValidBlock(pub(super) SignedBlock); impl ValidBlock { /// Validate a block against the current state of the world. @@ -264,7 +265,7 @@ mod valid { topology: &Topology, expected_chain_id: &ChainId, state_block: &mut StateBlock<'_>, - ) -> Result { + ) -> WithEvents> { if !block.header().is_genesis() { let actual_commit_topology = block.commit_topology(); let expected_commit_topology = &topology.ordered_peers; @@ -272,20 +273,23 @@ mod valid { if actual_commit_topology != expected_commit_topology { let actual_commit_topology = actual_commit_topology.clone(); - return Err(( + return WithEvents::new(Err(( block, BlockValidationError::TopologyMismatch { expected: expected_commit_topology.clone(), actual: actual_commit_topology, }, - )); + ))); } if topology .filter_signatures_by_roles(&[Role::Leader], block.signatures()) .is_empty() { - return Err((block, SignatureVerificationError::LeaderMissing.into())); + return WithEvents::new(Err(( + block, + SignatureVerificationError::LeaderMissing.into(), + ))); } } @@ -293,48 +297,51 @@ mod valid { let actual_height = block.header().height; if expected_block_height != actual_height { - return Err(( + return WithEvents::new(Err(( block, BlockValidationError::LatestBlockHeightMismatch { expected: expected_block_height, actual: actual_height, }, - )); + ))); } let expected_previous_block_hash = state_block.latest_block_hash(); let actual_block_hash = block.header().previous_block_hash; if expected_previous_block_hash != actual_block_hash { - return Err(( + return WithEvents::new(Err(( block, BlockValidationError::LatestBlockHashMismatch { expected: expected_previous_block_hash, actual: actual_block_hash, }, - )); + ))); } if block .transactions() .any(|tx| state_block.has_transaction(tx.as_ref().hash())) { - return Err((block, BlockValidationError::HasCommittedTransactions)); + return WithEvents::new(Err(( + block, + BlockValidationError::HasCommittedTransactions, + ))); } if let Err(error) = Self::validate_transactions(&block, expected_chain_id, state_block) { - return Err((block, error.into())); + return WithEvents::new(Err((block, error.into()))); } let SignedBlock::V1(block) = block; - Ok(ValidBlock( + WithEvents::new(Ok(ValidBlock( SignedBlockV1 { payload: block.payload, signatures: block.signatures, } .into(), - )) + ))) } fn validate_transactions( @@ -379,24 +386,33 @@ mod valid { /// /// - Not enough signatures /// - Not signed by proxy tail - pub(crate) fn commit_with_signatures( + pub fn commit_with_signatures( mut self, topology: &Topology, signatures: SignaturesOf, - ) -> Result { + ) -> WithEvents> { if topology .filter_signatures_by_roles(&[Role::Leader], &signatures) .is_empty() { - return Err((self, SignatureVerificationError::LeaderMissing.into())); + return WithEvents::new(Err(( + self, + SignatureVerificationError::LeaderMissing.into(), + ))); } if !self.as_ref().signatures().is_subset(&signatures) { - return Err((self, SignatureVerificationError::SignatureMissing.into())); + return WithEvents::new(Err(( + self, + SignatureVerificationError::SignatureMissing.into(), + ))); } if !self.0.replace_signatures(signatures) { - return Err((self, SignatureVerificationError::UnknownSignature.into())); + return WithEvents::new(Err(( + self, + SignatureVerificationError::UnknownSignature.into(), + ))); } self.commit(topology) @@ -411,19 +427,19 @@ mod valid { pub fn commit( self, topology: &Topology, - ) -> Result { + ) -> WithEvents> { if !self.0.header().is_genesis() { if let Err(err) = self.verify_signatures(topology) { - return Err((self, err.into())); + return WithEvents::new(Err((self, err.into()))); } } - Ok(CommittedBlock(self)) + WithEvents::new(Ok(CommittedBlock(self))) } /// Add additional signatures for [`Self`]. #[must_use] - pub fn sign(self, key_pair: &KeyPair) -> Self { + pub fn sign(self, key_pair: &KeyPair) -> ValidBlock { ValidBlock(self.0.sign(key_pair)) } @@ -458,6 +474,7 @@ mod valid { event_recommendations: Vec::new(), })) .sign(&KeyPair::random()) + .unpack(|_| {}) } /// Check if block's signatures meet requirements for given topology. @@ -628,31 +645,7 @@ mod commit { /// Represents a block accepted by consensus. /// Every [`Self`] will have a different height. #[derive(Debug, Clone)] - pub struct CommittedBlock(pub(crate) ValidBlock); - - impl CommittedBlock { - pub(crate) fn produce_events(&self) -> Vec { - let tx = self.as_ref().transactions().map(|tx| { - let status = tx.error.as_ref().map_or_else( - || PipelineStatus::Committed, - |error| PipelineStatus::Rejected(error.clone().into()), - ); - - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status, - hash: tx.as_ref().hash().into(), - } - }); - let current_block = core::iter::once(PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash: self.as_ref().hash().into(), - }); - - tx.chain(current_block).collect() - } - } + pub struct CommittedBlock(pub(super) ValidBlock); impl From for ValidBlock { fn from(source: CommittedBlock) -> Self { @@ -666,12 +659,105 @@ mod commit { } } - // Invariants of [`CommittedBlock`] can't be violated through immutable reference impl AsRef for CommittedBlock { fn as_ref(&self) -> &SignedBlock { &self.0 .0 } } + + #[cfg(test)] + impl AsMut for CommittedBlock { + fn as_mut(&mut self) -> &mut SignedBlock { + &mut self.0 .0 + } + } +} + +mod event { + use super::*; + + pub trait EventProducer { + fn produce_events(&self) -> impl Iterator; + } + + #[derive(Debug)] + #[must_use] + pub struct WithEvents(B); + + impl WithEvents { + pub(super) fn new(source: B) -> Self { + Self(source) + } + } + + impl WithEvents> { + pub fn unpack(self, f: F) -> Result { + match self.0 { + Ok(ok) => Ok(WithEvents(ok).unpack(f)), + Err(err) => Err(WithEvents(err).unpack(f)), + } + } + } + impl WithEvents { + pub fn unpack(self, f: F) -> B { + self.0.produce_events().for_each(f); + self.0 + } + } + + impl WithEvents<(B, E)> { + pub(crate) fn unpack(self, f: F) -> (B, E) { + self.0 .1.produce_events().for_each(f); + self.0 + } + } + + impl EventProducer for ValidBlock { + fn produce_events(&self) -> impl Iterator { + let block_height = self.as_ref().header().height; + + let tx_events = self.as_ref().transactions().map(move |tx| { + let status = tx.error.as_ref().map_or_else( + || TransactionStatus::Approved, + |error| TransactionStatus::Rejected(error.clone().into()), + ); + + TransactionEvent { + block_height: Some(block_height), + hash: tx.as_ref().hash(), + status, + } + }); + + let block_event = core::iter::once(BlockEvent { + height: block_height, + status: BlockStatus::Approved, + }); + + tx_events + .map(PipelineEventBox::from) + .chain(block_event.map(Into::into)) + } + } + + impl EventProducer for CommittedBlock { + fn produce_events(&self) -> impl Iterator { + let block_height = self.as_ref().header().height; + + let block_event = core::iter::once(BlockEvent { + height: block_height, + status: BlockStatus::Committed, + }); + + block_event.map(Into::into) + } + } + + impl EventProducer for BlockValidationError { + fn produce_events(&self) -> impl Iterator { + core::iter::empty() + } + } } #[cfg(test)] @@ -690,7 +776,11 @@ mod tests { pub fn committed_and_valid_block_hashes_are_equal() { let valid_block = ValidBlock::new_dummy(); let topology = Topology::new(UniqueVec::new()); - let committed_block = valid_block.clone().commit(&topology).unwrap(); + let committed_block = valid_block + .clone() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); assert_eq!( valid_block.0.hash_of_payload(), @@ -733,13 +823,26 @@ mod tests { let topology = Topology::new(UniqueVec::new()); let valid_block = BlockBuilder::new(transactions, topology, Vec::new()) .chain(0, &mut state_block) - .sign(&alice_keys); + .sign(&alice_keys) + .unpack(|_| {}); // The first transaction should be confirmed - assert!(valid_block.0.transactions().next().unwrap().error.is_none()); + assert!(valid_block + .as_ref() + .transactions() + .next() + .unwrap() + .error + .is_none()); // The second transaction should be rejected - assert!(valid_block.0.transactions().nth(1).unwrap().error.is_some()); + assert!(valid_block + .as_ref() + .transactions() + .nth(1) + .unwrap() + .error + .is_some()); } #[tokio::test] @@ -795,13 +898,26 @@ mod tests { let topology = Topology::new(UniqueVec::new()); let valid_block = BlockBuilder::new(transactions, topology, Vec::new()) .chain(0, &mut state_block) - .sign(&alice_keys); + .sign(&alice_keys) + .unpack(|_| {}); // The first transaction should fail - assert!(valid_block.0.transactions().next().unwrap().error.is_some()); + assert!(valid_block + .as_ref() + .transactions() + .next() + .unwrap() + .error + .is_some()); // The third transaction should succeed - assert!(valid_block.0.transactions().nth(2).unwrap().error.is_none()); + assert!(valid_block + .as_ref() + .transactions() + .nth(2) + .unwrap() + .error + .is_none()); } #[tokio::test] @@ -852,17 +968,30 @@ mod tests { let topology = Topology::new(UniqueVec::new()); let valid_block = BlockBuilder::new(transactions, topology, Vec::new()) .chain(0, &mut state_block) - .sign(&alice_keys); + .sign(&alice_keys) + .unpack(|_| {}); // The first transaction should be rejected assert!( - valid_block.0.transactions().next().unwrap().error.is_some(), + valid_block + .as_ref() + .transactions() + .next() + .unwrap() + .error + .is_some(), "The first transaction should be rejected, as it contains `Fail`." ); // The second transaction should be accepted assert!( - valid_block.0.transactions().nth(1).unwrap().error.is_none(), + valid_block + .as_ref() + .transactions() + .nth(1) + .unwrap() + .error + .is_none(), "The second transaction should be accepted." ); } diff --git a/core/src/lib.rs b/core/src/lib.rs index ab0b9be0d6b..06a0bd4103f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -18,7 +18,7 @@ use core::time::Duration; use gossiper::TransactionGossip; use indexmap::IndexSet; -use iroha_data_model::prelude::*; +use iroha_data_model::{events::EventBox, prelude::*}; use iroha_primitives::unique_vec::UniqueVec; use parity_scale_codec::{Decode, Encode}; use tokio::sync::broadcast; @@ -41,8 +41,8 @@ pub type PeersIds = UniqueVec; /// Parameters set. pub type Parameters = IndexSet; -/// Type of `Sender` which should be used for channels of `Event` messages. -pub type EventsSender = broadcast::Sender; +/// Type of `Sender` which should be used for channels of `Event` messages. +pub type EventsSender = broadcast::Sender; /// The network message #[derive(Clone, Debug, Encode, Decode)] diff --git a/core/src/queue.rs b/core/src/queue.rs index d463a655a4c..4d69fc46722 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -8,13 +8,17 @@ use eyre::Result; use indexmap::IndexSet; use iroha_config::parameters::actual::Queue as Config; use iroha_crypto::HashOf; -use iroha_data_model::{account::AccountId, transaction::prelude::*}; +use iroha_data_model::{ + account::AccountId, + events::pipeline::{TransactionEvent, TransactionStatus}, + transaction::prelude::*, +}; use iroha_logger::{trace, warn}; use iroha_primitives::must_use::MustUse; use rand::seq::IteratorRandom; use thiserror::Error; -use crate::prelude::*; +use crate::{prelude::*, EventsSender}; impl AcceptedTransaction { // TODO: We should have another type of transaction like `CheckedTransaction` in the type system? @@ -48,6 +52,7 @@ impl AcceptedTransaction { /// Multiple producers, single consumer #[derive(Debug)] pub struct Queue { + events_sender: EventsSender, /// The queue for transactions tx_hashes: ArrayQueue>, /// [`AcceptedTransaction`]s addressed by `Hash` @@ -96,8 +101,9 @@ pub struct Failure { impl Queue { /// Makes queue from configuration - pub fn from_config(cfg: Config) -> Self { + pub fn from_config(cfg: Config, events_sender: EventsSender) -> Self { Self { + events_sender, tx_hashes: ArrayQueue::new(cfg.capacity.get()), accepted_txs: DashMap::new(), txs_per_user: DashMap::new(), @@ -226,6 +232,14 @@ impl Queue { err: Error::Full, } })?; + let _ = self.events_sender.send( + TransactionEvent { + hash, + block_height: None, + status: TransactionStatus::Queued, + } + .into(), + ); trace!("Transaction queue length = {}", self.tx_hashes.len(),); Ok(()) } @@ -285,7 +299,6 @@ impl Queue { state_view, max_txs_in_block, &mut transactions, - &mut Vec::new(), ); transactions } @@ -298,17 +311,16 @@ impl Queue { state_view: &StateView, max_txs_in_block: usize, transactions: &mut Vec, - expired_transactions: &mut Vec, ) { if transactions.len() >= max_txs_in_block { return; } let mut seen_queue = Vec::new(); - let mut expired_transactions_queue = Vec::new(); + let mut expired_transactions = Vec::new(); let txs_from_queue = core::iter::from_fn(|| { - self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions_queue) + self.pop_from_queue(&mut seen_queue, state_view, &mut expired_transactions) }); let transactions_hashes: IndexSet> = @@ -322,7 +334,17 @@ impl Queue { .into_iter() .try_for_each(|hash| self.tx_hashes.push(hash)) .expect("Exceeded the number of transactions pending"); - expired_transactions.extend(expired_transactions_queue); + + expired_transactions + .into_iter() + .map(|tx| TransactionEvent { + hash: tx.as_ref().hash(), + block_height: None, + status: TransactionStatus::Expired, + }) + .for_each(|e| { + let _ = self.events_sender.send(e.into()); + }); } /// Check that the user adhered to the maximum transaction per user limit and increment their transaction count. @@ -381,6 +403,21 @@ pub mod tests { PeersIds, }; + impl Queue { + pub fn test(cfg: Config) -> Self { + Self { + events_sender: tokio::sync::broadcast::Sender::new(1), + tx_hashes: ArrayQueue::new(cfg.capacity.get()), + accepted_txs: DashMap::new(), + txs_per_user: DashMap::new(), + capacity: cfg.capacity, + capacity_per_user: cfg.capacity_per_user, + tx_time_to_live: cfg.transaction_time_to_live, + future_threshold: cfg.future_threshold, + } + } + } + fn accepted_tx(account_id: &str, key: &KeyPair) -> AcceptedTransaction { let chain_id = ChainId::from("0"); @@ -437,7 +474,7 @@ pub mod tests { )); let state_view = state.view(); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); queue .push(accepted_tx("alice@wonderland", &key_pair), &state_view) @@ -458,7 +495,7 @@ pub mod tests { )); let state_view = state.view(); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), capacity, ..Config::default() @@ -504,7 +541,7 @@ pub mod tests { }; let state_view = state.view(); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); let instructions: [InstructionBox; 0] = []; let tx = TransactionBuilder::new(chain_id.clone(), "alice@wonderland".parse().expect("Valid")) @@ -560,7 +597,7 @@ pub mod tests { query_handle, )); let state_view = state.view(); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), ..config_factory() }); @@ -590,7 +627,7 @@ pub mod tests { state_block.transactions.insert(tx.as_ref().hash(), 1); state_block.commit(); let state_view = state.view(); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); assert!(matches!( queue.push(tx, &state_view), Err(Failure { @@ -613,7 +650,7 @@ pub mod tests { query_handle, ); let tx = accepted_tx("alice@wonderland", &alice_key); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); queue.push(tx.clone(), &state.view()).unwrap(); let mut state_block = state.block(); state_block.transactions.insert(tx.as_ref().hash(), 1); @@ -639,7 +676,7 @@ pub mod tests { query_handle, )); let state_view = state.view(); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_millis(300), ..config_factory() }); @@ -687,7 +724,7 @@ pub mod tests { query_handle, )); let state_view = state.view(); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); queue .push(accepted_tx("alice@wonderland", &alice_key), &state_view) .expect("Failed to push tx into queue"); @@ -722,7 +759,9 @@ pub mod tests { query_handle, )); let state_view = state.view(); - let queue = Queue::from_config(config_factory()); + let mut queue = Queue::test(config_factory()); + let (event_sender, mut event_receiver) = tokio::sync::broadcast::channel(1); + queue.events_sender = event_sender; let instructions = [Fail { message: "expired".to_owned(), }]; @@ -737,18 +776,26 @@ pub mod tests { max_instruction_number: 4096, max_wasm_size_bytes: 0, }; + let tx_hash = tx.hash(); let tx = AcceptedTransaction::accept(tx, &chain_id, &limits) .expect("Failed to accept Transaction."); queue .push(tx.clone(), &state_view) .expect("Failed to push tx into queue"); let mut txs = Vec::new(); - let mut expired_txs = Vec::new(); thread::sleep(Duration::from_millis(TTL_MS)); - queue.get_transactions_for_block(&state_view, max_txs_in_block, &mut txs, &mut expired_txs); + queue.get_transactions_for_block(&state_view, max_txs_in_block, &mut txs); assert!(txs.is_empty()); - assert_eq!(expired_txs.len(), 1); - assert_eq!(expired_txs[0], tx); + + assert_eq!( + event_receiver.recv().await.unwrap(), + TransactionEvent { + hash: tx_hash, + block_height: None, + status: TransactionStatus::Expired, + } + .into() + ) } #[test] @@ -763,7 +810,7 @@ pub mod tests { query_handle, )); - let queue = Arc::new(Queue::from_config(Config { + let queue = Arc::new(Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), capacity: 100_000_000.try_into().unwrap(), ..Config::default() @@ -837,7 +884,7 @@ pub mod tests { )); let state_view = state.view(); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { future_threshold, ..Config::default() }); @@ -898,7 +945,7 @@ pub mod tests { let query_handle = LiveQueryStore::test().start(); let state = State::new(world, kura, query_handle); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), capacity: 100.try_into().unwrap(), capacity_per_user: 1.try_into().unwrap(), diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index 1b8f8715ad8..e74c18ee217 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -316,7 +316,9 @@ mod tests { let first_block = BlockBuilder::new(transactions.clone(), topology.clone(), Vec::new()) .chain(0, &mut state_block) .sign(&ALICE_KEYS) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .expect("Block is valid"); state_block.apply(&first_block)?; @@ -326,7 +328,9 @@ mod tests { let block = BlockBuilder::new(transactions.clone(), topology.clone(), Vec::new()) .chain(0, &mut state_block) .sign(&ALICE_KEYS) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .expect("Block is valid"); state_block.apply(&block)?; @@ -466,7 +470,9 @@ mod tests { let vcb = BlockBuilder::new(vec![va_tx.clone()], topology.clone(), Vec::new()) .chain(0, &mut state_block) .sign(&ALICE_KEYS) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .expect("Block is valid"); state_block.apply(&vcb)?; diff --git a/core/src/smartcontracts/isi/triggers/set.rs b/core/src/smartcontracts/isi/triggers/set.rs index d7bfca0b769..63d7732e92b 100644 --- a/core/src/smartcontracts/isi/triggers/set.rs +++ b/core/src/smartcontracts/isi/triggers/set.rs @@ -58,8 +58,8 @@ type WasmSmartContractMap = IndexMap, (WasmSmartContra pub struct Set { /// Triggers using [`DataEventFilter`] data_triggers: IndexMap>, - /// Triggers using [`PipelineEventFilter`] - pipeline_triggers: IndexMap>, + /// Triggers using [`PipelineEventFilterBox`] + pipeline_triggers: IndexMap>, /// Triggers using [`TimeEventFilter`] time_triggers: IndexMap>, /// Triggers using [`ExecuteTriggerEventFilter`] @@ -70,7 +70,7 @@ pub struct Set { original_contracts: WasmSmartContractMap, /// List of actions that should be triggered by events provided by `handle_*` methods. /// Vector is used to save the exact triggers order. - matched_ids: Vec<(Event, TriggerId)>, + matched_ids: Vec<(EventBox, TriggerId)>, } /// Helper struct for serializing triggers. @@ -177,7 +177,7 @@ impl<'de> DeserializeSeed<'de> for WasmSeed<'_, Set> { "pipeline_triggers" => { let triggers: IndexMap< TriggerId, - SpecializedAction, + SpecializedAction, > = map.next_value()?; for (id, action) in triggers { set.add_pipeline_trigger( @@ -259,7 +259,7 @@ impl Set { }) } - /// Add trigger with [`PipelineEventFilter`] + /// Add trigger with [`PipelineEventFilterBox`] /// /// Return `false` if a trigger with given id already exists /// @@ -270,7 +270,7 @@ impl Set { pub fn add_pipeline_trigger( &mut self, engine: &wasmtime::Engine, - trigger: SpecializedTrigger, + trigger: SpecializedTrigger, ) -> Result { self.add_to(engine, trigger, TriggeringEventType::Pipeline, |me| { &mut me.pipeline_triggers @@ -721,18 +721,6 @@ impl Set { }; } - /// Handle [`PipelineEvent`]. - /// - /// Find all actions that are triggered by `event` and store them. - /// These actions are inspected in the next [`Set::inspect_matched()`] call. - // Passing by value to follow other `handle_` methods interface - #[allow(clippy::needless_pass_by_value)] - pub fn handle_pipeline_event(&mut self, event: PipelineEvent) { - self.pipeline_triggers.iter().for_each(|entry| { - Self::match_and_insert_trigger(&mut self.matched_ids, event.clone(), entry) - }); - } - /// Handle [`TimeEvent`]. /// /// Find all actions that are triggered by `event` and store them. @@ -747,7 +735,7 @@ impl Set { continue; } - let ids = core::iter::repeat_with(|| (Event::Time(event), id.clone())).take( + let ids = core::iter::repeat_with(|| (EventBox::Time(event), id.clone())).take( count .try_into() .expect("`u32` should always fit in `usize`"), @@ -761,8 +749,8 @@ impl Set { /// Skips insertion: /// - If the action's filter doesn't match an event /// - If the action's repeats count equals to 0 - fn match_and_insert_trigger, F: EventFilter>( - matched_ids: &mut Vec<(Event, TriggerId)>, + fn match_and_insert_trigger, F: EventFilter>( + matched_ids: &mut Vec<(EventBox, TriggerId)>, event: E, (id, action): (&TriggerId, &LoadedAction), ) { @@ -825,7 +813,7 @@ impl Set { } /// Extract `matched_id` - pub fn extract_matched_ids(&mut self) -> Vec<(Event, TriggerId)> { + pub fn extract_matched_ids(&mut self) -> Vec<(EventBox, TriggerId)> { core::mem::take(&mut self.matched_ids) } } diff --git a/core/src/smartcontracts/isi/triggers/specialized.rs b/core/src/smartcontracts/isi/triggers/specialized.rs index 09e898b126d..24aa7b34500 100644 --- a/core/src/smartcontracts/isi/triggers/specialized.rs +++ b/core/src/smartcontracts/isi/triggers/specialized.rs @@ -103,7 +103,7 @@ macro_rules! impl_try_from_box { impl_try_from_box! { Data => DataEventFilter, - Pipeline => PipelineEventFilter, + Pipeline => PipelineEventFilterBox, Time => TimeEventFilter, ExecuteTrigger => ExecuteTriggerEventFilter, } @@ -228,7 +228,7 @@ mod tests { .unwrap() } TriggeringEventFilterBox::Pipeline(_) => { - SpecializedTrigger::::try_from(boxed) + SpecializedTrigger::::try_from(boxed) .map(|_| ()) .unwrap() } diff --git a/core/src/smartcontracts/wasm.rs b/core/src/smartcontracts/wasm.rs index dd8df4bd163..25f27e25675 100644 --- a/core/src/smartcontracts/wasm.rs +++ b/core/src/smartcontracts/wasm.rs @@ -465,7 +465,7 @@ pub mod state { #[derive(Constructor)] pub struct Trigger { /// Event which activated this trigger - pub(in super::super) triggering_event: Event, + pub(in super::super) triggering_event: EventBox, } pub mod executor { @@ -977,7 +977,7 @@ impl<'wrld, 'block: 'wrld, 'state: 'block> Runtime Result<()> { let span = wasm_log_span!("Trigger execution", %id, %authority); let state = state::Trigger::new( diff --git a/core/src/state.rs b/core/src/state.rs index b9291530cbf..a018c7d2217 100644 --- a/core/src/state.rs +++ b/core/src/state.rs @@ -7,7 +7,12 @@ use iroha_crypto::HashOf; use iroha_data_model::{ account::AccountId, block::SignedBlock, - events::trigger_completed::{TriggerCompletedEvent, TriggerCompletedOutcome}, + events::{ + pipeline::BlockEvent, + time::TimeEvent, + trigger_completed::{TriggerCompletedEvent, TriggerCompletedOutcome}, + EventBox, + }, isi::error::{InstructionExecutionError as Error, MathError}, parameter::{Parameter, ParameterValueBox}, permission::{PermissionTokenSchema, Permissions}, @@ -95,7 +100,7 @@ pub struct WorldBlock<'world> { /// Runtime Executor pub(crate) executor: CellBlock<'world, Executor>, /// Events produced during execution of block - pub(crate) events_buffer: Vec, + events_buffer: Vec, } /// Struct for single transaction's aggregated changes @@ -126,7 +131,7 @@ pub struct WorldTransaction<'block, 'world> { /// Wrapper for event's buffer to apply transaction rollback struct TransactionEventBuffer<'block> { /// Events produced during execution of block - events_buffer: &'block mut Vec, + events_buffer: &'block mut Vec, /// Number of events produced during execution current transaction events_created_in_transaction: usize, } @@ -285,7 +290,7 @@ impl World { } } - /// Create struct to apply block's changes while reverting changes made in the latest block + /// Create struct to apply block's changes while reverting changes made in the latest block pub fn block_and_revert(&self) -> WorldBlock { WorldBlock { parameters: self.parameters.block_and_revert(), @@ -895,14 +900,14 @@ impl WorldTransaction<'_, '_> { } impl TransactionEventBuffer<'_> { - fn push(&mut self, event: Event) { + fn push(&mut self, event: EventBox) { self.events_created_in_transaction += 1; self.events_buffer.push(event); } } -impl Extend for TransactionEventBuffer<'_> { - fn extend>(&mut self, iter: T) { +impl Extend for TransactionEventBuffer<'_> { + fn extend>(&mut self, iter: T) { let len_before = self.events_buffer.len(); self.events_buffer.extend(iter); let len_after = self.events_buffer.len(); @@ -1183,13 +1188,10 @@ impl<'state> StateBlock<'state> { deprecated(note = "This function is to be used in testing only. ") )] #[iroha_logger::log(skip_all, fields(block_height))] - pub fn apply(&mut self, block: &CommittedBlock) -> Result<()> { + pub fn apply(&mut self, block: &CommittedBlock) -> Result> { self.execute_transactions(block)?; debug!("All block transactions successfully executed"); - - self.apply_without_execution(block)?; - - Ok(()) + Ok(self.apply_without_execution(block)) } /// Execute `block` transactions and store their hashes as well as @@ -1217,12 +1219,12 @@ impl<'state> StateBlock<'state> { /// Apply transactions without actually executing them. /// It's assumed that block's transaction was already executed (as part of validation for example). #[iroha_logger::log(skip_all, fields(block_height = block.as_ref().header().height))] - pub fn apply_without_execution(&mut self, block: &CommittedBlock) -> Result<()> { + pub fn apply_without_execution(&mut self, block: &CommittedBlock) -> Vec { let block_hash = block.as_ref().hash(); trace!(%block_hash, "Applying block"); let time_event = self.create_time_event(block); - self.world.events_buffer.push(Event::Time(time_event)); + self.world.events_buffer.push(time_event.into()); let block_height = block.as_ref().header().height; block @@ -1248,8 +1250,14 @@ impl<'state> StateBlock<'state> { self.block_hashes.push(block_hash); self.apply_parameters(); - - Ok(()) + self.world.events_buffer.push( + BlockEvent { + height: block_height, + status: BlockStatus::Applied, + } + .into(), + ); + core::mem::take(&mut self.world.events_buffer) } /// Create time event using previous and current blocks @@ -1388,7 +1396,7 @@ impl StateTransaction<'_, '_> { &mut self, id: &TriggerId, action: &dyn LoadedActionTrait, - event: Event, + event: EventBox, ) -> Result<()> { use triggers::set::LoadedExecutable::*; let authority = action.authority(); @@ -1751,7 +1759,7 @@ mod tests { /// Used to inject faulty payload for testing fn payload_mut(block: &mut CommittedBlock) -> &mut BlockPayload { - let SignedBlock::V1(signed) = &mut block.0 .0; + let SignedBlock::V1(signed) = block.as_mut(); &mut signed.payload } @@ -1760,7 +1768,10 @@ mod tests { const BLOCK_CNT: usize = 10; let topology = Topology::new(UniqueVec::new()); - let block = ValidBlock::new_dummy().commit(&topology).unwrap(); + let block = ValidBlock::new_dummy() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); let kura = Kura::blank_kura_for_testing(); let query_handle = LiveQueryStore::test().start(); let state = State::new(World::default(), kura, query_handle); @@ -1788,7 +1799,10 @@ mod tests { const BLOCK_CNT: usize = 10; let topology = Topology::new(UniqueVec::new()); - let block = ValidBlock::new_dummy().commit(&topology).unwrap(); + let block = ValidBlock::new_dummy() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); let kura = Kura::blank_kura_for_testing(); let query_handle = LiveQueryStore::test().start(); let state = State::new(World::default(), kura.clone(), query_handle); diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 13bb94bb01a..8e8fcde1b3c 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -2,10 +2,7 @@ use std::sync::mpsc; use iroha_crypto::HashOf; -use iroha_data_model::{ - block::*, events::pipeline::PipelineEvent, peer::PeerId, - transaction::error::TransactionRejectionReason, -}; +use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId}; use iroha_p2p::UpdateTopology; use tracing::{span, Level}; @@ -82,17 +79,19 @@ impl Sumeragi { #[allow(clippy::needless_pass_by_value, single_use_lifetimes)] // TODO: uncomment when anonymous lifetimes are stable fn broadcast_packet_to<'peer_id>( &self, - msg: BlockMessage, + msg: impl Into, ids: impl IntoIterator + Send, ) { + let msg = msg.into(); + for peer_id in ids { self.post_packet_to(msg.clone(), peer_id); } } - fn broadcast_packet(&self, msg: BlockMessage) { + fn broadcast_packet(&self, msg: impl Into) { let broadcast = iroha_p2p::Broadcast { - data: NetworkMessage::SumeragiBlock(Box::new(msg)), + data: NetworkMessage::SumeragiBlock(Box::new(msg.into())), }; self.network.broadcast(broadcast); } @@ -116,17 +115,8 @@ impl Sumeragi { self.block_time + self.commit_time } - fn send_events(&self, events: impl IntoIterator>) { - let addr = &self.peer_id.address; - - if self.events_sender.receiver_count() > 0 { - for event in events { - self.events_sender - .send(event.into()) - .map_err(|err| warn!(%addr, ?err, "Event not sent")) - .unwrap_or(0); - } - } + fn send_event(&self, event: impl Into) { + let _ = self.events_sender.send(event.into()); } fn receive_network_packet( @@ -239,13 +229,15 @@ impl Sumeragi { &self.chain_id, &mut state_block, ) + .unpack(|e| self.send_event(e)) .and_then(|block| { block .commit(&self.current_topology) + .unpack(|e| self.send_event(e)) .map_err(|(block, error)| (block.into(), error)) }) { Ok(block) => block, - Err((_, error)) => { + Err(error) => { error!(?error, "Received invalid genesis block"); continue; } @@ -280,12 +272,14 @@ impl Sumeragi { let mut state_block = state.block(); let genesis = BlockBuilder::new(transactions, self.current_topology.clone(), vec![]) .chain(0, &mut state_block) - .sign(&self.key_pair); + .sign(&self.key_pair) + .unpack(|e| self.send_event(e)); - let genesis_msg = BlockCreated::from(genesis.clone()).into(); + let genesis_msg = BlockCreated::from(genesis.clone()); let genesis = genesis .commit(&self.current_topology) + .unpack(|e| self.send_event(e)) .expect("Genesis invalid"); assert!( @@ -324,19 +318,13 @@ impl Sumeragi { "{}", Strategy::LOG_MESSAGE, ); - state_block - .apply_without_execution(&block) - .expect("Failed to apply block on state. Bailing."); - - let state_events = core::mem::take(&mut state_block.world.events_buffer); - self.send_events(state_events); + let state_events = state_block.apply_without_execution(&block); let new_topology = Topology::recreate_topology( block.as_ref(), 0, state_block.world.peers().cloned().collect(), ); - let events = block.produce_events(); // https://github.com/hyperledger/iroha/issues/3396 // Kura should store the block only upon successful application to the internal state to avoid storing a corrupted block. @@ -346,6 +334,7 @@ impl Sumeragi { // Parameters are updated before updating public copy of sumeragi self.update_params(&state_block); self.cache_transaction(&state_block); + self.current_topology = new_topology; self.connect_peers(&self.current_topology); @@ -353,7 +342,7 @@ impl Sumeragi { state_block.commit(); // NOTE: This sends "Block committed" event, // so it should be done AFTER public facing state update - self.send_events(events); + state_events.into_iter().for_each(|e| self.send_event(e)); } fn update_params(&mut self, state_block: &StateBlock<'_>) { @@ -391,9 +380,11 @@ impl Sumeragi { trace!(%addr, %role, block_hash=%block_hash, "Block received, voting..."); let mut state_block = state.block(); - let block = match ValidBlock::validate(block, topology, &self.chain_id, &mut state_block) { + let block = match ValidBlock::validate(block, topology, &self.chain_id, &mut state_block) + .unpack(|e| self.send_event(e)) + { Ok(block) => block, - Err((_, error)) => { + Err(error) => { warn!(%addr, %role, ?error, "Block validation failed"); return None; } @@ -438,9 +429,9 @@ impl Sumeragi { // Release writer before handling block sync let _ = voting_block.take(); - match handle_block_sync(&self.chain_id, block, state) { + match handle_block_sync(&self.chain_id, block, state, &|e| self.send_event(e)) { Ok(BlockSyncOk::CommitBlock(block, state_block)) => { - self.commit_block(block, state_block) + self.commit_block(block, state_block); } Ok(BlockSyncOk::ReplaceTopBlock(block, state_block)) => { warn!( @@ -502,6 +493,7 @@ impl Sumeragi { match voted_block .block .commit_with_signatures(current_topology, signatures) + .unpack(|e| self.send_event(e)) { Ok(committed_block) => { self.commit_block(committed_block, voted_block.state_block) @@ -509,7 +501,7 @@ impl Sumeragi { Err((_, error)) => { error!(%addr, %role, %hash, ?error, "Block failed to be committed") } - }; + } } else { error!( %addr, %role, committed_block_hash=%hash, %voting_block_hash, @@ -533,8 +525,7 @@ impl Sumeragi { { let block_hash = v_block.block.as_ref().hash_of_payload(); - let msg = BlockSigned::from(v_block.block.clone()).into(); - + let msg = BlockSigned::from(v_block.block.clone()); self.broadcast_packet_to(msg, [current_topology.proxy_tail()]); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -554,7 +545,7 @@ impl Sumeragi { let block_hash = v_block.block.as_ref().hash(); self.broadcast_packet_to( - BlockSigned::from(v_block.block.clone()).into(), + BlockSigned::from(v_block.block.clone()), [current_topology.proxy_tail()], ); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -641,7 +632,8 @@ impl Sumeragi { event_recommendations, ) .chain(current_view_change_index, &mut state_block) - .sign(&self.key_pair); + .sign(&self.key_pair) + .unpack(|e| self.send_event(e)); let created_in = create_block_start_time.elapsed(); if let Some(current_topology) = current_topology.is_consensus_required() { @@ -652,22 +644,23 @@ impl Sumeragi { } *voting_block = Some(VotingBlock::new(new_block.clone(), state_block)); - let msg = BlockCreated::from(new_block).into(); + let msg = BlockCreated::from(new_block); if current_view_change_index >= 1 { self.broadcast_packet(msg); } else { self.broadcast_packet_to(msg, current_topology.voting_peers()); } } else { - match new_block.commit(current_topology) { + match new_block + .commit(current_topology) + .unpack(|e| self.send_event(e)) + { Ok(committed_block) => { - self.broadcast_packet( - BlockCommitted::from(committed_block.clone()).into(), - ); + self.broadcast_packet(BlockCommitted::from(&committed_block)); self.commit_block(committed_block, state_block); } - Err((_, error)) => error!(%addr, role=%Role::Leader, ?error), - } + Err(error) => error!(%addr, role=%Role::Leader, ?error), + }; } } } @@ -677,12 +670,15 @@ impl Sumeragi { let voted_at = voted_block.voted_at; let state_block = voted_block.state_block; - match voted_block.block.commit(current_topology) { + match voted_block + .block + .commit(current_topology) + .unpack(|e| self.send_event(e)) + { Ok(committed_block) => { info!(voting_block_hash = %committed_block.as_ref().hash(), "Block reached required number of votes"); - let msg = BlockCommitted::from(committed_block.clone()).into(); - + let msg = BlockCommitted::from(&committed_block); let current_topology = current_topology .is_consensus_required() .expect("Peer has `ProxyTail` role, which mean that current topology require consensus"); @@ -863,14 +859,11 @@ pub(crate) fn run( expired }); - let mut expired_transactions = Vec::new(); sumeragi.queue.get_transactions_for_block( &state_view, sumeragi.max_txs_in_block, &mut sumeragi.transaction_cache, - &mut expired_transactions, ); - sumeragi.send_events(expired_transactions.iter().map(expired_event)); let current_view_change_index = sumeragi .prune_view_change_proofs_and_calculate_current_index( @@ -1001,18 +994,6 @@ fn add_signatures( } } -/// Create expired pipeline event for the given transaction. -fn expired_event(txn: &AcceptedTransaction) -> Event { - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Rejected(PipelineRejectionReason::Transaction( - TransactionRejectionReason::Expired, - )), - hash: txn.as_ref().hash().into(), - } - .into() -} - /// Type enumerating early return types to reduce cyclomatic /// complexity of the main loop items and allow direct short /// circuiting with the `?` operator. Candidate for `impl @@ -1092,10 +1073,11 @@ enum BlockSyncError { }, } -fn handle_block_sync<'state>( +fn handle_block_sync<'state, F: Fn(PipelineEventBox)>( chain_id: &ChainId, block: SignedBlock, state: &'state State, + handle_events: &F, ) -> Result, (SignedBlock, BlockSyncError)> { let block_height = block.header().height; let state_height = state.view().height(); @@ -1111,9 +1093,11 @@ fn handle_block_sync<'state>( Topology::recreate_topology(&last_committed_block, view_change_index, new_peers) }; ValidBlock::validate(block, &topology, chain_id, &mut state_block) + .unpack(handle_events) .and_then(|block| { block .commit(&topology) + .unpack(handle_events) .map_err(|(block, err)| (block.into(), err)) }) .map(|block| BlockSyncOk::CommitBlock(block, state_block)) @@ -1144,9 +1128,11 @@ fn handle_block_sync<'state>( Topology::recreate_topology(&last_committed_block, view_change_index, new_peers) }; ValidBlock::validate(block, &topology, chain_id, &mut state_block) + .unpack(handle_events) .and_then(|block| { block .commit(&topology) + .unpack(handle_events) .map_err(|(block, err)| (block.into(), err)) }) .map_err(|(block, error)| (block, BlockSyncError::SoftForkBlockNotValid(error))) @@ -1214,9 +1200,13 @@ mod tests { // Creating a block of two identical transactions and validating it let block = BlockBuilder::new(vec![tx.clone(), tx], topology.clone(), Vec::new()) .chain(0, &mut state_block) - .sign(leader_key_pair); + .sign(leader_key_pair) + .unpack(|_| {}); - let genesis = block.commit(topology).expect("Block is valid"); + let genesis = block + .commit(topology) + .unpack(|_| {}) + .expect("Block is valid"); state_block.apply(&genesis).expect("Failed to apply block"); state_block.commit(); kura.store_block(genesis); @@ -1256,6 +1246,7 @@ mod tests { BlockBuilder::new(vec![tx1, tx2], topology.clone(), Vec::new()) .chain(0, &mut state_block) .sign(leader_key_pair) + .unpack(|_| {}) }; (state, kura, block.into()) @@ -1276,7 +1267,7 @@ mod tests { // Malform block to make it invalid payload_mut(&mut block).commit_topology.clear(); - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!(result, Err((_, BlockSyncError::BlockNotValid(_))))) } @@ -1292,12 +1283,14 @@ mod tests { let (state, kura, mut block) = create_data_for_test(&chain_id, &topology, &leader_key_pair); let mut state_block = state.block(); - let validated_block = - ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block).unwrap(); - let committed_block = validated_block.commit(&topology).expect("Block is valid"); - state_block - .apply_without_execution(&committed_block) - .expect("Failed to apply block"); + let committed_block = + ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .expect("Block is valid"); + let _wsv_events = state_block.apply_without_execution(&committed_block); state_block.commit(); kura.store_block(committed_block); @@ -1305,7 +1298,7 @@ mod tests { payload_mut(&mut block).commit_topology.clear(); payload_mut(&mut block).header.view_change_index = 1; - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!( result, Err((_, BlockSyncError::SoftForkBlockNotValid(_))) @@ -1324,7 +1317,7 @@ mod tests { // Change block height payload_mut(&mut block).header.height = 42; - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!( result, Err(( @@ -1348,7 +1341,7 @@ mod tests { leader_key_pair.public_key().clone(), )]); let (state, _, block) = create_data_for_test(&chain_id, &topology, &leader_key_pair); - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!(result, Ok(BlockSyncOk::CommitBlock(_, _)))) } @@ -1364,12 +1357,14 @@ mod tests { let (state, kura, mut block) = create_data_for_test(&chain_id, &topology, &leader_key_pair); let mut state_block = state.block(); - let validated_block = - ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block).unwrap(); - let committed_block = validated_block.commit(&topology).expect("Block is valid"); - state_block - .apply_without_execution(&committed_block) - .expect("Failed to apply block"); + let committed_block = + ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .expect("Block is valid"); + let _wsv_events = state_block.apply_without_execution(&committed_block); state_block.commit(); kura.store_block(committed_block); @@ -1378,7 +1373,7 @@ mod tests { // Increase block view change index payload_mut(&mut block).header.view_change_index = 42; - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!(result, Ok(BlockSyncOk::ReplaceTopBlock(_, _)))) } @@ -1397,12 +1392,14 @@ mod tests { payload_mut(&mut block).header.view_change_index = 42; let mut state_block = state.block(); - let validated_block = - ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block).unwrap(); - let committed_block = validated_block.commit(&topology).expect("Block is valid"); - state_block - .apply_without_execution(&committed_block) - .expect("Failed to apply block"); + let committed_block = + ValidBlock::validate(block.clone(), &topology, &chain_id, &mut state_block) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .expect("Block is valid"); + let _wsv_events = state_block.apply_without_execution(&committed_block); state_block.commit(); kura.store_block(committed_block); assert_eq!(state.view().latest_block_view_change_index(), 42); @@ -1410,7 +1407,7 @@ mod tests { // Decrease block view change index back payload_mut(&mut block).header.view_change_index = 0; - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!( result, Err(( @@ -1437,7 +1434,7 @@ mod tests { payload_mut(&mut block).header.view_change_index = 42; payload_mut(&mut block).header.height = 1; - let result = handle_block_sync(&chain_id, block, &state); + let result = handle_block_sync(&chain_id, block, &state, &|_| {}); assert!(matches!( result, Err(( diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs index b0a80207072..95caaf5c0f5 100644 --- a/core/src/sumeragi/message.rs +++ b/core/src/sumeragi/message.rs @@ -84,8 +84,8 @@ pub struct BlockCommitted { pub signatures: SignaturesOf, } -impl From for BlockCommitted { - fn from(block: CommittedBlock) -> Self { +impl From<&CommittedBlock> for BlockCommitted { + fn from(block: &CommittedBlock) -> Self { let block_hash = block.as_ref().hash_of_payload(); let block_signatures = block.as_ref().signatures().clone(); diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index 1e10895b992..238b9647de9 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -193,24 +193,32 @@ impl SumeragiHandle { chain_id: &ChainId, block: &SignedBlock, state_block: &mut StateBlock<'_>, + events_sender: &EventsSender, mut current_topology: Topology, ) -> Topology { // NOTE: topology need to be updated up to block's view_change_index current_topology.rotate_all_n(block.header().view_change_index); let block = ValidBlock::validate(block.clone(), ¤t_topology, chain_id, state_block) - .expect("Kura blocks should be valid") + .unpack(|e| { + let _ = events_sender.send(e.into()); + }) + .expect("Kura: Invalid block") .commit(¤t_topology) - .expect("Kura blocks should be valid"); + .unpack(|e| { + let _ = events_sender.send(e.into()); + }) + .expect("Kura: Invalid block"); if block.as_ref().header().is_genesis() { *state_block.world.trusted_peers_ids = block.as_ref().commit_topology().clone(); } - state_block.apply_without_execution(&block).expect( - "Block application in init should not fail. \ - Blocks loaded from kura assumed to be valid", - ); + state_block.apply_without_execution(&block) + .into_iter() + .for_each(|e| { + let _ = events_sender.send(e); + }); Topology::recreate_topology( block.as_ref(), @@ -278,6 +286,7 @@ impl SumeragiHandle { &common_config.chain_id, &block, &mut state_block, + &events_sender, current_topology, ); state_block.commit(); @@ -356,16 +365,21 @@ pub const PEERS_CONNECT_INTERVAL: Duration = Duration::from_secs(1); pub const TELEMETRY_INTERVAL: Duration = Duration::from_secs(5); /// Structure represents a block that is currently in discussion. -#[non_exhaustive] pub struct VotingBlock<'state> { + /// Valid Block + block: ValidBlock, /// At what time has this peer voted for this block pub voted_at: Instant, - /// Valid Block - pub block: ValidBlock, /// [`WorldState`] after applying transactions to it but before it was committed pub state_block: StateBlock<'state>, } +impl AsRef for VotingBlock<'_> { + fn as_ref(&self) -> &ValidBlock { + &self.block + } +} + impl VotingBlock<'_> { /// Construct new `VotingBlock` with current time. pub fn new(block: ValidBlock, state_block: StateBlock<'_>) -> VotingBlock { @@ -382,8 +396,8 @@ impl VotingBlock<'_> { voted_at: Instant, ) -> VotingBlock { VotingBlock { - voted_at, block, + voted_at, state_block, } } diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 012f475eda0..b7dc0e49552 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -14,7 +14,7 @@ use iroha_client::{ }; use iroha_config::parameters::actual::Root as Config; pub use iroha_core::state::StateReadOnly; -use iroha_crypto::prelude::*; +use iroha_crypto::KeyPair; use iroha_data_model::{query::QueryOutputBox, ChainId}; use iroha_genesis::{GenesisNetwork, RawGenesisBlockFile}; use iroha_logger::InstrumentFutures; @@ -54,11 +54,11 @@ pub fn get_chain_id() -> ChainId { /// Get a standardised key-pair from the hard-coded literals. pub fn get_key_pair() -> KeyPair { KeyPair::new( - PublicKey::from_str( + iroha_crypto::PublicKey::from_str( "ed01207233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C0", ).unwrap(), - PrivateKey::from_hex( - Algorithm::Ed25519, + iroha_crypto::PrivateKey::from_hex( + iroha_crypto::Algorithm::Ed25519, "9AC47ABF59B356E0BD7DCBBBB4DEC080E302156A48CA907E47CB6AEA1D32719E7233BFC89DCBD68C19FDE6CE6158225298EC1131B6A130D1AEB454C1AB5183C0" ).unwrap() ).unwrap() @@ -689,7 +689,7 @@ pub trait TestClient: Sized { fn test_with_account(api_url: &SocketAddr, keys: KeyPair, account_id: &AccountId) -> Self; /// Loop for events with filter and handler function - fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)); + fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)); /// Submit instruction with polling /// @@ -828,7 +828,7 @@ impl TestClient for Client { Client::new(config) } - fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) { + fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) { for event_result in self .listen_for_events(event_filter) .expect("Failed to create event iterator.") diff --git a/crypto/src/lib.rs b/crypto/src/lib.rs index f1662780479..aafd7868459 100755 --- a/crypto/src/lib.rs +++ b/crypto/src/lib.rs @@ -27,8 +27,6 @@ use alloc::{ }; use core::{borrow::Borrow, fmt, str::FromStr}; -#[cfg(feature = "base64")] -pub use base64; #[cfg(not(feature = "ffi_import"))] pub use blake2; use derive_more::Display; @@ -857,11 +855,6 @@ mod ffi { pub(crate) use ffi_item; } -/// The prelude re-exports most commonly used items from this crate. -pub mod prelude { - pub use super::{Algorithm, Hash, KeyPair, PrivateKey, PublicKey, Signature}; -} - #[cfg(test)] mod tests { use parity_scale_codec::{Decode, Encode}; diff --git a/data_model/src/account.rs b/data_model/src/account.rs index 2383bdc21ac..ce3f9582770 100644 --- a/data_model/src/account.rs +++ b/data_model/src/account.rs @@ -431,6 +431,8 @@ pub mod prelude { #[cfg(test)] mod tests { + #[cfg(not(feature = "std"))] + use alloc::{vec, vec::Vec}; use core::cmp::Ordering; use iroha_crypto::{KeyPair, PublicKey}; diff --git a/data_model/src/block.rs b/data_model/src/block.rs index 93ce5bec045..69ac0bae298 100644 --- a/data_model/src/block.rs +++ b/data_model/src/block.rs @@ -98,7 +98,7 @@ pub mod model { pub transactions: Vec, /// Event recommendations. #[getset(skip)] // NOTE: Unused ATM - pub event_recommendations: Vec, + pub event_recommendations: Vec, } /// Signed block diff --git a/data_model/src/events/data/filters.rs b/data_model/src/events/data/filters.rs index 4edc08c828e..92743725aaf 100644 --- a/data_model/src/events/data/filters.rs +++ b/data_model/src/events/data/filters.rs @@ -705,7 +705,6 @@ impl EventFilter for DataEventFilter { (DataEvent::Peer(event), Peer(filter)) => filter.matches(event), (DataEvent::Trigger(event), Trigger(filter)) => filter.matches(event), (DataEvent::Role(event), Role(filter)) => filter.matches(event), - (DataEvent::PermissionToken(_), PermissionTokenSchemaUpdate) => true, (DataEvent::Configuration(event), Configuration(filter)) => filter.matches(event), (DataEvent::Executor(event), Executor(filter)) => filter.matches(event), diff --git a/data_model/src/events/mod.rs b/data_model/src/events/mod.rs index 94c003526bc..b7e9c2abbe0 100644 --- a/data_model/src/events/mod.rs +++ b/data_model/src/events/mod.rs @@ -7,9 +7,11 @@ use iroha_data_model_derive::model; use iroha_macro::FromVariant; use iroha_schema::IntoSchema; use parity_scale_codec::{Decode, Encode}; +use pipeline::{BlockEvent, TransactionEvent}; use serde::{Deserialize, Serialize}; pub use self::model::*; +use self::pipeline::{BlockEventFilter, TransactionEventFilter}; pub mod data; pub mod execute_trigger; @@ -37,9 +39,9 @@ pub mod model { IntoSchema, )] #[ffi_type] - pub enum Event { + pub enum EventBox { /// Pipeline event. - Pipeline(pipeline::PipelineEvent), + Pipeline(pipeline::PipelineEventBox), /// Data event. Data(data::DataEvent), /// Time event. @@ -85,7 +87,7 @@ pub mod model { #[ffi_type(opaque)] pub enum EventFilterBox { /// Listen to pipeline events with filter. - Pipeline(pipeline::PipelineEventFilter), + Pipeline(pipeline::PipelineEventFilterBox), /// Listen to data events with filter. Data(data::DataEventFilter), /// Listen to time events with filter. @@ -116,7 +118,7 @@ pub mod model { #[ffi_type(opaque)] pub enum TriggeringEventFilterBox { /// Listen to pipeline events with filter. - Pipeline(pipeline::PipelineEventFilter), + Pipeline(pipeline::PipelineEventFilterBox), /// Listen to data events with filter. Data(data::DataEventFilter), /// Listen to time events with filter. @@ -126,6 +128,62 @@ pub mod model { } } +impl From for EventBox { + fn from(source: TransactionEvent) -> Self { + Self::Pipeline(source.into()) + } +} + +impl From for EventBox { + fn from(source: BlockEvent) -> Self { + Self::Pipeline(source.into()) + } +} + +impl From for EventFilterBox { + fn from(source: TransactionEventFilter) -> Self { + Self::Pipeline(source.into()) + } +} + +impl From for EventFilterBox { + fn from(source: BlockEventFilter) -> Self { + Self::Pipeline(source.into()) + } +} + +impl TryFrom for TransactionEvent { + type Error = iroha_macro::error::ErrorTryFromEnum; + + fn try_from(event: EventBox) -> Result { + use iroha_macro::error::ErrorTryFromEnum; + + let EventBox::Pipeline(pipeline_event) = event else { + return Err(ErrorTryFromEnum::default()); + }; + + pipeline_event + .try_into() + .map_err(|_| ErrorTryFromEnum::default()) + } +} + +impl TryFrom for BlockEvent { + type Error = iroha_macro::error::ErrorTryFromEnum; + + fn try_from(event: EventBox) -> Result { + use iroha_macro::error::ErrorTryFromEnum; + + let EventBox::Pipeline(pipeline_event) = event else { + return Err(ErrorTryFromEnum::default()); + }; + + pipeline_event + .try_into() + .map_err(|_| ErrorTryFromEnum::default()) + } +} + /// Trait for filters #[cfg(feature = "transparent_api")] pub trait EventFilter { @@ -156,25 +214,27 @@ pub trait EventFilter { #[cfg(feature = "transparent_api")] impl EventFilter for EventFilterBox { - type Event = Event; + type Event = EventBox; /// Apply filter to event. - fn matches(&self, event: &Event) -> bool { + fn matches(&self, event: &EventBox) -> bool { match (event, self) { - (Event::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), - (Event::Data(event), Self::Data(filter)) => filter.matches(event), - (Event::Time(event), Self::Time(filter)) => filter.matches(event), - (Event::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => filter.matches(event), - (Event::TriggerCompleted(event), Self::TriggerCompleted(filter)) => { + (EventBox::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), + (EventBox::Data(event), Self::Data(filter)) => filter.matches(event), + (EventBox::Time(event), Self::Time(filter)) => filter.matches(event), + (EventBox::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => { + filter.matches(event) + } + (EventBox::TriggerCompleted(event), Self::TriggerCompleted(filter)) => { filter.matches(event) } // Fail to compile in case when new variant to event or filter is added ( - Event::Pipeline(_) - | Event::Data(_) - | Event::Time(_) - | Event::ExecuteTrigger(_) - | Event::TriggerCompleted(_), + EventBox::Pipeline(_) + | EventBox::Data(_) + | EventBox::Time(_) + | EventBox::ExecuteTrigger(_) + | EventBox::TriggerCompleted(_), Self::Pipeline(_) | Self::Data(_) | Self::Time(_) @@ -187,22 +247,24 @@ impl EventFilter for EventFilterBox { #[cfg(feature = "transparent_api")] impl EventFilter for TriggeringEventFilterBox { - type Event = Event; + type Event = EventBox; /// Apply filter to event. - fn matches(&self, event: &Event) -> bool { + fn matches(&self, event: &EventBox) -> bool { match (event, self) { - (Event::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), - (Event::Data(event), Self::Data(filter)) => filter.matches(event), - (Event::Time(event), Self::Time(filter)) => filter.matches(event), - (Event::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => filter.matches(event), + (EventBox::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), + (EventBox::Data(event), Self::Data(filter)) => filter.matches(event), + (EventBox::Time(event), Self::Time(filter)) => filter.matches(event), + (EventBox::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => { + filter.matches(event) + } // Fail to compile in case when new variant to event or filter is added ( - Event::Pipeline(_) - | Event::Data(_) - | Event::Time(_) - | Event::ExecuteTrigger(_) - | Event::TriggerCompleted(_), + EventBox::Pipeline(_) + | EventBox::Data(_) + | EventBox::Time(_) + | EventBox::ExecuteTrigger(_) + | EventBox::TriggerCompleted(_), Self::Pipeline(_) | Self::Data(_) | Self::Time(_) | Self::ExecuteTrigger(_), ) => false, } @@ -279,7 +341,7 @@ pub mod stream { /// Event sent by the peer. #[derive(Debug, Clone, Decode, Encode, IntoSchema)] #[repr(transparent)] - pub struct EventMessage(pub Event); + pub struct EventMessage(pub EventBox); /// Message sent by the stream consumer. /// Request sent by the client to subscribe to events. @@ -288,7 +350,7 @@ pub mod stream { pub struct EventSubscriptionRequest(pub EventFilterBox); } - impl From for Event { + impl From for EventBox { fn from(source: EventMessage) -> Self { source.0 } @@ -303,7 +365,7 @@ pub mod prelude { pub use super::EventFilter; pub use super::{ data::prelude::*, execute_trigger::prelude::*, pipeline::prelude::*, time::prelude::*, - trigger_completed::prelude::*, Event, EventFilterBox, TriggeringEventFilterBox, + trigger_completed::prelude::*, EventBox, EventFilterBox, TriggeringEventFilterBox, TriggeringEventType, }; } diff --git a/data_model/src/events/pipeline.rs b/data_model/src/events/pipeline.rs index 5d3b962144f..69bd880c00f 100644 --- a/data_model/src/events/pipeline.rs +++ b/data_model/src/events/pipeline.rs @@ -1,59 +1,52 @@ //! Pipeline events. #[cfg(not(feature = "std"))] -use alloc::{format, string::String, vec::Vec}; +use alloc::{boxed::Box, format, string::String, vec::Vec}; -use getset::Getters; -use iroha_crypto::Hash; +use iroha_crypto::HashOf; use iroha_data_model_derive::model; use iroha_macro::FromVariant; use iroha_schema::IntoSchema; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; -use strum::EnumDiscriminants; pub use self::model::*; +use crate::transaction::SignedTransaction; #[model] pub mod model { + use getset::Getters; + use super::*; - /// [`Event`] filter. #[derive( Debug, Clone, - Copy, PartialEq, Eq, PartialOrd, Ord, - Default, - Getters, + FromVariant, Decode, Encode, - Serialize, Deserialize, + Serialize, IntoSchema, )] - pub struct PipelineEventFilter { - /// If `Some::`, filter by the [`EntityKind`]. If `None`, accept all the [`EntityKind`]. - pub(super) entity_kind: Option, - /// If `Some::`, filter by the [`StatusKind`]. If `None`, accept all the [`StatusKind`]. - pub(super) status_kind: Option, - /// If `Some::`, filter by the [`struct@Hash`]. If `None`, accept all the [`struct@Hash`]. - // TODO: Can we make hash typed like HashOf? - pub(super) hash: Option, + #[ffi_type(opaque)] + pub enum PipelineEventBox { + Transaction(TransactionEvent), + Block(BlockEvent), } - /// The kind of the pipeline entity. #[derive( Debug, Clone, - Copy, PartialEq, Eq, PartialOrd, Ord, + Getters, Decode, Encode, Deserialize, @@ -61,15 +54,12 @@ pub mod model { IntoSchema, )] #[ffi_type] - #[repr(u8)] - pub enum PipelineEntityKind { - /// Block - Block, - /// Transaction - Transaction, + #[getset(get = "pub")] + pub struct BlockEvent { + pub height: u64, + pub status: BlockStatus, } - /// Strongly-typed [`Event`] that tells the receiver the kind and the hash of the changed entity as well as its [`Status`]. #[derive( Debug, Clone, @@ -84,18 +74,15 @@ pub mod model { Serialize, IntoSchema, )] - #[getset(get = "pub")] #[ffi_type] - pub struct PipelineEvent { - /// [`EntityKind`] of the entity that caused this [`Event`]. - pub entity_kind: PipelineEntityKind, - /// [`Status`] of the entity that caused this [`Event`]. - pub status: PipelineStatus, - /// [`struct@Hash`] of the entity that caused this [`Event`]. - pub hash: Hash, + #[getset(get = "pub")] + pub struct TransactionEvent { + pub hash: HashOf, + pub block_height: Option, + pub status: TransactionStatus, } - /// [`Status`] of the entity. + /// Report of block's status in the pipeline #[derive( Debug, Clone, @@ -103,129 +90,218 @@ pub mod model { Eq, PartialOrd, Ord, - FromVariant, - EnumDiscriminants, Decode, Encode, + Deserialize, Serialize, + IntoSchema, + )] + #[ffi_type(opaque)] + pub enum BlockStatus { + /// Block was approved to participate in consensus + Approved, + /// Block was rejected by consensus + Rejected(crate::block::error::BlockRejectionReason), + /// Block has passed consensus successfully + Committed, + /// Changes have been reflected in the WSV + Applied, + } + + /// Report of transaction's status in the pipeline + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Decode, + Encode, Deserialize, + Serialize, IntoSchema, )] - #[strum_discriminants( - name(PipelineStatusKind), - derive(PartialOrd, Ord, Decode, Encode, Deserialize, Serialize, IntoSchema,) + #[ffi_type(opaque)] + pub enum TransactionStatus { + /// Transaction was received and enqueued + Queued, + /// Transaction was dropped(not stored in a block) + Expired, + /// Transaction was stored in the block as valid + Approved, + /// Transaction was stored in the block as invalid + Rejected(Box), + } + + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + FromVariant, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, )] #[ffi_type] - pub enum PipelineStatus { - /// Entity has been seen in the blockchain but has not passed validation. - Validating, - /// Entity was rejected during validation. - Rejected(PipelineRejectionReason), - /// Entity has passed validation. - Committed, + pub enum PipelineEventFilterBox { + Transaction(TransactionEventFilter), + Block(BlockEventFilter), } - /// The reason for rejecting pipeline entity such as transaction or block. #[derive( Debug, - displaydoc::Display, Clone, PartialEq, Eq, PartialOrd, Ord, - FromVariant, + Default, + Getters, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, + )] + #[ffi_type] + #[getset(get = "pub")] + pub struct BlockEventFilter { + pub height: Option, + pub status: Option, + } + + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Default, + Getters, Decode, Encode, Deserialize, Serialize, IntoSchema, )] - #[cfg_attr(feature = "std", derive(thiserror::Error))] #[ffi_type] - pub enum PipelineRejectionReason { - /// Block was rejected - Block(#[cfg_attr(feature = "std", source)] crate::block::error::BlockRejectionReason), - /// Transaction was rejected - Transaction( - #[cfg_attr(feature = "std", source)] - crate::transaction::error::TransactionRejectionReason, - ), + #[getset(get = "pub")] + pub struct TransactionEventFilter { + pub hash: Option>, + #[getset(skip)] + pub block_height: Option>, + pub status: Option, } } -impl PipelineEventFilter { - /// Creates a new [`PipelineEventFilter`] accepting all [`PipelineEvent`]s +impl BlockEventFilter { + /// Match only block with the given height #[must_use] - #[inline] - pub const fn new() -> Self { - Self { - status_kind: None, - entity_kind: None, - hash: None, - } + pub fn for_height(mut self, height: u64) -> Self { + self.height = Some(height); + self } - /// Modifies a [`PipelineEventFilter`] to accept only [`PipelineEvent`]s originating from a specific entity kind (block/transaction). + /// Match only block with the given status #[must_use] - #[inline] - pub const fn for_entity(mut self, entity_kind: PipelineEntityKind) -> Self { - self.entity_kind = Some(entity_kind); + pub fn for_status(mut self, status: BlockStatus) -> Self { + self.status = Some(status); self } +} - /// Modifies a [`PipelineEventFilter`] to accept only [`PipelineEvent`]s with a specific status. +impl TransactionEventFilter { + /// Get height of the block filter is set to track + pub fn block_height(&self) -> Option> { + self.block_height + } + + /// Match only transactions with the given block height #[must_use] - #[inline] - pub const fn for_status(mut self, status_kind: PipelineStatusKind) -> Self { - self.status_kind = Some(status_kind); + pub fn for_block_height(mut self, block_height: Option) -> Self { + self.block_height = Some(block_height); self } - /// Modifies a [`PipelineEventFilter`] to accept only [`PipelineEvent`]s originating from an entity with specified hash. + /// Match only transactions with the given hash #[must_use] - #[inline] - pub const fn for_hash(mut self, hash: Hash) -> Self { + pub fn for_hash(mut self, hash: HashOf) -> Self { self.hash = Some(hash); self } - #[inline] - #[cfg(feature = "transparent_api")] + /// Match only transactions with the given status + #[must_use] + pub fn for_status(mut self, status: TransactionStatus) -> Self { + self.status = Some(status); + self + } +} + +#[cfg(feature = "transparent_api")] +impl TransactionEventFilter { fn field_matches(filter: Option<&T>, event: &T) -> bool { filter.map_or(true, |field| field == event) } } #[cfg(feature = "transparent_api")] -impl super::EventFilter for PipelineEventFilter { - type Event = PipelineEvent; - - /// Check if `self` accepts the `event`. - #[inline] - fn matches(&self, event: &PipelineEvent) -> bool { - [ - Self::field_matches(self.entity_kind.as_ref(), &event.entity_kind), - Self::field_matches(self.status_kind.as_ref(), &event.status.kind()), - Self::field_matches(self.hash.as_ref(), &event.hash), - ] - .into_iter() - .all(core::convert::identity) +impl BlockEventFilter { + fn field_matches(filter: Option<&T>, event: &T) -> bool { + filter.map_or(true, |field| field == event) } } #[cfg(feature = "transparent_api")] -impl PipelineStatus { - fn kind(&self) -> PipelineStatusKind { - PipelineStatusKind::from(self) +impl super::EventFilter for PipelineEventFilterBox { + type Event = PipelineEventBox; + + /// Check if `self` accepts the `event`. + #[inline] + fn matches(&self, event: &PipelineEventBox) -> bool { + match (self, event) { + (Self::Block(block_filter), PipelineEventBox::Block(block_event)) => [ + BlockEventFilter::field_matches(block_filter.height.as_ref(), &block_event.height), + BlockEventFilter::field_matches(block_filter.status.as_ref(), &block_event.status), + ] + .into_iter() + .all(core::convert::identity), + ( + Self::Transaction(transaction_filter), + PipelineEventBox::Transaction(transaction_event), + ) => [ + TransactionEventFilter::field_matches( + transaction_filter.hash.as_ref(), + &transaction_event.hash, + ), + TransactionEventFilter::field_matches( + transaction_filter.block_height.as_ref(), + &transaction_event.block_height, + ), + TransactionEventFilter::field_matches( + transaction_filter.status.as_ref(), + &transaction_event.status, + ), + ] + .into_iter() + .all(core::convert::identity), + _ => false, + } } } /// Exports common structs and enums from this module. pub mod prelude { pub use super::{ - PipelineEntityKind, PipelineEvent, PipelineEventFilter, PipelineRejectionReason, - PipelineStatus, PipelineStatusKind, + BlockEvent, BlockStatus, PipelineEventBox, PipelineEventFilterBox, TransactionEvent, + TransactionStatus, }; } @@ -235,94 +311,108 @@ mod tests { #[cfg(not(feature = "std"))] use alloc::{string::ToString as _, vec, vec::Vec}; - use super::{super::EventFilter, PipelineRejectionReason::*, *}; + use iroha_crypto::Hash; + + use super::{super::EventFilter, *}; use crate::{transaction::error::TransactionRejectionReason::*, ValidationFail}; #[test] fn events_are_correctly_filtered() { let events = vec![ - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Validating, - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Rejected(Transaction(Validation( + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Queued, + } + .into(), + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: Some(3), + status: TransactionStatus::Rejected(Box::new(Validation( ValidationFail::TooComplex, ))), - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }, + } + .into(), + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([2_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Approved, + } + .into(), + BlockEvent { + height: 7, + status: BlockStatus::Committed, + } + .into(), ]; + assert_eq!( + events + .iter() + .filter(|&event| { + let filter: PipelineEventFilterBox = TransactionEventFilter::default() + .for_hash(HashOf::from_untyped_unchecked(Hash::prehashed( + [0_u8; Hash::LENGTH], + ))) + .into(); + + filter.matches(event) + }) + .cloned() + .collect::>(), vec![ - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Validating, - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Rejected(Transaction(Validation( + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Queued, + } + .into(), + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: Some(3), + status: TransactionStatus::Rejected(Box::new(Validation( ValidationFail::TooComplex, ))), - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, + } + .into(), ], - events - .iter() - .filter(|&event| PipelineEventFilter::new() - .for_hash(Hash::prehashed([0_u8; Hash::LENGTH])) - .matches(event)) - .cloned() - .collect::>() ); + assert_eq!( - vec![PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }], events .iter() - .filter(|&event| PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .matches(event)) + .filter(|&event| { + let filter: PipelineEventFilterBox = BlockEventFilter::default().into(); + filter.matches(event) + }) .cloned() - .collect::>() + .collect::>(), + vec![BlockEvent { + status: BlockStatus::Committed, + height: 2 + } + .into()], ); assert_eq!( - vec![PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }], events .iter() - .filter(|&event| PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Transaction) - .for_hash(Hash::prehashed([2_u8; Hash::LENGTH])) - .matches(event)) + .filter(|&event| { + let filter: PipelineEventFilterBox = TransactionEventFilter::default() + .for_hash(HashOf::from_untyped_unchecked(Hash::prehashed( + [2_u8; Hash::LENGTH], + ))) + .into(); + + filter.matches(event) + }) .cloned() - .collect::>() + .collect::>(), + vec![TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Approved, + } + .into()], ); - assert_eq!( - events, - events - .iter() - .filter(|&event| PipelineEventFilter::new().matches(event)) - .cloned() - .collect::>() - ) } } diff --git a/data_model/src/smart_contract.rs b/data_model/src/smart_contract.rs index 379da0585d9..5d51c2f89d3 100644 --- a/data_model/src/smart_contract.rs +++ b/data_model/src/smart_contract.rs @@ -20,7 +20,7 @@ pub mod payloads { /// Trigger owner who registered the trigger pub owner: AccountId, /// Event which triggered the execution - pub event: Event, + pub event: EventBox, } /// Payload for migrate entrypoint diff --git a/data_model/src/transaction.rs b/data_model/src/transaction.rs index fbfded831ab..dea247777dd 100644 --- a/data_model/src/transaction.rs +++ b/data_model/src/transaction.rs @@ -565,8 +565,6 @@ pub mod error { InstructionExecution(#[cfg_attr(feature = "std", source)] InstructionExecutionFail), /// Failure in WebAssembly execution WasmExecution(#[cfg_attr(feature = "std", source)] WasmExecutionFail), - /// Transaction rejected due to being expired - Expired, } } @@ -745,6 +743,9 @@ pub mod prelude { #[cfg(test)] mod tests { + #[cfg(not(feature = "std"))] + use alloc::vec; + use super::*; #[test] diff --git a/data_model/src/trigger.rs b/data_model/src/trigger.rs index e65cf5f4a96..9a739cfead1 100644 --- a/data_model/src/trigger.rs +++ b/data_model/src/trigger.rs @@ -237,21 +237,21 @@ pub mod action { impl PartialOrd for Action { fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for Action { + fn cmp(&self, other: &Self) -> cmp::Ordering { // Exclude the executable. When debugging and replacing // the trigger, its position in Hash and Tree maps should // not change depending on the content. match self.repeats.cmp(&other.repeats) { cmp::Ordering::Equal => {} - ord => return Some(ord), + ord => return ord, } - Some(self.authority.cmp(&other.authority)) - } - } - impl Ord for Action { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.partial_cmp(other) - .expect("`PartialCmp::partial_cmp()` for `Action` should never return `None`") + self.authority.cmp(&other.authority) } } diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index b6d93a29fdc..9d8ea726755 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -480,6 +480,30 @@ } ] }, + "BlockEvent": { + "Struct": [ + { + "name": "height", + "type": "u64" + }, + { + "name": "status", + "type": "BlockStatus" + } + ] + }, + "BlockEventFilter": { + "Struct": [ + { + "name": "height", + "type": "Option" + }, + { + "name": "status", + "type": "Option" + } + ] + }, "BlockHeader": { "Struct": [ { @@ -525,7 +549,7 @@ }, { "name": "event_recommendations", - "type": "Vec" + "type": "Vec" } ] }, @@ -537,6 +561,27 @@ } ] }, + "BlockStatus": { + "Enum": [ + { + "tag": "Approved", + "discriminant": 0 + }, + { + "tag": "Rejected", + "discriminant": 1, + "type": "BlockRejectionReason" + }, + { + "tag": "Committed", + "discriminant": 2 + }, + { + "tag": "Applied", + "discriminant": 3 + } + ] + }, "BlockSubscriptionRequest": "NonZero", "Burn": { "Struct": [ @@ -857,12 +902,12 @@ "u32" ] }, - "Event": { + "EventBox": { "Enum": [ { "tag": "Pipeline", "discriminant": 0, - "type": "PipelineEvent" + "type": "PipelineEventBox" }, { "tag": "Data", @@ -891,7 +936,7 @@ { "tag": "Pipeline", "discriminant": 0, - "type": "PipelineEventFilter" + "type": "PipelineEventFilterBox" }, { "tag": "Data", @@ -915,7 +960,7 @@ } ] }, - "EventMessage": "Event", + "EventMessage": "EventBox", "EventSubscriptionRequest": "EventFilterBox", "Executable": { "Enum": [ @@ -2229,21 +2274,24 @@ "Option": { "Option": "AssetId" }, + "Option": { + "Option": "BlockStatus" + }, "Option": { "Option": "DomainId" }, "Option": { "Option": "Duration" }, - "Option": { - "Option": "Hash" - }, "Option>>": { "Option": "HashOf>" }, "Option>": { "Option": "HashOf" }, + "Option>": { + "Option": "HashOf" + }, "Option": { "Option": "IpfsPath" }, @@ -2253,18 +2301,15 @@ "Option>": { "Option": "NonZero" }, + "Option>": { + "Option": "Option" + }, "Option": { "Option": "ParameterId" }, "Option": { "Option": "PeerId" }, - "Option": { - "Option": "PipelineEntityKind" - }, - "Option": { - "Option": "PipelineStatusKind" - }, "Option": { "Option": "RoleId" }, @@ -2277,6 +2322,9 @@ "Option": { "Option": "TransactionRejectionReason" }, + "Option": { + "Option": "TransactionStatus" + }, "Option": { "Option": "TriggerCompletedOutcomeType" }, @@ -2286,6 +2334,9 @@ "Option": { "Option": "u32" }, + "Option": { + "Option": "u64" + }, "Parameter": { "Struct": [ { @@ -2413,94 +2464,31 @@ } ] }, - "PipelineEntityKind": { + "PipelineEventBox": { "Enum": [ - { - "tag": "Block", - "discriminant": 0 - }, { "tag": "Transaction", - "discriminant": 1 - } - ] - }, - "PipelineEvent": { - "Struct": [ - { - "name": "entity_kind", - "type": "PipelineEntityKind" - }, - { - "name": "status", - "type": "PipelineStatus" - }, - { - "name": "hash", - "type": "Hash" - } - ] - }, - "PipelineEventFilter": { - "Struct": [ - { - "name": "entity_kind", - "type": "Option" - }, - { - "name": "status_kind", - "type": "Option" - }, - { - "name": "hash", - "type": "Option" - } - ] - }, - "PipelineRejectionReason": { - "Enum": [ - { - "tag": "Block", "discriminant": 0, - "type": "BlockRejectionReason" + "type": "TransactionEvent" }, { - "tag": "Transaction", + "tag": "Block", "discriminant": 1, - "type": "TransactionRejectionReason" + "type": "BlockEvent" } ] }, - "PipelineStatus": { + "PipelineEventFilterBox": { "Enum": [ { - "tag": "Validating", - "discriminant": 0 + "tag": "Transaction", + "discriminant": 0, + "type": "TransactionEventFilter" }, { - "tag": "Rejected", + "tag": "Block", "discriminant": 1, - "type": "PipelineRejectionReason" - }, - { - "tag": "Committed", - "discriminant": 2 - } - ] - }, - "PipelineStatusKind": { - "Enum": [ - { - "tag": "Validating", - "discriminant": 0 - }, - { - "tag": "Rejected", - "discriminant": 1 - }, - { - "tag": "Committed", - "discriminant": 2 + "type": "BlockEventFilter" } ] }, @@ -3574,6 +3562,38 @@ } ] }, + "TransactionEvent": { + "Struct": [ + { + "name": "hash", + "type": "HashOf" + }, + { + "name": "block_height", + "type": "Option" + }, + { + "name": "status", + "type": "TransactionStatus" + } + ] + }, + "TransactionEventFilter": { + "Struct": [ + { + "name": "hash", + "type": "Option>" + }, + { + "name": "block_height", + "type": "Option>" + }, + { + "name": "status", + "type": "Option" + } + ] + }, "TransactionLimitError": { "Struct": [ { @@ -3664,10 +3684,27 @@ "tag": "WasmExecution", "discriminant": 4, "type": "WasmExecutionFail" + } + ] + }, + "TransactionStatus": { + "Enum": [ + { + "tag": "Queued", + "discriminant": 0 }, { "tag": "Expired", - "discriminant": 5 + "discriminant": 1 + }, + { + "tag": "Approved", + "discriminant": 2 + }, + { + "tag": "Rejected", + "discriminant": 3, + "type": "TransactionRejectionReason" } ] }, @@ -3893,7 +3930,7 @@ { "tag": "Pipeline", "discriminant": 0, - "type": "PipelineEventFilter" + "type": "PipelineEventFilterBox" }, { "tag": "Data", @@ -4061,8 +4098,8 @@ } ] }, - "Vec": { - "Vec": "Event" + "Vec": { + "Vec": "EventBox" }, "Vec>": { "Vec": "GenericPredicateBox" diff --git a/schema/gen/src/lib.rs b/schema/gen/src/lib.rs index ebfb956f436..9d8969dbc87 100644 --- a/schema/gen/src/lib.rs +++ b/schema/gen/src/lib.rs @@ -96,13 +96,17 @@ types!( BTreeSet>, BatchedResponse, BatchedResponseV1, + BlockEvent, + BlockEventFilter, BlockHeader, BlockMessage, BlockPayload, BlockRejectionReason, + BlockStatus, BlockSubscriptionRequest, Box>, Box, + Box, Burn, Burn, Burn, @@ -124,7 +128,7 @@ types!( DomainId, DomainOwnerChanged, Duration, - Event, + EventBox, EventMessage, EventSubscriptionRequest, Executable, @@ -232,25 +236,27 @@ types!( Numeric, NumericSpec, Option, + Option, Option, Option, Option, + Option, Option, Option, - Option, Option>>, Option>, + Option>, Option, Option, Option, + Option>, Option, Option, - Option, - Option, Option, Option, Option, Option, + Option, Option, Option, Parameter, @@ -265,12 +271,8 @@ types!( PermissionToken, PermissionTokenSchema, PermissionTokenSchemaUpdateEvent, - PipelineEntityKind, - PipelineEvent, - PipelineEventFilter, - PipelineRejectionReason, - PipelineStatus, - PipelineStatusKind, + PipelineEventBox, + PipelineEventFilterBox, PredicateBox, PublicKey, QueryBox, @@ -338,12 +340,15 @@ types!( TimeEventFilter, TimeInterval, TimeSchedule, + TransactionEvent, + TransactionEventFilter, TransactionLimitError, TransactionLimits, TransactionPayload, TransactionQueryOutput, TransactionRejectionReason, TransactionValue, + TransactionStatus, Transfer, Transfer, Transfer, @@ -372,7 +377,7 @@ types!( UnregisterBox, Upgrade, ValidationFail, - Vec, + Vec, Vec, Vec, Vec, @@ -412,6 +417,7 @@ mod tests { BlockHeader, BlockPayload, SignedBlock, SignedBlockV1, }, domain::NewDomain, + events::pipeline::{BlockEventFilter, TransactionEventFilter}, executor::Executor, ipfs::IpfsPath, isi::{ diff --git a/tools/parity_scale_decoder/src/main.rs b/tools/parity_scale_decoder/src/main.rs index 9bf3a824693..705d9e9f43c 100644 --- a/tools/parity_scale_decoder/src/main.rs +++ b/tools/parity_scale_decoder/src/main.rs @@ -21,6 +21,7 @@ use iroha_data_model::{ BlockHeader, BlockPayload, SignedBlock, SignedBlockV1, }, domain::NewDomain, + events::pipeline::{BlockEventFilter, TransactionEventFilter}, executor::Executor, ipfs::IpfsPath, isi::{ diff --git a/torii/src/event.rs b/torii/src/event.rs index 873f81d91ec..226ca8cd4a1 100644 --- a/torii/src/event.rs +++ b/torii/src/event.rs @@ -63,7 +63,7 @@ impl Consumer { /// # Errors /// Can fail due to timeout or sending event. Also receiving might fail #[iroha_futures::telemetry_future] - pub async fn consume(&mut self, event: Event) -> Result<()> { + pub async fn consume(&mut self, event: EventBox) -> Result<()> { if !self.filter.matches(&event) { return Ok(()); }