Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] #4315: split pipeline events #4366

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ tokio-console http://127.0.0.1:5555

To optimize performance it's useful to profile iroha.

To do that you should compile iroha with `profiling` profile and with `profiling` feature:
To do that you should compile iroha with `profiling` profile and with `profiling` feature:

```bash
RUSTFLAGS="-C force-frame-pointers=on" cargo +nightly -Z build-std build --target your-desired-target --profile profiling --features profiling
Expand Down
2 changes: 1 addition & 1 deletion cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The results of the compilation can be found in `<IROHA REPO ROOT>/target/release

### Add features

To add optional features, use ``--features``. For example, to add the support for _dev_telemetry_, run:
To add optional features, use ``--features``. For example, to add the support for _dev telemetry_, run:

```bash
cargo build --release --features dev-telemetry
Expand Down
2 changes: 1 addition & 1 deletion cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Iroha {
});
let state = Arc::new(state);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));

#[cfg(feature = "telemetry")]
Self::start_telemetry(&logger, &config).await?;
Expand Down
7 changes: 3 additions & 4 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -172,13 +173,11 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle<Result<()>> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Block)
.for_status(PipelineStatusKind::Committed);
let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
mversic marked this conversation as resolved.
Show resolved Hide resolved
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
let mut event_iterator = listener.listen_for_events(event_filter)?;
let mut event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for i in 1..=blocks_expected {
let _event = event_iterator.next().expect("Event stream closed")?;
Expand Down
98 changes: 61 additions & 37 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{
BlockEventFilter, BlockStatus, PipelineEventBox, PipelineEventFilterBox,
TransactionEventFilter, TransactionStatus,
},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -603,14 +609,19 @@ impl Client {

rt.block_on(async {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())),
)
.await
.map_err(Into::into)
.and_then(std::convert::identity)
.wrap_err("Failed to establish event listener connection");
let filters = vec![
TransactionEventFilter::default().for_hash(hash).into(),
PipelineEventFilterBox::from(
BlockEventFilter::default().for_status(BlockStatus::Applied),
),
];

let event_iterator_result =
tokio::time::timeout_at(deadline, self.listen_for_events_async(filters))
.await
.map_err(Into::into)
.and_then(std::convert::identity)
.wrap_err("Failed to establish event listener connection");
let _send_result = init_sender.send(event_iterator_result.is_ok());
event_iterator_result?
};
Expand All @@ -631,17 +642,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
if let EventBox::Pipeline(this_event) = event? {
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err((Clone::clone(&**reason)).into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.header().height()) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down Expand Up @@ -903,11 +931,9 @@ impl Client {
/// - Forwards from [`events_api::EventIterator::new`]
pub fn listen_for_events(
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<impl Iterator<Item = Result<Event>>> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);
events_api::EventIterator::new(self.events_handler(event_filter)?)
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
) -> Result<impl Iterator<Item = Result<EventBox>>> {
events_api::EventIterator::new(self.events_handler(event_filters)?)
}

/// Connect asynchronously (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events.
Expand All @@ -917,11 +943,9 @@ impl Client {
/// - Forwards from [`events_api::AsyncEventStream::new`]
pub async fn listen_for_events_async(
&self,
event_filter: impl Into<EventFilterBox> + Send,
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>> + Send,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
events_api::AsyncEventStream::new(self.events_handler(event_filters)?).await
}

/// Constructs an Events API handler. With it, you can use any WS client you want.
Expand All @@ -931,10 +955,10 @@ impl Client {
#[inline]
pub fn events_handler(
&self,
event_filter: impl Into<EventFilterBox>,
event_filters: impl IntoIterator<Item = impl Into<EventFilterBox>>,
) -> Result<events_api::flow::Init> {
events_api::flow::Init::new(
event_filter.into(),
event_filters.into_iter().map(Into::into).collect(),
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
Expand Down Expand Up @@ -1237,12 +1261,12 @@ pub mod events_api {

/// Initialization struct for Events API flow.
pub struct Init {
/// Event filter
filter: EventFilterBox,
/// HTTP request headers
headers: HashMap<String, String>,
/// TORII URL
url: Url,
/// HTTP request headers
headers: HashMap<String, String>,
/// Event filter
filters: Vec<EventFilterBox>,
}

impl Init {
Expand All @@ -1252,14 +1276,14 @@ pub mod events_api {
/// Fails if [`transform_ws_url`] fails.
#[inline]
pub(in super::super) fn new(
filter: EventFilterBox,
filters: Vec<EventFilterBox>,
headers: HashMap<String, String>,
url: Url,
) -> Result<Self> {
Ok(Self {
filter,
headers,
url: transform_ws_url(url)?,
headers,
filters,
})
}
}
Expand All @@ -1269,12 +1293,12 @@ pub mod events_api {

fn init(self) -> InitData<R, Self::Next> {
let Self {
filter,
headers,
url,
headers,
filters,
} = self;

let msg = EventSubscriptionRequest::new(filter).encode();
let msg = EventSubscriptionRequest::new(filters).encode();
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
}
}
Expand All @@ -1284,7 +1308,7 @@ pub mod events_api {
pub struct Events;

impl FlowEvents for Events {
type Event = crate::data_model::prelude::Event;
type Event = crate::data_model::prelude::EventBox;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Expand Down
2 changes: 1 addition & 1 deletion client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroha_config::{
base,
base::{FromEnv, StdEnv, UnwrapPartial},
};
use iroha_crypto::prelude::*;
use iroha_crypto::KeyPair;
use iroha_data_model::{prelude::*, ChainId};
use iroha_primitives::small::SmallStr;
use serde::{Deserialize, Serialize};
Expand Down
6 changes: 3 additions & 3 deletions client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub mod ws {
/// use eyre::Result;
/// use url::Url;
/// use iroha_client::{
/// data_model::prelude::Event,
/// data_model::prelude::EventBox,
/// client::events_api::flow as events_api_flow,
/// http::{
/// ws::conn_flow::{Events, Init, InitData},
Expand Down Expand Up @@ -203,7 +203,7 @@ pub mod ws {
/// }
/// }
///
/// fn collect_5_events(flow: events_api_flow::Init) -> Result<Vec<Event>> {
/// fn collect_5_events(flow: events_api_flow::Init) -> Result<Vec<EventBox>> {
/// // Constructing initial flow data
/// let InitData {
/// next: flow,
Expand All @@ -216,7 +216,7 @@ pub mod ws {
/// stream.send(first_message);
///
/// // And now we are able to collect events
/// let mut events: Vec<Event> = Vec::with_capacity(5);
/// let mut events: Vec<EventBox> = Vec::with_capacity(5);
/// while events.len() < 5 {
/// let msg = stream.get_next();
/// let event = flow.message(msg)?;
Expand Down
13 changes: 7 additions & 6 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
asset::{AssetId, AssetValue, AssetValueType},
isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError},
transaction::error::TransactionRejectionReason,
};
use serde_json::json;
use test_network::*;
Expand Down Expand Up @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() {
.expect_err("Should be rejected due to non integer value");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert_eq!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate(
InstructionEvaluationError::Type(TypeError::from(Mismatch {
&TransactionRejectionReason::Validation(ValidationFail::InstructionFailed(
InstructionExecutionError::Evaluate(InstructionEvaluationError::Type(
TypeError::from(Mismatch {
expected: AssetValueType::Numeric(NumericSpec::integer()),
actual: AssetValueType::Numeric(NumericSpec::fractional(2))
}))
})
))
))
);
Expand Down
9 changes: 4 additions & 5 deletions client/tests/integration/domain_owner_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::{account::SignatureCheckCondition, prelude::*},
};
use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;

Expand Down Expand Up @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> {
.expect_err("Tx should fail due to permissions");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

// "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/events/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fn transaction_execution_should_produce_events(
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
let event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for event in event_iterator {
event_sender.send(event)?
Expand Down Expand Up @@ -184,7 +184,7 @@ fn produce_multiple_events() -> Result<()> {
let (event_sender, event_receiver) = mpsc::channel();
let event_filter = DataEventFilter::Any;
thread::spawn(move || -> Result<()> {
let event_iterator = listener.listen_for_events(event_filter)?;
let event_iterator = listener.listen_for_events([event_filter])?;
init_sender.send(())?;
for event in event_iterator {
event_sender.send(event)?
Expand Down
16 changes: 6 additions & 10 deletions client/tests/integration/events/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client.listen_for_events(
TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Success),
)?;
let mut event_it = thread_client.listen_for_events([TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Success)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down Expand Up @@ -79,11 +77,9 @@ fn trigger_completion_failure_should_produce_event() -> Result<()> {
let thread_client = test_client.clone();
let (sender, receiver) = mpsc::channel();
let _handle = thread::spawn(move || -> Result<()> {
let mut event_it = thread_client.listen_for_events(
TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Failure),
)?;
let mut event_it = thread_client.listen_for_events([TriggerCompletedEventFilter::new()
.for_trigger(trigger_id)
.for_outcome(TriggerCompletedOutcomeType::Failure)])?;
if event_it.next().is_some() {
sender.send(())?;
return Ok(());
Expand Down
Loading
Loading