Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Remove sleep in import blocks #6124

Merged
merged 6 commits into from
May 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions client/service/src/chain_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageC
use sc_client_api::{StorageProvider, BlockBackend, UsageProvider};

use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap};
use std::{thread, time::{Duration, Instant}};
use std::time::{Duration, Instant};
use futures_timer::Delay;
use std::task::Poll;
use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
use std::convert::{TryFrom, TryInto};
use sp_runtime::traits::{CheckedDiv, Saturating};
Expand Down Expand Up @@ -272,14 +274,14 @@ enum ImportState<R, B> where
/// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up.
WaitingForImportQueueToCatchUp{
block_iter: BlockIter<R, B>,
delay: Duration,
delay: Delay,
block: SignedBlock<B>
},
// We have added all the blocks to the queue but they are still being processed.
WaitingForImportQueueToFinish{
num_expected_blocks: Option<u64>,
read_block_count: u64,
delay: Duration,
delay: Delay,
},
}

Expand Down Expand Up @@ -373,7 +375,7 @@ impl<
// The iterator is over: we now need to wait for the import queue to finish.
let num_expected_blocks = block_iter.num_expected_blocks();
let read_block_count = block_iter.read_block_count();
let delay = Duration::from_millis(DELAY_TIME);
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
},
Some(block_result) => {
Expand All @@ -383,7 +385,7 @@ impl<
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
// The queue is full, so do not add this block and simply wait until
// the queue has made some progress.
let delay = Duration::from_millis(DELAY_TIME);
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
} else {
// Queue is not full, we can keep on adding blocks to the queue.
Expand All @@ -392,18 +394,26 @@ impl<
}
}
Err(e) => {
return std::task::Poll::Ready(
return Poll::Ready(
Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e))))
}
}
}
}
},
ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block} => {
ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => {
let read_block_count = block_iter.read_block_count();
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
thread::sleep(delay);
// Queue is still full, so wait until there is room to insert our block.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
} else {
// Queue is no longer full, so we can add our block to the queue.
Expand All @@ -412,7 +422,7 @@ impl<
state = Some(ImportState::Reading{block_iter});
}
},
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay} => {
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => {
// All the blocks have been added to the queue, which doesn't mean they
// have all been properly imported.
if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) {
Expand All @@ -421,10 +431,20 @@ impl<
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.chain_info().best_number
);
return std::task::Poll::Ready(Ok(()))
return Poll::Ready(Ok(()))
} else {
thread::sleep(delay);
// Importing is not done, we still have to wait for the queue to finish.
// Wait for the delay, because we know the queue is lagging behind.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}

state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
}
}
Expand All @@ -436,15 +456,15 @@ impl<
speedometer.notify_user(best_number);

if link.has_error {
return std::task::Poll::Ready(Err(
return Poll::Ready(Err(
Error::Other(
format!("Stopping after #{} blocks because of an error", link.imported_blocks)
)
))
}

cx.waker().wake_by_ref();
std::task::Poll::Pending
Poll::Pending
});
Box::pin(import)
}
Expand Down Expand Up @@ -477,7 +497,7 @@ impl<
let client = &self.client;

if last < block {
return std::task::Poll::Ready(Err("Invalid block range specified".into()));
return Poll::Ready(Err("Invalid block range specified".into()));
}

if !wrote_header {
Expand All @@ -501,19 +521,19 @@ impl<
}
},
// Reached end of the chain.
None => return std::task::Poll::Ready(Ok(())),
None => return Poll::Ready(Ok(())),
}
if (block % 10000.into()).is_zero() {
info!("#{}", block);
}
if block == last {
return std::task::Poll::Ready(Ok(()));
return Poll::Ready(Ok(()));
}
block += One::one();

// Re-schedule the task in order to continue the operation.
cx.waker().wake_by_ref();
std::task::Poll::Pending
Poll::Pending
});

Box::pin(export)
Expand Down