diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index 2caa2edfb68..e56320e75c5 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -17,6 +17,7 @@ use fuel_core_types::{ }, fuel_types::BlockHeight, services::block_importer::SharedImportResult, + tai64::Tai64, }; use fuel_core_upgradable_executor::executor::Executor; use std::sync::Arc; @@ -260,3 +261,11 @@ impl SharedMemoryPool { } } } + +pub struct SystemTime; + +impl fuel_core_poa::ports::GetTime for SystemTime { + fn now(&self) -> Tai64 { + Tai64::now() + } +} diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index de3d6f5ee89..5cc519a79cc 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -29,6 +29,7 @@ use crate::{ MaybeRelayerAdapter, PoAAdapter, SharedMemoryPool, + SystemTime, TxPoolAdapter, VerifierAdapter, }, @@ -71,6 +72,7 @@ pub type PoAService = fuel_core_poa::Service< BlockImporterAdapter, SignMode, InDirectoryPredefinedBlocks, + SystemTime, >; #[cfg(feature = "p2p")] pub type P2PService = fuel_core_p2p::service::Service; @@ -259,6 +261,7 @@ pub fn init_sub_services( p2p_adapter.clone(), FuelBlockSigner::new(config.consensus_signer.clone()), predefined_blocks, + SystemTime, ) }); let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone())); diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 0bc76d5947f..2148bcfb44b 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -148,3 +148,7 @@ impl PredefinedBlocks for InMemoryPredefinedBlocks { Ok(self.blocks.get(height).cloned()) } } + +pub trait GetTime: Send + Sync { + fn now(&self) -> Tai64; +} diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 968359c25f7..827be8f9998 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -23,6 +23,7 @@ use crate::{ BlockImporter, BlockProducer, BlockSigner, + GetTime, P2pPort, PredefinedBlocks, TransactionPool, @@ -71,7 +72,7 @@ use fuel_core_types::{ }; use serde::Serialize; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner>; #[derive(Clone)] pub struct SharedState { @@ -126,7 +127,7 @@ pub(crate) enum RequestType { Trigger, } -pub struct MainTask { +pub struct MainTask { signer: S, block_producer: B, block_importer: I, @@ -139,15 +140,17 @@ pub struct MainTask { last_block_created: Instant, predefined_blocks: PB, trigger: Trigger, + clock: C, /// Deadline clock, used by the triggers sync_task_handle: ServiceRunner, } -impl MainTask +impl MainTask where T: TransactionPool, I: BlockImporter, PB: PredefinedBlocks, + C: GetTime, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -159,11 +162,12 @@ where p2p_port: P, signer: S, predefined_blocks: PB, + clock: C, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(1024); let (last_height, last_timestamp, last_block_created) = - Self::extract_block_info(last_block); + Self::extract_block_info(&clock, last_block); let block_stream = block_importer.block_stream(); let peer_connections_stream = p2p_port.reserved_peers_count(); @@ -199,13 +203,17 @@ where predefined_blocks, trigger, sync_task_handle, + clock, } } - fn extract_block_info(last_block: &BlockHeader) -> (BlockHeight, Tai64, Instant) { + fn extract_block_info( + clock: &C, + last_block: &BlockHeader, + ) -> (BlockHeight, Tai64, Instant) { let last_timestamp = last_block.time(); let duration_since_last_block = - Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0)); + Duration::from_secs(clock.now().0.saturating_sub(last_timestamp.0)); let last_block_created = Instant::now() .checked_sub(duration_since_last_block) .unwrap_or(Instant::now()); @@ -231,7 +239,7 @@ where } }, RequestType::Trigger => { - let now = Tai64::now(); + let now = self.clock.now(); if now > self.last_timestamp { Ok(now) } else { @@ -242,13 +250,14 @@ where } } -impl MainTask +impl MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, S: BlockSigner, PB: PredefinedBlocks, + C: GetTime, { // Request the block producer to make a new block, and return it when ready async fn signal_produce_block( @@ -351,6 +360,9 @@ where entity: block, consensus: seal, }; + + block.entity.header().time(); + // Import the sealed block self.block_importer .commit_result(Uncommitted::new( @@ -454,7 +466,7 @@ where } fn update_last_block_values(&mut self, block_header: &Arc) { let (last_height, last_timestamp, last_block_created) = - Self::extract_block_info(block_header); + Self::extract_block_info(&self.clock, block_header); if last_height > self.last_height { self.last_height = last_height; self.last_timestamp = last_timestamp; @@ -470,14 +482,14 @@ struct PredefinedBlockWithSkippedTransactions { } #[async_trait::async_trait] -impl RunnableService for MainTask +impl RunnableService for MainTask where Self: RunnableTask, { const NAME: &'static str = "PoA"; type SharedData = SharedState; - type Task = MainTask; + type Task = MainTask; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -506,13 +518,14 @@ where } #[async_trait::async_trait] -impl RunnableTask for MainTask +impl RunnableTask for MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, S: BlockSigner, PB: PredefinedBlocks, + C: GetTime, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -611,7 +624,7 @@ where } #[allow(clippy::too_many_arguments)] -pub fn new_service( +pub fn new_service( last_block: &BlockHeader, config: Config, txpool: T, @@ -620,7 +633,8 @@ pub fn new_service( p2p_port: P, block_signer: S, predefined_blocks: PB, -) -> Service + clock: C, +) -> Service where T: TransactionPool + 'static, B: BlockProducer + 'static, @@ -628,6 +642,7 @@ where S: BlockSigner + 'static, PB: PredefinedBlocks + 'static, P: P2pPort, + C: GetTime, { Service::new(MainTask::new( last_block, @@ -638,6 +653,7 @@ where p2p_port, block_signer, predefined_blocks, + clock, )) } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 9d1b87c008e..7c0567a3de3 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -6,6 +6,7 @@ use crate::{ ports::{ BlockProducer, BlockSigner, + GetTime, InMemoryPredefinedBlocks, MockBlockImporter, MockBlockProducer, @@ -54,7 +55,10 @@ use fuel_core_types::{ ExecutionResult, UncommittedResult, }, - tai64::Tai64, + tai64::{ + Tai64, + Tai64N, + }, }; use rand::{ prelude::StdRng, @@ -82,13 +86,17 @@ use tokio::{ }; mod manually_produce_tests; +mod test_time; mod trigger_tests; +use test_time::TestTime; + struct TestContextBuilder { config: Option, txpool: Option, importer: Option, producer: Option, + start_time: Option, } fn generate_p2p_port() -> MockP2pPort { @@ -108,6 +116,7 @@ impl TestContextBuilder { txpool: None, importer: None, producer: None, + start_time: None, } } @@ -168,8 +177,15 @@ impl TestContextBuilder { let predefined_blocks = HashMap::new().into(); + let time = match self.start_time { + Some(start_time) => TestTime::new(start_time), + None => TestTime::at_unix_epoch(), + }; + + let watch = time.watch(); + let service = new_service( - &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), + &BlockHeader::new_block(BlockHeight::from(1u32), watch.now()), config.clone(), txpool, producer, @@ -177,9 +193,10 @@ impl TestContextBuilder { p2p_port, FakeBlockSigner { succeeds: true }, predefined_blocks, + watch, ); service.start().unwrap(); - TestContext { service } + TestContext { service, time } } } @@ -211,7 +228,9 @@ struct TestContext { MockBlockImporter, FakeBlockSigner, InMemoryPredefinedBlocks, + test_time::Watch, >, + time: TestTime, } impl TestContext { @@ -385,6 +404,8 @@ async fn remove_skipped_transactions() { let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); + let time = TestTime::at_unix_epoch(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -394,6 +415,7 @@ async fn remove_skipped_transactions() { p2p_port, FakeBlockSigner { succeeds: true }, predefined_blocks, + time.watch(), ); assert!(task.produce_next_block().await.is_ok()); @@ -438,6 +460,8 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); + let time = TestTime::at_unix_epoch(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -447,6 +471,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { p2p_port, FakeBlockSigner { succeeds: true }, predefined_blocks, + time.watch(), ); // simulate some txpool event to see if any block production is erroneously triggered @@ -555,6 +580,7 @@ async fn consensus_service__run__will_include_sequential_predefined_blocks_befor let mut rng = StdRng::seed_from_u64(0); let tx = make_tx(&mut rng); let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let time = TestTime::at_unix_epoch(); let task = MainTask::new( &last_block, config, @@ -564,6 +590,7 @@ async fn consensus_service__run__will_include_sequential_predefined_blocks_befor generate_p2p_port(), FakeBlockSigner { succeeds: true }, InMemoryPredefinedBlocks::new(blocks_map), + time.watch(), ); // when @@ -617,6 +644,7 @@ async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order( let mut rng = StdRng::seed_from_u64(0); let tx = make_tx(&mut rng); let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let time = TestTime::at_unix_epoch(); let task = MainTask::new( &last_block, config, @@ -626,6 +654,7 @@ async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order( generate_p2p_port(), FakeBlockSigner { succeeds: true }, InMemoryPredefinedBlocks::new(predefined_blocks_map), + time.watch(), ); // when diff --git a/crates/services/consensus_module/poa/src/service_test/test_time.rs b/crates/services/consensus_module/poa/src/service_test/test_time.rs new file mode 100644 index 00000000000..173c5934615 --- /dev/null +++ b/crates/services/consensus_module/poa/src/service_test/test_time.rs @@ -0,0 +1,81 @@ +use std::time::Duration; + +use fuel_core_types::tai64::{ + Tai64, + Tai64N, +}; +use tokio::{ + sync::watch, + time::Instant, +}; + +use crate::ports::GetTime; + +pub struct TestTime { + time: watch::Sender, + last_observed_tokio_time: Instant, +} + +impl TestTime { + pub fn at_unix_epoch() -> Self { + Self::new(Tai64N::UNIX_EPOCH) + } + + pub fn new(start_time: Tai64N) -> Self { + tokio::time::pause(); + let time = watch::Sender::new(start_time); + let last_observed_tokio_time = Instant::now(); + + Self { + time, + last_observed_tokio_time, + } + } + + /// Advances time by the same duration that Tokio's time has been advanced. + /// + /// This function is particularly useful in scenarios where `tokio::time::advance` has been + /// manually invoked, or when Tokio has automatically advanced the time. + /// For more information on auto-advancement, see the + /// [Tokio documentation](https://docs.rs/tokio/1.39.3/tokio/time/fn.advance.html#auto-advance). + pub fn advance_with_tokio(&mut self) { + let current_tokio_time = Instant::now(); + + let tokio_advance = + current_tokio_time.duration_since(self.last_observed_tokio_time); + self.last_observed_tokio_time = current_tokio_time; + + self.advance(tokio_advance); + } + + pub fn advance(&mut self, duration: Duration) { + self.time + .send_modify(|timestamp| *timestamp = *timestamp + duration); + } + + pub fn rewind(&mut self, duration: Duration) { + self.time + .send_modify(|timestamp| *timestamp = *timestamp - duration); + } + + pub fn watch(&self) -> Watch { + let time = self.time.subscribe(); + Watch { time } + } +} + +impl Drop for TestTime { + fn drop(&mut self) { + tokio::time::resume(); + } +} + +pub struct Watch { + time: watch::Receiver, +} + +impl GetTime for Watch { + fn now(&self) -> Tai64 { + self.time.borrow().0 + } +} diff --git a/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs b/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs index 54c92fd184c..4825624da63 100644 --- a/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs +++ b/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs @@ -6,7 +6,7 @@ use tokio::{ use super::*; -#[tokio::test(start_paused = true)] // Run with time paused, start/stop must still work +#[tokio::test] async fn clean_startup_shutdown_each_trigger() -> anyhow::Result<()> { for trigger in [ Trigger::Never, @@ -30,7 +30,7 @@ async fn clean_startup_shutdown_each_trigger() -> anyhow::Result<()> { Ok(()) } -#[tokio::test(start_paused = true)] +#[tokio::test] async fn never_trigger_never_produces_blocks() { const TX_COUNT: usize = 10; let mut rng = StdRng::seed_from_u64(1234u64); @@ -104,7 +104,27 @@ impl DefaultContext { importer .expect_block_stream() .returning(|| Box::pin(tokio_stream::pending())); + + let mut block_producer = MockBlockProducer::default(); + block_producer + .expect_produce_and_execute_block() + .returning(|_, time, _| { + let mut block = Block::default(); + block.header_mut().set_time(time); + block.header_mut().recalculate_metadata(); + Ok(UncommittedResult::new( + ExecutionResult { + block, + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + }); + ctx_builder.with_importer(importer); + ctx_builder.with_producer(block_producer); let test_ctx = ctx_builder.build(); @@ -118,7 +138,7 @@ impl DefaultContext { } } -#[tokio::test(start_paused = true)] +#[tokio::test] async fn instant_trigger_produces_block_instantly() { let mut ctx = DefaultContext::new(Config { trigger: Trigger::Instant, @@ -135,7 +155,7 @@ async fn instant_trigger_produces_block_instantly() { assert_eq!(ctx.test_ctx.stop().await, State::Stopped); } -#[tokio::test(start_paused = true)] +#[tokio::test] async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { let mut ctx = DefaultContext::new(Config { trigger: Trigger::Interval { @@ -153,7 +173,7 @@ async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { Err(broadcast::error::TryRecvError::Empty) )); - // Pass time until a single block is produced, and a bit more + // Pause time until a single block is produced, and a bit more time::sleep(Duration::new(3, 0)).await; // Make sure the empty block is actually produced @@ -167,7 +187,7 @@ async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { Err(broadcast::error::TryRecvError::Empty) )); - // Pass time until a the next block is produced + // Pause time until a the next block is produced time::sleep(Duration::new(2, 0)).await; // Make sure it's produced @@ -184,7 +204,7 @@ async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { Err(broadcast::error::TryRecvError::Empty) )); - // Pass time until a the next block is produced + // Pause time until a the next block is produced time::sleep(Duration::new(2, 0)).await; // Make sure only one block is produced @@ -200,7 +220,7 @@ async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { Ok(()) } -#[tokio::test(start_paused = true)] +#[tokio::test] async fn service__if_commit_result_fails_then_retry_commit_result_after_one_second( ) -> anyhow::Result<()> { // given @@ -256,7 +276,7 @@ async fn service__if_commit_result_fails_then_retry_commit_result_after_one_seco Ok(()) } -#[tokio::test(start_paused = true)] +#[tokio::test] async fn interval_trigger_doesnt_react_to_full_txpool() -> anyhow::Result<()> { let mut ctx = DefaultContext::new(Config { trigger: Trigger::Interval { @@ -299,3 +319,141 @@ async fn interval_trigger_doesnt_react_to_full_txpool() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn interval_trigger_produces_blocks_in_the_future_when_time_is_lagging() { + // Given + + let block_time = Duration::from_secs(10); + let offset = Duration::from_secs(1); + let mut ctx = DefaultContext::new(Config { + trigger: Trigger::Interval { block_time }, + signer: SignMode::Key(test_signing_key()), + metrics: false, + ..Default::default() + }); + ctx.status_sender.send_replace(Some(TxId::zeroed())); + let start_time = ctx.test_ctx.time.watch().now(); + + // When + + // We produce three blocks without advancing real time. + time::sleep(block_time * 3 + offset).await; + let first_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + let second_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + let third_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + + // Then + + // We should only have produced the three blocks. + assert!(matches!( + ctx.block_import.try_recv(), + Err(broadcast::error::TryRecvError::Empty) + )); + + // Even though real time is frozen, the blocks should advance into the future. + assert_eq!(first_block_time, start_time + block_time.as_secs() * 1); + assert_eq!(second_block_time, start_time + block_time.as_secs() * 2); + assert_eq!(third_block_time, start_time + block_time.as_secs() * 3); + + ctx.test_ctx.service.stop_and_await().await.unwrap(); +} + +#[tokio::test] +async fn interval_trigger_produces_blocks_with_current_time_when_block_production_is_lagging( +) { + // Given + + let block_time = Duration::from_secs(10); + let second_block_delay = Duration::from_secs(5); + let offset = Duration::from_secs(1); + let mut ctx = DefaultContext::new(Config { + trigger: Trigger::Interval { block_time }, + signer: SignMode::Key(test_signing_key()), + metrics: false, + ..Default::default() + }); + ctx.status_sender.send_replace(Some(TxId::zeroed())); + let start_time = ctx.test_ctx.time.watch().now(); + + // When + + // We produce the first block in real time. + time::sleep(block_time + offset).await; + ctx.test_ctx.time.advance_with_tokio(); + let first_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + + // But we produce second block with a delay relative to real time. + ctx.test_ctx.time.advance(block_time + second_block_delay); + time::sleep(block_time).await; + let second_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + + // And the third block is produced without advancing real time. + time::sleep(block_time).await; + let third_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + + // Then + + // We should only have produced the three blocks. + assert!(matches!( + ctx.block_import.try_recv(), + Err(broadcast::error::TryRecvError::Empty) + )); + + // The fist block should be produced after the given block time. + assert_eq!(first_block_time, start_time + block_time.as_secs()); + + // The second block should have a delay in its timestamp. + assert_eq!( + second_block_time, + first_block_time + block_time.as_secs() + second_block_delay.as_secs() + ); + + // The third block should be produced `block_time` in the future relative to the second block time. + assert_eq!(third_block_time, second_block_time + block_time.as_secs()); + + ctx.test_ctx.service.stop_and_await().await.unwrap(); +} + +#[tokio::test] +async fn interval_trigger_produces_blocks_in_the_future_when_time_rewinds() { + // Given + + let block_time = Duration::from_secs(10); + let offset = Duration::from_secs(1); + let mut ctx = DefaultContext::new(Config { + trigger: Trigger::Interval { block_time }, + signer: SignMode::Key(test_signing_key()), + metrics: false, + ..Default::default() + }); + ctx.status_sender.send_replace(Some(TxId::zeroed())); + let start_time = ctx.test_ctx.time.watch().now(); + + // When + + // We produce the first block in real time. + time::sleep(block_time + offset).await; + ctx.test_ctx.time.advance_with_tokio(); + let first_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + + // And we rewind time before attempting to produce the next block. + ctx.test_ctx.time.rewind(block_time); + time::sleep(block_time).await; + let second_block_time = ctx.block_import.try_recv().unwrap().entity.header().time(); + + // Then + + // We should only have produced two blocks. + assert!(matches!( + ctx.block_import.try_recv(), + Err(broadcast::error::TryRecvError::Empty) + )); + + // The fist block should be produced after the given block time. + assert_eq!(first_block_time, start_time + block_time.as_secs()); + + // Even though the real time clock rewinded, the second block is produced with a future timestamp + // similarly to how it works when time is lagging. + assert_eq!(second_block_time, start_time + block_time.as_secs() * 2); +}