Skip to content

Commit

Permalink
[refactor] hyperledger-iroha#4315: split pipeline events
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic committed Mar 21, 2024
1 parent 47f278d commit 56b455a
Show file tree
Hide file tree
Showing 71 changed files with 1,368 additions and 981 deletions.
10 changes: 5 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ Set the `LOG_FILE_PATH` environment variable to an appropriate location to store

<details> <summary> Expand to learn how to compile iroha with tokio console support.</summary>

Sometimes it might be helpful for debugging to analyze tokio tasks using [tokio-console](https://github.com/tokio-rs/console).
Sometimes it might be helpful for debugging to analyze tokio tasks using [tokio_console](https://github.com/tokio-rs/console).

In this case you should compile iroha with support of tokio console like that:

```bash
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
```

Port for tokio console can by configured through `LOG_TOKIO_CONSOLE_ADDR` configuration parameter (or environment variable).
Expand All @@ -257,11 +257,11 @@ Example of running iroha with tokio console support using `scripts/test_env.sh`:

```bash
# 1. Compile iroha
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio-console
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
# 2. Run iroha with TRACE log level
LOG_LEVEL=TRACE ./scripts/test_env.sh setup
# 3. Access iroha. Peers will be available on ports 5555, 5556, ...
tokio-console http://127.0.0.1:5555
tokio_console http://127.0.0.1:5555
```

</details>
Expand All @@ -272,7 +272,7 @@ tokio-console http://127.0.0.1:5555

To optimize performance it's useful to profile iroha.

To do that you should compile iroha with `profiling` profile and with `profiling` feature:
To do that you should compile iroha with `profiling` profile and with `profiling` feature:

```bash
RUSTFLAGS="-C force-frame-pointers=on" cargo +nightly -Z build-std build --target your-desired-target --profile profiling --features profiling
Expand Down
12 changes: 6 additions & 6 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ categories.workspace = true
workspace = true

[features]
default = ["telemetry", "schema-endpoint"]
default = ["telemetry", "schema_endpoint"]

# Support lightweight telemetry, including diagnostics
telemetry = ["iroha_telemetry", "iroha_core/telemetry", "iroha_torii/telemetry"]
# Support developer-specific telemetry.
# Should not be enabled on production builds.
dev-telemetry = ["iroha_core/dev-telemetry", "iroha_telemetry"]
dev_telemetry = ["iroha_core/dev_telemetry", "iroha_telemetry"]
# Support schema generation from the `schema` endpoint in the local binary.
# Useful for debugging issues with decoding in SDKs.
schema-endpoint = ["iroha_torii/schema"]
schema_endpoint = ["iroha_torii/schema"]
# Support internal testing infrastructure for integration tests.
# Disable in production.
test-network = ["thread-local-panic-hook"]
test_network = ["thread-local-panic-hook"]

[badges]
is-it-maintained-issue-resolution = { repository = "https://github.com/hyperledger/iroha" }
Expand Down Expand Up @@ -79,8 +79,8 @@ vergen = { workspace = true, features = ["cargo"] }

[package.metadata.cargo-all-features]
denylist = [
"schema-endpoint",
"schema_endpoint",
"telemetry",
"test-network",
"test_network",
]
skip_optional_dependencies = true
4 changes: 2 additions & 2 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ The results of the compilation can be found in `<IROHA REPO ROOT>/target/release
To add optional features, use ``--features``. For example, to add the support for _dev_telemetry_, run:

```bash
cargo build --release --features dev-telemetry
cargo build --release --features dev_telemetry
```

A full list of features can be found in the [cargo manifest file](Cargo.toml) for this crate.

### Disable default features

By default, the Iroha binary is compiled with the `telemetry`, and `schema-endpoint` features. If you wish to remove those features, add `--no-default-features` to the command.
By default, the Iroha binary is compiled with the `telemetry`, and `schema_endpoint` features. If you wish to remove those features, add `--no-default-features` to the command.

```bash
cargo build --release --no-default-features
Expand Down
16 changes: 7 additions & 9 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl NetworkRelay {

impl Iroha {
fn prepare_panic_hook(notify_shutdown: Arc<Notify>) {
#[cfg(not(feature = "test-network"))]
#[cfg(not(feature = "test_network"))]
use std::panic::set_hook;

// This is a hot-fix for tests
Expand All @@ -160,10 +160,8 @@ impl Iroha {
//
// Remove this when all Rust integrations tests will be converted to a
// separate Python tests.
#[cfg(feature = "test-network")]
use thread_local_panic_hook::set_hook;

set_hook(Box::new(move |info| {
#[cfg(feature = "test_network")]
thread_local_panic_hook::set_hook(Box::new(move |info| {
// What clippy suggests is much less readable in this case
#[allow(clippy::option_if_let_else)]
let panic_message = if let Some(message) = info.payload().downcast_ref::<&str>() {
Expand Down Expand Up @@ -251,7 +249,7 @@ impl Iroha {
});
let state = Arc::new(state);

let queue = Arc::new(Queue::from_config(config.queue));
let queue = Arc::new(Queue::from_config(config.queue, events_sender.clone()));
match Self::start_telemetry(&logger, &config).await? {
TelemetryStartStatus::Started => iroha_logger::info!("Telemetry started"),
TelemetryStartStatus::NotStarted => iroha_logger::warn!("Telemetry not started"),
Expand Down Expand Up @@ -369,7 +367,7 @@ impl Iroha {
///
/// # Errors
/// - Forwards initialisation error.
#[cfg(feature = "test-network")]
#[cfg(feature = "test_network")]
pub fn start_as_task(&mut self) -> Result<tokio::task::JoinHandle<eyre::Result<()>>> {
iroha_logger::info!("Starting Iroha as task");
let torii = self
Expand All @@ -386,7 +384,7 @@ impl Iroha {
logger: &LoggerHandle,
config: &Config,
) -> Result<TelemetryStartStatus> {
#[cfg(feature = "dev-telemetry")]
#[cfg(feature = "dev_telemetry")]
{
if let Some(config) = &config.dev_telemetry {
let receiver = logger
Expand Down Expand Up @@ -539,7 +537,7 @@ mod tests {

use super::*;

#[cfg(not(feature = "test-network"))]
#[cfg(not(feature = "test_network"))]
mod no_test_network {
use std::{iter::repeat, panic, thread};

Expand Down
12 changes: 6 additions & 6 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ maintenance = { status = "actively-developed" }

[features]
# Use rustls by default to avoid OpenSSL dependency, simplifying compilation with musl
default = ["tls-rustls-native-roots"]
default = ["tls_rustls_native_roots"]

tls-native = [
tls_native = [
"attohttpc/tls-native",
"tokio-tungstenite/native-tls",
"tungstenite/native-tls",
]
tls-native-vendored = [
tls_native_vendored = [
"attohttpc/tls-native-vendored",
"tokio-tungstenite/native-tls-vendored",
"tungstenite/native-tls-vendored",
]
tls-rustls-native-roots = [
tls_rustls_native_roots = [
"attohttpc/tls-rustls-native-roots",
"tokio-tungstenite/rustls-tls-native-roots",
"tungstenite/rustls-tls-native-roots",
]
tls-rustls-webpki-roots = [
tls_rustls_webpki_roots = [
"attohttpc/tls-rustls-webpki-roots",
"tokio-tungstenite/rustls-tls-webpki-roots",
"tungstenite/rustls-tls-webpki-roots",
Expand Down Expand Up @@ -83,7 +83,7 @@ iroha_wasm_builder = { workspace = true }
# TODO: These three activate `transparent_api` but client should never activate this feature.
# Additionally there is a dependency on iroha_core in dev-dependencies in telemetry/derive
# Hopefully, once the integration tests migration is finished these can be removed
iroha = { workspace = true, features = ["dev-telemetry", "telemetry"] }
iroha = { workspace = true, features = ["dev_telemetry", "telemetry"] }
iroha_genesis = { workspace = true }
test_network = { workspace = true }

Expand Down
5 changes: 2 additions & 3 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iroha_client::{
prelude::*,
},
};
use iroha_data_model::events::pipeline::{BlockEventFilter, BlockStatus};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -172,9 +173,7 @@ impl MeasurerUnit {
fn spawn_event_counter(&self) -> thread::JoinHandle<Result<()>> {
let listener = self.client.clone();
let (init_sender, init_receiver) = mpsc::channel();
let event_filter = PipelineEventFilter::new()
.for_entity(PipelineEntityKind::Block)
.for_status(PipelineStatusKind::Committed);
let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let blocks_expected = self.config.blocks as usize;
let name = self.name;
let handle = thread::spawn(move || -> Result<()> {
Expand Down
49 changes: 34 additions & 15 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use eyre::{eyre, Result, WrapErr};
use futures_util::StreamExt;
use http_default::{AsyncWebSocketStream, WebSocketStream};
pub use iroha_config::client_api::ConfigDTO;
use iroha_data_model::query::QueryOutputBox;
use iroha_data_model::{
events::pipeline::{BlockStatus, PipelineEventBox, TransactionEventFilter, TransactionStatus},
query::QueryOutputBox,
};
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_torii_const::uri as torii_uri;
Expand Down Expand Up @@ -605,7 +608,7 @@ impl Client {
let mut event_iterator = {
let event_iterator_result = tokio::time::timeout_at(
deadline,
self.listen_for_events_async(PipelineEventFilter::new().for_hash(hash.into())),
self.listen_for_events_async(TransactionEventFilter::default().for_hash(hash)),
)
.await
.map_err(Into::into)
Expand All @@ -631,17 +634,34 @@ impl Client {
event_iterator: &mut AsyncEventStream,
hash: HashOf<SignedTransaction>,
) -> Result<HashOf<SignedTransaction>> {
let mut block_height = None;

while let Some(event) = event_iterator.next().await {
if let Event::Pipeline(this_event) = event? {
match this_event.status() {
PipelineStatus::Validating => {}
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
if let EventBox::Pipeline(this_event) = event? {
match this_event {
PipelineEventBox::Transaction(transaction_event) => {
match transaction_event.status() {
TransactionStatus::Queued => {}
TransactionStatus::Approved => {
block_height = transaction_event.block_height;
}
TransactionStatus::Rejected(reason) => {
return Err(reason.clone().into());
}
TransactionStatus::Expired => return Err(eyre!("Transaction expired")),
}
}
PipelineEventBox::Block(block_event) => {
if Some(block_event.header().height()) == block_height {
if let BlockStatus::Applied = block_event.status() {
return Ok(hash);
}
}
}
PipelineStatus::Committed => return Ok(hash),
}
}
}

Err(eyre!(
"Connection dropped without `Committed` or `Rejected` event"
))
Expand Down Expand Up @@ -904,9 +924,7 @@ impl Client {
pub fn listen_for_events(
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<impl Iterator<Item = Result<Event>>> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);
) -> Result<impl Iterator<Item = Result<EventBox>>> {
events_api::EventIterator::new(self.events_handler(event_filter)?)
}

Expand All @@ -919,8 +937,6 @@ impl Client {
&self,
event_filter: impl Into<EventFilterBox> + Send,
) -> Result<AsyncEventStream> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter, "Async listening with");
events_api::AsyncEventStream::new(self.events_handler(event_filter)?).await
}

Expand All @@ -933,8 +949,11 @@ impl Client {
&self,
event_filter: impl Into<EventFilterBox>,
) -> Result<events_api::flow::Init> {
let event_filter = event_filter.into();
iroha_logger::trace!(?event_filter);

events_api::flow::Init::new(
event_filter.into(),
event_filter,
self.headers.clone(),
self.torii_url
.join(torii_uri::SUBSCRIPTION)
Expand Down Expand Up @@ -1284,7 +1303,7 @@ pub mod events_api {
pub struct Events;

impl FlowEvents for Events {
type Event = crate::data_model::prelude::Event;
type Event = crate::data_model::prelude::EventBox;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Expand Down
2 changes: 1 addition & 1 deletion client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroha_config::{
base,
base::{FromEnv, StdEnv, UnwrapPartial},
};
use iroha_crypto::prelude::*;
use iroha_crypto::KeyPair;
use iroha_data_model::{prelude::*, ChainId};
use iroha_primitives::small::SmallStr;
use serde::{Deserialize, Serialize};
Expand Down
13 changes: 7 additions & 6 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{
asset::{AssetId, AssetValue, AssetValueType},
isi::error::{InstructionEvaluationError, InstructionExecutionError, Mismatch, TypeError},
transaction::error::TransactionRejectionReason,
};
use serde_json::json;
use test_network::*;
Expand Down Expand Up @@ -463,17 +464,17 @@ fn fail_if_dont_satisfy_spec() {
.expect_err("Should be rejected due to non integer value");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert_eq!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::InstructionFailed(InstructionExecutionError::Evaluate(
InstructionEvaluationError::Type(TypeError::from(Mismatch {
&TransactionRejectionReason::Validation(ValidationFail::InstructionFailed(
InstructionExecutionError::Evaluate(InstructionEvaluationError::Type(
TypeError::from(Mismatch {
expected: AssetValueType::Numeric(NumericSpec::integer()),
actual: AssetValueType::Numeric(NumericSpec::fractional(2))
}))
})
))
))
);
Expand Down
9 changes: 4 additions & 5 deletions client/tests/integration/domain_owner_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha_client::{
crypto::KeyPair,
data_model::{account::SignatureCheckCondition, prelude::*},
};
use iroha_data_model::transaction::error::TransactionRejectionReason;
use serde_json::json;
use test_network::*;

Expand Down Expand Up @@ -37,14 +38,12 @@ fn domain_owner_domain_permissions() -> Result<()> {
.expect_err("Tx should fail due to permissions");

let rejection_reason = err
.downcast_ref::<PipelineRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not PipelineRejectionReason"));
.downcast_ref::<TransactionRejectionReason>()
.unwrap_or_else(|| panic!("Error {err} is not TransactionRejectionReason"));

assert!(matches!(
rejection_reason,
&PipelineRejectionReason::Transaction(TransactionRejectionReason::Validation(
ValidationFail::NotPermitted(_)
))
&TransactionRejectionReason::Validation(ValidationFail::NotPermitted(_))
));

// "alice@wonderland" owns the domain and can register AssetDefinitions by default as domain owner
Expand Down
Loading

0 comments on commit 56b455a

Please sign in to comment.