Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Sep 22, 2022
1 parent 0ed0a60 commit b9f1f5b
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl ChainMetadataService {
match event {
BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) |
BlockEvent::BlockSyncComplete() => {
BlockEvent::BlockSyncComplete(_, _) => {
self.update_liveness_chain_metadata().await?;
},
_ => {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub enum BlockEvent {
AddBlockErrored {
block: Arc<Block>,
},
BlockSyncComplete(),
BlockSyncComplete(Arc<ChainBlock>, u64),
BlockSyncRewind(Vec<Arc<ChainBlock>>),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl BlockSync {
});

let local_nci = shared.local_node_interface.clone();
synchronizer.on_complete(move |_| {
local_nci.publish_block_event(BlockEvent::BlockSyncComplete());
synchronizer.on_complete(move |block, starting_height| {
local_nci.publish_block_event(BlockEvent::BlockSyncComplete(block, starting_height));
});

let timer = Instant::now();
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
}

pub fn on_complete<H>(&mut self, hook: H)
where H: Fn(Arc<ChainBlock>) + Send + Sync + 'static {
where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
self.hooks.add_on_complete_hook(hook);
}

Expand Down Expand Up @@ -377,7 +377,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
}

if let Some(block) = current_block {
self.hooks.call_on_complete_hooks(block);
self.hooks.call_on_complete_hooks(block, best_height);
}

debug!(target: LOG_TARGET, "Completed block sync with peer `{}`", sync_peer);
Expand Down
10 changes: 6 additions & 4 deletions base_layer/core/src/base_node/sync/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(super) struct Hooks {
on_progress_header: Vec<Box<dyn Fn(u64, u64, &SyncPeer) + Send + Sync>>,
on_progress_block: Vec<Box<dyn Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync>>,
on_progress_horizon_sync: Vec<Box<dyn Fn(HorizonSyncInfo) + Send + Sync>>,
on_complete: Vec<Box<dyn Fn(Arc<ChainBlock>) + Send + Sync>>,
on_complete: Vec<Box<dyn Fn(Arc<ChainBlock>, u64) + Send + Sync>>,
on_rewind: Vec<Box<dyn Fn(Vec<Arc<ChainBlock>>) + Send + Sync>>,
}

Expand Down Expand Up @@ -81,12 +81,14 @@ impl Hooks {
}

pub fn add_on_complete_hook<H>(&mut self, hook: H)
where H: Fn(Arc<ChainBlock>) + Send + Sync + 'static {
where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
self.on_complete.push(Box::new(hook));
}

pub fn call_on_complete_hooks(&self, final_block: Arc<ChainBlock>) {
self.on_complete.iter().for_each(|f| (*f)(final_block.clone()));
pub fn call_on_complete_hooks(&self, final_block: Arc<ChainBlock>, starting_height: u64) {
self.on_complete
.iter()
.for_each(|f| (*f)(final_block.clone(), starting_height));
}

pub fn add_on_rewind_hook<H>(&mut self, hook: H)
Expand Down
11 changes: 3 additions & 8 deletions base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,10 @@ impl Mempool {
.await
}

/// In the event of a Rewind for a block sync, all transactions to the orphan pool
pub async fn process_rewind(&self, removed_blocks: Vec<Arc<Block>>) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_rewind(&removed_blocks))
.await
}

/// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation
pub async fn process_sync(&self) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_sync()).await
pub async fn process_sync(&self, blocks_added: u64) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_sync(blocks_added))
.await
}

/// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool.
Expand Down
29 changes: 7 additions & 22 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,32 +202,17 @@ impl MempoolStorage {
Ok(())
}

/// In the event of a Rewind for a block sync. Move all transactions to the orphan pool
pub fn process_rewind(&mut self, removed_blocks: &[Arc<Block>]) -> Result<(), MempoolError> {
debug!(target: LOG_TARGET, "Mempool processing rewind");
let current_tip = removed_blocks
.first()
.map(|block| block.header.height)
.unwrap_or_default()
.checked_sub(1)
.unwrap_or_default();

// Clear out all transactions from the unconfirmed pool and save them in the reorg pool. We dont reinsert valid
// transactions here as we will need to revalidate them again after the sync was done and the mempool and
// blockchain does not yet know how the blocks will look.
let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions();
// lets save to the reorg pool
self.reorg_pool.insert_all(current_tip, removed_txs);
Ok(())
}

/// After a sync event, we need to try to add in all the transaction form the reorg pool.
pub fn process_sync(&mut self) -> Result<(), MempoolError> {
pub fn process_sync(&mut self, blocks_added: u64) -> Result<(), MempoolError> {
debug!(target: LOG_TARGET, "Mempool processing sync finished");
// lets retrieve all the transactions from the reorg pool and try to reinsert them.
let txs = self.reorg_pool.clear_and_retrieve_all();
// lets remove and revalidate all transactions from the mempool. All we know is that the state has changed, but
// we dont have the data to know what.
let txs = self.unconfirmed_pool.drain_all_mempool_transactions();
// lets add them all back into the mempool
self.insert_txs(txs);
//let retrieve all re-org pool transactions as well as make sure they are mined as well
let txs = self.reorg_pool.clear_and_retrieve_all();
self.insert_txs(txs);
Ok(())
}

Expand Down
11 changes: 4 additions & 7 deletions base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,10 @@ impl MempoolInboundHandlers {
.await?;
},
ValidBlockAdded(_, _) => {},
BlockSyncRewind(removed_blocks) => {
self.mempool
.process_rewind(removed_blocks.iter().map(|b| b.to_arc_block()).collect())
.await?;
},
BlockSyncComplete() => {
self.mempool.process_sync().await?;
BlockSyncRewind(_) => {},
BlockSyncComplete(tip_block, starting_sync_height) => {
let height_diff = tip_block.height() - starting_sync_height;
self.mempool.process_sync(height_diff).await?;
},
AddBlockValidationFailed {
block: failed_block,
Expand Down

0 comments on commit b9f1f5b

Please sign in to comment.