diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 1a119b68415..d3d6ea90639 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -38,7 +38,10 @@ use codec::{Decode, DecodeAll, Encode}; use futures::{channel::oneshot, future::FutureExt, Future, StreamExt}; use cumulus_client_consensus_common::parachain_consensus::RelaychainClient; -use std::{convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc}; +use std::{ + convert::TryFrom, fmt, marker::PhantomData, pin::Pin, sync::Arc, thread::sleep, time::Duration, +}; + #[cfg(test)] mod tests; @@ -465,12 +468,27 @@ impl WaitForParachainTargetBlock { pub async fn warp_sync_get( para_id: ParaId, relay_chain_interface: Arc, + spawner: Arc, ) -> Result, BoxedError> where Block: BlockT + 'static, { let (sender, receiver) = oneshot::channel::(); - Self::wait_for_target_block(sender, para_id, relay_chain_interface).await; + + spawner.spawn( + "cumulus-parachain-wait-for-target-block", + None, + async move { + tracing::debug!( + target: LOG_TARGET, + "waiting for target block in a background task...", + ); + Self::wait_for_target_block(sender, para_id, relay_chain_interface).await; + tracing::debug!(target: LOG_TARGET, "target block reached",); + } + .boxed(), + ); + return Ok(receiver) } @@ -479,47 +497,66 @@ impl WaitForParachainTargetBlock { para_id: ParaId, relay_chain_interface: Arc, ) { - let is_syncing = relay_chain_interface - .is_major_syncing() - .await - .map_err(|e| { - tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e) - }) - .unwrap_or(false); - - loop { - if !is_syncing { - let mut finalized_heads = match relay_chain_interface.finalized_heads(para_id).await - { - Ok(finalized_heads_stream) => finalized_heads_stream, - Err(err) => { - tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); - return - }, - }; - - let finalized_head = if let Some(h) = finalized_heads.next().await { - h - } else { - tracing::debug!(target: "cumulus-network", "Stopping following finalized head."); - return - }; - - let target_header = match Block::Header::decode(&mut &finalized_head[..]) { - Ok(header) => header, - Err(err) => { - tracing::debug!( - target: "cumulus-network", - error = ?err, - "Could not decode parachain header while following finalized heads.", - ); - continue - }, - }; - - let _ = sender.send(target_header); - break - } + let import_stream = relay_chain_interface.import_notification_stream(); + match import_stream.await { + Ok(mut import_stream_notification) => match import_stream_notification.next().await { + Some(_header) => loop { + let is_syncing = relay_chain_interface + .is_major_syncing() + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + "Unable to determine sync status. {}", + e + ) + }) + .unwrap_or(false); + + if !is_syncing { + let mut finalized_heads = + match relay_chain_interface.finalized_heads(para_id).await { + Ok(finalized_heads_stream) => finalized_heads_stream, + Err(err) => { + tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream."); + return + }, + }; + + let finalized_head = if let Some(h) = finalized_heads.next().await { + h + } else { + tracing::debug!( + target: LOG_TARGET, + "Stopping following finalized head." + ); + return + }; + + let target_header = match Block::Header::decode(&mut &finalized_head[..]) { + Ok(header) => header, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + error = ?err, + "Could not decode parachain header while following finalized heads.", + ); + continue + }, + }; + + let _ = sender.send(target_header); + break + } + tracing::debug!( + target: LOG_TARGET, + "waiting for relay chain sync to complete......", + ); + sleep(Duration::from_secs(120)); + }, + None => (), + }, + _ => (), } } } diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 3dabd1a5e6a..fdfe05be450 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -203,6 +203,7 @@ async fn start_node_impl( match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( id, relay_chain_interface.clone(), + Arc::new(task_manager.spawn_handle()), ) .await { diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 1559aecc507..e028aa235e7 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -409,6 +409,7 @@ where match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( para_id, relay_chain_interface.clone(), + Arc::new(task_manager.spawn_handle()), ) .await { @@ -602,6 +603,7 @@ where match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( para_id, relay_chain_interface.clone(), + Arc::new(task_manager.spawn_handle()), ) .await { @@ -1382,6 +1384,7 @@ where match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( para_id, relay_chain_interface.clone(), + Arc::new(task_manager.spawn_handle()), ) .await { diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index b3b4cbeecb9..b0bc5603888 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -275,6 +275,7 @@ where match cumulus_client_network::WaitForParachainTargetBlock::::warp_sync_get( para_id, relay_chain_interface.clone(), + Arc::new(task_manager.spawn_handle()), ) .await {