Skip to content

Commit

Permalink
add warp to target block for parachains (paritytech#12761)
Browse files Browse the repository at this point in the history
* add warp to target block for parachains

* fix for failing tests

* format using  `Cargo +nightly fmt`

* Remove blocking based on PR comments and create new `WarpSync` on poll

* remove method from trait

* add tests for wait for target

* Update client/network/common/src/sync/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/common/src/sync/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/test/src/sync.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/test/src/sync.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/test/src/lib.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/test/src/sync.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/test/src/sync.rs

Co-authored-by: Bastian Köcher <[email protected]>

* code refactor based on pr comments

* Second round of PR comments

* Third round of pr comments

* add comments to explain logic

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <[email protected]>

* code refactor based on last PR comments

* move warp sync polling before `process_outbound_requests`

Add error message if target block fails to be retreived

* Update client/network/sync/src/warp.rs

Co-authored-by: Arkadiy Paronyan <[email protected]>

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* fmt after code suggestions

* rebase changes

* Bring down the node if the target block fails to return

* Revert "Bring down the node if the target block fails to return"

This reverts commit c0ecb22.

* Update client/network/common/src/sync/warp.rs

Co-authored-by: Michal Kucharczyk <[email protected]>

* Update client/network/common/src/sync/warp.rs

Co-authored-by: Michal Kucharczyk <[email protected]>

* use matching on polling to avoid calling poll more than once

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/sync/src/warp.rs

Co-authored-by: Bastian Köcher <[email protected]>

* fix typo on comment

* update snapshot with new folder structure

* Upload snapshot

* Bump zombienet

* bump zombienet again

* Improve test

* Update client/network/test/src/sync.rs

Co-authored-by: Bastian Köcher <[email protected]>

* Update client/network/test/src/sync.rs

Co-authored-by: Bastian Köcher <[email protected]>

* fix tests

* dummy commit to restart builds

* Converted the target block to an optional value that is set to `None` when an error occurs

* dummy commit to restart builds

---------

Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: Arkadiy Paronyan <[email protected]>
Co-authored-by: Michal Kucharczyk <[email protected]>
Co-authored-by: Sebastian Kunert <[email protected]>
  • Loading branch information
5 people authored and ukint-vs committed Apr 10, 2023
1 parent 78fba34 commit 85b49d7
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 60 deletions.
4 changes: 2 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
pub use sc_executor::NativeElseWasmExecutor;
use sc_finality_grandpa::SharedVoterState;
use sc_keystore::LocalKeystore;
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncParams};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -200,7 +200,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync: Some(warp_sync),
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
})?;

if config.offchain_worker.enabled {
Expand Down
6 changes: 4 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use sc_client_api::BlockBackend;
use sc_consensus_babe::{self, SlotProportion};
use sc_executor::NativeElseWasmExecutor;
use sc_network::NetworkService;
use sc_network_common::{protocol::event::Event, service::NetworkEventStream};
use sc_network_common::{
protocol::event::Event, service::NetworkEventStream, sync::warp::WarpSyncParams,
};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -359,7 +361,7 @@ pub fn new_full_base(
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync: Some(warp_sync),
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
})?;

if config.offchain_worker.enabled {
Expand Down
5 changes: 5 additions & 0 deletions client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ impl<B: BlockT> InformantDisplay<B> {
_,
Some(WarpSyncProgress { phase: WarpSyncPhase::DownloadingBlocks(n), .. }),
) => ("⏩", "Block history".into(), format!(", #{}", n)),
(
_,
_,
Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingTargetBlock, .. }),
) => ("⏩", "Waiting for pending target block".into(), "".into()),
(_, _, Some(warp)) => (
"⏩",
"Warping".into(),
Expand Down
16 changes: 15 additions & 1 deletion client/network/common/src/sync/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use codec::{Decode, Encode};
use futures::channel::oneshot;
pub use sp_finality_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::fmt;
use std::{fmt, sync::Arc};

/// Scale-encoded warp sync proof response.
pub struct EncodedProof(pub Vec<u8>);
Expand All @@ -29,6 +30,16 @@ pub struct WarpProofRequest<B: BlockT> {
pub begin: B::Hash,
}

/// The different types of warp syncing.
pub enum WarpSyncParams<Block: BlockT> {
/// Standard warp sync for the relay chain
WithProvider(Arc<dyn WarpSyncProvider<Block>>),
/// Skip downloading proofs and wait for a header of the state that should be downloaded.
///
/// It is expected that the header provider ensures that the header is trusted.
WaitForTarget(oneshot::Receiver<<Block as BlockT>::Header>),
}

/// Proof verification result.
pub enum VerificationResult<Block: BlockT> {
/// Proof is valid, but the target was not reached.
Expand Down Expand Up @@ -62,6 +73,8 @@ pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
pub enum WarpSyncPhase<Block: BlockT> {
/// Waiting for peers to connect.
AwaitingPeers,
/// Waiting for target block to be received.
AwaitingTargetBlock,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading target block.
Expand All @@ -78,6 +91,7 @@ impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::AwaitingPeers => write!(f, "Waiting for peers"),
Self::AwaitingTargetBlock => write!(f, "Waiting for target block to be received"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
Self::DownloadingState => write!(f, "Downloading state"),
Expand Down
28 changes: 18 additions & 10 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use sc_network_common::{
BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest,
BlockResponse, Direction, FromBlock,
},
warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider},
warp::{EncodedProof, WarpProofRequest, WarpSyncParams, WarpSyncPhase, WarpSyncProgress},
BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification,
OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest,
OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode,
Expand Down Expand Up @@ -318,8 +318,10 @@ pub struct ChainSync<B: BlockT, Client> {
state_sync: Option<StateSync<B, Client>>,
/// Warp sync in progress, if any.
warp_sync: Option<WarpSync<B, Client>>,
/// Warp sync provider.
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
/// Warp sync params.
///
/// Will be `None` after `self.warp_sync` is `Some(_)`.
warp_sync_params: Option<WarpSyncParams<B>>,
/// Enable importing existing blocks. This is used used after the state download to
/// catch up to the latest state while re-importing blocks.
import_existing: bool,
Expand Down Expand Up @@ -565,6 +567,7 @@ where
info!("💔 New peer with unknown genesis hash {} ({}).", best_hash, best_number);
return Err(BadPeer(who, rep::GENESIS_MISMATCH))
}

// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
// enough to do in the import queue that it's not worth kicking off
// an ancestor search, which is what we do in the next match case below.
Expand Down Expand Up @@ -630,17 +633,15 @@ where
},
);

if let SyncMode::Warp = &self.mode {
if let SyncMode::Warp = self.mode {
if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none()
{
log::debug!(target: "sync", "Starting warp state sync.");
if let Some(provider) = &self.warp_sync_provider {
self.warp_sync =
Some(WarpSync::new(self.client.clone(), provider.clone()));
if let Some(params) = self.warp_sync_params.take() {
self.warp_sync = Some(WarpSync::new(self.client.clone(), params));
}
}
}

Ok(req)
},
Ok(BlockStatus::Queued) |
Expand Down Expand Up @@ -1359,6 +1360,13 @@ where
},
}
}

// Should be called before `process_outbound_requests` to ensure
// that a potential target block is directly leading to requests.
if let Some(warp_sync) = &mut self.warp_sync {
let _ = warp_sync.poll(cx);
}

self.process_outbound_requests();

while let Poll::Ready(result) = self.poll_pending_responses(cx) {
Expand Down Expand Up @@ -1427,7 +1435,7 @@ where
roles: Roles,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
warp_sync_params: Option<WarpSyncParams<B>>,
metrics_registry: Option<&Registry>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
Expand Down Expand Up @@ -1467,13 +1475,13 @@ where
block_announce_validation_per_peer_stats: Default::default(),
state_sync: None,
warp_sync: None,
warp_sync_provider,
import_existing: false,
gap_sync: None,
service_rx,
network_service,
block_request_protocol_name,
state_request_protocol_name,
warp_sync_params,
warp_sync_protocol_name,
block_announce_protocol_name: block_announce_config
.notifications_protocol
Expand Down
Loading

0 comments on commit 85b49d7

Please sign in to comment.