diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index c38cc7f18b..a4a19c8db2 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -136,7 +136,7 @@ impl ChainMetadataService { match event { BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) | BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) | - BlockEvent::BlockSyncComplete() => { + BlockEvent::BlockSyncComplete(_, _) => { self.update_liveness_chain_metadata().await?; }, _ => {}, diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 6354c2a38e..96d6262f69 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -70,7 +70,7 @@ pub enum BlockEvent { AddBlockErrored { block: Arc, }, - BlockSyncComplete(), + BlockSyncComplete(Arc, u64), BlockSyncRewind(Vec>), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index 498977717d..0e3dd9fc09 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -93,8 +93,8 @@ impl BlockSync { }); let local_nci = shared.local_node_interface.clone(); - synchronizer.on_complete(move |_| { - local_nci.publish_block_event(BlockEvent::BlockSyncComplete()); + synchronizer.on_complete(move |block, starting_height| { + local_nci.publish_block_event(BlockEvent::BlockSyncComplete(block, starting_height)); }); let timer = Instant::now(); diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 319fd172b7..917f4cac59 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -92,7 +92,7 @@ impl BlockSynchronizer { } pub fn on_complete(&mut self, hook: H) - where H: Fn(Arc) + Send + Sync + 'static { + where H: Fn(Arc, u64) + Send + Sync + 'static { self.hooks.add_on_complete_hook(hook); } @@ -377,7 +377,7 @@ impl BlockSynchronizer { } if let Some(block) = current_block { - self.hooks.call_on_complete_hooks(block); + self.hooks.call_on_complete_hooks(block, best_height); } debug!(target: LOG_TARGET, "Completed block sync with peer `{}`", sync_peer); diff --git a/base_layer/core/src/base_node/sync/hooks.rs b/base_layer/core/src/base_node/sync/hooks.rs index 9ddd2cd7ed..6e7f00a1c7 100644 --- a/base_layer/core/src/base_node/sync/hooks.rs +++ b/base_layer/core/src/base_node/sync/hooks.rs @@ -35,7 +35,7 @@ pub(super) struct Hooks { on_progress_header: Vec>, on_progress_block: Vec, u64, &SyncPeer) + Send + Sync>>, on_progress_horizon_sync: Vec>, - on_complete: Vec) + Send + Sync>>, + on_complete: Vec, u64) + Send + Sync>>, on_rewind: Vec>) + Send + Sync>>, } @@ -81,12 +81,14 @@ impl Hooks { } pub fn add_on_complete_hook(&mut self, hook: H) - where H: Fn(Arc) + Send + Sync + 'static { + where H: Fn(Arc, u64) + Send + Sync + 'static { self.on_complete.push(Box::new(hook)); } - pub fn call_on_complete_hooks(&self, final_block: Arc) { - self.on_complete.iter().for_each(|f| (*f)(final_block.clone())); + pub fn call_on_complete_hooks(&self, final_block: Arc, starting_height: u64) { + self.on_complete + .iter() + .for_each(|f| (*f)(final_block.clone(), starting_height)); } pub fn add_on_rewind_hook(&mut self, hook: H) diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index b79a806d49..49a2951544 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -101,15 +101,10 @@ impl Mempool { .await } - /// In the event of a Rewind for a block sync, all transactions to the orphan pool - pub async fn process_rewind(&self, removed_blocks: Vec>) -> Result<(), MempoolError> { - self.with_write_access(move |storage| storage.process_rewind(&removed_blocks)) - .await - } - /// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation - pub async fn process_sync(&self) -> Result<(), MempoolError> { - self.with_write_access(move |storage| storage.process_sync()).await + pub async fn process_sync(&self, blocks_added: u64) -> Result<(), MempoolError> { + self.with_write_access(move |storage| storage.process_sync(blocks_added)) + .await } /// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool. diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 7d242b6891..a5bb0f6b76 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -202,32 +202,17 @@ impl MempoolStorage { Ok(()) } - /// In the event of a Rewind for a block sync. Move all transactions to the orphan pool - pub fn process_rewind(&mut self, removed_blocks: &[Arc]) -> Result<(), MempoolError> { - debug!(target: LOG_TARGET, "Mempool processing rewind"); - let current_tip = removed_blocks - .first() - .map(|block| block.header.height) - .unwrap_or_default() - .checked_sub(1) - .unwrap_or_default(); - - // Clear out all transactions from the unconfirmed pool and save them in the reorg pool. We dont reinsert valid - // transactions here as we will need to revalidate them again after the sync was done and the mempool and - // blockchain does not yet know how the blocks will look. - let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); - // lets save to the reorg pool - self.reorg_pool.insert_all(current_tip, removed_txs); - Ok(()) - } - /// After a sync event, we need to try to add in all the transaction form the reorg pool. - pub fn process_sync(&mut self) -> Result<(), MempoolError> { + pub fn process_sync(&mut self, blocks_added: u64) -> Result<(), MempoolError> { debug!(target: LOG_TARGET, "Mempool processing sync finished"); - // lets retrieve all the transactions from the reorg pool and try to reinsert them. - let txs = self.reorg_pool.clear_and_retrieve_all(); + // lets remove and revalidate all transactions from the mempool. All we know is that the state has changed, but + // we dont have the data to know what. + let txs = self.unconfirmed_pool.drain_all_mempool_transactions(); // lets add them all back into the mempool self.insert_txs(txs); + //let retrieve all re-org pool transactions as well as make sure they are mined as well + let txs = self.reorg_pool.clear_and_retrieve_all(); + self.insert_txs(txs); Ok(()) } diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index 1248776f15..29a1ced323 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -173,13 +173,10 @@ impl MempoolInboundHandlers { .await?; }, ValidBlockAdded(_, _) => {}, - BlockSyncRewind(removed_blocks) => { - self.mempool - .process_rewind(removed_blocks.iter().map(|b| b.to_arc_block()).collect()) - .await?; - }, - BlockSyncComplete() => { - self.mempool.process_sync().await?; + BlockSyncRewind(_) => {}, + BlockSyncComplete(tip_block, starting_sync_height) => { + let height_diff = tip_block.height() - starting_sync_height; + self.mempool.process_sync(height_diff).await?; }, AddBlockValidationFailed { block: failed_block,