Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cli-flag to keep db after changing assignments #280

Merged
merged 18 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ pub struct ContainerChainRunCmd {
/// Optional container chain para id that should be used to build chain spec.
#[arg(long)]
pub para_id: Option<u32>,

/// Keep container-chain db after changing collator assignments
#[arg(long)]
pub keep_db: bool,
}

#[derive(Debug)]
Expand Down
150 changes: 122 additions & 28 deletions node/src/container_chain_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@ use {
container_chain_monitor::{SpawnedContainer, SpawnedContainersMonitor},
service::{start_node_impl_container, ParachainClient},
},
cumulus_client_cli::generate_genesis_block,
cumulus_primitives_core::ParaId,
cumulus_relay_chain_interface::RelayChainInterface,
dancebox_runtime::{AccountId, Block, BlockNumber},
dancebox_runtime::Block,
futures::FutureExt,
pallet_author_noting_runtime_api::AuthorNotingApi,
pallet_registrar_runtime_api::RegistrarApi,
polkadot_primitives::CollatorPair,
sc_cli::SyncMode,
sc_service::SpawnTaskHandle,
sp_api::{ApiExt, ProvideRuntimeApi},
sp_api::ProvideRuntimeApi,
sp_keystore::KeystorePtr,
sp_runtime::traits::Block as BlockT,
std::{
collections::{HashMap, HashSet},
future::Future,
path::Path,
path::{Path, PathBuf},
pin::Pin,
sync::{Arc, Mutex},
time::Instant,
Expand Down Expand Up @@ -103,13 +105,39 @@ pub enum CcSpawnMsg {
},
}

/// Error thrown when a container chain needs to restart and remove the database.
struct NeedsRestartAndDbRemoval {
db_path: PathBuf,
validator: bool,
keep_db: bool,
Agusrodri marked this conversation as resolved.
Show resolved Hide resolved
self2: ContainerChainSpawner,
warp_sync: bool,
}
impl std::fmt::Debug for NeedsRestartAndDbRemoval {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NeedsRestartAndDbRemoval")
.field("db_path", &self.db_path)
.field("validator", &self.validator)
.field("keep_db", &self.keep_db)
.field("self2", &"<ContainerChainSpawner>")
.finish()
}
}
impl std::fmt::Display for NeedsRestartAndDbRemoval {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for NeedsRestartAndDbRemoval {}

impl ContainerChainSpawner {
/// Try to start a new container chain. In case of error, this panics and stops the node.
fn spawn(
&self,
container_chain_para_id: ParaId,
start_collation: bool,
) -> impl Future<Output = ()> {
warp_sync: bool,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let ContainerChainSpawner {
orchestrator_chain_interface,
orchestrator_client,
Expand All @@ -126,6 +154,8 @@ impl ContainerChainSpawner {
state,
collate_on_tanssi: _,
} = self.clone();
// Additional copy only needed in case of restart
let self2 = self.clone();

// This closure is used to emulate a try block, it enables using the `?` operator inside
let try_closure = move || async move {
Expand Down Expand Up @@ -184,27 +214,10 @@ impl ContainerChainSpawner {
// Update CLI params
container_chain_cli.base.para_id = Some(container_chain_para_id.into());

// Force container chains to use warp sync, unless full sync is needed for some reason
let full_sync_needed = if !orchestrator_runtime_api
.has_api::<dyn AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>>(
orchestrator_chain_info.best_hash,
)
.map_err(|e| format!("Failed to check if runtime has AuthorNotingApi: {}", e))?
{
// Before runtime API was implemented we don't know if the container chain has any blocks,
// so use full sync because that always works
true
} else {
// If the container chain is still at genesis block, use full sync because warp sync is broken
orchestrator_runtime_api
.latest_author(orchestrator_chain_info.best_hash, container_chain_para_id)
.map_err(|e| format!("Failed to read latest author: {}", e))?
.is_none()
};
// Use full sync by default
container_chain_cli.base.base.network_params.sync = SyncMode::Full;

if full_sync_needed {
container_chain_cli.base.base.network_params.sync = SyncMode::Full;
} else {
if warp_sync {
container_chain_cli.base.base.network_params.sync = SyncMode::Warp;
}

Expand All @@ -226,7 +239,7 @@ impl ContainerChainSpawner {
container_chain_cli_config.database.set_path(&db_path);

// Delete existing database if running as collator
if validator {
if validator && !container_chain_cli.base.keep_db {
delete_container_chain_db(&db_path);
}

Expand All @@ -249,6 +262,66 @@ impl ContainerChainSpawner {
)
.await?;

// Get latest block number from the container chain client
let last_container_block = container_chain_client.chain_info().best_number;

// Get the container chain's latest block from orchestrator chain and compare with client's one
let last_container_block_from_orchestrator = orchestrator_runtime_api
.latest_block_number(orchestrator_chain_info.best_hash, container_chain_para_id)
.unwrap_or_default();

let max_block_diff_allowed = 10u32;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let max_block_diff_allowed = 10u32;
let max_block_diff_allowed = 100u32;


if last_container_block_from_orchestrator
.unwrap_or(0u32)
.abs_diff(last_container_block)
> max_block_diff_allowed
{
// if the diff is big, delete db and restart using warp sync
return Err(sc_service::error::Error::Application(Box::new(
NeedsRestartAndDbRemoval {
db_path,
validator,
keep_db: container_chain_cli.base.keep_db,
self2,
warp_sync: true,
},
)));
}

// Generate genesis hash to compare against container client's genesis hash
let container_preloaded_genesis = container_chain_cli.preloaded_chain_spec.unwrap();

// Check with both state versions
let block_v0: Block =
generate_genesis_block(&*container_preloaded_genesis, sp_runtime::StateVersion::V0)
.map_err(|e| format!("{:?}", e))?;
let chain_spec_genesis_hash_v0 = block_v0.header().hash();

let block_v1: Block =
generate_genesis_block(&*container_preloaded_genesis, sp_runtime::StateVersion::V1)
.map_err(|e| format!("{:?}", e))?;
let chain_spec_genesis_hash_v1 = block_v1.header().hash();

let container_client_genesis_hash = container_chain_client.chain_info().genesis_hash;

if container_client_genesis_hash != chain_spec_genesis_hash_v0
&& container_client_genesis_hash != chain_spec_genesis_hash_v1
{
log::info!("Container genesis V0: {:?}", chain_spec_genesis_hash_v0);
log::info!("Container genesis V1: {:?}", chain_spec_genesis_hash_v1);
log::info!("Chain spec genesis {:?} did not match with any container genesis - Restarting...", container_client_genesis_hash);
return Err(sc_service::error::Error::Application(Box::new(
NeedsRestartAndDbRemoval {
db_path,
validator,
keep_db: container_chain_cli.base.keep_db,
self2,
warp_sync: true,
},
)));
}

// Signal that allows to gracefully stop a container chain
let (signal, on_exit) = exit_future::signal();
let collate_on = collate_on.unwrap_or_else(|| {
Expand Down Expand Up @@ -315,7 +388,7 @@ impl ContainerChainSpawner {
_ = on_exit_future => {
// Graceful shutdown
// Delete existing database if running as collator
if validator {
if validator && !container_chain_cli.base.keep_db {
delete_container_chain_db(&db_path);
}
}
Expand All @@ -330,14 +403,31 @@ impl ContainerChainSpawner {
sc_service::error::Result::Ok(())
};

async {
async move {
match try_closure().await {
Ok(()) => {}
Err(sc_service::error::Error::Application(e))
if e.is::<NeedsRestartAndDbRemoval>() =>
{
let e = e.downcast::<NeedsRestartAndDbRemoval>().unwrap();
// Delete container chain
// TODO: shouldn't need to check this condition here because this error is only returned if this condition is true
if e.validator && !e.keep_db {
delete_container_chain_db(&e.db_path);
}

log::info!("Restarting container chain {}", container_chain_para_id);
// self.spawn must return a boxed future because of the recursion here
e.self2
.spawn(container_chain_para_id, start_collation, e.warp_sync)
.await;
tmpolaczyk marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => {
panic!("Failed to start container chain node: {}", e);
}
}
}
.boxed()
}

/// Stop a container chain. Prints a warning if the container chain was not running.
Expand All @@ -363,6 +453,10 @@ impl ContainerChainSpawner {
);
}
}

if state.assigned_para_id.unwrap() != container_chain_para_id && state.next_assigned_para_id.unwrap() != container_chain_para_id{
delete_container_chain_db(&self.container_chain_cli.base_path);
Agusrodri marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Receive and process `CcSpawnMsg`s indefinitely
Expand Down Expand Up @@ -410,7 +504,7 @@ impl ContainerChainSpawner {
// Edge case: when starting the node it may be assigned to a container chain, so we need to
// start a container chain already collating.
let start_collation = Some(para_id) == current;
self.spawn(para_id, start_collation).await;
self.spawn(para_id, start_collation, false).await;
Agusrodri marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Loading