diff --git a/cli/src/lib.rs b/cli/src/lib.rs index aa0c04f3a64..027a9189aef 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -55,18 +55,7 @@ pub mod samples; /// [Orchestrator](https://en.wikipedia.org/wiki/Orchestration_%28computing%29) /// of the system. It configures, coordinates and manages transactions /// and queries processing, work of consensus and storage. -/// -/// # Usage -/// Construct and then use [`Iroha::start_torii`] or [`Iroha::start_torii_as_task`]. If you experience -/// an immediate shutdown after constructing Iroha, then you probably forgot this step. -#[must_use = "run `.start_torii().await?` to not immediately stop Iroha"] -pub struct Iroha { - main_state: IrohaMainState, - /// Torii web server - torii: ToriiState, -} - -struct IrohaMainState { +pub struct Iroha { /// Actor responsible for the configuration _kiso: KisoHandle, /// Queue of transactions @@ -79,6 +68,8 @@ struct IrohaMainState { _snapshot_maker: Option, /// State of blockchain state: Arc, + /// Shutdown signal + notify_shutdown: Arc, /// Thread handlers thread_handlers: Vec, /// A boolean value indicating whether or not the peers will receive data from the network. @@ -87,16 +78,10 @@ struct IrohaMainState { pub freeze_status: Arc, } -/// A state of [`Iroha`] for when the network is started, but [`Torii`] not yet. -pub struct ToriiNotStarted(Torii); - -/// A state of [`Iroha`] for when the network & [`Torii`] are started. -#[allow(missing_copy_implementations)] -pub struct ToriiStarted; - -impl Drop for IrohaMainState { +impl Drop for Iroha { fn drop(&mut self) { iroha_logger::trace!("Iroha instance dropped"); + self.notify_shutdown.notify_waiters(); let _thread_handles = core::mem::take(&mut self.thread_handlers); iroha_logger::debug!( "Thread handles dropped. Dependent processes going for a graceful shutdown" @@ -178,7 +163,7 @@ impl NetworkRelay { } } -impl Iroha { +impl Iroha { fn prepare_panic_hook(notify_shutdown: Arc) { #[cfg(not(feature = "test-network"))] use std::panic::set_hook; @@ -226,9 +211,9 @@ impl Iroha { })); } - /// Creates new Iroha instance and starts all internal services, except [`Torii`]. + /// Creates new Iroha instance and starts all internal services. /// - /// Torii is started separately with [`Self::start_torii`] or [`Self::start_torii_as_task`] + /// Returns iroha itself and future to await for iroha completion. /// /// # Errors /// - Reading telemetry configs @@ -243,7 +228,7 @@ impl Iroha { config: Config, genesis: Option, logger: LoggerHandle, - ) -> Result { + ) -> Result<(impl core::future::Future, Self), StartError> { let network = IrohaNetwork::start(config.common.key_pair.clone(), config.network.clone()) .await .change_context(StartError::StartP2p)?; @@ -384,81 +369,40 @@ impl Iroha { metrics_reporter, ); + tokio::spawn(async move { + torii + .start() + .await + .into_report() + .map_err(|report| report.change_context(StartError::StartTorii)) + }); + Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone()); Self::start_listening_signal(Arc::clone(¬ify_shutdown))?; - Self::prepare_panic_hook(notify_shutdown); - - Ok(Self { - main_state: IrohaMainState { - _kiso: kiso, - _queue: queue, - _sumeragi: sumeragi, - kura, - _snapshot_maker: snapshot_maker, - state, - thread_handlers: vec![kura_thread_handler], - #[cfg(debug_assertions)] - freeze_status, - }, - torii: ToriiNotStarted(torii), - }) - } + Self::prepare_panic_hook(Arc::clone(¬ify_shutdown)); - fn take_torii(self) -> (Torii, Iroha) { - let Self { - main_state, - torii: ToriiNotStarted(torii), - } = self; - ( - torii, - Iroha { - main_state, - torii: ToriiStarted, - }, - ) - } + // Future to wait for iroha completion + let wait = { + let notify_shutdown = Arc::clone(¬ify_shutdown); + async move { notify_shutdown.notified().await } + }; - /// To make `Iroha` peer work it should be started first. After - /// that moment it will listen for incoming requests and messages. - /// - /// # Errors - /// - Forwards initialisation error. - #[iroha_futures::telemetry_future] - pub async fn start_torii(self) -> Result, StartError> { - let (torii, new_self) = self.take_torii(); - iroha_logger::info!("Starting Iroha"); - torii - .start() - .await - .into_report() - // https://github.com/hashintel/hash/issues/4295 - .map_err(|report| report.change_context(StartError::StartTorii))?; - Ok(new_self) - } + let irohad = Self { + _kiso: kiso, + _queue: queue, + _sumeragi: sumeragi, + kura, + _snapshot_maker: snapshot_maker, + state, + notify_shutdown, + thread_handlers: vec![kura_thread_handler], + #[cfg(debug_assertions)] + freeze_status, + }; - /// Starts Iroha in separate tokio task. - /// - /// # Errors - /// - Forwards initialisation error. - #[cfg(feature = "test-network")] - pub fn start_torii_as_task( - self, - ) -> ( - task::JoinHandle>, - Iroha, - ) { - let (torii, new_self) = self.take_torii(); - iroha_logger::info!("Starting Iroha as task"); - let handle = tokio::spawn(async move { - torii - .start() - .await - .into_report() - .map_err(|report| report.change_context(StartError::StartTorii)) - }); - (handle, new_self) + Ok((wait, irohad)) } #[cfg(feature = "telemetry")] @@ -563,23 +507,21 @@ impl Iroha { } }) } -} -impl Iroha { #[allow(missing_docs)] #[cfg(debug_assertions)] pub fn freeze_status(&self) -> &Arc { - &self.main_state.freeze_status + &self.freeze_status } #[allow(missing_docs)] pub fn state(&self) -> &Arc { - &self.main_state.state + &self.state } #[allow(missing_docs)] pub fn kura(&self) -> &Arc { - &self.main_state.kura + &self.kura } } diff --git a/cli/src/main.rs b/cli/src/main.rs index 410ddd903f6..f8033d61680 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -51,12 +51,11 @@ async fn main() -> error_stack::Result<(), MainError> { iroha_logger::debug!("Submitting genesis."); } - let _iroha = Iroha::start_network(config, genesis, logger) + Iroha::start_network(config, genesis, logger) .await .change_context(MainError::IrohaStart)? - .start_torii() - .await - .change_context(MainError::IrohaStart)?; + .0 + .await; Ok(()) } diff --git a/client/tests/integration/triggers/time_trigger.rs b/client/tests/integration/triggers/time_trigger.rs index f56901cf273..24945786a7f 100644 --- a/client/tests/integration/triggers/time_trigger.rs +++ b/client/tests/integration/triggers/time_trigger.rs @@ -199,7 +199,7 @@ fn pre_commit_trigger_should_be_executed() -> Result<()> { #[test] fn mint_nft_for_every_user_every_1_sec() -> Result<()> { - const TRIGGER_PERIOD_MS: u64 = 1000; + const TRIGGER_PERIOD: Duration = Duration::from_millis(1000); const EXPECTED_COUNT: u64 = 4; let (_rt, _peer, mut test_client) = ::new().with_port(10_780).start_with_runtime(); @@ -241,9 +241,10 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> { let event_listener = get_block_committed_event_listener(&test_client)?; // Registering trigger - let start_time = curr_time(); - let schedule = TimeSchedule::starting_at(start_time + Duration::from_secs(5)) - .with_period(Duration::from_millis(TRIGGER_PERIOD_MS)); + // Offset into the future to be able to register trigger + let offset = Duration::from_secs(10); + let start_time = curr_time() + offset; + let schedule = TimeSchedule::starting_at(start_time).with_period(TRIGGER_PERIOD); let register_trigger = Register::trigger(Trigger::new( "mint_nft_for_all".parse()?, Action::new( @@ -253,14 +254,15 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> { TimeEventFilter::new(ExecutionTime::Schedule(schedule)), ), )); - test_client.submit(register_trigger)?; + test_client.submit_blocking(register_trigger)?; + std::thread::sleep(offset); // Time trigger will be executed on block commits, so we have to produce some transactions submit_sample_isi_on_every_block_commit( event_listener, &mut test_client, &alice_id, - Duration::from_millis(TRIGGER_PERIOD_MS), + TRIGGER_PERIOD, usize::try_from(EXPECTED_COUNT)?, )?; @@ -294,7 +296,7 @@ fn mint_nft_for_every_user_every_1_sec() -> Result<()> { fn get_block_committed_event_listener( client: &Client, ) -> Result>> { - let block_filter = BlockEventFilter::default().for_status(BlockStatus::Committed); + let block_filter = BlockEventFilter::default().for_status(BlockStatus::Applied); client.listen_for_events([block_filter]) } diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 0c7c21a8449..911d10ee45e 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -21,14 +21,13 @@ use iroha_primitives::{ addr::{socket_addr, SocketAddr}, unique_vec::UniqueVec, }; -use irohad::{Iroha, ToriiStarted}; +use irohad::Iroha; use rand::{seq::IteratorRandom, thread_rng}; use serde_json::json; use tempfile::TempDir; use test_samples::{ALICE_ID, ALICE_KEYPAIR, PEER_KEYPAIR, SAMPLE_GENESIS_ACCOUNT_KEYPAIR}; use tokio::{ runtime::{self, Runtime}, - task::{self, JoinHandle}, time, }; pub use unique_port; @@ -375,10 +374,8 @@ pub struct Peer { pub p2p_address: SocketAddr, /// The key-pair for the peer pub key_pair: KeyPair, - /// Shutdown handle - shutdown: Option>, /// Iroha server - pub irohad: Option>, + pub irohad: Option, /// Temporary directory // Note: last field to be dropped after Iroha (struct fields drops in FIFO RFC 1857) pub temp_dir: Option>, @@ -453,43 +450,28 @@ impl Peer { api_addr = %self.api_address, ); let logger = iroha_logger::test_logger(); - let (sender, receiver) = std::sync::mpsc::sync_channel(1); - - let handle = task::spawn( - async move { - let irohad = Iroha::start_network(config, genesis, logger) - .await - .expect("Failed to start Iroha"); - let (job_handle, irohad) = irohad.start_torii_as_task(); - sender.send(irohad).unwrap(); - job_handle.await.unwrap().unwrap(); - } - .instrument(info_span), - ); - self.irohad = Some(receiver.recv().unwrap()); + let (_, irohad) = Iroha::start_network(config, genesis, logger) + .instrument(info_span) + .await + .expect("Failed to start Iroha"); + + self.irohad = Some(irohad); time::sleep(Duration::from_millis(300)).await; - self.shutdown = Some(handle); // Prevent temporary directory deleting self.temp_dir = Some(temp_dir); } /// Stop the peer if it's running - pub fn stop(&mut self) -> Option<()> { + pub fn stop(&mut self) { iroha_logger::info!( p2p_addr = %self.p2p_address, api_addr = %self.api_address, "Stopping peer", ); - if let Some(shutdown) = self.shutdown.take() { - shutdown.abort(); - iroha_logger::info!("Shutting down peer..."); - self.irohad.take(); - Some(()) - } else { - None - } + iroha_logger::info!("Shutting down peer..."); + self.irohad.take(); } /// Creates peer @@ -504,13 +486,11 @@ impl Peer { let p2p_address = local_unique_port()?; let api_address = local_unique_port()?; let id = PeerId::new(p2p_address.clone(), key_pair.public_key().clone()); - let shutdown = None; Ok(Self { id, key_pair, p2p_address, api_address, - shutdown, irohad: None, temp_dir: None, })