diff --git a/client/tests/integration/events/pipeline.rs b/client/tests/integration/events/pipeline.rs index 93aaf544220..4d24ec3efca 100644 --- a/client/tests/integration/events/pipeline.rs +++ b/client/tests/integration/events/pipeline.rs @@ -1,12 +1,8 @@ #![allow(clippy::restriction)] -use std::{ - sync::{Arc, RwLock}, - thread, -}; +use std::thread::{self, JoinHandle}; use eyre::Result; -use iroha_client::client::Client; use iroha_data_model::prelude::*; use test_network::*; @@ -15,98 +11,68 @@ use super::Configuration; const PEER_COUNT: usize = 7; #[test] -fn transaction_event_should_be_sent_to_all_peers_from_all_peers() -> Result<()> { - test_with_instruction_and_status(None, true)?; - let fail = FailBox::new("Failing transaction to test Rejected event."); - test_with_instruction_and_status(Some(fail.into()), false) +fn transaction_with_no_instructions_should_be_committed() -> Result<()> { + test_with_instruction_and_status(None, PipelineStatusKind::Committed) +} + +#[test] +fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> { + let fail = FailBox::new("Should be rejected"); + test_with_instruction_and_status(Some(fail.into()), PipelineStatusKind::Rejected) } #[allow(clippy::needless_range_loop, clippy::needless_pass_by_value)] fn test_with_instruction_and_status( instruction: Option, - should_be_committed: bool, + should_be: PipelineStatusKind, ) -> Result<()> { - let (_rt, network, _) = ::start_test_with_runtime(PEER_COUNT.try_into().unwrap(), 1); - wait_for_genesis_committed(&network.clients(), 0); + let (_rt, network, genesis_client) = + ::start_test_with_runtime(PEER_COUNT.try_into().unwrap(), 1); + let clients = network.clients(); + wait_for_genesis_committed(&clients, 0); + let pipeline_time = Configuration::pipeline_time(); + // Given + let submitter = genesis_client; + let transaction = submitter.build_transaction(instruction.into(), UnlimitedMetadata::new())?; + let hash = transaction.hash(); + let mut handles = Vec::new(); + for listener in clients { + let checker = Checker { listener, hash }; + let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating); + handles.push(handle_validating); + let handle_validated = checker.spawn(should_be); + handles.push(handle_validated); + } + // When + submitter.submit_transaction(transaction)?; + thread::sleep(pipeline_time * 2); + // Then + for handle in handles { + handle.join().expect("Thread panicked") + } + Ok(()) +} - for submitting_peer in 0..PEER_COUNT { - let pipeline_time = Configuration::pipeline_time(); +#[derive(Clone)] +struct Checker { + listener: iroha_client::client::Client, + hash: iroha_crypto::HashOf, +} - // Given - let committed_event_received = Arc::new(RwLock::new([false; PEER_COUNT])); - let validating_event_received = Arc::new(RwLock::new([false; PEER_COUNT])); - let rejected_event_received = Arc::new(RwLock::new([false; PEER_COUNT])); - let peers: Vec<_> = network.peers().collect(); - let submitter_client = Client::test( - &peers[submitting_peer].api_address, - &peers[submitting_peer].telemetry_address, - ); - let instructions: Vec = instruction.clone().into_iter().collect(); - let transaction = - submitter_client.build_transaction(instructions.into(), UnlimitedMetadata::new())?; - for receiving_peer in 0..PEER_COUNT { - let committed_event_received_clone = committed_event_received.clone(); - let validating_event_received_clone = validating_event_received.clone(); - let rejected_event_received_clone = rejected_event_received.clone(); - let listener_client = Client::test( - &peers[receiving_peer].api_address, - &peers[receiving_peer].telemetry_address, - ); - let hash = transaction.hash(); - let _handle = thread::spawn(move || { - listener_client.for_each_event( - EventFilter::Pipeline( - PipelineEventFilter::new().entity_kind(PipelineEntityKind::Transaction), - ), - |event| { - if let Ok(Event::Pipeline(event)) = event { - if event.entity_kind == PipelineEntityKind::Transaction - && event.hash == *hash - { - match event.status { - PipelineStatus::Committed => { - committed_event_received_clone - .write() - .expect("Failed to acquire lock.")[receiving_peer] = - true; - } - PipelineStatus::Validating => { - validating_event_received_clone - .write() - .expect("Failed to acquire lock.")[receiving_peer] = - true; - } - PipelineStatus::Rejected(_) => { - rejected_event_received_clone - .write() - .expect("Failed to acquire lock.")[receiving_peer] = - true; - } - } - } - } - }, - ); - }); - } - thread::sleep(pipeline_time * 2); - //When - submitter_client.submit_transaction(transaction)?; - thread::sleep(pipeline_time * 2); - //Then - let committed = committed_event_received.read().unwrap(); - let validating = validating_event_received.read().unwrap(); - let rejected = rejected_event_received.read().unwrap(); - for receiving_peer in 0..PEER_COUNT { - assert!(validating[receiving_peer]); - if should_be_committed { - assert!(committed[receiving_peer]); - assert!(!rejected[receiving_peer]); - } else { - assert!(!committed[receiving_peer]); - assert!(rejected[receiving_peer]); - } - } +impl Checker { + fn spawn(mut self, status_kind: PipelineStatusKind) -> JoinHandle<()> { + thread::spawn(move || { + let mut event_iterator = self + .listener + .listen_for_events(EventFilter::Pipeline( + PipelineEventFilter::new() + .entity_kind(PipelineEntityKind::Transaction) + .status_kind(status_kind) + .hash(*self.hash), + )) + .expect("Failed to create event iterator."); + let event_result = event_iterator.next().expect("Stream closed"); + let _event = event_result.expect("Must be valid"); + }) } - Ok(()) } diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 4af7404ad2c..2a8738b8327 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -833,11 +833,11 @@ impl TestClient for Client { } fn for_each_event(mut self, event_filter: EventFilter, f: impl Fn(Result)) { - for event in self + for event_result in self .listen_for_events(event_filter) .expect("Failed to create event iterator.") { - f(event) + f(event_result) } }