Skip to content

Commit

Permalink
feat: Introduce time port in PoA service
Browse files Browse the repository at this point in the history
  • Loading branch information
netrome committed Aug 30, 2024
1 parent 9f96573 commit f6a08b4
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 26 deletions.
9 changes: 9 additions & 0 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use fuel_core_types::{
},
fuel_types::BlockHeight,
services::block_importer::SharedImportResult,
tai64::Tai64,
};
use fuel_core_upgradable_executor::executor::Executor;
use std::sync::Arc;
Expand Down Expand Up @@ -260,3 +261,11 @@ impl SharedMemoryPool {
}
}
}

pub struct SystemTime;

impl fuel_core_poa::ports::GetTime for SystemTime {
fn now(&self) -> Tai64 {
Tai64::now()
}
}
3 changes: 3 additions & 0 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
MaybeRelayerAdapter,
PoAAdapter,
SharedMemoryPool,
SystemTime,
TxPoolAdapter,
VerifierAdapter,
},
Expand Down Expand Up @@ -71,6 +72,7 @@ pub type PoAService = fuel_core_poa::Service<
BlockImporterAdapter,
SignMode,
InDirectoryPredefinedBlocks,
SystemTime,
>;
#[cfg(feature = "p2p")]
pub type P2PService = fuel_core_p2p::service::Service<Database>;
Expand Down Expand Up @@ -259,6 +261,7 @@ pub fn init_sub_services(
p2p_adapter.clone(),
FuelBlockSigner::new(config.consensus_signer.clone()),
predefined_blocks,
SystemTime,
)
});
let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone()));
Expand Down
4 changes: 4 additions & 0 deletions crates/services/consensus_module/poa/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,7 @@ impl PredefinedBlocks for InMemoryPredefinedBlocks {
Ok(self.blocks.get(height).cloned())
}
}

pub trait GetTime: Send + Sync {
fn now(&self) -> Tai64;
}
44 changes: 30 additions & 14 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
BlockImporter,
BlockProducer,
BlockSigner,
GetTime,
P2pPort,
PredefinedBlocks,
TransactionPool,
Expand Down Expand Up @@ -71,7 +72,7 @@ use fuel_core_types::{
};
use serde::Serialize;

pub type Service<T, B, I, S, PB> = ServiceRunner<MainTask<T, B, I, S, PB>>;
pub type Service<T, B, I, S, PB, C> = ServiceRunner<MainTask<T, B, I, S, PB, C>>;

#[derive(Clone)]
pub struct SharedState {
Expand Down Expand Up @@ -126,7 +127,7 @@ pub(crate) enum RequestType {
Trigger,
}

pub struct MainTask<T, B, I, S, PB> {
pub struct MainTask<T, B, I, S, PB, C> {
signer: S,
block_producer: B,
block_importer: I,
Expand All @@ -139,15 +140,17 @@ pub struct MainTask<T, B, I, S, PB> {
last_block_created: Instant,
predefined_blocks: PB,
trigger: Trigger,
clock: C,
/// Deadline clock, used by the triggers
sync_task_handle: ServiceRunner<SyncTask>,
}

impl<T, B, I, S, PB> MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> MainTask<T, B, I, S, PB, C>
where
T: TransactionPool,
I: BlockImporter,
PB: PredefinedBlocks,
C: GetTime,
{
#[allow(clippy::too_many_arguments)]
pub fn new<P: P2pPort>(
Expand All @@ -159,11 +162,12 @@ where
p2p_port: P,
signer: S,
predefined_blocks: PB,
clock: C,
) -> Self {
let tx_status_update_stream = txpool.transaction_status_events();
let (request_sender, request_receiver) = mpsc::channel(1024);
let (last_height, last_timestamp, last_block_created) =
Self::extract_block_info(last_block);
Self::extract_block_info(&clock, last_block);

let block_stream = block_importer.block_stream();
let peer_connections_stream = p2p_port.reserved_peers_count();
Expand Down Expand Up @@ -199,13 +203,17 @@ where
predefined_blocks,
trigger,
sync_task_handle,
clock,
}
}

fn extract_block_info(last_block: &BlockHeader) -> (BlockHeight, Tai64, Instant) {
fn extract_block_info(
clock: &C,
last_block: &BlockHeader,
) -> (BlockHeight, Tai64, Instant) {
let last_timestamp = last_block.time();
let duration_since_last_block =
Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0));
Duration::from_secs(clock.now().0.saturating_sub(last_timestamp.0));
let last_block_created = Instant::now()
.checked_sub(duration_since_last_block)
.unwrap_or(Instant::now());
Expand All @@ -231,7 +239,7 @@ where
}
},
RequestType::Trigger => {
let now = Tai64::now();
let now = self.clock.now();
if now > self.last_timestamp {
Ok(now)
} else {
Expand All @@ -242,13 +250,14 @@ where
}
}

impl<T, B, I, S, PB> MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> MainTask<T, B, I, S, PB, C>
where
T: TransactionPool,
B: BlockProducer,
I: BlockImporter,
S: BlockSigner,
PB: PredefinedBlocks,
C: GetTime,
{
// Request the block producer to make a new block, and return it when ready
async fn signal_produce_block(
Expand Down Expand Up @@ -351,6 +360,9 @@ where
entity: block,
consensus: seal,
};

block.entity.header().time();

// Import the sealed block
self.block_importer
.commit_result(Uncommitted::new(
Expand Down Expand Up @@ -454,7 +466,7 @@ where
}
fn update_last_block_values(&mut self, block_header: &Arc<BlockHeader>) {
let (last_height, last_timestamp, last_block_created) =
Self::extract_block_info(block_header);
Self::extract_block_info(&self.clock, block_header);
if last_height > self.last_height {
self.last_height = last_height;
self.last_timestamp = last_timestamp;
Expand All @@ -470,14 +482,14 @@ struct PredefinedBlockWithSkippedTransactions {
}

#[async_trait::async_trait]
impl<T, B, I, S, PB> RunnableService for MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> RunnableService for MainTask<T, B, I, S, PB, C>
where
Self: RunnableTask,
{
const NAME: &'static str = "PoA";

type SharedData = SharedState;
type Task = MainTask<T, B, I, S, PB>;
type Task = MainTask<T, B, I, S, PB, C>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
Expand Down Expand Up @@ -506,13 +518,14 @@ where
}

#[async_trait::async_trait]
impl<T, B, I, S, PB> RunnableTask for MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> RunnableTask for MainTask<T, B, I, S, PB, C>
where
T: TransactionPool,
B: BlockProducer,
I: BlockImporter,
S: BlockSigner,
PB: PredefinedBlocks,
C: GetTime,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
Expand Down Expand Up @@ -611,7 +624,7 @@ where
}

#[allow(clippy::too_many_arguments)]
pub fn new_service<T, B, I, P, S, PB>(
pub fn new_service<T, B, I, P, S, PB, C>(
last_block: &BlockHeader,
config: Config,
txpool: T,
Expand All @@ -620,14 +633,16 @@ pub fn new_service<T, B, I, P, S, PB>(
p2p_port: P,
block_signer: S,
predefined_blocks: PB,
) -> Service<T, B, I, S, PB>
clock: C,
) -> Service<T, B, I, S, PB, C>
where
T: TransactionPool + 'static,
B: BlockProducer + 'static,
I: BlockImporter + 'static,
S: BlockSigner + 'static,
PB: PredefinedBlocks + 'static,
P: P2pPort,
C: GetTime,
{
Service::new(MainTask::new(
last_block,
Expand All @@ -638,6 +653,7 @@ where
p2p_port,
block_signer,
predefined_blocks,
clock,
))
}

Expand Down
35 changes: 32 additions & 3 deletions crates/services/consensus_module/poa/src/service_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
ports::{
BlockProducer,
BlockSigner,
GetTime,
InMemoryPredefinedBlocks,
MockBlockImporter,
MockBlockProducer,
Expand Down Expand Up @@ -54,7 +55,10 @@ use fuel_core_types::{
ExecutionResult,
UncommittedResult,
},
tai64::Tai64,
tai64::{
Tai64,
Tai64N,
},
};
use rand::{
prelude::StdRng,
Expand Down Expand Up @@ -82,13 +86,17 @@ use tokio::{
};

mod manually_produce_tests;
mod test_time;
mod trigger_tests;

use test_time::TestTime;

struct TestContextBuilder {
config: Option<Config>,
txpool: Option<MockTransactionPool>,
importer: Option<MockBlockImporter>,
producer: Option<MockBlockProducer>,
start_time: Option<Tai64N>,
}

fn generate_p2p_port() -> MockP2pPort {
Expand All @@ -108,6 +116,7 @@ impl TestContextBuilder {
txpool: None,
importer: None,
producer: None,
start_time: None,
}
}

Expand Down Expand Up @@ -168,18 +177,26 @@ impl TestContextBuilder {

let predefined_blocks = HashMap::new().into();

let time = match self.start_time {
Some(start_time) => TestTime::new(start_time),
None => TestTime::at_unix_epoch(),
};

let watch = time.watch();

let service = new_service(
&BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()),
&BlockHeader::new_block(BlockHeight::from(1u32), watch.now()),
config.clone(),
txpool,
producer,
importer,
p2p_port,
FakeBlockSigner { succeeds: true },
predefined_blocks,
watch,
);
service.start().unwrap();
TestContext { service }
TestContext { service, time }
}
}

Expand Down Expand Up @@ -211,7 +228,9 @@ struct TestContext {
MockBlockImporter,
FakeBlockSigner,
InMemoryPredefinedBlocks,
test_time::Watch,
>,
time: TestTime,
}

impl TestContext {
Expand Down Expand Up @@ -385,6 +404,8 @@ async fn remove_skipped_transactions() {

let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into();

let time = TestTime::at_unix_epoch();

let mut task = MainTask::new(
&BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()),
config,
Expand All @@ -394,6 +415,7 @@ async fn remove_skipped_transactions() {
p2p_port,
FakeBlockSigner { succeeds: true },
predefined_blocks,
time.watch(),
);

assert!(task.produce_next_block().await.is_ok());
Expand Down Expand Up @@ -438,6 +460,8 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() {

let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into();

let time = TestTime::at_unix_epoch();

let mut task = MainTask::new(
&BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()),
config,
Expand All @@ -447,6 +471,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() {
p2p_port,
FakeBlockSigner { succeeds: true },
predefined_blocks,
time.watch(),
);

// simulate some txpool event to see if any block production is erroneously triggered
Expand Down Expand Up @@ -555,6 +580,7 @@ async fn consensus_service__run__will_include_sequential_predefined_blocks_befor
let mut rng = StdRng::seed_from_u64(0);
let tx = make_tx(&mut rng);
let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]);
let time = TestTime::at_unix_epoch();
let task = MainTask::new(
&last_block,
config,
Expand All @@ -564,6 +590,7 @@ async fn consensus_service__run__will_include_sequential_predefined_blocks_befor
generate_p2p_port(),
FakeBlockSigner { succeeds: true },
InMemoryPredefinedBlocks::new(blocks_map),
time.watch(),
);

// when
Expand Down Expand Up @@ -617,6 +644,7 @@ async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order(
let mut rng = StdRng::seed_from_u64(0);
let tx = make_tx(&mut rng);
let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]);
let time = TestTime::at_unix_epoch();
let task = MainTask::new(
&last_block,
config,
Expand All @@ -626,6 +654,7 @@ async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order(
generate_p2p_port(),
FakeBlockSigner { succeeds: true },
InMemoryPredefinedBlocks::new(predefined_blocks_map),
time.watch(),
);

// when
Expand Down
Loading

0 comments on commit f6a08b4

Please sign in to comment.