Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Remove sleep in import blocks (#6124)
Browse files Browse the repository at this point in the history
* Add Delay and info logging

* Switch from Duration to Delay in enum declaration

* Remove sleep from import_blocks fn

* Set back constans and remove unnecessary code

* Fix hot loop

* Reset timer once poll is ready, not when it's pending
  • Loading branch information
pscott authored May 25, 2020
1 parent df30fd4 commit e6d5d49
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions client/service/src/chain_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageC
use sc_client_api::{StorageProvider, BlockBackend, UsageProvider};

use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap};
use std::{thread, time::{Duration, Instant}};
use std::time::{Duration, Instant};
use futures_timer::Delay;
use std::task::Poll;
use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
use std::convert::{TryFrom, TryInto};
use sp_runtime::traits::{CheckedDiv, Saturating};
Expand Down Expand Up @@ -272,14 +274,14 @@ enum ImportState<R, B> where
/// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up.
WaitingForImportQueueToCatchUp{
block_iter: BlockIter<R, B>,
delay: Duration,
delay: Delay,
block: SignedBlock<B>
},
// We have added all the blocks to the queue but they are still being processed.
WaitingForImportQueueToFinish{
num_expected_blocks: Option<u64>,
read_block_count: u64,
delay: Duration,
delay: Delay,
},
}

Expand Down Expand Up @@ -373,7 +375,7 @@ impl<
// The iterator is over: we now need to wait for the import queue to finish.
let num_expected_blocks = block_iter.num_expected_blocks();
let read_block_count = block_iter.read_block_count();
let delay = Duration::from_millis(DELAY_TIME);
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
},
Some(block_result) => {
Expand All @@ -383,7 +385,7 @@ impl<
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
// The queue is full, so do not add this block and simply wait until
// the queue has made some progress.
let delay = Duration::from_millis(DELAY_TIME);
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
} else {
// Queue is not full, we can keep on adding blocks to the queue.
Expand All @@ -392,18 +394,26 @@ impl<
}
}
Err(e) => {
return std::task::Poll::Ready(
return Poll::Ready(
Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e))))
}
}
}
}
},
ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block} => {
ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => {
let read_block_count = block_iter.read_block_count();
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
thread::sleep(delay);
// Queue is still full, so wait until there is room to insert our block.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
} else {
// Queue is no longer full, so we can add our block to the queue.
Expand All @@ -412,7 +422,7 @@ impl<
state = Some(ImportState::Reading{block_iter});
}
},
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay} => {
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => {
// All the blocks have been added to the queue, which doesn't mean they
// have all been properly imported.
if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) {
Expand All @@ -421,10 +431,20 @@ impl<
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.chain_info().best_number
);
return std::task::Poll::Ready(Ok(()))
return Poll::Ready(Ok(()))
} else {
thread::sleep(delay);
// Importing is not done, we still have to wait for the queue to finish.
// Wait for the delay, because we know the queue is lagging behind.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}

state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
}
}
Expand All @@ -436,15 +456,15 @@ impl<
speedometer.notify_user(best_number);

if link.has_error {
return std::task::Poll::Ready(Err(
return Poll::Ready(Err(
Error::Other(
format!("Stopping after #{} blocks because of an error", link.imported_blocks)
)
))
}

cx.waker().wake_by_ref();
std::task::Poll::Pending
Poll::Pending
});
Box::pin(import)
}
Expand Down Expand Up @@ -477,7 +497,7 @@ impl<
let client = &self.client;

if last < block {
return std::task::Poll::Ready(Err("Invalid block range specified".into()));
return Poll::Ready(Err("Invalid block range specified".into()));
}

if !wrote_header {
Expand All @@ -501,19 +521,19 @@ impl<
}
},
// Reached end of the chain.
None => return std::task::Poll::Ready(Ok(())),
None => return Poll::Ready(Ok(())),
}
if (block % 10000.into()).is_zero() {
info!("#{}", block);
}
if block == last {
return std::task::Poll::Ready(Ok(()));
return Poll::Ready(Ok(()));
}
block += One::one();

// Re-schedule the task in order to continue the operation.
cx.waker().wake_by_ref();
std::task::Poll::Pending
Poll::Pending
});

Box::pin(export)
Expand Down

0 comments on commit e6d5d49

Please sign in to comment.