diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 9ebcce6f929..ccf07f3d48f 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -249,7 +249,7 @@ impl Iroha { }); - let queue = Arc::new(Queue::from_config(config.queue)); + let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone())); match Self::start_telemetry(&logger, &config).await? { TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"), TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"), diff --git a/client/benches/tps/utils.rs b/client/benches/tps/utils.rs index f9c452980c7..eb9bef5e75b 100644 --- a/client/benches/tps/utils.rs +++ b/client/benches/tps/utils.rs @@ -18,6 +18,7 @@ use iroha_client::{ prelude::*, }, }; +use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus}; use serde::Deserialize; use test_network::*; @@ -171,9 +172,7 @@ impl MeasurerUnit { fn spawn_event_counter(&self) -> thread::JoinHandle> { let listener = self.client.clone(); let (init_sender, init_receiver) = mpsc::channel(); - let event_filter = PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .for_status(PipelineStatusKind::Committed); + let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied); let blocks_expected = self.config.blocks as usize; let name = self.name; let handle = thread::spawn(move || -> Result<()> { diff --git a/client/src/client.rs b/client/src/client.rs index 4ec503bf9c2..ebe8ae20f7c 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -14,7 +14,10 @@ use eyre::{eyre, Result, WrapErr}; use futures_util::StreamExt; use http_default::{AsyncWebSocketStream, WebSocketStream}; pub use iroha_config::client_api::ConfigDTO; -use iroha_data_model::query::QueryOutputBox; +use iroha_data_model::{ + events::pipeline::{BlockStatus, PipelineEventBox, TransactionEventFilter, TransactionStatus}, + query::QueryOutputBox, +}; use iroha_logger::prelude::*; use iroha_telemetry::metrics::Status; use iroha_torii_const::uri as torii_uri; @@ -605,7 +608,7 @@ impl Client { let mut event_iterator = { let event_iterator_result = tokio::time::timeout_at( deadline, - self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())), + self.listen_for_events_async(TransactionEventFilter::default().for_hash(hash)), ) .await .map_err(Into::into) @@ -631,17 +634,34 @@ impl Client { event_iterator: &mut AsyncEventStream, hash: HashOf, ) -> Result> { + let mut block_height = None; + while let Some(event) = event_iterator.next().await { - if let Event::Pipeline(this_event) = event? { - match this_event.status() { - PipelineStatus::Validating => {} - PipelineStatus::Rejected(ref reason) => { - return Err(reason.clone().into()); + if let EventBox::Pipeline(this_event) = event? { + match this_event { + PipelineEventBox::Transaction(transaction_event) => { + match transaction_event.status() { + TransactionStatus::Queued => {} + TransactionStatus::Approved => { + block_height = transaction_event.block_height; + } + TransactionStatus::Rejected(reason) => { + return Err(reason.clone().into()); + } + TransactionStatus::Expired => return Err(eyre!("Transaction expired")), + } + } + PipelineEventBox::Block(block_event) => { + if Some(block_event.height) == block_height { + if let BlockStatus::Applied = block_event.status() { + return Ok(hash); + } + } } - PipelineStatus::Committed => return Ok(hash), } } } + Err(eyre!( "Connection dropped without `Committed` or `Rejected` event" )) @@ -904,9 +924,7 @@ impl Client { pub fn listen_for_events( &self, event_filter: impl Into, - ) -> Result>> { - let event_filter = event_filter.into(); - iroha_logger::trace!(?event_filter); + ) -> Result>> { events_api::EventIterator::new(self.events_handler(event_filter)?) } @@ -919,8 +937,6 @@ impl Client { &self, event_filter: impl Into + Send, ) -> Result { - let event_filter = event_filter.into(); - iroha_logger::trace!(?event_filter, "Async listening with"); events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await } @@ -933,8 +949,11 @@ impl Client { &self, event_filter: impl Into, ) -> Result { + let event_filter = event_filter.into(); + iroha_logger::trace!(?event_filter); + events_api::flow::Init::new( - event_filter.into(), + event_filter, self.headers.clone(), self.torii_url .join(torii_uri::SUBSCRIPTION) @@ -1284,7 +1303,7 @@ pub mod events_api { pub struct Events; impl FlowEvents for Events { - type Event = crate::data_model::prelude::Event; + type Event = crate::data_model::prelude::EventBox; fn message(&self, message: Vec) -> Result { let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?; diff --git a/client/tests/integration/asset.rs b/client/tests/integration/asset.rs index fe95e30f348..34a102afd93 100644 --- a/client/tests/integration/asset.rs +++ b/client/tests/integration/asset.rs @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config; use iroha_data_model::{ asset::{AssetId, AssetValue, AssetValueType}, isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError}, + transaction::error::TransactionRejectionReason, }; use serde_json::json; use test_network::*; @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() { .expect_err("Should be rejected due to non integer value"); let rejection_reason = err - .downcast_ref::() - .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason")); + .downcast_ref::() + .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason")); assert_eq!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate( - InstructionEvaluationError::Type(TypeError::from(Mismatch { + &TransactionRejectionReason::Validation(ValidationFail::InstructionFailed( + InstructionExecutionError::Evaluate(InstructionEvaluationError::Type( + TypeError::from(Mismatch { expected: AssetValueType::Numeric(NumericSpec::integer()), actual: AssetValueType::Numeric(NumericSpec::fractional(2)) - })) + }) )) )) ); diff --git a/client/tests/integration/domain_owner_permissions.rs b/client/tests/integration/domain_owner_permissions.rs index b26eb647728..a6dbeb21aad 100644 --- a/client/tests/integration/domain_owner_permissions.rs +++ b/client/tests/integration/domain_owner_permissions.rs @@ -3,6 +3,7 @@ use iroha_client::{ crypto::KeyPair, data_model::{account::SignatureCheckCondition, prelude::*}, }; +use iroha_data_model::transaction::error::TransactionRejectionReason; use serde_json::json; use test_network::*; @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> { .expect_err("Tx should fail due to permissions"); let rejection_reason = err - .downcast_ref::() - .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason")); + .downcast_ref::() + .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason")); assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); // "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner diff --git a/client/tests/integration/events/pipeline.rs b/client/tests/integration/events/pipeline.rs index 30f17528219..ed9857e2f4f 100644 --- a/client/tests/integration/events/pipeline.rs +++ b/client/tests/integration/events/pipeline.rs @@ -9,6 +9,14 @@ use iroha_client::{ }, }; use iroha_config::parameters::actual::Root as Config; +use iroha_data_model::{ + events::pipeline::{ + BlockEvent, BlockEventFilter, BlockStatus, TransactionEventFilter, TransactionStatus, + }, + isi::error::InstructionExecutionError, + transaction::error::TransactionRejectionReason, + ValidationFail, +}; use test_network::*; // Needed to re-enable ignored tests. @@ -17,24 +25,28 @@ const PEER_COUNT: usize = 7; #[ignore = "ignore, more in #2851"] #[test] fn transaction_with_no_instructions_should_be_committed() -> Result<()> { - test_with_instruction_and_status_and_port(None, PipelineStatusKind::Committed, 10_250) + test_with_instruction_and_status_and_port(None, &TransactionStatus::Approved, 10_250) } #[ignore = "ignore, more in #2851"] // #[ignore = "Experiment"] #[test] fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> { - let fail = Fail::new("Should be rejected".to_owned()); + let msg = "Should be rejected".to_owned(); + + let fail = Fail::new(msg.clone()); test_with_instruction_and_status_and_port( Some(fail.into()), - PipelineStatusKind::Rejected, + &TransactionStatus::Rejected(Box::new(TransactionRejectionReason::Validation( + ValidationFail::InstructionFailed(InstructionExecutionError::Fail(msg)), + ))), 10_350, ) } fn test_with_instruction_and_status_and_port( instruction: Option, - should_be: PipelineStatusKind, + should_be: &TransactionStatus, port: u16, ) -> Result<()> { let (_rt, network, client) = @@ -56,9 +68,9 @@ fn test_with_instruction_and_status_and_port( let mut handles = Vec::new(); for listener in clients { let checker = Checker { listener, hash }; - let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating); + let handle_validating = checker.clone().spawn(TransactionStatus::Queued); handles.push(handle_validating); - let handle_validated = checker.spawn(should_be); + let handle_validated = checker.spawn(should_be.clone()); handles.push(handle_validated); } // When @@ -78,15 +90,14 @@ struct Checker { } impl Checker { - fn spawn(self, status_kind: PipelineStatusKind) -> JoinHandle<()> { + fn spawn(self, status_kind: TransactionStatus) -> JoinHandle<()> { thread::spawn(move || { let mut event_iterator = self .listener .listen_for_events( - PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Transaction) + TransactionEventFilter::default() .for_status(status_kind) - .for_hash(*self.hash), + .for_hash(self.hash), ) .expect("Failed to create event iterator."); let event_result = event_iterator.next().expect("Stream closed"); @@ -96,13 +107,11 @@ impl Checker { } #[test] -fn committed_block_must_be_available_in_kura() { +fn applied_block_must_be_available_in_kura() { let (_rt, peer, client) = ::new().with_port(11_040).start_with_runtime(); wait_for_genesis_committed(&[client.clone()], 0); - let event_filter = PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .for_status(PipelineStatusKind::Committed); + let event_filter = BlockEventFilter::default().for_status(BlockStatus::Committed); let mut event_iter = client .listen_for_events(event_filter) .expect("Failed to subscribe for events"); @@ -111,21 +120,17 @@ fn committed_block_must_be_available_in_kura() { .submit(Fail::new("Dummy instruction".to_owned())) .expect("Failed to submit transaction"); - let event = event_iter.next().expect("Block must be committed"); - let Ok(Event::Pipeline(PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash, - })) = event - else { - panic!("Received unexpected event") - }; - let hash = HashOf::from_untyped_unchecked(hash); + let event: BlockEvent = event_iter + .next() + .expect("Block must be committed") + .expect("Block must be committed") + .try_into() + .expect("Received unexpected event"); peer.iroha .as_ref() .expect("Must be some") .kura - .get_block_height_by_hash(&hash) + .get_block_by_height(event.height) .expect("Block committed event was received earlier"); } diff --git a/client/tests/integration/permissions.rs b/client/tests/integration/permissions.rs index e7fea53ac18..9a4578b8660 100644 --- a/client/tests/integration/permissions.rs +++ b/client/tests/integration/permissions.rs @@ -6,7 +6,9 @@ use iroha_client::{ crypto::KeyPair, data_model::prelude::*, }; -use iroha_data_model::permission::PermissionToken; +use iroha_data_model::{ + permission::PermissionToken, transaction::error::TransactionRejectionReason, +}; use iroha_genesis::GenesisNetwork; use serde_json::json; use test_network::{PeerBuilder, *}; @@ -104,14 +106,12 @@ fn permissions_disallow_asset_transfer() { .submit_transaction_blocking(&transfer_tx) .expect_err("Transaction was not rejected."); let rejection_reason = err - .downcast_ref::() - .expect("Error {err} is not PipelineRejectionReason"); + .downcast_ref::() + .expect("Error {err} is not TransactionRejectionReason"); //Then assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); let alice_assets = get_assets(&iroha_client, &alice_id); assert_eq!(alice_assets, alice_start_assets); @@ -156,14 +156,12 @@ fn permissions_disallow_asset_burn() { .submit_transaction_blocking(&burn_tx) .expect_err("Transaction was not rejected."); let rejection_reason = err - .downcast_ref::() - .expect("Error {err} is not PipelineRejectionReason"); + .downcast_ref::() + .expect("Error {err} is not TransactionRejectionReason"); assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); let alice_assets = get_assets(&iroha_client, &alice_id); diff --git a/client/tests/integration/roles.rs b/client/tests/integration/roles.rs index 12a03f333c1..6f260e3709f 100644 --- a/client/tests/integration/roles.rs +++ b/client/tests/integration/roles.rs @@ -6,6 +6,7 @@ use iroha_client::{ crypto::KeyPair, data_model::prelude::*, }; +use iroha_data_model::transaction::error::TransactionRejectionReason; use serde_json::json; use test_network::*; @@ -164,14 +165,12 @@ fn role_with_invalid_permissions_is_not_accepted() -> Result<()> { .expect_err("Submitting role with invalid permission token should fail"); let rejection_reason = err - .downcast_ref::() - .unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason")); + .downcast_ref::() + .unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason")); assert!(matches!( rejection_reason, - &PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation( - ValidationFail::NotPermitted(_) - )) + &TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_)) )); Ok(()) diff --git a/client/tests/integration/triggers/time_trigger.rs b/client/tests/integration/triggers/time_trigger.rs index 06042776200..b11a0f8f496 100644 --- a/client/tests/integration/triggers/time_trigger.rs +++ b/client/tests/integration/triggers/time_trigger.rs @@ -6,6 +6,7 @@ use iroha_client::{ data_model::{prelude::*, transaction::WasmSmartContract}, }; use iroha_config::parameters::defaults::chain_wide::DEFAULT_CONSENSUS_ESTIMATION; +use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus}; use iroha_logger::info; use test_network::*; @@ -272,10 +273,8 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> { /// Get block committed event listener fn get_block_committed_event_listener( client: &Client, -) -> Result>> { - let block_filter = PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .for_status(PipelineStatusKind::Committed); +) -> Result>> { + let block_filter = BlockEventFilter::default().for_status(BlockStatus::Committed); client.listen_for_events(block_filter) } @@ -292,7 +291,7 @@ fn get_asset_value(client: &mut Client, asset_id: AssetId) -> Numeric { /// Submit some sample ISIs to create new blocks fn submit_sample_isi_on_every_block_commit( - block_committed_event_listener: impl Iterator>, + block_committed_event_listener: impl Iterator>, test_client: &mut Client, account_id: &AccountId, timeout: Duration, diff --git a/client_cli/src/main.rs b/client_cli/src/main.rs index 807d504a280..5173858371f 100644 --- a/client_cli/src/main.rs +++ b/client_cli/src/main.rs @@ -249,13 +249,17 @@ mod filter { mod events { + use iroha_client::data_model::events::pipeline::{BlockEventFilter, TransactionEventFilter}; + use super::*; /// Get event stream from iroha peer #[derive(clap::Subcommand, Debug, Clone, Copy)] pub enum Args { - /// Gets pipeline events - Pipeline, + /// Gets block pipeline events + BlockPipeline, + /// Gets transaction pipeline events + TransactionPipeline, /// Gets data events Data, /// Get execute trigger events @@ -267,7 +271,8 @@ mod events { impl RunArgs for Args { fn run(self, context: &mut dyn RunContext) -> Result<()> { match self { - Args::Pipeline => listen(PipelineEventFilter::new(), context), + Args::TransactionPipeline => listen(TransactionEventFilter::default(), context), + Args::BlockPipeline => listen(BlockEventFilter::default(), context), Args::Data => listen(DataEventFilter::Any, context), Args::ExecuteTrigger => listen(ExecuteTriggerEventFilter::new(), context), Args::TriggerCompleted => listen(TriggerCompletedEventFilter::new(), context), diff --git a/core/benches/blocks/apply_blocks.rs b/core/benches/blocks/apply_blocks.rs index c16c16fd8cb..d223939160d 100644 --- a/core/benches/blocks/apply_blocks.rs +++ b/core/benches/blocks/apply_blocks.rs @@ -41,9 +41,10 @@ impl WsvApplyBlocks { .into_iter() .map(|instructions| { let block = create_block(&mut wsv, instructions, account_id.clone(), &key_pair); - wsv.apply_without_execution(&block).map(|()| block) + let _wsv_events = wsv.apply_without_execution(&block); + block }) - .collect::, _>>()? + .collect::>() }; Ok(Self { wsv, blocks }) diff --git a/core/benches/blocks/common.rs b/core/benches/blocks/common.rs index ef489686047..bb9a2fcefef 100644 --- a/core/benches/blocks/common.rs +++ b/core/benches/blocks/common.rs @@ -42,7 +42,9 @@ pub fn create_block( ) .chain(0, wsv) .sign(key_pair) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .unwrap(); // Verify that transactions are valid diff --git a/core/benches/blocks/validate_blocks.rs b/core/benches/blocks/validate_blocks.rs index f7a5e40b1b0..664211f4b0c 100644 --- a/core/benches/blocks/validate_blocks.rs +++ b/core/benches/blocks/validate_blocks.rs @@ -62,7 +62,7 @@ impl WsvValidateBlocks { key_pair, account_id, }: Self, - ) -> Result<()> { + ) { let mut finalized_wsv = wsv; let mut wsv = finalized_wsv.clone(); @@ -70,11 +70,9 @@ impl WsvValidateBlocks { for (instructions, i) in instructions.into_iter().zip(1..) { finalized_wsv = wsv.clone(); let block = create_block(&mut wsv, instructions, account_id.clone(), &key_pair); - wsv.apply_without_execution(&block)?; + let _wsv_events = wsv.apply_without_execution(&block); assert_eq!(wsv.height(), i); assert_eq!(wsv.height(), finalized_wsv.height() + 1); } - - Ok(()) } } diff --git a/core/benches/blocks/validate_blocks_benchmark.rs b/core/benches/blocks/validate_blocks_benchmark.rs index 548c1bbb0eb..f48e900eb45 100644 --- a/core/benches/blocks/validate_blocks_benchmark.rs +++ b/core/benches/blocks/validate_blocks_benchmark.rs @@ -15,10 +15,7 @@ fn validate_blocks(c: &mut Criterion) { let mut group = c.benchmark_group("validate_blocks"); group.significance_level(0.1).sample_size(10); group.bench_function("validate_blocks", |b| { - b.iter(|| { - WsvValidateBlocks::measure(black_box(bench.clone())) - .expect("Failed to execute benchmark"); - }); + b.iter(|| WsvValidateBlocks::measure(black_box(bench.clone()))); }); group.finish(); } diff --git a/core/benches/blocks/validate_blocks_oneshot.rs b/core/benches/blocks/validate_blocks_oneshot.rs index abfc09f7e7b..02279f10306 100644 --- a/core/benches/blocks/validate_blocks_oneshot.rs +++ b/core/benches/blocks/validate_blocks_oneshot.rs @@ -21,5 +21,5 @@ fn main() { iroha_logger::test_logger(); iroha_logger::info!("Starting..."); let bench = WsvValidateBlocks::setup(rt.handle()).expect("Failed to setup benchmark"); - WsvValidateBlocks::measure(bench).expect("Failed to execute bnechmark"); + WsvValidateBlocks::measure(bench); } diff --git a/core/benches/kura.rs b/core/benches/kura.rs index e8e0e6b75c5..8de71843236 100644 --- a/core/benches/kura.rs +++ b/core/benches/kura.rs @@ -53,7 +53,8 @@ async fn measure_block_size_for_n_executors(n_executors: u32) { let topology = Topology::new(UniqueVec::new()); let mut block = BlockBuilder::new(vec![tx], topology, Vec::new()) .chain(0, &mut wsv) - .sign(&KeyPair::random()); + .sign(&KeyPair::random()) + .unpack(|_| {}); for _ in 1..n_executors { block = block.sign(&KeyPair::random()); diff --git a/core/benches/validation.rs b/core/benches/validation.rs index 814e565fce0..ecab3463f66 100644 --- a/core/benches/validation.rs +++ b/core/benches/validation.rs @@ -180,7 +180,7 @@ fn sign_blocks(criterion: &mut Criterion) { b.iter_batched( || block.clone(), |block| { - let _: ValidBlock = block.sign(&key_pair); + let _: ValidBlock = block.sign(&key_pair).unpack(|_| {}); count += 1; }, BatchSize::SmallInput, diff --git a/core/src/block.rs b/core/src/block.rs index 7a044a6abdc..159b5714c8e 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -18,6 +18,7 @@ use iroha_genesis::GenesisTransaction; use iroha_primitives::unique_vec::UniqueVec; use thiserror::Error; +pub(crate) use self::event::WithEvents; pub use self::{chained::Chained, commit::CommittedBlock, valid::ValidBlock}; use crate::{prelude::*, sumeragi::network_topology::Topology, tx::AcceptTransactionFail}; @@ -109,7 +110,7 @@ mod pending { /// Transaction will be validated when block is chained. transactions: Vec, /// Event recommendations for use in triggers and off-chain work - event_recommendations: Vec, + event_recommendations: Vec, } impl BlockBuilder { @@ -122,7 +123,7 @@ mod pending { pub fn new( transactions: Vec, commit_topology: Topology, - event_recommendations: Vec, + event_recommendations: Vec, ) -> Self { assert!(!transactions.is_empty(), "Empty block created"); @@ -219,16 +220,16 @@ mod chained { impl BlockBuilder { /// Sign this block and get [`SignedBlock`]. - pub fn sign(self, key_pair: &KeyPair) -> ValidBlock { + pub fn sign(self, key_pair: &KeyPair) -> WithEvents { let signature = SignatureOf::new(key_pair, &self.0 .0); - ValidBlock( + WithEvents::new(ValidBlock( SignedBlockV1 { payload: self.0 .0, signatures: SignaturesOf::from(signature), } .into(), - ) + )) } } } @@ -242,7 +243,7 @@ mod valid { /// Block that was validated and accepted #[derive(Debug, Clone)] #[repr(transparent)] - pub struct ValidBlock(pub(crate) SignedBlock); + pub struct ValidBlock(pub(super) SignedBlock); impl ValidBlock { /// Validate a block against the current state of the world. @@ -261,7 +262,7 @@ mod valid { topology: &Topology, expected_chain_id: &ChainId, wsv: &mut WorldStateView, - ) -> Result { + ) -> WithEvents> { if !block.header().is_genesis() { let actual_commit_topology = block.commit_topology(); let expected_commit_topology = &topology.ordered_peers; @@ -269,20 +270,23 @@ mod valid { if actual_commit_topology != expected_commit_topology { let actual_commit_topology = actual_commit_topology.clone(); - return Err(( + return WithEvents::new(Err(( block, BlockValidationError::TopologyMismatch { expected: expected_commit_topology.clone(), actual: actual_commit_topology, }, - )); + ))); } if topology .filter_signatures_by_roles(&[Role::Leader], block.signatures()) .is_empty() { - return Err((block, SignatureVerificationError::LeaderMissing.into())); + return WithEvents::new(Err(( + block, + SignatureVerificationError::LeaderMissing.into(), + ))); } } @@ -290,47 +294,50 @@ mod valid { let actual_height = block.header().height; if expected_block_height != actual_height { - return Err(( + return WithEvents::new(Err(( block, BlockValidationError::LatestBlockHeightMismatch { expected: expected_block_height, actual: actual_height, }, - )); + ))); } let expected_previous_block_hash = wsv.latest_block_hash(); let actual_block_hash = block.header().previous_block_hash; if expected_previous_block_hash != actual_block_hash { - return Err(( + return WithEvents::new(Err(( block, BlockValidationError::LatestBlockHashMismatch { expected: expected_previous_block_hash, actual: actual_block_hash, }, - )); + ))); } if block .transactions() .any(|tx| wsv.has_transaction(tx.as_ref().hash())) { - return Err((block, BlockValidationError::HasCommittedTransactions)); + return WithEvents::new(Err(( + block, + BlockValidationError::HasCommittedTransactions, + ))); } if let Err(error) = Self::validate_transactions(&block, expected_chain_id, wsv) { - return Err((block, error.into())); + return WithEvents::new(Err((block, error.into()))); } let SignedBlock::V1(block) = block; - Ok(ValidBlock( + WithEvents::new(Ok(ValidBlock( SignedBlockV1 { payload: block.payload, signatures: block.signatures, } .into(), - )) + ))) } fn validate_transactions( @@ -375,24 +382,33 @@ mod valid { /// /// - Not enough signatures /// - Not signed by proxy tail - pub(crate) fn commit_with_signatures( + pub fn commit_with_signatures( mut self, topology: &Topology, signatures: SignaturesOf, - ) -> Result { + ) -> WithEvents> { if topology .filter_signatures_by_roles(&[Role::Leader], &signatures) .is_empty() { - return Err((self, SignatureVerificationError::LeaderMissing.into())); + return WithEvents::new(Err(( + self, + SignatureVerificationError::LeaderMissing.into(), + ))); } if !self.as_ref().signatures().is_subset(&signatures) { - return Err((self, SignatureVerificationError::SignatureMissing.into())); + return WithEvents::new(Err(( + self, + SignatureVerificationError::SignatureMissing.into(), + ))); } if !self.0.replace_signatures(signatures) { - return Err((self, SignatureVerificationError::UnknownSignature.into())); + return WithEvents::new(Err(( + self, + SignatureVerificationError::UnknownSignature.into(), + ))); } self.commit(topology) @@ -407,19 +423,19 @@ mod valid { pub fn commit( self, topology: &Topology, - ) -> Result { + ) -> WithEvents> { if !self.0.header().is_genesis() { if let Err(err) = self.verify_signatures(topology) { - return Err((self, err.into())); + return WithEvents::new(Err((self, err.into()))); } } - Ok(CommittedBlock(self)) + WithEvents::new(Ok(CommittedBlock(self))) } /// Add additional signatures for [`Self`]. #[must_use] - pub fn sign(self, key_pair: &KeyPair) -> Self { + pub fn sign(self, key_pair: &KeyPair) -> ValidBlock { ValidBlock(self.0.sign(key_pair)) } @@ -454,6 +470,7 @@ mod valid { event_recommendations: Vec::new(), })) .sign(&KeyPair::random()) + .unpack(|_| {}) } /// Check if block's signatures meet requirements for given topology. @@ -624,31 +641,7 @@ mod commit { /// Represents a block accepted by consensus. /// Every [`Self`] will have a different height. #[derive(Debug, Clone)] - pub struct CommittedBlock(pub(crate) ValidBlock); - - impl CommittedBlock { - pub(crate) fn produce_events(&self) -> Vec { - let tx = self.as_ref().transactions().map(|tx| { - let status = tx.error.as_ref().map_or_else( - || PipelineStatus::Committed, - |error| PipelineStatus::Rejected(error.clone().into()), - ); - - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status, - hash: tx.as_ref().hash().into(), - } - }); - let current_block = core::iter::once(PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash: self.as_ref().hash().into(), - }); - - tx.chain(current_block).collect() - } - } + pub struct CommittedBlock(pub(super) ValidBlock); impl From for ValidBlock { fn from(source: CommittedBlock) -> Self { @@ -662,12 +655,105 @@ mod commit { } } - // Invariants of [`CommittedBlock`] can't be violated through immutable reference impl AsRef for CommittedBlock { fn as_ref(&self) -> &SignedBlock { &self.0 .0 } } + + #[cfg(test)] + impl AsMut for CommittedBlock { + fn as_mut(&mut self) -> &mut SignedBlock { + &mut self.0 .0 + } + } +} + +mod event { + use super::*; + + pub trait EventProducer { + fn produce_events(&self) -> impl Iterator; + } + + #[derive(Debug)] + #[must_use] + pub struct WithEvents(B); + + impl WithEvents { + pub(super) fn new(source: B) -> Self { + Self(source) + } + } + + impl WithEvents> { + pub fn unpack(self, f: F) -> Result { + match self.0 { + Ok(ok) => Ok(WithEvents(ok).unpack(f)), + Err(err) => Err(WithEvents(err).unpack(f)), + } + } + } + impl WithEvents { + pub fn unpack(self, f: F) -> B { + self.0.produce_events().for_each(f); + self.0 + } + } + + impl WithEvents<(B, E)> { + pub(crate) fn unpack(self, f: F) -> (B, E) { + self.0 .1.produce_events().for_each(f); + self.0 + } + } + + impl EventProducer for ValidBlock { + fn produce_events(&self) -> impl Iterator { + let block_height = self.as_ref().header().height; + + let tx_events = self.as_ref().transactions().map(move |tx| { + let status = tx.error.as_ref().map_or_else( + || TransactionStatus::Approved, + |error| TransactionStatus::Rejected(error.clone().into()), + ); + + TransactionEvent { + block_height: Some(block_height), + hash: tx.as_ref().hash(), + status, + } + }); + + let block_event = core::iter::once(BlockEvent { + height: block_height, + status: BlockStatus::Approved, + }); + + tx_events + .map(PipelineEventBox::from) + .chain(block_event.map(Into::into)) + } + } + + impl EventProducer for CommittedBlock { + fn produce_events(&self) -> impl Iterator { + let block_height = self.as_ref().header().height; + + let block_event = core::iter::once(BlockEvent { + height: block_height, + status: BlockStatus::Committed, + }); + + block_event.map(Into::into) + } + } + + impl EventProducer for BlockValidationError { + fn produce_events(&self) -> impl Iterator { + core::iter::empty() + } + } } #[cfg(test)] @@ -683,7 +769,11 @@ mod tests { pub fn committed_and_valid_block_hashes_are_equal() { let valid_block = ValidBlock::new_dummy(); let topology = Topology::new(UniqueVec::new()); - let committed_block = valid_block.clone().commit(&topology).unwrap(); + let committed_block = valid_block + .clone() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); assert_eq!( valid_block.0.hash_of_payload(), @@ -725,13 +815,26 @@ mod tests { let topology = Topology::new(UniqueVec::new()); let valid_block = BlockBuilder::new(transactions, topology, Vec::new()) .chain(0, &mut wsv) - .sign(&alice_keys); + .sign(&alice_keys) + .unpack(|_| {}); // The first transaction should be confirmed - assert!(valid_block.0.transactions().next().unwrap().error.is_none()); + assert!(valid_block + .as_ref() + .transactions() + .next() + .unwrap() + .error + .is_none()); // The second transaction should be rejected - assert!(valid_block.0.transactions().nth(1).unwrap().error.is_some()); + assert!(valid_block + .as_ref() + .transactions() + .nth(1) + .unwrap() + .error + .is_some()); } #[tokio::test] @@ -786,13 +889,26 @@ mod tests { let topology = Topology::new(UniqueVec::new()); let valid_block = BlockBuilder::new(transactions, topology, Vec::new()) .chain(0, &mut wsv) - .sign(&alice_keys); + .sign(&alice_keys) + .unpack(|_| {}); // The first transaction should fail - assert!(valid_block.0.transactions().next().unwrap().error.is_some()); + assert!(valid_block + .as_ref() + .transactions() + .next() + .unwrap() + .error + .is_some()); // The third transaction should succeed - assert!(valid_block.0.transactions().nth(2).unwrap().error.is_none()); + assert!(valid_block + .as_ref() + .transactions() + .nth(2) + .unwrap() + .error + .is_none()); } #[tokio::test] @@ -842,17 +958,30 @@ mod tests { let topology = Topology::new(UniqueVec::new()); let valid_block = BlockBuilder::new(transactions, topology, Vec::new()) .chain(0, &mut wsv) - .sign(&alice_keys); + .sign(&alice_keys) + .unpack(|_| {}); // The first transaction should be rejected assert!( - valid_block.0.transactions().next().unwrap().error.is_some(), + valid_block + .as_ref() + .transactions() + .next() + .unwrap() + .error + .is_some(), "The first transaction should be rejected, as it contains `Fail`." ); // The second transaction should be accepted assert!( - valid_block.0.transactions().nth(1).unwrap().error.is_none(), + valid_block + .as_ref() + .transactions() + .nth(1) + .unwrap() + .error + .is_none(), "The second transaction should be accepted." ); } diff --git a/core/src/lib.rs b/core/src/lib.rs index 50b47920da2..1fbbab48c11 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -54,8 +54,8 @@ pub type PermissionTokensMap = IndexMap; /// API to work with a collections of [`AccountId`] to [`RoleId`] mappings. pub type AccountRolesSet = BTreeSet; -/// Type of `Sender` which should be used for channels of `Event` messages. -pub type EventsSender = broadcast::Sender; +/// Type of `Sender` which should be used for channels of `Event` messages. +pub type EventsSender = broadcast::Sender; /// The network message #[derive(Clone, Debug, Encode, Decode)] diff --git a/core/src/queue.rs b/core/src/queue.rs index 195dd81ddcc..19b987b31cc 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -8,13 +8,17 @@ use eyre::Result; use indexmap::IndexSet; use iroha_config::parameters::actual::Queue as Config; use iroha_crypto::HashOf; -use iroha_data_model::{account::AccountId, transaction::prelude::*}; +use iroha_data_model::{ + account::AccountId, + events::pipeline::{TransactionEvent, TransactionStatus}, + transaction::prelude::*, +}; use iroha_logger::{trace, warn}; use iroha_primitives::must_use::MustUse; use rand::seq::IteratorRandom; use thiserror::Error; -use crate::prelude::*; +use crate::{prelude::*, EventsSender}; impl AcceptedTransaction { // TODO: We should have another type of transaction like `CheckedTransaction` in the type system? @@ -46,6 +50,7 @@ impl AcceptedTransaction { /// Multiple producers, single consumer #[derive(Debug)] pub struct Queue { + events_sender: EventsSender, /// The queue for transactions tx_hashes: ArrayQueue>, /// [`AcceptedTransaction`]s addressed by `Hash` @@ -94,8 +99,9 @@ pub struct Failure { impl Queue { /// Makes queue from configuration - pub fn from_config(cfg: Config) -> Self { + pub fn from_config(cfg: Config, events_sender: EventsSender) -> Self { Self { + events_sender, tx_hashes: ArrayQueue::new(cfg.capacity.get()), accepted_txs: DashMap::new(), txs_per_user: DashMap::new(), @@ -220,6 +226,14 @@ impl Queue { err: Error::Full, } })?; + let _ = self.events_sender.send( + TransactionEvent { + hash, + block_height: None, + status: TransactionStatus::Queued, + } + .into(), + ); trace!("Transaction queue length = {}", self.tx_hashes.len(),); Ok(()) } @@ -275,7 +289,7 @@ impl Queue { max_txs_in_block: usize, ) -> Vec { let mut transactions = Vec::with_capacity(max_txs_in_block); - self.get_transactions_for_block(wsv, max_txs_in_block, &mut transactions, &mut Vec::new()); + self.get_transactions_for_block(wsv, max_txs_in_block, &mut transactions); transactions } @@ -287,17 +301,16 @@ impl Queue { wsv: &WorldStateView, max_txs_in_block: usize, transactions: &mut Vec, - expired_transactions: &mut Vec, ) { if transactions.len() >= max_txs_in_block { return; } let mut seen_queue = Vec::new(); - let mut expired_transactions_queue = Vec::new(); + let mut expired_transactions = Vec::new(); let txs_from_queue = core::iter::from_fn(|| { - self.pop_from_queue(&mut seen_queue, wsv, &mut expired_transactions_queue) + self.pop_from_queue(&mut seen_queue, wsv, &mut expired_transactions) }); let transactions_hashes: IndexSet> = @@ -311,7 +324,17 @@ impl Queue { .into_iter() .try_for_each(|hash| self.tx_hashes.push(hash)) .expect("Exceeded the number of transactions pending"); - expired_transactions.extend(expired_transactions_queue); + + expired_transactions + .into_iter() + .map(|tx| TransactionEvent { + hash: tx.as_ref().hash(), + block_height: None, + status: TransactionStatus::Expired, + }) + .for_each(|e| { + let _ = self.events_sender.send(e.into()); + }); } /// Check that the user adhered to the maximum transaction per user limit and increment their transaction count. @@ -367,6 +390,21 @@ pub mod tests { wsv::World, PeersIds, }; + impl Queue { + pub fn test(cfg: Config) -> Self { + Self { + events_sender: tokio::sync::broadcast::Sender::new(1), + tx_hashes: ArrayQueue::new(cfg.capacity.get()), + accepted_txs: DashMap::new(), + txs_per_user: DashMap::new(), + capacity: cfg.capacity, + capacity_per_user: cfg.capacity_per_user, + tx_time_to_live: cfg.transaction_time_to_live, + future_threshold: cfg.future_threshold, + } + } + } + fn accepted_tx(account_id: &str, key: &KeyPair) -> AcceptedTransaction { let chain_id = ChainId::from("0"); @@ -422,7 +460,7 @@ pub mod tests { query_handle, )); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); queue .push(accepted_tx("alice@wonderland", &key_pair), &wsv) @@ -442,7 +480,7 @@ pub mod tests { query_handle, )); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), capacity, ..Config::default() @@ -487,7 +525,7 @@ pub mod tests { )) }; - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); let instructions: [InstructionBox; 0] = []; let tx = TransactionBuilder::new(chain_id.clone(), "alice@wonderland".parse().expect("Valid")) @@ -539,7 +577,7 @@ pub mod tests { kura, query_handle, )); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), ..config_factory() }); @@ -566,7 +604,7 @@ pub mod tests { ); let tx = accepted_tx("alice@wonderland", &alice_key); wsv.transactions.insert(tx.as_ref().hash(), 1); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); assert!(matches!( queue.push(tx, &wsv), Err(Failure { @@ -589,7 +627,7 @@ pub mod tests { query_handle, ); let tx = accepted_tx("alice@wonderland", &alice_key); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); queue.push(tx.clone(), &wsv).unwrap(); wsv.transactions.insert(tx.as_ref().hash(), 1); assert_eq!( @@ -612,7 +650,7 @@ pub mod tests { kura, query_handle, )); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_millis(300), ..config_factory() }); @@ -659,7 +697,7 @@ pub mod tests { kura, query_handle, )); - let queue = Queue::from_config(config_factory()); + let queue = Queue::test(config_factory()); queue .push(accepted_tx("alice@wonderland", &alice_key), &wsv) .expect("Failed to push tx into queue"); @@ -693,7 +731,9 @@ pub mod tests { kura, query_handle, )); - let queue = Queue::from_config(config_factory()); + let mut queue = Queue::test(config_factory()); + let (event_sender, mut event_receiver) = tokio::sync::broadcast::channel(1); + queue.events_sender = event_sender; let instructions = [Fail { message: "expired".to_owned(), }]; @@ -708,18 +748,26 @@ pub mod tests { max_instruction_number: 4096, max_wasm_size_bytes: 0, }; + let tx_hash = tx.hash(); let tx = AcceptedTransaction::accept(tx, &chain_id, &limits) .expect("Failed to accept Transaction."); queue .push(tx.clone(), &wsv) .expect("Failed to push tx into queue"); let mut txs = Vec::new(); - let mut expired_txs = Vec::new(); thread::sleep(Duration::from_millis(TTL_MS)); - queue.get_transactions_for_block(&wsv, max_txs_in_block, &mut txs, &mut expired_txs); + queue.get_transactions_for_block(&wsv, max_txs_in_block, &mut txs); assert!(txs.is_empty()); - assert_eq!(expired_txs.len(), 1); - assert_eq!(expired_txs[0], tx); + + assert_eq!( + event_receiver.recv().await.unwrap(), + TransactionEvent { + hash: tx_hash, + block_height: None, + status: TransactionStatus::Expired, + } + .into() + ) } #[test] @@ -734,7 +782,7 @@ pub mod tests { query_handle, ); - let queue = Arc::new(Queue::from_config(Config { + let queue = Arc::new(Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), capacity: 100_000_000.try_into().unwrap(), ..Config::default() @@ -807,7 +855,7 @@ pub mod tests { query_handle, )); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { future_threshold, ..Config::default() }); @@ -868,7 +916,7 @@ pub mod tests { let query_handle = LiveQueryStore::test().start(); let mut wsv = WorldStateView::new(world, kura, query_handle); - let queue = Queue::from_config(Config { + let queue = Queue::test(Config { transaction_time_to_live: Duration::from_secs(100), capacity: 100.try_into().unwrap(), capacity_per_user: 1.try_into().unwrap(), diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index 88db4d6330b..6389b610eb2 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -296,7 +296,9 @@ mod tests { let first_block = BlockBuilder::new(transactions.clone(), topology.clone(), Vec::new()) .chain(0, &mut wsv) .sign(&ALICE_KEYS) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .expect("Block is valid"); wsv.apply(&first_block)?; @@ -306,7 +308,9 @@ mod tests { let block = BlockBuilder::new(transactions.clone(), topology.clone(), Vec::new()) .chain(0, &mut wsv) .sign(&ALICE_KEYS) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .expect("Block is valid"); wsv.apply(&block)?; @@ -437,7 +441,9 @@ mod tests { let vcb = BlockBuilder::new(vec![va_tx.clone()], topology.clone(), Vec::new()) .chain(0, &mut wsv) .sign(&ALICE_KEYS) + .unpack(|_| {}) .commit(&topology) + .unpack(|_| {}) .expect("Block is valid"); wsv.apply(&vcb)?; diff --git a/core/src/smartcontracts/isi/triggers/set.rs b/core/src/smartcontracts/isi/triggers/set.rs index dc1587848bb..554bc15b569 100644 --- a/core/src/smartcontracts/isi/triggers/set.rs +++ b/core/src/smartcontracts/isi/triggers/set.rs @@ -155,8 +155,8 @@ type WasmSmartContractMap = IndexMap, (WasmSmartContra pub struct Set { /// Triggers using [`DataEventFilter`] data_triggers: IndexMap>, - /// Triggers using [`PipelineEventFilter`] - pipeline_triggers: IndexMap>, + /// Triggers using [`PipelineEventFilterBox`] + pipeline_triggers: IndexMap>, /// Triggers using [`TimeEventFilter`] time_triggers: IndexMap>, /// Triggers using [`ExecuteTriggerEventFilter`] @@ -167,7 +167,7 @@ pub struct Set { original_contracts: WasmSmartContractMap, /// List of actions that should be triggered by events provided by `handle_*` methods. /// Vector is used to save the exact triggers order. - matched_ids: Vec<(Event, TriggerId)>, + matched_ids: Vec<(EventBox, TriggerId)>, } /// Helper struct for serializing triggers. @@ -269,7 +269,7 @@ impl<'de> DeserializeSeed<'de> for WasmSeed<'_, Set> { } } "pipeline_triggers" => { - let triggers: IndexMap> = + let triggers: IndexMap> = map.next_value()?; for (id, action) in triggers { set.add_pipeline_trigger( @@ -346,7 +346,7 @@ impl Set { }) } - /// Add trigger with [`PipelineEventFilter`] + /// Add trigger with [`PipelineEventFilterBox`] /// /// Return `false` if a trigger with given id already exists /// @@ -357,7 +357,7 @@ impl Set { pub fn add_pipeline_trigger( &mut self, engine: &wasmtime::Engine, - trigger: Trigger, + trigger: Trigger, ) -> Result { self.add_to(engine, trigger, TriggeringEventType::Pipeline, |me| { &mut me.pipeline_triggers @@ -805,18 +805,6 @@ impl Set { }; } - /// Handle [`PipelineEvent`]. - /// - /// Find all actions that are triggered by `event` and store them. - /// These actions are inspected in the next [`Set::inspect_matched()`] call. - // Passing by value to follow other `handle_` methods interface - #[allow(clippy::needless_pass_by_value)] - pub fn handle_pipeline_event(&mut self, event: PipelineEvent) { - self.pipeline_triggers.iter().for_each(|entry| { - Self::match_and_insert_trigger(&mut self.matched_ids, event.clone(), entry) - }); - } - /// Handle [`TimeEvent`]. /// /// Find all actions that are triggered by `event` and store them. @@ -831,7 +819,7 @@ impl Set { continue; } - let ids = core::iter::repeat_with(|| (Event::Time(event), id.clone())).take( + let ids = core::iter::repeat_with(|| (EventBox::Time(event), id.clone())).take( count .try_into() .expect("`u32` should always fit in `usize`"), @@ -845,8 +833,8 @@ impl Set { /// Skips insertion: /// - If the action's filter doesn't match an event /// - If the action's repeats count equals to 0 - fn match_and_insert_trigger, F: EventFilter>( - matched_ids: &mut Vec<(Event, TriggerId)>, + fn match_and_insert_trigger, F: EventFilter>( + matched_ids: &mut Vec<(EventBox, TriggerId)>, event: E, (id, action): (&TriggerId, &LoadedAction), ) { @@ -909,7 +897,7 @@ impl Set { } /// Extract `matched_id` - pub fn extract_matched_ids(&mut self) -> Vec<(Event, TriggerId)> { + pub fn extract_matched_ids(&mut self) -> Vec<(EventBox, TriggerId)> { core::mem::take(&mut self.matched_ids) } } diff --git a/core/src/smartcontracts/wasm.rs b/core/src/smartcontracts/wasm.rs index 71ed2b8b92a..d27aa2efaa5 100644 --- a/core/src/smartcontracts/wasm.rs +++ b/core/src/smartcontracts/wasm.rs @@ -460,7 +460,7 @@ pub mod state { #[derive(Constructor)] pub struct Trigger { /// Event which activated this trigger - pub(in super::super) triggering_event: Event, + pub(in super::super) triggering_event: EventBox, } pub mod executor { @@ -937,7 +937,7 @@ impl<'wrld> Runtime> { id: &TriggerId, authority: AccountId, module: &wasmtime::Module, - event: Event, + event: EventBox, ) -> Result<()> { let span = wasm_log_span!("Trigger execution", %id, %authority); let state = state::Trigger::new( diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 90a4a5bef66..cb9368e089f 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -2,10 +2,7 @@ use std::sync::mpsc; use iroha_crypto::HashOf; -use iroha_data_model::{ - block::*, events::pipeline::PipelineEvent, peer::PeerId, - transaction::error::TransactionRejectionReason, -}; +use iroha_data_model::{block::*, events::pipeline::PipelineEventBox, peer::PeerId}; use iroha_p2p::UpdateTopology; use tracing::{span, Level}; @@ -98,17 +95,19 @@ impl Sumeragi { #[allow(clippy::needless_pass_by_value, single_use_lifetimes)] // TODO: uncomment when anonymous lifetimes are stable fn broadcast_packet_to<'peer_id>( &self, - msg: BlockMessage, + msg: impl Into, ids: impl IntoIterator + Send, ) { + let msg = msg.into(); + for peer_id in ids { self.post_packet_to(msg.clone(), peer_id); } } - fn broadcast_packet(&self, msg: BlockMessage) { + fn broadcast_packet(&self, msg: impl Into) { let broadcast = iroha_p2p::Broadcast { - data: NetworkMessage::SumeragiBlock(Box::new(msg)), + data: NetworkMessage::SumeragiBlock(Box::new(msg.into())), }; self.network.broadcast(broadcast); } @@ -132,17 +131,8 @@ impl Sumeragi { self.block_time + self.commit_time } - fn send_events(&self, events: impl IntoIterator>) { - let addr = &self.peer_id.address; - - if self.events_sender.receiver_count() > 0 { - for event in events { - self.events_sender - .send(event.into()) - .map_err(|err| warn!(%addr, ?err, "Event not sent")) - .unwrap_or(0); - } - } + fn send_event(&self, event: impl Into) { + let _ = self.events_sender.send(event.into()); } fn receive_network_packet( @@ -254,13 +244,15 @@ impl Sumeragi { &self.chain_id, &mut new_wsv, ) + .unpack(|e| self.send_event(e)) .and_then(|block| { block .commit(&self.current_topology) + .unpack(|e| self.send_event(e)) .map_err(|(block, error)| (block.into(), error)) }) { Ok(block) => block, - Err((_, error)) => { + Err(error) => { error!(?error, "Received invalid genesis block"); continue; } @@ -293,12 +285,14 @@ impl Sumeragi { let mut new_wsv = self.wsv.clone(); let genesis = BlockBuilder::new(transactions, self.current_topology.clone(), vec![]) .chain(0, &mut new_wsv) - .sign(&self.key_pair); + .sign(&self.key_pair) + .unpack(|e| self.send_event(e)); - let genesis_msg = BlockCreated::from(genesis.clone()).into(); + let genesis_msg = BlockCreated::from(genesis.clone()); let genesis = genesis .commit(&self.current_topology) + .unpack(|e| self.send_event(e)) .expect("Genesis invalid"); assert!( @@ -339,20 +333,14 @@ impl Sumeragi { Strategy::before_update_hook(self); - new_wsv - .apply_without_execution(&block) - .expect("Failed to apply block on WSV. Bailing."); + let wsv_events = new_wsv.apply_without_execution(&block); self.wsv = new_wsv; - let wsv_events = core::mem::take(&mut self.wsv.events_buffer); - self.send_events(wsv_events); - // Parameters are updated before updating public copy of sumeragi self.update_params(); let new_topology = Topology::recreate_topology(block.as_ref(), 0, self.wsv.peers().cloned().collect()); - let events = block.produce_events(); // https://github.com/hyperledger/iroha/issues/3396 // Kura should store the block only upon successful application to the internal WSV to avoid storing a corrupted block. @@ -372,9 +360,8 @@ impl Sumeragi { } }); - // NOTE: This sends "Block committed" event, - // so it should be done AFTER public facing WSV update - self.send_events(events); + wsv_events.into_iter().for_each(|e| self.send_event(e)); + self.current_topology = new_topology; self.connect_peers(&self.current_topology); @@ -412,16 +399,17 @@ impl Sumeragi { trace!(%addr, %role, block_hash=%block_hash, "Block received, voting..."); let mut new_wsv = self.wsv.clone(); - let block = match ValidBlock::validate(block, topology, &self.chain_id, &mut new_wsv) { + let block = match ValidBlock::validate(block, topology, &self.chain_id, &mut new_wsv) + .unpack(|e| self.send_event(e)) + { Ok(block) => block, - Err((_, error)) => { + Err(error) => { warn!(%addr, %role, ?error, "Block validation failed"); return None; } }; let signed_block = block.sign(&self.key_pair); - Some(VotingBlock::new(signed_block, new_wsv)) } @@ -455,7 +443,13 @@ impl Sumeragi { let block_hash = block.hash(); info!(%addr, %role, hash=%block_hash, "Block sync update received"); - match handle_block_sync(&self.chain_id, block, &self.wsv, &self.finalized_wsv) { + match handle_block_sync( + &self.chain_id, + &self.wsv, + &self.finalized_wsv, + block, + &|e| self.send_event(e), + ) { Ok(BlockSyncOk::CommitBlock(block, new_wsv)) => { self.commit_block(block, new_wsv) } @@ -519,14 +513,13 @@ impl Sumeragi { match voted_block .block .commit_with_signatures(current_topology, signatures) + .unpack(|e| self.send_event(e)) { - Ok(committed_block) => { - self.commit_block(committed_block, voted_block.new_wsv) - } - Err((_, error)) => { + Ok(block) => self.commit_block(block, voted_block.new_wsv), + Err(error) => { error!(%addr, %role, %hash, ?error, "Block failed to be committed") } - }; + } } else { error!( %addr, %role, committed_block_hash=%hash, %voting_block_hash, @@ -547,8 +540,7 @@ impl Sumeragi { if let Some(v_block) = self.vote_for_block(¤t_topology, block_created) { let block_hash = v_block.block.as_ref().hash_of_payload(); - let msg = BlockSigned::from(v_block.block.clone()).into(); - + let msg = BlockSigned::from(v_block.block.clone()); self.broadcast_packet_to(msg, [current_topology.proxy_tail()]); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -565,7 +557,7 @@ impl Sumeragi { let block_hash = v_block.block.as_ref().hash_of_payload(); self.broadcast_packet_to( - BlockSigned::from(v_block.block.clone()).into(), + BlockSigned::from(v_block.block.clone()), [current_topology.proxy_tail()], ); info!(%addr, %block_hash, "Block validated, signed and forwarded"); @@ -647,7 +639,8 @@ impl Sumeragi { event_recommendations, ) .chain(current_view_change_index, &mut new_wsv) - .sign(&self.key_pair); + .sign(&self.key_pair) + .unpack(|e| self.send_event(e)); let created_in = create_block_start_time.elapsed(); if let Some(current_topology) = current_topology.is_consensus_required() { @@ -658,22 +651,23 @@ impl Sumeragi { } *voting_block = Some(VotingBlock::new(new_block.clone(), new_wsv)); - let msg = BlockCreated::from(new_block).into(); + let msg = BlockCreated::from(new_block); if current_view_change_index >= 1 { self.broadcast_packet(msg); } else { self.broadcast_packet_to(msg, current_topology.voting_peers()); } } else { - match new_block.commit(current_topology) { - Ok(committed_block) => { - self.broadcast_packet( - BlockCommitted::from(committed_block.clone()).into(), - ); - self.commit_block(committed_block, new_wsv); + match new_block + .commit(current_topology) + .unpack(|e| self.send_event(e)) + { + Ok(block) => { + self.broadcast_packet(BlockCommitted::from(&block)); + self.commit_block(block, new_wsv); } - Err((_, error)) => error!(%addr, role=%Role::Leader, ?error), - } + Err(error) => error!(%addr, role=%Role::Leader, ?error), + }; } } } @@ -683,12 +677,15 @@ impl Sumeragi { let voted_at = voted_block.voted_at; let new_wsv = voted_block.new_wsv; - match voted_block.block.commit(current_topology) { + match voted_block + .block + .commit(current_topology) + .unpack(|e| self.send_event(e)) + { Ok(committed_block) => { info!(voting_block_hash = %committed_block.as_ref().hash(), "Block reached required number of votes"); - let msg = BlockCommitted::from(committed_block.clone()).into(); - + let msg = BlockCommitted::from(&committed_block); let current_topology = current_topology .is_consensus_required() .expect("Peer has `ProxyTail` role, which mean that current topology require consensus"); @@ -865,14 +862,11 @@ pub(crate) fn run( expired }); - let mut expired_transactions = Vec::new(); sumeragi.queue.get_transactions_for_block( &sumeragi.wsv, sumeragi.max_txs_in_block, &mut sumeragi.transaction_cache, - &mut expired_transactions, ); - sumeragi.send_events(expired_transactions.iter().map(expired_event)); let current_view_change_index = sumeragi .prune_view_change_proofs_and_calculate_current_index(&mut view_change_proof_chain); @@ -995,18 +989,6 @@ fn add_signatures( } } -/// Create expired pipeline event for the given transaction. -fn expired_event(txn: &AcceptedTransaction) -> Event { - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Rejected(PipelineRejectionReason::Transaction( - TransactionRejectionReason::Expired, - )), - hash: txn.as_ref().hash().into(), - } - .into() -} - /// Type enumerating early return types to reduce cyclomatic /// complexity of the main loop items and allow direct short /// circuiting with the `?` operator. Candidate for `impl @@ -1102,11 +1084,12 @@ enum BlockSyncError { }, } -fn handle_block_sync( +fn handle_block_sync( chain_id: &ChainId, - block: SignedBlock, wsv: &WorldStateView, finalized_wsv: &WorldStateView, + block: SignedBlock, + handle_events: &F, ) -> Result { let block_height = block.header().height; let wsv_height = wsv.height(); @@ -1122,9 +1105,11 @@ fn handle_block_sync( Topology::recreate_topology(&last_committed_block, view_change_index, new_peers) }; ValidBlock::validate(block, &topology, chain_id, &mut new_wsv) + .unpack(handle_events) .and_then(|block| { block .commit(&topology) + .unpack(handle_events) .map_err(|(block, err)| (block.into(), err)) }) .map(|block| BlockSyncOk::CommitBlock(block, new_wsv)) @@ -1142,9 +1127,11 @@ fn handle_block_sync( Topology::recreate_topology(&last_committed_block, view_change_index, new_peers) }; ValidBlock::validate(block, &topology, chain_id, &mut new_wsv) + .unpack(handle_events) .and_then(|block| { block .commit(&topology) + .unpack(handle_events) .map_err(|(block, err)| (block.into(), err)) }) .map_err(|(block, error)| (block, BlockSyncError::SoftForkBlockNotValid(error))) @@ -1225,9 +1212,13 @@ mod tests { // Creating a block of two identical transactions and validating it let block = BlockBuilder::new(vec![tx.clone(), tx], topology.clone(), Vec::new()) .chain(0, &mut wsv) - .sign(leader_key_pair); + .sign(leader_key_pair) + .unpack(|_| {}); - let genesis = block.commit(topology).expect("Block is valid"); + let genesis = block + .commit(topology) + .unpack(|_| {}) + .expect("Block is valid"); wsv.apply(&genesis).expect("Failed to apply block"); kura.store_block(genesis); @@ -1263,7 +1254,8 @@ mod tests { // Creating a block of two identical transactions and validating it let block = BlockBuilder::new(vec![tx1, tx2], topology.clone(), Vec::new()) .chain(0, &mut wsv.clone()) - .sign(leader_key_pair); + .sign(leader_key_pair) + .unpack(|_| {}); (wsv, kura, block.into()) } @@ -1285,7 +1277,7 @@ mod tests { // Malform block to make it invalid payload_mut(&mut block).commit_topology.clear(); - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!(result, Err((_, BlockSyncError::BlockNotValid(_))))) } @@ -1302,17 +1294,19 @@ mod tests { create_data_for_test(&chain_id, &topology, &leader_key_pair); let mut wsv = finalized_wsv.clone(); - let validated_block = - ValidBlock::validate(block.clone(), &topology, &chain_id, &mut wsv).unwrap(); - let committed_block = validated_block.commit(&topology).expect("Block is valid"); - wsv.apply_without_execution(&committed_block) - .expect("Failed to apply block"); + let committed_block = ValidBlock::validate(block.clone(), &topology, &chain_id, &mut wsv) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .expect("Block is valid"); + let _wsv_events = wsv.apply_without_execution(&committed_block); kura.store_block(committed_block); // Malform block to make it invalid payload_mut(&mut block).commit_topology.clear(); - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!( result, Err((_, BlockSyncError::SoftForkBlockNotValid(_))) @@ -1333,7 +1327,7 @@ mod tests { // Change block height payload_mut(&mut block).header.height = 42; - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!( result, Err(( @@ -1359,7 +1353,7 @@ mod tests { let (finalized_wsv, _, block) = create_data_for_test(&chain_id, &topology, &leader_key_pair); let wsv = finalized_wsv.clone(); - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!(result, Ok(BlockSyncOk::CommitBlock(_, _)))) } @@ -1376,18 +1370,20 @@ mod tests { create_data_for_test(&chain_id, &topology, &leader_key_pair); let mut wsv = finalized_wsv.clone(); - let validated_block = - ValidBlock::validate(block.clone(), &topology, &chain_id, &mut wsv).unwrap(); - let committed_block = validated_block.commit(&topology).expect("Block is valid"); - wsv.apply_without_execution(&committed_block) - .expect("Failed to apply block"); + let committed_block = ValidBlock::validate(block.clone(), &topology, &chain_id, &mut wsv) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .expect("Block is valid"); + let _wsv_events = wsv.apply_without_execution(&committed_block); kura.store_block(committed_block); assert_eq!(wsv.latest_block_view_change_index(), 0); // Increase block view change index payload_mut(&mut block).header.view_change_index = 42; - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!(result, Ok(BlockSyncOk::ReplaceTopBlock(_, _)))) } @@ -1407,18 +1403,20 @@ mod tests { // Increase block view change index payload_mut(&mut block).header.view_change_index = 42; - let validated_block = - ValidBlock::validate(block.clone(), &topology, &chain_id, &mut wsv).unwrap(); - let committed_block = validated_block.commit(&topology).expect("Block is valid"); - wsv.apply_without_execution(&committed_block) - .expect("Failed to apply block"); + let committed_block = ValidBlock::validate(block.clone(), &topology, &chain_id, &mut wsv) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .expect("Block is valid"); + let _wsv_events = wsv.apply_without_execution(&committed_block); kura.store_block(committed_block); assert_eq!(wsv.latest_block_view_change_index(), 42); // Decrease block view change index back payload_mut(&mut block).header.view_change_index = 0; - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!( result, Err(( @@ -1447,7 +1445,7 @@ mod tests { payload_mut(&mut block).header.view_change_index = 42; payload_mut(&mut block).header.height = 1; - let result = handle_block_sync(&chain_id, block, &wsv, &finalized_wsv); + let result = handle_block_sync(&chain_id, &wsv, &finalized_wsv, block, &|_| {}); assert!(matches!( result, Err(( diff --git a/core/src/sumeragi/message.rs b/core/src/sumeragi/message.rs index b0a80207072..95caaf5c0f5 100644 --- a/core/src/sumeragi/message.rs +++ b/core/src/sumeragi/message.rs @@ -84,8 +84,8 @@ pub struct BlockCommitted { pub signatures: SignaturesOf, } -impl From for BlockCommitted { - fn from(block: CommittedBlock) -> Self { +impl From<&CommittedBlock> for BlockCommitted { + fn from(block: &CommittedBlock) -> Self { let block_hash = block.as_ref().hash_of_payload(); let block_signatures = block.as_ref().signatures().clone(); diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index b08525a4ea1..8c1dd2909e9 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -228,26 +228,34 @@ impl SumeragiHandle { chain_id: &ChainId, block: &SignedBlock, wsv: &mut WorldStateView, + events_sender: &EventsSender, mut current_topology: Topology, ) -> Topology { // NOTE: topology need to be updated up to block's view_change_index current_topology.rotate_all_n(block.header().view_change_index); - let block = ValidBlock::validate(block.clone(), ¤t_topology, chain_id, wsv) - .expect("Kura blocks should be valid") + let committed_block = ValidBlock::validate(block.clone(), ¤t_topology, chain_id, wsv) + .unpack(|e| { + let _ = events_sender.send(e.into()); + }) + .expect("Kura: Invalid block") .commit(¤t_topology) - .expect("Kura blocks should be valid"); + .unpack(|e| { + let _ = events_sender.send(e.into()); + }) + .expect("Kura: Invalid block"); - if block.as_ref().header().is_genesis() { - wsv.world_mut().trusted_peers_ids = block.as_ref().commit_topology().clone(); + if committed_block.as_ref().header().is_genesis() { + wsv.world_mut().trusted_peers_ids = committed_block.as_ref().commit_topology().clone(); } - wsv.apply_without_execution(&block).expect( - "Block application in init should not fail. \ - Blocks loaded from kura assumed to be valid", - ); + wsv.apply_without_execution(&committed_block) + .into_iter() + .for_each(|e| { + let _ = events_sender.send(e); + }); - Topology::recreate_topology(block.as_ref(), 0, wsv.peers().cloned().collect()) + Topology::recreate_topology(committed_block.as_ref(), 0, wsv.peers().cloned().collect()) } /// Start [`Sumeragi`] actor and return handle to it. @@ -296,16 +304,26 @@ impl SumeragiHandle { let block_iter_except_last = (&mut blocks_iter).take(block_count.saturating_sub(skip_block_count + 1)); for block in block_iter_except_last { - current_topology = - Self::replay_block(&common_config.chain_id, &block, &mut wsv, current_topology); + current_topology = Self::replay_block( + &common_config.chain_id, + &block, + &mut wsv, + &events_sender, + current_topology, + ); } // finalized_wsv is one block behind let finalized_wsv = wsv.clone(); if let Some(block) = blocks_iter.next() { - current_topology = - Self::replay_block(&common_config.chain_id, &block, &mut wsv, current_topology); + current_topology = Self::replay_block( + &common_config.chain_id, + &block, + &mut wsv, + &events_sender, + current_topology, + ); } info!("Sumeragi has finished loading blocks and setting up the WSV"); @@ -387,16 +405,21 @@ pub const PEERS_CONNECT_INTERVAL: Duration = Duration::from_secs(1); pub const TELEMETRY_INTERVAL: Duration = Duration::from_secs(5); /// Structure represents a block that is currently in discussion. -#[non_exhaustive] pub struct VotingBlock { + /// Valid Block + block: ValidBlock, /// At what time has this peer voted for this block pub voted_at: Instant, - /// Valid Block - pub block: ValidBlock, /// WSV after applying transactions to it pub new_wsv: WorldStateView, } +impl AsRef for VotingBlock { + fn as_ref(&self) -> &ValidBlock { + &self.block + } +} + impl VotingBlock { /// Construct new `VotingBlock` with current time. pub fn new(block: ValidBlock, new_wsv: WorldStateView) -> VotingBlock { @@ -413,8 +436,8 @@ impl VotingBlock { voted_at: Instant, ) -> VotingBlock { VotingBlock { - voted_at, block, + voted_at, new_wsv, } } diff --git a/core/src/wsv.rs b/core/src/wsv.rs index fbe6730f77f..425d6bceabd 100644 --- a/core/src/wsv.rs +++ b/core/src/wsv.rs @@ -12,7 +12,10 @@ use iroha_crypto::HashOf; use iroha_data_model::{ account::AccountId, block::SignedBlock, - events::trigger_completed::{TriggerCompletedEvent, TriggerCompletedOutcome}, + events::{ + pipeline::{BlockEvent, BlockStatus}, + trigger_completed::{TriggerCompletedEvent, TriggerCompletedOutcome}, + }, isi::error::{InstructionExecutionError as Error, MathError}, parameter::{Parameter, ParameterValueBox}, permission::PermissionTokenSchema, @@ -274,7 +277,7 @@ pub struct WorldStateView { pub transactions: IndexMap, u64>, /// Buffer containing events generated during `WorldStateView::apply`. Renewed on every block commit. #[serde(skip)] - pub events_buffer: Vec, + pub events_buffer: Vec, /// Engine for WASM [`Runtime`](wasm::Runtime) to execute triggers. #[serde(skip)] pub engine: wasmtime::Engine, @@ -510,7 +513,7 @@ impl WorldStateView { &mut self, id: &TriggerId, action: &dyn LoadedActionTrait, - event: Event, + event: EventBox, ) -> Result<()> { use triggers::set::LoadedExecutable::*; let authority = action.authority(); @@ -624,33 +627,31 @@ impl WorldStateView { deprecated(note = "This function is to be used in testing only. ") )] #[iroha_logger::log(skip_all, fields(block_height))] - pub fn apply(&mut self, block: &CommittedBlock) -> Result<()> { + pub fn apply(&mut self, block: &CommittedBlock) -> Result> { self.execute_transactions(block)?; debug!("All block transactions successfully executed"); - - self.apply_without_execution(block)?; - - Ok(()) + Ok(self.apply_without_execution(block)) } /// Apply transactions without actually executing them. /// It's assumed that block's transaction was already executed (as part of validation for example). + #[must_use] #[iroha_logger::log(skip_all, fields(block_height = block.as_ref().header().height()))] - pub fn apply_without_execution(&mut self, block: &CommittedBlock) -> Result<()> { + pub fn apply_without_execution(&mut self, block: &CommittedBlock) -> Vec { let block_hash = block.as_ref().hash(); trace!(%block_hash, "Applying block"); let time_event = self.create_time_event(block); - self.events_buffer.push(Event::Time(time_event)); + self.events_buffer.push(time_event.into()); - let block_height = block.as_ref().header().height(); + let block_height = block.as_ref().header().height; block .as_ref() .transactions() .map(|tx| &tx.value) .map(SignedTransaction::hash) .for_each(|tx_hash| { - self.transactions.insert(tx_hash, *block_height); + self.transactions.insert(tx_hash, block_height); }); self.world.triggers.handle_time_event(time_event); @@ -667,8 +668,14 @@ impl WorldStateView { self.block_hashes.push(block_hash); self.apply_parameters(); - - Ok(()) + self.events_buffer.push( + BlockEvent { + height: block_height, + status: BlockStatus::Applied, + } + .into(), + ); + core::mem::take(&mut self.events_buffer) } fn apply_parameters(&mut self) { @@ -1241,7 +1248,7 @@ impl WorldStateView { /// Usable when you can't call [`Self::emit_events()`] due to mutable reference to self. fn emit_events_impl, T: Into>( triggers: &mut TriggerSet, - events_buffer: &mut Vec, + events_buffer: &mut Vec, world_events: I, ) { let data_events: SmallVec<[DataEvent; 3]> = world_events @@ -1352,7 +1359,7 @@ mod tests { /// Used to inject faulty payload for testing fn payload_mut(block: &mut CommittedBlock) -> &mut BlockPayload { - let SignedBlock::V1(signed) = &mut block.0 .0; + let SignedBlock::V1(signed) = block.as_mut(); &mut signed.payload } @@ -1361,7 +1368,10 @@ mod tests { const BLOCK_CNT: usize = 10; let topology = Topology::new(UniqueVec::new()); - let mut block = ValidBlock::new_dummy().commit(&topology).unwrap(); + let mut block = ValidBlock::new_dummy() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); let kura = Kura::blank_kura_for_testing(); let query_handle = LiveQueryStore::test().start(); let mut wsv = WorldStateView::new(World::default(), kura, query_handle); @@ -1386,7 +1396,10 @@ mod tests { const BLOCK_CNT: usize = 10; let topology = Topology::new(UniqueVec::new()); - let block = ValidBlock::new_dummy().commit(&topology).unwrap(); + let block = ValidBlock::new_dummy() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); let kura = Kura::blank_kura_for_testing(); let query_handle = LiveQueryStore::test().start(); let mut wsv = WorldStateView::new(World::default(), kura.clone(), query_handle); diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 0b036eff0fe..a59fda86ff0 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -672,7 +672,7 @@ pub trait TestClient: Sized { fn test_with_account(api_url: &SocketAddr, keys: KeyPair, account_id: &AccountId) -> Self; /// Loop for events with filter and handler function - fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)); + fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)); /// Submit instruction with polling /// @@ -811,7 +811,7 @@ impl TestClient for Client { Client::new(config) } - fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) { + fn for_each_event(self, event_filter: impl Into, f: impl Fn(Result)) { for event_result in self .listen_for_events(event_filter) .expect("Failed to create event iterator.") diff --git a/data_model/src/account.rs b/data_model/src/account.rs index 2383bdc21ac..ce3f9582770 100644 --- a/data_model/src/account.rs +++ b/data_model/src/account.rs @@ -431,6 +431,8 @@ pub mod prelude { #[cfg(test)] mod tests { + #[cfg(not(feature = "std"))] + use alloc::{vec, vec::Vec}; use core::cmp::Ordering; use iroha_crypto::{KeyPair, PublicKey}; diff --git a/data_model/src/block.rs b/data_model/src/block.rs index 93ce5bec045..69ac0bae298 100644 --- a/data_model/src/block.rs +++ b/data_model/src/block.rs @@ -98,7 +98,7 @@ pub mod model { pub transactions: Vec, /// Event recommendations. #[getset(skip)] // NOTE: Unused ATM - pub event_recommendations: Vec, + pub event_recommendations: Vec, } /// Signed block diff --git a/data_model/src/events/data/filters.rs b/data_model/src/events/data/filters.rs index 77aed5d8558..ef81c425fb2 100644 --- a/data_model/src/events/data/filters.rs +++ b/data_model/src/events/data/filters.rs @@ -704,7 +704,6 @@ impl EventFilter for DataEventFilter { (DataEvent::Peer(event), Peer(filter)) => filter.matches(event), (DataEvent::Trigger(event), Trigger(filter)) => filter.matches(event), (DataEvent::Role(event), Role(filter)) => filter.matches(event), - (DataEvent::PermissionToken(_), PermissionTokenSchemaUpdate) => true, (DataEvent::Configuration(event), Configuration(filter)) => filter.matches(event), (DataEvent::Executor(event), Executor(filter)) => filter.matches(event), diff --git a/data_model/src/events/mod.rs b/data_model/src/events/mod.rs index daa9e544e26..50cbb4a8e50 100644 --- a/data_model/src/events/mod.rs +++ b/data_model/src/events/mod.rs @@ -7,9 +7,11 @@ use iroha_data_model_derive::model; use iroha_macro::FromVariant; use iroha_schema::IntoSchema; use parity_scale_codec::{Decode, Encode}; +use pipeline::{BlockEvent, TransactionEvent}; use serde::{Deserialize, Serialize}; pub use self::model::*; +use self::pipeline::{BlockEventFilter, TransactionEventFilter}; pub mod data; pub mod execute_trigger; @@ -37,9 +39,9 @@ pub mod model { IntoSchema, )] #[ffi_type] - pub enum Event { + pub enum EventBox { /// Pipeline event. - Pipeline(pipeline::PipelineEvent), + Pipeline(pipeline::PipelineEventBox), /// Data event. Data(data::DataEvent), /// Time event. @@ -85,7 +87,7 @@ pub mod model { #[ffi_type(opaque)] pub enum EventFilterBox { /// Listen to pipeline events with filter. - Pipeline(pipeline::PipelineEventFilter), + Pipeline(pipeline::PipelineEventFilterBox), /// Listen to data events with filter. Data(data::DataEventFilter), /// Listen to time events with filter. @@ -116,7 +118,7 @@ pub mod model { #[ffi_type(opaque)] pub enum TriggeringEventFilterBox { /// Listen to pipeline events with filter. - Pipeline(pipeline::PipelineEventFilter), + Pipeline(pipeline::PipelineEventFilterBox), /// Listen to data events with filter. Data(data::DataEventFilter), /// Listen to time events with filter. @@ -126,6 +128,62 @@ pub mod model { } } +impl From for EventBox { + fn from(source: TransactionEvent) -> Self { + Self::Pipeline(source.into()) + } +} + +impl From for EventBox { + fn from(source: BlockEvent) -> Self { + Self::Pipeline(source.into()) + } +} + +impl From for EventFilterBox { + fn from(source: TransactionEventFilter) -> Self { + Self::Pipeline(source.into()) + } +} + +impl From for EventFilterBox { + fn from(source: BlockEventFilter) -> Self { + Self::Pipeline(source.into()) + } +} + +impl TryFrom for TransactionEvent { + type Error = iroha_macro::error::ErrorTryFromEnum; + + fn try_from(event: EventBox) -> Result { + use iroha_macro::error::ErrorTryFromEnum; + + let EventBox::Pipeline(pipeline_event) = event else { + return Err(ErrorTryFromEnum::default()); + }; + + pipeline_event + .try_into() + .map_err(|_| ErrorTryFromEnum::default()) + } +} + +impl TryFrom for BlockEvent { + type Error = iroha_macro::error::ErrorTryFromEnum; + + fn try_from(event: EventBox) -> Result { + use iroha_macro::error::ErrorTryFromEnum; + + let EventBox::Pipeline(pipeline_event) = event else { + return Err(ErrorTryFromEnum::default()); + }; + + pipeline_event + .try_into() + .map_err(|_| ErrorTryFromEnum::default()) + } +} + /// Trait for filters #[cfg(feature = "transparent_api")] pub trait EventFilter { @@ -156,25 +214,27 @@ pub trait EventFilter { #[cfg(feature = "transparent_api")] impl EventFilter for EventFilterBox { - type Event = Event; + type Event = EventBox; /// Apply filter to event. - fn matches(&self, event: &Event) -> bool { + fn matches(&self, event: &EventBox) -> bool { match (event, self) { - (Event::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), - (Event::Data(event), Self::Data(filter)) => filter.matches(event), - (Event::Time(event), Self::Time(filter)) => filter.matches(event), - (Event::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => filter.matches(event), - (Event::TriggerCompleted(event), Self::TriggerCompleted(filter)) => { + (EventBox::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), + (EventBox::Data(event), Self::Data(filter)) => filter.matches(event), + (EventBox::Time(event), Self::Time(filter)) => filter.matches(event), + (EventBox::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => { + filter.matches(event) + } + (EventBox::TriggerCompleted(event), Self::TriggerCompleted(filter)) => { filter.matches(event) } // Fail to compile in case when new variant to event or filter is added ( - Event::Pipeline(_) - | Event::Data(_) - | Event::Time(_) - | Event::ExecuteTrigger(_) - | Event::TriggerCompleted(_), + EventBox::Pipeline(_) + | EventBox::Data(_) + | EventBox::Time(_) + | EventBox::ExecuteTrigger(_) + | EventBox::TriggerCompleted(_), Self::Pipeline(_) | Self::Data(_) | Self::Time(_) @@ -187,22 +247,24 @@ impl EventFilter for EventFilterBox { #[cfg(feature = "transparent_api")] impl EventFilter for TriggeringEventFilterBox { - type Event = Event; + type Event = EventBox; /// Apply filter to event. - fn matches(&self, event: &Event) -> bool { + fn matches(&self, event: &EventBox) -> bool { match (event, self) { - (Event::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), - (Event::Data(event), Self::Data(filter)) => filter.matches(event), - (Event::Time(event), Self::Time(filter)) => filter.matches(event), - (Event::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => filter.matches(event), + (EventBox::Pipeline(event), Self::Pipeline(filter)) => filter.matches(event), + (EventBox::Data(event), Self::Data(filter)) => filter.matches(event), + (EventBox::Time(event), Self::Time(filter)) => filter.matches(event), + (EventBox::ExecuteTrigger(event), Self::ExecuteTrigger(filter)) => { + filter.matches(event) + } // Fail to compile in case when new variant to event or filter is added ( - Event::Pipeline(_) - | Event::Data(_) - | Event::Time(_) - | Event::ExecuteTrigger(_) - | Event::TriggerCompleted(_), + EventBox::Pipeline(_) + | EventBox::Data(_) + | EventBox::Time(_) + | EventBox::ExecuteTrigger(_) + | EventBox::TriggerCompleted(_), Self::Pipeline(_) | Self::Data(_) | Self::Time(_) | Self::ExecuteTrigger(_), ) => false, } @@ -228,7 +290,7 @@ pub mod stream { /// Event sent by the peer. #[derive(Debug, Clone, Decode, Encode, IntoSchema)] #[repr(transparent)] - pub struct EventMessage(pub Event); + pub struct EventMessage(pub EventBox); /// Message sent by the stream consumer. /// Request sent by the client to subscribe to events. @@ -237,7 +299,7 @@ pub mod stream { pub struct EventSubscriptionRequest(pub EventFilterBox); } - impl From for Event { + impl From for EventBox { fn from(source: EventMessage) -> Self { source.0 } @@ -252,7 +314,7 @@ pub mod prelude { pub use super::EventFilter; pub use super::{ data::prelude::*, execute_trigger::prelude::*, pipeline::prelude::*, time::prelude::*, - trigger_completed::prelude::*, Event, EventFilterBox, TriggeringEventFilterBox, + trigger_completed::prelude::*, EventBox, EventFilterBox, TriggeringEventFilterBox, TriggeringEventType, }; } diff --git a/data_model/src/events/pipeline.rs b/data_model/src/events/pipeline.rs index 5d3b962144f..69bd880c00f 100644 --- a/data_model/src/events/pipeline.rs +++ b/data_model/src/events/pipeline.rs @@ -1,59 +1,52 @@ //! Pipeline events. #[cfg(not(feature = "std"))] -use alloc::{format, string::String, vec::Vec}; +use alloc::{boxed::Box, format, string::String, vec::Vec}; -use getset::Getters; -use iroha_crypto::Hash; +use iroha_crypto::HashOf; use iroha_data_model_derive::model; use iroha_macro::FromVariant; use iroha_schema::IntoSchema; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; -use strum::EnumDiscriminants; pub use self::model::*; +use crate::transaction::SignedTransaction; #[model] pub mod model { + use getset::Getters; + use super::*; - /// [`Event`] filter. #[derive( Debug, Clone, - Copy, PartialEq, Eq, PartialOrd, Ord, - Default, - Getters, + FromVariant, Decode, Encode, - Serialize, Deserialize, + Serialize, IntoSchema, )] - pub struct PipelineEventFilter { - /// If `Some::`, filter by the [`EntityKind`]. If `None`, accept all the [`EntityKind`]. - pub(super) entity_kind: Option, - /// If `Some::`, filter by the [`StatusKind`]. If `None`, accept all the [`StatusKind`]. - pub(super) status_kind: Option, - /// If `Some::`, filter by the [`struct@Hash`]. If `None`, accept all the [`struct@Hash`]. - // TODO: Can we make hash typed like HashOf? - pub(super) hash: Option, + #[ffi_type(opaque)] + pub enum PipelineEventBox { + Transaction(TransactionEvent), + Block(BlockEvent), } - /// The kind of the pipeline entity. #[derive( Debug, Clone, - Copy, PartialEq, Eq, PartialOrd, Ord, + Getters, Decode, Encode, Deserialize, @@ -61,15 +54,12 @@ pub mod model { IntoSchema, )] #[ffi_type] - #[repr(u8)] - pub enum PipelineEntityKind { - /// Block - Block, - /// Transaction - Transaction, + #[getset(get = "pub")] + pub struct BlockEvent { + pub height: u64, + pub status: BlockStatus, } - /// Strongly-typed [`Event`] that tells the receiver the kind and the hash of the changed entity as well as its [`Status`]. #[derive( Debug, Clone, @@ -84,18 +74,15 @@ pub mod model { Serialize, IntoSchema, )] - #[getset(get = "pub")] #[ffi_type] - pub struct PipelineEvent { - /// [`EntityKind`] of the entity that caused this [`Event`]. - pub entity_kind: PipelineEntityKind, - /// [`Status`] of the entity that caused this [`Event`]. - pub status: PipelineStatus, - /// [`struct@Hash`] of the entity that caused this [`Event`]. - pub hash: Hash, + #[getset(get = "pub")] + pub struct TransactionEvent { + pub hash: HashOf, + pub block_height: Option, + pub status: TransactionStatus, } - /// [`Status`] of the entity. + /// Report of block's status in the pipeline #[derive( Debug, Clone, @@ -103,129 +90,218 @@ pub mod model { Eq, PartialOrd, Ord, - FromVariant, - EnumDiscriminants, Decode, Encode, + Deserialize, Serialize, + IntoSchema, + )] + #[ffi_type(opaque)] + pub enum BlockStatus { + /// Block was approved to participate in consensus + Approved, + /// Block was rejected by consensus + Rejected(crate::block::error::BlockRejectionReason), + /// Block has passed consensus successfully + Committed, + /// Changes have been reflected in the WSV + Applied, + } + + /// Report of transaction's status in the pipeline + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Decode, + Encode, Deserialize, + Serialize, IntoSchema, )] - #[strum_discriminants( - name(PipelineStatusKind), - derive(PartialOrd, Ord, Decode, Encode, Deserialize, Serialize, IntoSchema,) + #[ffi_type(opaque)] + pub enum TransactionStatus { + /// Transaction was received and enqueued + Queued, + /// Transaction was dropped(not stored in a block) + Expired, + /// Transaction was stored in the block as valid + Approved, + /// Transaction was stored in the block as invalid + Rejected(Box), + } + + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + FromVariant, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, )] #[ffi_type] - pub enum PipelineStatus { - /// Entity has been seen in the blockchain but has not passed validation. - Validating, - /// Entity was rejected during validation. - Rejected(PipelineRejectionReason), - /// Entity has passed validation. - Committed, + pub enum PipelineEventFilterBox { + Transaction(TransactionEventFilter), + Block(BlockEventFilter), } - /// The reason for rejecting pipeline entity such as transaction or block. #[derive( Debug, - displaydoc::Display, Clone, PartialEq, Eq, PartialOrd, Ord, - FromVariant, + Default, + Getters, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, + )] + #[ffi_type] + #[getset(get = "pub")] + pub struct BlockEventFilter { + pub height: Option, + pub status: Option, + } + + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Default, + Getters, Decode, Encode, Deserialize, Serialize, IntoSchema, )] - #[cfg_attr(feature = "std", derive(thiserror::Error))] #[ffi_type] - pub enum PipelineRejectionReason { - /// Block was rejected - Block(#[cfg_attr(feature = "std", source)] crate::block::error::BlockRejectionReason), - /// Transaction was rejected - Transaction( - #[cfg_attr(feature = "std", source)] - crate::transaction::error::TransactionRejectionReason, - ), + #[getset(get = "pub")] + pub struct TransactionEventFilter { + pub hash: Option>, + #[getset(skip)] + pub block_height: Option>, + pub status: Option, } } -impl PipelineEventFilter { - /// Creates a new [`PipelineEventFilter`] accepting all [`PipelineEvent`]s +impl BlockEventFilter { + /// Match only block with the given height #[must_use] - #[inline] - pub const fn new() -> Self { - Self { - status_kind: None, - entity_kind: None, - hash: None, - } + pub fn for_height(mut self, height: u64) -> Self { + self.height = Some(height); + self } - /// Modifies a [`PipelineEventFilter`] to accept only [`PipelineEvent`]s originating from a specific entity kind (block/transaction). + /// Match only block with the given status #[must_use] - #[inline] - pub const fn for_entity(mut self, entity_kind: PipelineEntityKind) -> Self { - self.entity_kind = Some(entity_kind); + pub fn for_status(mut self, status: BlockStatus) -> Self { + self.status = Some(status); self } +} - /// Modifies a [`PipelineEventFilter`] to accept only [`PipelineEvent`]s with a specific status. +impl TransactionEventFilter { + /// Get height of the block filter is set to track + pub fn block_height(&self) -> Option> { + self.block_height + } + + /// Match only transactions with the given block height #[must_use] - #[inline] - pub const fn for_status(mut self, status_kind: PipelineStatusKind) -> Self { - self.status_kind = Some(status_kind); + pub fn for_block_height(mut self, block_height: Option) -> Self { + self.block_height = Some(block_height); self } - /// Modifies a [`PipelineEventFilter`] to accept only [`PipelineEvent`]s originating from an entity with specified hash. + /// Match only transactions with the given hash #[must_use] - #[inline] - pub const fn for_hash(mut self, hash: Hash) -> Self { + pub fn for_hash(mut self, hash: HashOf) -> Self { self.hash = Some(hash); self } - #[inline] - #[cfg(feature = "transparent_api")] + /// Match only transactions with the given status + #[must_use] + pub fn for_status(mut self, status: TransactionStatus) -> Self { + self.status = Some(status); + self + } +} + +#[cfg(feature = "transparent_api")] +impl TransactionEventFilter { fn field_matches(filter: Option<&T>, event: &T) -> bool { filter.map_or(true, |field| field == event) } } #[cfg(feature = "transparent_api")] -impl super::EventFilter for PipelineEventFilter { - type Event = PipelineEvent; - - /// Check if `self` accepts the `event`. - #[inline] - fn matches(&self, event: &PipelineEvent) -> bool { - [ - Self::field_matches(self.entity_kind.as_ref(), &event.entity_kind), - Self::field_matches(self.status_kind.as_ref(), &event.status.kind()), - Self::field_matches(self.hash.as_ref(), &event.hash), - ] - .into_iter() - .all(core::convert::identity) +impl BlockEventFilter { + fn field_matches(filter: Option<&T>, event: &T) -> bool { + filter.map_or(true, |field| field == event) } } #[cfg(feature = "transparent_api")] -impl PipelineStatus { - fn kind(&self) -> PipelineStatusKind { - PipelineStatusKind::from(self) +impl super::EventFilter for PipelineEventFilterBox { + type Event = PipelineEventBox; + + /// Check if `self` accepts the `event`. + #[inline] + fn matches(&self, event: &PipelineEventBox) -> bool { + match (self, event) { + (Self::Block(block_filter), PipelineEventBox::Block(block_event)) => [ + BlockEventFilter::field_matches(block_filter.height.as_ref(), &block_event.height), + BlockEventFilter::field_matches(block_filter.status.as_ref(), &block_event.status), + ] + .into_iter() + .all(core::convert::identity), + ( + Self::Transaction(transaction_filter), + PipelineEventBox::Transaction(transaction_event), + ) => [ + TransactionEventFilter::field_matches( + transaction_filter.hash.as_ref(), + &transaction_event.hash, + ), + TransactionEventFilter::field_matches( + transaction_filter.block_height.as_ref(), + &transaction_event.block_height, + ), + TransactionEventFilter::field_matches( + transaction_filter.status.as_ref(), + &transaction_event.status, + ), + ] + .into_iter() + .all(core::convert::identity), + _ => false, + } } } /// Exports common structs and enums from this module. pub mod prelude { pub use super::{ - PipelineEntityKind, PipelineEvent, PipelineEventFilter, PipelineRejectionReason, - PipelineStatus, PipelineStatusKind, + BlockEvent, BlockStatus, PipelineEventBox, PipelineEventFilterBox, TransactionEvent, + TransactionStatus, }; } @@ -235,94 +311,108 @@ mod tests { #[cfg(not(feature = "std"))] use alloc::{string::ToString as _, vec, vec::Vec}; - use super::{super::EventFilter, PipelineRejectionReason::*, *}; + use iroha_crypto::Hash; + + use super::{super::EventFilter, *}; use crate::{transaction::error::TransactionRejectionReason::*, ValidationFail}; #[test] fn events_are_correctly_filtered() { let events = vec![ - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Validating, - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Rejected(Transaction(Validation( + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Queued, + } + .into(), + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: Some(3), + status: TransactionStatus::Rejected(Box::new(Validation( ValidationFail::TooComplex, ))), - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }, + } + .into(), + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([2_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Approved, + } + .into(), + BlockEvent { + height: 7, + status: BlockStatus::Committed, + } + .into(), ]; + assert_eq!( + events + .iter() + .filter(|&event| { + let filter: PipelineEventFilterBox = TransactionEventFilter::default() + .for_hash(HashOf::from_untyped_unchecked(Hash::prehashed( + [0_u8; Hash::LENGTH], + ))) + .into(); + + filter.matches(event) + }) + .cloned() + .collect::>(), vec![ - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Validating, - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, - PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Rejected(Transaction(Validation( + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Queued, + } + .into(), + TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: Some(3), + status: TransactionStatus::Rejected(Box::new(Validation( ValidationFail::TooComplex, ))), - hash: Hash::prehashed([0_u8; Hash::LENGTH]), - }, + } + .into(), ], - events - .iter() - .filter(|&event| PipelineEventFilter::new() - .for_hash(Hash::prehashed([0_u8; Hash::LENGTH])) - .matches(event)) - .cloned() - .collect::>() ); + assert_eq!( - vec![PipelineEvent { - entity_kind: PipelineEntityKind::Block, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }], events .iter() - .filter(|&event| PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Block) - .matches(event)) + .filter(|&event| { + let filter: PipelineEventFilterBox = BlockEventFilter::default().into(); + filter.matches(event) + }) .cloned() - .collect::>() + .collect::>(), + vec![BlockEvent { + status: BlockStatus::Committed, + height: 2 + } + .into()], ); assert_eq!( - vec![PipelineEvent { - entity_kind: PipelineEntityKind::Transaction, - status: PipelineStatus::Committed, - hash: Hash::prehashed([2_u8; Hash::LENGTH]), - }], events .iter() - .filter(|&event| PipelineEventFilter::new() - .for_entity(PipelineEntityKind::Transaction) - .for_hash(Hash::prehashed([2_u8; Hash::LENGTH])) - .matches(event)) + .filter(|&event| { + let filter: PipelineEventFilterBox = TransactionEventFilter::default() + .for_hash(HashOf::from_untyped_unchecked(Hash::prehashed( + [2_u8; Hash::LENGTH], + ))) + .into(); + + filter.matches(event) + }) .cloned() - .collect::>() + .collect::>(), + vec![TransactionEvent { + hash: HashOf::from_untyped_unchecked(Hash::prehashed([0_u8; Hash::LENGTH])), + block_height: None, + status: TransactionStatus::Approved, + } + .into()], ); - assert_eq!( - events, - events - .iter() - .filter(|&event| PipelineEventFilter::new().matches(event)) - .cloned() - .collect::>() - ) } } diff --git a/data_model/src/smart_contract.rs b/data_model/src/smart_contract.rs index 379da0585d9..5d51c2f89d3 100644 --- a/data_model/src/smart_contract.rs +++ b/data_model/src/smart_contract.rs @@ -20,7 +20,7 @@ pub mod payloads { /// Trigger owner who registered the trigger pub owner: AccountId, /// Event which triggered the execution - pub event: Event, + pub event: EventBox, } /// Payload for migrate entrypoint diff --git a/data_model/src/transaction.rs b/data_model/src/transaction.rs index fbfded831ab..dea247777dd 100644 --- a/data_model/src/transaction.rs +++ b/data_model/src/transaction.rs @@ -565,8 +565,6 @@ pub mod error { InstructionExecution(#[cfg_attr(feature = "std", source)] InstructionExecutionFail), /// Failure in WebAssembly execution WasmExecution(#[cfg_attr(feature = "std", source)] WasmExecutionFail), - /// Transaction rejected due to being expired - Expired, } } @@ -745,6 +743,9 @@ pub mod prelude { #[cfg(test)] mod tests { + #[cfg(not(feature = "std"))] + use alloc::vec; + use super::*; #[test] diff --git a/data_model/src/trigger.rs b/data_model/src/trigger.rs index df53f68707b..400122744ae 100644 --- a/data_model/src/trigger.rs +++ b/data_model/src/trigger.rs @@ -118,7 +118,7 @@ macro_rules! impl_try_from_box { impl_try_from_box! { Data => DataEventFilter, - Pipeline => PipelineEventFilter, + Pipeline => PipelineEventFilterBox, Time => TimeEventFilter, ExecuteTrigger => ExecuteTriggerEventFilter, } @@ -338,7 +338,7 @@ mod tests { .map(|_| ()) .unwrap(), TriggeringEventFilterBox::Pipeline(_) => { - Trigger::::try_from(boxed) + Trigger::::try_from(boxed) .map(|_| ()) .unwrap() } diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index 04e5ed6c954..87facb002ed 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -480,6 +480,30 @@ } ] }, + "BlockEvent": { + "Struct": [ + { + "name": "height", + "type": "u64" + }, + { + "name": "status", + "type": "BlockStatus" + } + ] + }, + "BlockEventFilter": { + "Struct": [ + { + "name": "height", + "type": "Option" + }, + { + "name": "status", + "type": "Option" + } + ] + }, "BlockHeader": { "Struct": [ { @@ -525,7 +549,7 @@ }, { "name": "event_recommendations", - "type": "Vec" + "type": "Vec" } ] }, @@ -537,6 +561,27 @@ } ] }, + "BlockStatus": { + "Enum": [ + { + "tag": "Approved", + "discriminant": 0 + }, + { + "tag": "Rejected", + "discriminant": 1, + "type": "BlockRejectionReason" + }, + { + "tag": "Committed", + "discriminant": 2 + }, + { + "tag": "Applied", + "discriminant": 3 + } + ] + }, "BlockSubscriptionRequest": "NonZero", "Burn": { "Struct": [ @@ -857,12 +902,12 @@ "u32" ] }, - "Event": { + "EventBox": { "Enum": [ { "tag": "Pipeline", "discriminant": 0, - "type": "PipelineEvent" + "type": "PipelineEventBox" }, { "tag": "Data", @@ -891,7 +936,7 @@ { "tag": "Pipeline", "discriminant": 0, - "type": "PipelineEventFilter" + "type": "PipelineEventFilterBox" }, { "tag": "Data", @@ -915,7 +960,7 @@ } ] }, - "EventMessage": "Event", + "EventMessage": "EventBox", "EventSubscriptionRequest": "EventFilterBox", "Executable": { "Enum": [ @@ -2243,21 +2288,24 @@ "Option": { "Option": "AssetId" }, + "Option": { + "Option": "BlockStatus" + }, "Option": { "Option": "DomainId" }, "Option": { "Option": "Duration" }, - "Option": { - "Option": "Hash" - }, "Option>>": { "Option": "HashOf>" }, "Option>": { "Option": "HashOf" }, + "Option>": { + "Option": "HashOf" + }, "Option": { "Option": "IpfsPath" }, @@ -2267,18 +2315,15 @@ "Option>": { "Option": "NonZero" }, + "Option>": { + "Option": "Option" + }, "Option": { "Option": "ParameterId" }, "Option": { "Option": "PeerId" }, - "Option": { - "Option": "PipelineEntityKind" - }, - "Option": { - "Option": "PipelineStatusKind" - }, "Option": { "Option": "RoleId" }, @@ -2291,6 +2336,9 @@ "Option": { "Option": "TransactionRejectionReason" }, + "Option": { + "Option": "TransactionStatus" + }, "Option": { "Option": "TriggerCompletedOutcomeType" }, @@ -2300,6 +2348,9 @@ "Option": { "Option": "u32" }, + "Option": { + "Option": "u64" + }, "Parameter": { "Struct": [ { @@ -2427,94 +2478,31 @@ } ] }, - "PipelineEntityKind": { + "PipelineEventBox": { "Enum": [ - { - "tag": "Block", - "discriminant": 0 - }, { "tag": "Transaction", - "discriminant": 1 - } - ] - }, - "PipelineEvent": { - "Struct": [ - { - "name": "entity_kind", - "type": "PipelineEntityKind" - }, - { - "name": "status", - "type": "PipelineStatus" - }, - { - "name": "hash", - "type": "Hash" - } - ] - }, - "PipelineEventFilter": { - "Struct": [ - { - "name": "entity_kind", - "type": "Option" - }, - { - "name": "status_kind", - "type": "Option" - }, - { - "name": "hash", - "type": "Option" - } - ] - }, - "PipelineRejectionReason": { - "Enum": [ - { - "tag": "Block", "discriminant": 0, - "type": "BlockRejectionReason" + "type": "TransactionEvent" }, { - "tag": "Transaction", + "tag": "Block", "discriminant": 1, - "type": "TransactionRejectionReason" + "type": "BlockEvent" } ] }, - "PipelineStatus": { + "PipelineEventFilterBox": { "Enum": [ { - "tag": "Validating", - "discriminant": 0 + "tag": "Transaction", + "discriminant": 0, + "type": "TransactionEventFilter" }, { - "tag": "Rejected", + "tag": "Block", "discriminant": 1, - "type": "PipelineRejectionReason" - }, - { - "tag": "Committed", - "discriminant": 2 - } - ] - }, - "PipelineStatusKind": { - "Enum": [ - { - "tag": "Validating", - "discriminant": 0 - }, - { - "tag": "Rejected", - "discriminant": 1 - }, - { - "tag": "Committed", - "discriminant": 2 + "type": "BlockEventFilter" } ] }, @@ -3600,6 +3588,38 @@ } ] }, + "TransactionEvent": { + "Struct": [ + { + "name": "hash", + "type": "HashOf" + }, + { + "name": "block_height", + "type": "Option" + }, + { + "name": "status", + "type": "TransactionStatus" + } + ] + }, + "TransactionEventFilter": { + "Struct": [ + { + "name": "hash", + "type": "Option>" + }, + { + "name": "block_height", + "type": "Option>" + }, + { + "name": "status", + "type": "Option" + } + ] + }, "TransactionLimitError": { "Struct": [ { @@ -3690,10 +3710,27 @@ "tag": "WasmExecution", "discriminant": 4, "type": "WasmExecutionFail" + } + ] + }, + "TransactionStatus": { + "Enum": [ + { + "tag": "Queued", + "discriminant": 0 }, { "tag": "Expired", - "discriminant": 5 + "discriminant": 1 + }, + { + "tag": "Approved", + "discriminant": 2 + }, + { + "tag": "Rejected", + "discriminant": 3, + "type": "TransactionRejectionReason" } ] }, @@ -3919,7 +3956,7 @@ { "tag": "Pipeline", "discriminant": 0, - "type": "PipelineEventFilter" + "type": "PipelineEventFilterBox" }, { "tag": "Data", @@ -4087,8 +4124,8 @@ } ] }, - "Vec": { - "Vec": "Event" + "Vec": { + "Vec": "EventBox" }, "Vec>": { "Vec": "GenericPredicateBox" diff --git a/schema/gen/src/lib.rs b/schema/gen/src/lib.rs index 259120b048d..238d737cde8 100644 --- a/schema/gen/src/lib.rs +++ b/schema/gen/src/lib.rs @@ -100,13 +100,17 @@ types!( BTreeSet>, BatchedResponse, BatchedResponseV1, + BlockEvent, + BlockEventFilter, BlockHeader, BlockMessage, BlockPayload, BlockRejectionReason, + BlockStatus, BlockSubscriptionRequest, Box>, Box, + Box, Burn>, Burn, Burn, @@ -128,7 +132,7 @@ types!( DomainId, DomainOwnerChanged, Duration, - Event, + EventBox, EventMessage, EventSubscriptionRequest, Executable, @@ -236,25 +240,27 @@ types!( Numeric, NumericSpec, Option, + Option, Option, Option, Option, + Option, Option, Option, - Option, Option>>, Option>, + Option>, Option, Option, Option, + Option>, Option, Option, - Option, - Option, Option, Option, Option, Option, + Option, Option, Option, Parameter, @@ -269,12 +275,8 @@ types!( PermissionToken, PermissionTokenSchema, PermissionTokenSchemaUpdateEvent, - PipelineEntityKind, - PipelineEvent, - PipelineEventFilter, - PipelineRejectionReason, - PipelineStatus, - PipelineStatusKind, + PipelineEventBox, + PipelineEventFilterBox, PredicateBox, PublicKey, QueryBox, @@ -342,12 +344,15 @@ types!( TimeEventFilter, TimeInterval, TimeSchedule, + TransactionEvent, + TransactionEventFilter, TransactionLimitError, TransactionLimits, TransactionPayload, TransactionQueryOutput, TransactionRejectionReason, TransactionValue, + TransactionStatus, Transfer, Transfer, Transfer, @@ -376,7 +381,7 @@ types!( UnregisterBox, Upgrade, ValidationFail, - Vec, + Vec, Vec, Vec, Vec, @@ -416,6 +421,7 @@ mod tests { BlockHeader, BlockPayload, SignedBlock, SignedBlockV1, }, domain::NewDomain, + events::pipeline::{BlockEventFilter, TransactionEventFilter}, executor::Executor, ipfs::IpfsPath, isi::{ diff --git a/tools/parity_scale_decoder/src/main.rs b/tools/parity_scale_decoder/src/main.rs index 86f3139d391..b958b3e6196 100644 --- a/tools/parity_scale_decoder/src/main.rs +++ b/tools/parity_scale_decoder/src/main.rs @@ -21,6 +21,7 @@ use iroha_data_model::{ BlockHeader, BlockPayload, SignedBlock, SignedBlockV1, }, domain::NewDomain, + events::pipeline::{BlockEventFilter, TransactionEventFilter}, executor::Executor, ipfs::IpfsPath, isi::{ diff --git a/torii/src/event.rs b/torii/src/event.rs index 873f81d91ec..226ca8cd4a1 100644 --- a/torii/src/event.rs +++ b/torii/src/event.rs @@ -63,7 +63,7 @@ impl Consumer { /// # Errors /// Can fail due to timeout or sending event. Also receiving might fail #[iroha_futures::telemetry_future] - pub async fn consume(&mut self, event: Event) -> Result<()> { + pub async fn consume(&mut self, event: EventBox) -> Result<()> { if !self.filter.matches(&event) { return Ok(()); }