Skip to content

Commit

Permalink
added a period notify functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
undercover-cactus committed Nov 21, 2024
1 parent 77e28d2 commit 120b859
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 5 deletions.
16 changes: 16 additions & 0 deletions client/blockchain-service/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ pub struct SpStopStoringInsolventUser {
}
impl EventBusMessage for SpStopStoringInsolventUser {}

/// Notify period event.
///
/// This event is emitted when a X amount of block has passed. It is configured at the start of the service.
#[derive(Debug, Clone)]
pub struct NotifyPeriod {}

impl EventBusMessage for NotifyPeriod {}

/// The event bus provider for the BlockchainService actor.
///
/// It holds the event buses for the different events that the BlockchainService actor
Expand All @@ -268,6 +276,7 @@ pub struct BlockchainServiceEventBusProvider {
user_without_funds_event_bus: EventBus<UserWithoutFunds>,
sp_stop_storing_insolvent_user_event_bus: EventBus<SpStopStoringInsolventUser>,
finalised_msp_stopped_storing_bucket_event_bus: EventBus<FinalisedMspStoppedStoringBucket>,
notify_period_event_bus: EventBus<NotifyPeriod>,
}

impl BlockchainServiceEventBusProvider {
Expand All @@ -288,6 +297,7 @@ impl BlockchainServiceEventBusProvider {
user_without_funds_event_bus: EventBus::new(),
sp_stop_storing_insolvent_user_event_bus: EventBus::new(),
finalised_msp_stopped_storing_bucket_event_bus: EventBus::new(),
notify_period_event_bus: EventBus::new(),
}
}
}
Expand Down Expand Up @@ -383,3 +393,9 @@ impl ProvidesEventBus<FinalisedMspStoppedStoringBucket> for BlockchainServiceEve
&self.finalised_msp_stopped_storing_bucket_event_bus
}
}

impl ProvidesEventBus<NotifyPeriod> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<NotifyPeriod> {
&self.notify_period_event_bus
}
}
17 changes: 16 additions & 1 deletion client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::{
events::{
AcceptedBspVolunteer, BlockchainServiceEventBusProvider,
FinalisedTrieRemoveMutationsApplied, LastChargeableInfoUpdated, NewStorageRequest,
SlashableProvider, SpStopStoringInsolventUser, UserWithoutFunds,
NotifyPeriod, SlashableProvider, SpStopStoringInsolventUser, UserWithoutFunds,
},
state::{
BlockchainServiceStateStore, LastProcessedBlockNumberCf,
Expand Down Expand Up @@ -109,6 +109,8 @@ pub struct BlockchainService {
/// various edge cases when restarting the node, all originating from the "dynamic" way of
/// computing the next challenges tick. This case is handled separately.
pub(crate) pending_submit_proof_requests: BTreeSet<SubmitProofRequest>,
/// Notify period value to know when to trigger the NotifyPeriod event.
notify_period: Option<u32>,
}

/// Event loop for the BlockchainService actor.
Expand Down Expand Up @@ -931,6 +933,7 @@ impl BlockchainService {
rpc_handlers: Arc<RpcHandlers>,
keystore: KeystorePtr,
rocksdb_root_path: impl Into<PathBuf>,
notify_period: Option<u32>,
) -> Self {
Self {
client,
Expand All @@ -945,6 +948,7 @@ impl BlockchainService {
last_block_processed: Zero::zero(),
persistent_state: BlockchainServiceStateStore::new(rocksdb_root_path.into()),
pending_submit_proof_requests: BTreeSet::new(),
notify_period,
}
}

Expand Down Expand Up @@ -1064,6 +1068,9 @@ impl BlockchainService {
// Process pending requests that update the forest root.
self.check_pending_forest_root_writes();

// Check that trigger an event every X amount of blocks (specified in config).
self.check_for_notify(&block_number);

let state_store_context = self.persistent_state.open_rw_context_with_overlay();
// Get events from storage.
match get_events_at_block(&self.client, block_hash) {
Expand Down Expand Up @@ -1298,4 +1305,12 @@ impl BlockchainService {
}
}
}

fn check_for_notify(&self, block_number: &BlockNumber) {
if let Some(np) = self.notify_period {
if block_number % np == 0 {
self.emit(NotifyPeriod {});
}
}
}
}
10 changes: 8 additions & 2 deletions client/blockchain-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ pub async fn spawn_blockchain_service(
rpc_handlers: Arc<RpcHandlers>,
keystore: KeystorePtr,
rocksdb_root_path: impl Into<PathBuf>,
notify_period: Option<u32>,
) -> ActorHandle<BlockchainService> {
let task_spawner = task_spawner
.with_name("blockchain-service")
.with_group("network");

let blockchain_service =
BlockchainService::new(client, rpc_handlers, keystore, rocksdb_root_path);
let blockchain_service = BlockchainService::new(
client,
rpc_handlers,
keystore,
rocksdb_root_path,
notify_period,
);

task_spawner.spawn_actor(blockchain_service)
}
7 changes: 7 additions & 0 deletions node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ pub struct ProviderConfigurations {
/// Extrinsic retry timeout in seconds.
#[clap(long, default_value = "60")]
pub extrinsic_retry_timeout: u64,

/// MSP charging fees frequency.
#[clap(long, required_if_eq_any([
("provider_type", "msp"),
]))]
pub msp_charging_freq: Option<u32>,
}

impl ProviderConfigurations {
Expand All @@ -128,6 +134,7 @@ impl ProviderConfigurations {
max_storage_capacity: self.max_storage_capacity,
jump_capacity: self.jump_capacity,
extrinsic_retry_timeout: self.extrinsic_retry_timeout,
msp_charging_freq: self.msp_charging_freq,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct ProviderOptions {
pub jump_capacity: Option<StorageDataUnit>,
/// Extrinsic retry timeout in seconds.
pub extrinsic_retry_timeout: u64,
/// MSP charging fees frequency.
pub msp_charging_freq: Option<u32>,
}

fn load_spec(id: &str) -> std::result::Result<Box<dyn ChainSpec>, String> {
Expand Down
6 changes: 4 additions & 2 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,12 @@ where
max_storage_capacity,
jump_capacity,
extrinsic_retry_timeout,
msp_charging_freq,
..
}) => {
info!(
"Starting as a Storage Provider. Storage path: {:?}, Max storage capacity: {:?}, Jump capacity: {:?}",
storage_path, max_storage_capacity, jump_capacity
"Starting as a Storage Provider. Storage path: {:?}, Max storage capacity: {:?}, Jump capacity: {:?}, MSP charging frequency: {:?}",
storage_path, max_storage_capacity, jump_capacity, msp_charging_freq,
);

// Start building the StorageHubHandler, if running as a provider.
Expand All @@ -244,6 +245,7 @@ where
*max_storage_capacity,
*jump_capacity,
*extrinsic_retry_timeout,
*msp_charging_freq,
);

let rpc_config = storage_hub_builder.create_rpc_config(keystore);
Expand Down
37 changes: 37 additions & 0 deletions node/src/services/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ where
max_storage_capacity: Option<StorageDataUnit>,
jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
notify_period: Option<u32>,
}

/// Common components to build for any given configuration of [`RoleSupport`] and [`StorageLayerSupport`].
Expand All @@ -125,6 +126,7 @@ where
max_storage_capacity: None,
jump_capacity: None,
extrinsic_retry_timeout: DEFAULT_EXTRINSIC_RETRY_TIMEOUT_SECONDS,
notify_period: None,
}
}

Expand Down Expand Up @@ -166,6 +168,18 @@ where
self
}

// TODO: add a function ´with_notify_period´ to be called in the service.
// but check that ´blockchain´ is None (so we don't have called with_blockchain before)
pub fn with_notify_period(&mut self, notify_period: u32) -> &mut Self {
if self.blockchain.is_some() {
panic!(
"`with_notify_period`should never be called after starting the blockchain service."
);
}
self.notify_period = Some(notify_period);
self
}

pub async fn with_blockchain(
&mut self,
client: Arc<ParachainClient>,
Expand All @@ -181,6 +195,7 @@ where
rpc_handlers.clone(),
keystore.clone(),
rocksdb_root_path,
self.notify_period,
)
.await;

Expand Down Expand Up @@ -410,6 +425,7 @@ pub trait RequiredStorageProviderSetup {
max_storage_capacity: Option<StorageDataUnit>,
jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
msp_charging_freq: Option<u32>,
);
}

Expand All @@ -424,11 +440,16 @@ where
max_storage_capacity: Option<StorageDataUnit>,
jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
msp_charging_freq: Option<u32>,
) {
self.setup_storage_layer(storage_path);
if max_storage_capacity.is_none() {
panic!("Max storage capacity not set");
}
if let Some(notify_period) = msp_charging_freq {
self.with_notify_period(notify_period.clone());

Check warning on line 450 in node/src/services/builder.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

using `clone` on type `u32` which implements the `Copy` trait
}

self.with_max_storage_capacity(max_storage_capacity);
self.with_jump_capacity(jump_capacity);
self.with_retry_timeout(extrinsic_retry_timeout);
Expand All @@ -446,6 +467,7 @@ where
max_storage_capacity: Option<StorageDataUnit>,
jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
msp_charging_freq: Option<u32>,
) {
if storage_path.is_none() {
panic!("Storage path not set");
Expand All @@ -454,6 +476,9 @@ where
if max_storage_capacity.is_none() {
panic!("Max storage capacity not set");
}
if let Some(notify_period) = msp_charging_freq {
self.with_notify_period(notify_period.clone());

Check warning on line 480 in node/src/services/builder.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

using `clone` on type `u32` which implements the `Copy` trait
}
self.with_max_storage_capacity(max_storage_capacity);
self.with_jump_capacity(jump_capacity);
self.with_retry_timeout(extrinsic_retry_timeout);
Expand All @@ -471,11 +496,15 @@ where
max_storage_capacity: Option<StorageDataUnit>,
jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
msp_charging_freq: Option<u32>,
) {
self.setup_storage_layer(storage_path);
if max_storage_capacity.is_none() {
panic!("Max storage capacity not set");
}
if let Some(notify_period) = msp_charging_freq {
self.with_notify_period(notify_period.clone());

Check warning on line 506 in node/src/services/builder.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

using `clone` on type `u32` which implements the `Copy` trait
}
self.with_max_storage_capacity(max_storage_capacity);
self.with_jump_capacity(jump_capacity);
self.with_retry_timeout(extrinsic_retry_timeout);
Expand All @@ -493,6 +522,7 @@ where
max_storage_capacity: Option<StorageDataUnit>,
jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
msp_charging_freq: Option<u32>,
) {
if storage_path.is_none() {
panic!("Storage path not set");
Expand All @@ -501,6 +531,9 @@ where
if max_storage_capacity.is_none() {
panic!("Max storage capacity not set");
}
if let Some(notify_period) = msp_charging_freq {
self.with_notify_period(notify_period.clone());

Check warning on line 535 in node/src/services/builder.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

using `clone` on type `u32` which implements the `Copy` trait
}
self.with_max_storage_capacity(max_storage_capacity);
self.with_jump_capacity(jump_capacity);
self.with_retry_timeout(extrinsic_retry_timeout);
Expand All @@ -518,7 +551,11 @@ where
_max_storage_capacity: Option<StorageDataUnit>,
_jump_capacity: Option<StorageDataUnit>,
extrinsic_retry_timeout: u64,
msp_charging_freq: Option<u32>,
) {
if let Some(notify_period) = msp_charging_freq {
self.with_notify_period(notify_period.clone());

Check warning on line 557 in node/src/services/builder.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

using `clone` on type `u32` which implements the `Copy` trait
}
self.setup_storage_layer(None);
self.with_retry_timeout(extrinsic_retry_timeout);
}
Expand Down
1 change: 1 addition & 0 deletions node/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod bsp_submit_proof;
pub mod bsp_upload_file;
pub mod mock_bsp_volunteer;
pub mod mock_sp_react_to_event;
pub mod msp_charge_fees;
pub mod msp_delete_bucket;
pub mod msp_upload_file;
pub mod sp_slash_provider;
Expand Down
Loading

0 comments on commit 120b859

Please sign in to comment.