Skip to content

Commit

Permalink
[refactor] hyperledger-iroha#2109: Make `integration::events::pipelin…
Browse files Browse the repository at this point in the history
…e` test stable (hyperledger-iroha#2110)
  • Loading branch information
s8sato authored and mversic committed May 2, 2022
1 parent b5a552a commit f806d51
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 92 deletions.
146 changes: 56 additions & 90 deletions client/tests/integration/events/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
#![allow(clippy::restriction)]

use std::{
sync::{Arc, RwLock},
thread,
};
use std::thread::{self, JoinHandle};

use eyre::Result;
use iroha_client::client::Client;
use iroha_data_model::prelude::*;
use test_network::*;

Expand All @@ -15,98 +11,68 @@ use super::Configuration;
const PEER_COUNT: usize = 7;

#[test]
fn transaction_event_should_be_sent_to_all_peers_from_all_peers() -> Result<()> {
test_with_instruction_and_status(None, true)?;
let fail = FailBox::new("Failing transaction to test Rejected event.");
test_with_instruction_and_status(Some(fail.into()), false)
fn transaction_with_no_instructions_should_be_committed() -> Result<()> {
test_with_instruction_and_status(None, PipelineStatusKind::Committed)
}

#[test]
fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> {
let fail = FailBox::new("Should be rejected");
test_with_instruction_and_status(Some(fail.into()), PipelineStatusKind::Rejected)
}

#[allow(clippy::needless_range_loop, clippy::needless_pass_by_value)]
fn test_with_instruction_and_status(
instruction: Option<Instruction>,
should_be_committed: bool,
should_be: PipelineStatusKind,
) -> Result<()> {
let (_rt, network, _) = <Network>::start_test_with_runtime(PEER_COUNT.try_into().unwrap(), 1);
wait_for_genesis_committed(&network.clients(), 0);
let (_rt, network, genesis_client) =
<Network>::start_test_with_runtime(PEER_COUNT.try_into().unwrap(), 1);
let clients = network.clients();
wait_for_genesis_committed(&clients, 0);
let pipeline_time = Configuration::pipeline_time();
// Given
let submitter = genesis_client;
let transaction = submitter.build_transaction(instruction.into(), UnlimitedMetadata::new())?;
let hash = transaction.hash();
let mut handles = Vec::new();
for listener in clients {
let checker = Checker { listener, hash };
let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating);
handles.push(handle_validating);
let handle_validated = checker.spawn(should_be);
handles.push(handle_validated);
}
// When
submitter.submit_transaction(transaction)?;
thread::sleep(pipeline_time * 2);
// Then
for handle in handles {
handle.join().expect("Thread panicked")
}
Ok(())
}

for submitting_peer in 0..PEER_COUNT {
let pipeline_time = Configuration::pipeline_time();
#[derive(Clone)]
struct Checker {
listener: iroha_client::client::Client,
hash: iroha_crypto::HashOf<Transaction>,
}

// Given
let committed_event_received = Arc::new(RwLock::new([false; PEER_COUNT]));
let validating_event_received = Arc::new(RwLock::new([false; PEER_COUNT]));
let rejected_event_received = Arc::new(RwLock::new([false; PEER_COUNT]));
let peers: Vec<_> = network.peers().collect();
let submitter_client = Client::test(
&peers[submitting_peer].api_address,
&peers[submitting_peer].telemetry_address,
);
let instructions: Vec<Instruction> = instruction.clone().into_iter().collect();
let transaction =
submitter_client.build_transaction(instructions.into(), UnlimitedMetadata::new())?;
for receiving_peer in 0..PEER_COUNT {
let committed_event_received_clone = committed_event_received.clone();
let validating_event_received_clone = validating_event_received.clone();
let rejected_event_received_clone = rejected_event_received.clone();
let listener_client = Client::test(
&peers[receiving_peer].api_address,
&peers[receiving_peer].telemetry_address,
);
let hash = transaction.hash();
let _handle = thread::spawn(move || {
listener_client.for_each_event(
EventFilter::Pipeline(
PipelineEventFilter::new().entity_kind(PipelineEntityKind::Transaction),
),
|event| {
if let Ok(Event::Pipeline(event)) = event {
if event.entity_kind == PipelineEntityKind::Transaction
&& event.hash == *hash
{
match event.status {
PipelineStatus::Committed => {
committed_event_received_clone
.write()
.expect("Failed to acquire lock.")[receiving_peer] =
true;
}
PipelineStatus::Validating => {
validating_event_received_clone
.write()
.expect("Failed to acquire lock.")[receiving_peer] =
true;
}
PipelineStatus::Rejected(_) => {
rejected_event_received_clone
.write()
.expect("Failed to acquire lock.")[receiving_peer] =
true;
}
}
}
}
},
);
});
}
thread::sleep(pipeline_time * 2);
//When
submitter_client.submit_transaction(transaction)?;
thread::sleep(pipeline_time * 2);
//Then
let committed = committed_event_received.read().unwrap();
let validating = validating_event_received.read().unwrap();
let rejected = rejected_event_received.read().unwrap();
for receiving_peer in 0..PEER_COUNT {
assert!(validating[receiving_peer]);
if should_be_committed {
assert!(committed[receiving_peer]);
assert!(!rejected[receiving_peer]);
} else {
assert!(!committed[receiving_peer]);
assert!(rejected[receiving_peer]);
}
}
impl Checker {
fn spawn(mut self, status_kind: PipelineStatusKind) -> JoinHandle<()> {
thread::spawn(move || {
let mut event_iterator = self
.listener
.listen_for_events(EventFilter::Pipeline(
PipelineEventFilter::new()
.entity_kind(PipelineEntityKind::Transaction)
.status_kind(status_kind)
.hash(*self.hash),
))
.expect("Failed to create event iterator.");
let event_result = event_iterator.next().expect("Stream closed");
let _event = event_result.expect("Must be valid");
})
}
Ok(())
}
4 changes: 2 additions & 2 deletions core/test_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,11 +833,11 @@ impl TestClient for Client {
}

fn for_each_event(mut self, event_filter: EventFilter, f: impl Fn(Result<Event>)) {
for event in self
for event_result in self
.listen_for_events(event_filter)
.expect("Failed to create event iterator.")
{
f(event)
f(event_result)
}
}

Expand Down

0 comments on commit f806d51

Please sign in to comment.