Skip to content

Commit

Permalink
feat: Introduce time port in PoA service (#2145)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
Closes #2091 

## Description
This PR introduces a `GetTime` port in the PoA service. The port is
implemented in a `SystemTime` adapter matching the current behavior of
the PoA service when running live, and in a `TestTime` implementation
allowing fine-grained control of how time elapses in tests.

Three additional tests have been added for previously untestable (but
correct) behavior:
-
`service_test::trigger_tests::interval_trigger_produces_blocks_in_the_future_when_time_is_lagging`
-
`service_test::trigger_tests::interval_trigger_produces_blocks_with_current_time_when_block_production_is_lagging`
-
`service_test::trigger_tests::interval_trigger_produces_blocks_in_the_future_when_time_rewinds`


## Checklist
- [x] Old behavior is reflected in tests

### Before requesting review
- [x] I have reviewed the code myself
  • Loading branch information
netrome authored Sep 6, 2024
1 parent 611bf7c commit d5ba082
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 26 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added
- [2135](https://github.com/FuelLabs/fuel-core/pull/2135): Added metrics logging for number of blocks served over the p2p req/res protocol.

### Changed

#### Breaking
- [2145](https://github.com/FuelLabs/fuel-core/pull/2145): feat: Introduce time port in PoA service.
- [2155](https://github.com/FuelLabs/fuel-core/pull/2155): Added trait declaration for block committer data
- [2142](https://github.com/FuelLabs/fuel-core/pull/2142): Added benchmarks for varied forms of db lookups to assist in optimizations.
- [2158](https://github.com/FuelLabs/fuel-core/pull/2158): Log the public address of the signing key, if it is specified
Expand Down
9 changes: 9 additions & 0 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use fuel_core_types::{
},
fuel_types::BlockHeight,
services::block_importer::SharedImportResult,
tai64::Tai64,
};
use fuel_core_upgradable_executor::executor::Executor;
use std::sync::Arc;
Expand Down Expand Up @@ -260,3 +261,11 @@ impl SharedMemoryPool {
}
}
}

pub struct SystemTime;

impl fuel_core_poa::ports::GetTime for SystemTime {
fn now(&self) -> Tai64 {
Tai64::now()
}
}
3 changes: 3 additions & 0 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
MaybeRelayerAdapter,
PoAAdapter,
SharedMemoryPool,
SystemTime,
TxPoolAdapter,
VerifierAdapter,
},
Expand Down Expand Up @@ -71,6 +72,7 @@ pub type PoAService = fuel_core_poa::Service<
BlockImporterAdapter,
SignMode,
InDirectoryPredefinedBlocks,
SystemTime,
>;
#[cfg(feature = "p2p")]
pub type P2PService = fuel_core_p2p::service::Service<Database>;
Expand Down Expand Up @@ -259,6 +261,7 @@ pub fn init_sub_services(
p2p_adapter.clone(),
FuelBlockSigner::new(config.consensus_signer.clone()),
predefined_blocks,
SystemTime,
)
});
let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone()));
Expand Down
4 changes: 4 additions & 0 deletions crates/services/consensus_module/poa/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,7 @@ impl PredefinedBlocks for InMemoryPredefinedBlocks {
Ok(self.blocks.get(height).cloned())
}
}

pub trait GetTime: Send + Sync {
fn now(&self) -> Tai64;
}
44 changes: 30 additions & 14 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
BlockImporter,
BlockProducer,
BlockSigner,
GetTime,
P2pPort,
PredefinedBlocks,
TransactionPool,
Expand Down Expand Up @@ -71,7 +72,7 @@ use fuel_core_types::{
};
use serde::Serialize;

pub type Service<T, B, I, S, PB> = ServiceRunner<MainTask<T, B, I, S, PB>>;
pub type Service<T, B, I, S, PB, C> = ServiceRunner<MainTask<T, B, I, S, PB, C>>;

#[derive(Clone)]
pub struct SharedState {
Expand Down Expand Up @@ -126,7 +127,7 @@ pub(crate) enum RequestType {
Trigger,
}

pub struct MainTask<T, B, I, S, PB> {
pub struct MainTask<T, B, I, S, PB, C> {
signer: S,
block_producer: B,
block_importer: I,
Expand All @@ -139,15 +140,17 @@ pub struct MainTask<T, B, I, S, PB> {
last_block_created: Instant,
predefined_blocks: PB,
trigger: Trigger,
clock: C,
/// Deadline clock, used by the triggers
sync_task_handle: ServiceRunner<SyncTask>,
}

impl<T, B, I, S, PB> MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> MainTask<T, B, I, S, PB, C>
where
T: TransactionPool,
I: BlockImporter,
PB: PredefinedBlocks,
C: GetTime,
{
#[allow(clippy::too_many_arguments)]
pub fn new<P: P2pPort>(
Expand All @@ -159,11 +162,12 @@ where
p2p_port: P,
signer: S,
predefined_blocks: PB,
clock: C,
) -> Self {
let tx_status_update_stream = txpool.transaction_status_events();
let (request_sender, request_receiver) = mpsc::channel(1024);
let (last_height, last_timestamp, last_block_created) =
Self::extract_block_info(last_block);
Self::extract_block_info(clock.now(), last_block);

let block_stream = block_importer.block_stream();
let peer_connections_stream = p2p_port.reserved_peers_count();
Expand Down Expand Up @@ -199,13 +203,17 @@ where
predefined_blocks,
trigger,
sync_task_handle,
clock,
}
}

fn extract_block_info(last_block: &BlockHeader) -> (BlockHeight, Tai64, Instant) {
fn extract_block_info(
now: Tai64,
last_block: &BlockHeader,
) -> (BlockHeight, Tai64, Instant) {
let last_timestamp = last_block.time();
let duration_since_last_block =
Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0));
Duration::from_secs(now.0.saturating_sub(last_timestamp.0));
let last_block_created = Instant::now()
.checked_sub(duration_since_last_block)
.unwrap_or(Instant::now());
Expand All @@ -231,7 +239,7 @@ where
}
},
RequestType::Trigger => {
let now = Tai64::now();
let now = self.clock.now();
if now > self.last_timestamp {
Ok(now)
} else {
Expand All @@ -242,13 +250,14 @@ where
}
}

impl<T, B, I, S, PB> MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> MainTask<T, B, I, S, PB, C>
where
T: TransactionPool,
B: BlockProducer,
I: BlockImporter,
S: BlockSigner,
PB: PredefinedBlocks,
C: GetTime,
{
// Request the block producer to make a new block, and return it when ready
async fn signal_produce_block(
Expand Down Expand Up @@ -351,6 +360,9 @@ where
entity: block,
consensus: seal,
};

block.entity.header().time();

// Import the sealed block
self.block_importer
.commit_result(Uncommitted::new(
Expand Down Expand Up @@ -454,7 +466,7 @@ where
}
fn update_last_block_values(&mut self, block_header: &Arc<BlockHeader>) {
let (last_height, last_timestamp, last_block_created) =
Self::extract_block_info(block_header);
Self::extract_block_info(self.clock.now(), block_header);
if last_height > self.last_height {
self.last_height = last_height;
self.last_timestamp = last_timestamp;
Expand All @@ -470,14 +482,14 @@ struct PredefinedBlockWithSkippedTransactions {
}

#[async_trait::async_trait]
impl<T, B, I, S, PB> RunnableService for MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> RunnableService for MainTask<T, B, I, S, PB, C>
where
Self: RunnableTask,
{
const NAME: &'static str = "PoA";

type SharedData = SharedState;
type Task = MainTask<T, B, I, S, PB>;
type Task = MainTask<T, B, I, S, PB, C>;
type TaskParams = ();

fn shared_data(&self) -> Self::SharedData {
Expand Down Expand Up @@ -506,13 +518,14 @@ where
}

#[async_trait::async_trait]
impl<T, B, I, S, PB> RunnableTask for MainTask<T, B, I, S, PB>
impl<T, B, I, S, PB, C> RunnableTask for MainTask<T, B, I, S, PB, C>
where
T: TransactionPool,
B: BlockProducer,
I: BlockImporter,
S: BlockSigner,
PB: PredefinedBlocks,
C: GetTime,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
Expand Down Expand Up @@ -611,7 +624,7 @@ where
}

#[allow(clippy::too_many_arguments)]
pub fn new_service<T, B, I, P, S, PB>(
pub fn new_service<T, B, I, P, S, PB, C>(
last_block: &BlockHeader,
config: Config,
txpool: T,
Expand All @@ -620,14 +633,16 @@ pub fn new_service<T, B, I, P, S, PB>(
p2p_port: P,
block_signer: S,
predefined_blocks: PB,
) -> Service<T, B, I, S, PB>
clock: C,
) -> Service<T, B, I, S, PB, C>
where
T: TransactionPool + 'static,
B: BlockProducer + 'static,
I: BlockImporter + 'static,
S: BlockSigner + 'static,
PB: PredefinedBlocks + 'static,
P: P2pPort,
C: GetTime,
{
Service::new(MainTask::new(
last_block,
Expand All @@ -638,6 +653,7 @@ where
p2p_port,
block_signer,
predefined_blocks,
clock,
))
}

Expand Down
Loading

0 comments on commit d5ba082

Please sign in to comment.