Skip to content

Commit

Permalink
[refactor] hyperledger-iroha#4315: split pipeline events
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Mar 21, 2024
1 parent 47f278d commit 8fe6655
Show file tree
Hide file tree
Showing 44 changed files with 1,070 additions and 662 deletions.
2 changes: 1 addition & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl Iroha {
});
let state = Arc::new(state);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));
match Self::start_telemetry(&logger, &config).await? {
TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"),
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
Expand Down
5 changes: 2 additions & 3 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -172,9 +173,7 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle<Result<()>> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Block)
.for_status(PipelineStatusKind::Committed);
let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
Expand Down
49 changes: 34 additions & 15 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{BlockStatus, PipelineEventBox, TransactionEventFilter, TransactionStatus},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -605,7 +608,7 @@ impl Client {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())),
self.listen_for_events_async(TransactionEventFilter::default().for_hash(hash)),
)
.await
.map_err(Into::into)
Expand All @@ -631,17 +634,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
if let EventBox::Pipeline(this_event) = event? {
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err(reason.clone().into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.height) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down Expand Up @@ -904,9 +924,7 @@ impl Client {
pub fn listen_for_events(
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<impl Iterator<Item = Result<Event>>> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);
) -> Result<impl Iterator<Item = Result<EventBox>>> {
events_api::EventIterator::new(self.events_handler(event_filter)?)
}

Expand All @@ -919,8 +937,6 @@ impl Client {
&self,
event_filter: impl Into<EventFilterBox> + Send,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
}

Expand All @@ -933,8 +949,11 @@ impl Client {
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<events_api::flow::Init> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);

events_api::flow::Init::new(
event_filter.into(),
event_filter,
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
Expand Down Expand Up @@ -1284,7 +1303,7 @@ pub mod events_api {
pub struct Events;

impl FlowEvents for Events {
type Event = crate::data_model::prelude::Event;
type Event = crate::data_model::prelude::EventBox;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Expand Down
2 changes: 1 addition & 1 deletion client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroha_config::{
base,
base::{FromEnv, StdEnv, UnwrapPartial},
};
use iroha_crypto::prelude::*;
use iroha_crypto::KeyPair;
use iroha_data_model::{prelude::*, ChainId};
use iroha_primitives::small::SmallStr;
use serde::{Deserialize, Serialize};
Expand Down
13 changes: 7 additions & 6 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
asset::{AssetId, AssetValue, AssetValueType},
isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError},
transaction::error::TransactionRejectionReason,
};
use serde_json::json;
use test_network::*;
Expand Down Expand Up @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() {
.expect_err("Should be rejected due to non integer value");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert_eq!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate(
InstructionEvaluationError::Type(TypeError::from(Mismatch {
&TransactionRejectionReason::Validation(ValidationFail::InstructionFailed(
InstructionExecutionError::Evaluate(InstructionEvaluationError::Type(
TypeError::from(Mismatch {
expected: AssetValueType::Numeric(NumericSpec::integer()),
actual: AssetValueType::Numeric(NumericSpec::fractional(2))
}))
})
))
))
);
Expand Down
9 changes: 4 additions & 5 deletions client/tests/integration/domain_owner_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::{account::SignatureCheckCondition, prelude::*},
};
use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;

Expand Down Expand Up @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> {
.expect_err("Tx should fail due to permissions");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

// "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner
Expand Down
55 changes: 30 additions & 25 deletions client/tests/integration/events/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use iroha_client::{
},
};
use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
events::pipeline::{
BlockEvent, BlockEventFilter, BlockStatus, TransactionEventFilter, TransactionStatus,
},
isi::error::InstructionExecutionError,
transaction::error::TransactionRejectionReason,
ValidationFail,
};
use test_network::*;

// Needed to re-enable ignored tests.
Expand All @@ -17,24 +25,28 @@ const PEER_COUNT: usize = 7;
#[ignore = "ignore, more in #2851"]
#[test]
fn transaction_with_no_instructions_should_be_committed() -> Result<()> {
test_with_instruction_and_status_and_port(None, PipelineStatusKind::Committed, 10_250)
test_with_instruction_and_status_and_port(None, &TransactionStatus::Approved, 10_250)
}

#[ignore = "ignore, more in #2851"]
// #[ignore = "Experiment"]
#[test]
fn transaction_with_fail_instruction_should_be_rejected() -> Result<()> {
let fail = Fail::new("Should be rejected".to_owned());
let msg = "Should be rejected".to_owned();

let fail = Fail::new(msg.clone());
test_with_instruction_and_status_and_port(
Some(fail.into()),
PipelineStatusKind::Rejected,
&TransactionStatus::Rejected(Box::new(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Fail(msg)),
))),
10_350,
)
}

fn test_with_instruction_and_status_and_port(
instruction: Option<InstructionBox>,
should_be: PipelineStatusKind,
should_be: &TransactionStatus,
port: u16,
) -> Result<()> {
let (_rt, network, client) =
Expand All @@ -56,9 +68,9 @@ fn test_with_instruction_and_status_and_port(
let mut handles = Vec::new();
for listener in clients {
let checker = Checker { listener, hash };
let handle_validating = checker.clone().spawn(PipelineStatusKind::Validating);
let handle_validating = checker.clone().spawn(TransactionStatus::Queued);
handles.push(handle_validating);
let handle_validated = checker.spawn(should_be);
let handle_validated = checker.spawn(should_be.clone());
handles.push(handle_validated);
}
// When
Expand All @@ -78,15 +90,14 @@ struct Checker {
}

impl Checker {
fn spawn(self, status_kind: PipelineStatusKind) -> JoinHandle<()> {
fn spawn(self, status_kind: TransactionStatus) -> JoinHandle<()> {
thread::spawn(move || {
let mut event_iterator = self
.listener
.listen_for_events(
PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Transaction)
TransactionEventFilter::default()
.for_status(status_kind)
.for_hash(*self.hash),
.for_hash(self.hash),
)
.expect("Failed to create event iterator.");
let event_result = event_iterator.next().expect("Stream closed");
Expand All @@ -96,13 +107,11 @@ impl Checker {
}

#[test]
fn committed_block_must_be_available_in_kura() {
fn applied_block_must_be_available_in_kura() {
let (_rt, peer, client) = <PeerBuilder>::new().with_port(11_040).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);

let event_filter = PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Block)
.for_status(PipelineStatusKind::Committed);
let event_filter = BlockEventFilter::default().for_status(BlockStatus::Committed);
let mut event_iter = client
.listen_for_events(event_filter)
.expect("Failed to subscribe for events");
Expand All @@ -111,21 +120,17 @@ fn committed_block_must_be_available_in_kura() {
.submit(Fail::new("Dummy instruction".to_owned()))
.expect("Failed to submit transaction");

let event = event_iter.next().expect("Block must be committed");
let Ok(Event::Pipeline(PipelineEvent {
entity_kind: PipelineEntityKind::Block,
status: PipelineStatus::Committed,
hash,
})) = event
else {
panic!("Received unexpected event")
};
let hash = HashOf::from_untyped_unchecked(hash);
let event: BlockEvent = event_iter
.next()
.expect("Block must be committed")
.expect("Block must be committed")
.try_into()
.expect("Received unexpected event");

peer.iroha
.as_ref()
.expect("Must be some")
.kura
.get_block_height_by_hash(&hash)
.get_block_by_height(event.height)
.expect("Block committed event was received earlier");
}
20 changes: 9 additions & 11 deletions client/tests/integration/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use iroha_client::{
crypto::KeyPair,
data_model::prelude::*,
};
use iroha_data_model::permission::PermissionToken;
use iroha_data_model::{
permission::PermissionToken, transaction::error::TransactionRejectionReason,
};
use iroha_genesis::GenesisNetwork;
use serde_json::json;
use test_network::{PeerBuilder, *};
Expand Down Expand Up @@ -104,14 +106,12 @@ fn permissions_disallow_asset_transfer() {
.submit_transaction_blocking(&transfer_tx)
.expect_err("Transaction was not rejected.");
let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.expect("Error {err} is not PipelineRejectionReason");
.downcast_ref::<TransactionRejectionReason>()
.expect("Error {err} is not TransactionRejectionReason");
//Then
assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));
let alice_assets = get_assets(&iroha_client, &alice_id);
assert_eq!(alice_assets, alice_start_assets);
Expand Down Expand Up @@ -156,14 +156,12 @@ fn permissions_disallow_asset_burn() {
.submit_transaction_blocking(&burn_tx)
.expect_err("Transaction was not rejected.");
let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.expect("Error {err} is not PipelineRejectionReason");
.downcast_ref::<TransactionRejectionReason>()
.expect("Error {err} is not TransactionRejectionReason");

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

let alice_assets = get_assets(&iroha_client, &alice_id);
Expand Down
9 changes: 4 additions & 5 deletions client/tests/integration/roles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::prelude::*,
};
use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;

Expand Down Expand Up @@ -164,14 +165,12 @@ fn role_with_invalid_permissions_is_not_accepted() -> Result<()> {
.expect_err("Submitting role with invalid permission token should fail");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

Ok(())
Expand Down
Loading

0 comments on commit 8fe6655

Please sign in to comment.