Skip to content

Commit

Permalink
add try_loop_sync_fast.
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanYuan committed Dec 6, 2023
1 parent 5b97729 commit 6c6a08b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 9 deletions.
2 changes: 1 addition & 1 deletion util/indexer-r/src/indexer/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub(crate) async fn bulk_insert_uncle_blocks(
tx: &mut Transaction<'_, Any>,
) -> Result<(), Error> {
let uncle_blocks = block_views
.into_iter()
.iter()
.flat_map(|block_view| {
block_view.uncles().into_iter().map(|uncle| {
let uncle_block_header = uncle.header();
Expand Down
14 changes: 9 additions & 5 deletions util/indexer-r/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ impl IndexerSync for IndexerR {
self.async_runtime.block_on(future)
}

/// Appends new blocks to the indexer
fn append_bulk(&self, block: &[BlockView]) -> Result<(), Error> {
let future = self.async_indexer_r.append_bulk(block);
self.async_runtime.block_on(future)
}

/// Rollback the indexer to a previous state
fn rollback(&self) -> Result<(), Error> {
let future = self.async_indexer_r.rollback();
Expand Down Expand Up @@ -137,17 +143,15 @@ impl AsyncIndexerR {
.await
.map_err(|err| Error::DB(err.to_string()))?;
if self.custom_filters.is_block_filter_match(block) {
append_blocks(&vec![block.clone()], &mut tx).await?;
self.insert_transactions(&vec![block.clone()], &mut tx)
.await?;
append_blocks(&[block.clone()], &mut tx).await?;
self.insert_transactions(&[block.clone()], &mut tx).await?;
} else {
let block_headers = vec![(block.hash().raw_data().to_vec(), block.number() as i64)];
bulk_insert_blocks_simple(&block_headers, &mut tx).await?;
}
tx.commit().await.map_err(|err| Error::DB(err.to_string()))
}

#[cfg(test)]
pub(crate) async fn append_bulk(&self, blocks: &[BlockView]) -> Result<(), Error> {
let mut tx = self
.store
Expand Down Expand Up @@ -216,7 +220,7 @@ impl AsyncIndexerR {
tx: &mut Transaction<'_, Any>,
) -> Result<(), Error> {
let tx_views = block_views
.into_iter()
.iter()
.flat_map(|block_view| {
let block_hash = block_view.hash().raw_data().to_vec();
block_view
Expand Down
65 changes: 62 additions & 3 deletions util/indexer-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ pub trait IndexerSync {
fn tip(&self) -> Result<Option<(BlockNumber, Byte32)>, Error>;
/// Appends a new block to the indexer
fn append(&self, block: &BlockView) -> Result<(), Error>;
/// Appends new blocks to the indexer
fn append_bulk(&self, blocks: &[BlockView]) -> Result<(), Error> {
for block in blocks {
self.append(block)?;
}
Ok(())
}
/// Rollback the indexer to a previous state
fn rollback(&self) -> Result<(), Error>;
/// Get indexer identity
Expand Down Expand Up @@ -132,6 +139,51 @@ impl IndexerSyncService {
}
}

// Bulk insert blocks without the need to verify the parent of new block.
fn try_loop_sync_fast<I: IndexerSync>(&self, indexer: I)
where
I: IndexerSync + Clone + Send + 'static,
{
const BULK_SIZE: u64 = 10;
if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
error!("secondary_db try_catch_up_with_primary error {}", e);
}
let chain_tip = self.get_tip().expect("get chain tip should be OK");
let indexer_tip = {
if let Some((tip_number, _)) = indexer.tip().expect("get tip should be OK") {
tip_number
} else {
let block = self
.get_block_by_number(0)
.expect("get genesis block should be OK");
indexer.append(&block).expect("append block should be OK");
0
}
};
// assume that long fork will not happen >= 100 blocks.
let target: u64 = chain_tip.0.saturating_sub(100);
for start in (indexer_tip + 1..=target).step_by(BULK_SIZE as usize) {
let end = (start + BULK_SIZE - 1).min(target);
let blocks: Vec<BlockView> = (start..=end)
.map(|number| {
self.get_block_by_number(number)
.expect("get block should be OK")
})
.collect();
indexer
.append_bulk(&blocks)
.expect("append blocks should be OK");
blocks.iter().for_each(|block| {
info!(
"{} append {}, {}",
indexer.get_identity(),
block.number(),
block.hash()
);
});
}
}

/// Processes that handle block cell and expect to be spawned to run in tokio runtime
pub fn spawn_poll<I>(
&self,
Expand All @@ -144,9 +196,10 @@ impl IndexerSyncService {
// Initial sync
let initial_service = self.clone();
let indexer = indexer_service.clone();
let initial_syncing = self
.async_handle
.spawn_blocking(move || initial_service.try_loop_sync(indexer));
let initial_syncing = self.async_handle.spawn_blocking(move || {
initial_service.try_loop_sync_fast(indexer.clone());
initial_service.try_loop_sync(indexer)
});

// Follow-up sync
let stop: CancellationToken = new_tokio_exit_rx();
Expand Down Expand Up @@ -195,6 +248,12 @@ impl IndexerSyncService {
let block_hash = self.secondary_db.get_block_hash(block_number)?;
self.secondary_db.get_block(&block_hash)
}

fn get_tip(&self) -> Option<(BlockNumber, Byte32)> {
self.secondary_db
.get_tip_header()
.map(|h| (h.number(), h.hash()))
}
}

fn indexer_secondary_options(config: &IndexerSyncConfig) -> Options {
Expand Down

0 comments on commit 6c6a08b

Please sign in to comment.