From 6c6a08bffdc2f10091c5c7ea854a44a60753a740 Mon Sep 17 00:00:00 2001 From: EthanYuan Date: Wed, 6 Dec 2023 19:57:47 +0800 Subject: [PATCH] add try_loop_sync_fast. --- util/indexer-r/src/indexer/insert.rs | 2 +- util/indexer-r/src/indexer/mod.rs | 14 +++--- util/indexer-sync/src/lib.rs | 65 ++++++++++++++++++++++++++-- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/util/indexer-r/src/indexer/insert.rs b/util/indexer-r/src/indexer/insert.rs index 6ebef8bae5b..fc3b48c6622 100644 --- a/util/indexer-r/src/indexer/insert.rs +++ b/util/indexer-r/src/indexer/insert.rs @@ -81,7 +81,7 @@ pub(crate) async fn bulk_insert_uncle_blocks( tx: &mut Transaction<'_, Any>, ) -> Result<(), Error> { let uncle_blocks = block_views - .into_iter() + .iter() .flat_map(|block_view| { block_view.uncles().into_iter().map(|uncle| { let uncle_block_header = uncle.header(); diff --git a/util/indexer-r/src/indexer/mod.rs b/util/indexer-r/src/indexer/mod.rs index 408f8e7bebd..baa1b83ce0a 100644 --- a/util/indexer-r/src/indexer/mod.rs +++ b/util/indexer-r/src/indexer/mod.rs @@ -81,6 +81,12 @@ impl IndexerSync for IndexerR { self.async_runtime.block_on(future) } + /// Appends new blocks to the indexer + fn append_bulk(&self, block: &[BlockView]) -> Result<(), Error> { + let future = self.async_indexer_r.append_bulk(block); + self.async_runtime.block_on(future) + } + /// Rollback the indexer to a previous state fn rollback(&self) -> Result<(), Error> { let future = self.async_indexer_r.rollback(); @@ -137,9 +143,8 @@ impl AsyncIndexerR { .await .map_err(|err| Error::DB(err.to_string()))?; if self.custom_filters.is_block_filter_match(block) { - append_blocks(&vec![block.clone()], &mut tx).await?; - self.insert_transactions(&vec![block.clone()], &mut tx) - .await?; + append_blocks(&[block.clone()], &mut tx).await?; + self.insert_transactions(&[block.clone()], &mut tx).await?; } else { let block_headers = vec![(block.hash().raw_data().to_vec(), block.number() as i64)]; bulk_insert_blocks_simple(&block_headers, &mut tx).await?; @@ -147,7 +152,6 @@ impl AsyncIndexerR { tx.commit().await.map_err(|err| Error::DB(err.to_string())) } - #[cfg(test)] pub(crate) async fn append_bulk(&self, blocks: &[BlockView]) -> Result<(), Error> { let mut tx = self .store @@ -216,7 +220,7 @@ impl AsyncIndexerR { tx: &mut Transaction<'_, Any>, ) -> Result<(), Error> { let tx_views = block_views - .into_iter() + .iter() .flat_map(|block_view| { let block_hash = block_view.hash().raw_data().to_vec(); block_view diff --git a/util/indexer-sync/src/lib.rs b/util/indexer-sync/src/lib.rs index 16b6fed5705..09952651476 100644 --- a/util/indexer-sync/src/lib.rs +++ b/util/indexer-sync/src/lib.rs @@ -39,6 +39,13 @@ pub trait IndexerSync { fn tip(&self) -> Result, Error>; /// Appends a new block to the indexer fn append(&self, block: &BlockView) -> Result<(), Error>; + /// Appends new blocks to the indexer + fn append_bulk(&self, blocks: &[BlockView]) -> Result<(), Error> { + for block in blocks { + self.append(block)?; + } + Ok(()) + } /// Rollback the indexer to a previous state fn rollback(&self) -> Result<(), Error>; /// Get indexer identity @@ -132,6 +139,51 @@ impl IndexerSyncService { } } + // Bulk insert blocks without the need to verify the parent of new block. + fn try_loop_sync_fast(&self, indexer: I) + where + I: IndexerSync + Clone + Send + 'static, + { + const BULK_SIZE: u64 = 10; + if let Err(e) = self.secondary_db.try_catch_up_with_primary() { + error!("secondary_db try_catch_up_with_primary error {}", e); + } + let chain_tip = self.get_tip().expect("get chain tip should be OK"); + let indexer_tip = { + if let Some((tip_number, _)) = indexer.tip().expect("get tip should be OK") { + tip_number + } else { + let block = self + .get_block_by_number(0) + .expect("get genesis block should be OK"); + indexer.append(&block).expect("append block should be OK"); + 0 + } + }; + // assume that long fork will not happen >= 100 blocks. + let target: u64 = chain_tip.0.saturating_sub(100); + for start in (indexer_tip + 1..=target).step_by(BULK_SIZE as usize) { + let end = (start + BULK_SIZE - 1).min(target); + let blocks: Vec = (start..=end) + .map(|number| { + self.get_block_by_number(number) + .expect("get block should be OK") + }) + .collect(); + indexer + .append_bulk(&blocks) + .expect("append blocks should be OK"); + blocks.iter().for_each(|block| { + info!( + "{} append {}, {}", + indexer.get_identity(), + block.number(), + block.hash() + ); + }); + } + } + /// Processes that handle block cell and expect to be spawned to run in tokio runtime pub fn spawn_poll( &self, @@ -144,9 +196,10 @@ impl IndexerSyncService { // Initial sync let initial_service = self.clone(); let indexer = indexer_service.clone(); - let initial_syncing = self - .async_handle - .spawn_blocking(move || initial_service.try_loop_sync(indexer)); + let initial_syncing = self.async_handle.spawn_blocking(move || { + initial_service.try_loop_sync_fast(indexer.clone()); + initial_service.try_loop_sync(indexer) + }); // Follow-up sync let stop: CancellationToken = new_tokio_exit_rx(); @@ -195,6 +248,12 @@ impl IndexerSyncService { let block_hash = self.secondary_db.get_block_hash(block_number)?; self.secondary_db.get_block(&block_hash) } + + fn get_tip(&self) -> Option<(BlockNumber, Byte32)> { + self.secondary_db + .get_tip_header() + .map(|h| (h.number(), h.hash())) + } } fn indexer_secondary_options(config: &IndexerSyncConfig) -> Options {