Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cli-flag to keep db after changing assignments #280

Merged
merged 18 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ pub struct ContainerChainRunCmd {
/// Optional container chain para id that should be used to build chain spec.
#[arg(long)]
pub para_id: Option<u32>,

/// Keep container-chain db after changing collator assignments
#[arg(long)]
pub keep_db: bool,
}

#[derive(Debug)]
Expand Down
129 changes: 114 additions & 15 deletions node/src/container_chain_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use {
container_chain_monitor::{SpawnedContainer, SpawnedContainersMonitor},
service::{start_node_impl_container, ParachainClient},
},
cumulus_client_cli::generate_genesis_block,
cumulus_primitives_core::ParaId,
cumulus_relay_chain_interface::RelayChainInterface,
dancebox_runtime::{AccountId, Block, BlockNumber},
Expand All @@ -31,6 +32,7 @@ use {
sc_service::SpawnTaskHandle,
sp_api::{ApiExt, ProvideRuntimeApi},
sp_keystore::KeystorePtr,
sp_runtime::traits::Block as BlockT,
std::{
collections::{HashMap, HashSet},
future::Future,
Expand All @@ -40,7 +42,7 @@ use {
time::Instant,
},
tc_orchestrator_chain_interface::OrchestratorChainInterface,
tokio::sync::mpsc::UnboundedReceiver,
tokio::sync::{mpsc, oneshot},
};

/// Struct with all the params needed to start a container chain node given the CLI arguments,
Expand Down Expand Up @@ -86,8 +88,7 @@ pub struct ContainerChainState {

/// Stops a container chain when dropped
pub struct StopContainerChain {
#[allow(dead_code)]
signal: exit_future::Signal,
signal: oneshot::Sender<()>,
id: usize,
}

Expand All @@ -103,13 +104,34 @@ pub enum CcSpawnMsg {
},
}

/// Error thrown when a container chain needs to restart and remove the database.
struct NeedsRestart {
self2: ContainerChainSpawner,
warp_sync: bool,
}
impl std::fmt::Debug for NeedsRestart {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NeedsRestartAndDbRemoval")
.field("self2", &"<ContainerChainSpawner>")
.field("warp_sync", &self.warp_sync)
.finish()
}
}
impl std::fmt::Display for NeedsRestart {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for NeedsRestart {}

impl ContainerChainSpawner {
/// Try to start a new container chain. In case of error, this panics and stops the node.
fn spawn(
&self,
container_chain_para_id: ParaId,
start_collation: bool,
) -> impl Future<Output = ()> {
warp_sync: bool,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let ContainerChainSpawner {
orchestrator_chain_interface,
orchestrator_client,
Expand All @@ -126,6 +148,8 @@ impl ContainerChainSpawner {
state,
collate_on_tanssi: _,
} = self.clone();
// Additional copy only needed in case of restart
let self2 = self.clone();

// This closure is used to emulate a try block, it enables using the `?` operator inside
let try_closure = move || async move {
Expand Down Expand Up @@ -202,10 +226,14 @@ impl ContainerChainSpawner {
.is_none()
};

if warp_sync {
container_chain_cli.base.base.network_params.sync = SyncMode::Warp;
} else {
container_chain_cli.base.base.network_params.sync = SyncMode::Full;
}

if full_sync_needed {
container_chain_cli.base.base.network_params.sync = SyncMode::Full;
} else {
container_chain_cli.base.base.network_params.sync = SyncMode::Warp;
}

let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
Expand All @@ -226,7 +254,7 @@ impl ContainerChainSpawner {
container_chain_cli_config.database.set_path(&db_path);

// Delete existing database if running as collator
if validator {
if validator && !container_chain_cli.base.keep_db && warp_sync {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hesitating about this. What happens if:

  • I have the keep_db flag to true
  • but I detect that the genesis-hash of the chain-spec I am running is different (genesis-hash different)
  • in that case I would be stuck (the node tells me to reboot and that I need to use warp sync, all the time, but I need all these 3 flags to be true to delete the db and hence be able to do warp sync)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should immediatly delete the container-chain-db if the spec is differnt I believe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no reason to maintain a DB that does not match with the specs right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right

delete_container_chain_db(&db_path);
}

Expand All @@ -249,8 +277,63 @@ impl ContainerChainSpawner {
)
.await?;

// Get latest block number from the container chain client
let last_container_block = container_chain_client.chain_info().best_number;

// Get the container chain's latest block from orchestrator chain and compare with client's one
let last_container_block_from_orchestrator = orchestrator_runtime_api
.latest_block_number(orchestrator_chain_info.best_hash, container_chain_para_id)
.unwrap_or_default();

let max_block_diff_allowed = 100u32;

if last_container_block_from_orchestrator
.unwrap_or(0u32)
.abs_diff(last_container_block)
> max_block_diff_allowed
{
// if the diff is big, delete db and restart using warp sync
return Err(sc_service::error::Error::Application(Box::new(
NeedsRestart {
self2,
warp_sync: true,
},
)));
}

// Generate genesis hash to compare against container client's genesis hash
let container_preloaded_genesis = container_chain_cli.preloaded_chain_spec.unwrap();

// Check with both state versions
let block_v0: Block =
generate_genesis_block(&*container_preloaded_genesis, sp_runtime::StateVersion::V0)
.map_err(|e| format!("{:?}", e))?;
let chain_spec_genesis_hash_v0 = block_v0.header().hash();

let block_v1: Block =
generate_genesis_block(&*container_preloaded_genesis, sp_runtime::StateVersion::V1)
.map_err(|e| format!("{:?}", e))?;
let chain_spec_genesis_hash_v1 = block_v1.header().hash();

let container_client_genesis_hash = container_chain_client.chain_info().genesis_hash;

if container_client_genesis_hash != chain_spec_genesis_hash_v0
&& container_client_genesis_hash != chain_spec_genesis_hash_v1
{
log::info!("Container genesis V0: {:?}", chain_spec_genesis_hash_v0);
log::info!("Container genesis V1: {:?}", chain_spec_genesis_hash_v1);
log::info!("Chain spec genesis {:?} did not match with any container genesis - Restarting...", container_client_genesis_hash);
delete_container_chain_db(&db_path);
tmpolaczyk marked this conversation as resolved.
Show resolved Hide resolved
return Err(sc_service::error::Error::Application(Box::new(
NeedsRestart {
self2,
warp_sync: true,
},
)));
}

// Signal that allows to gracefully stop a container chain
let (signal, on_exit) = exit_future::signal();
let (signal, on_exit) = oneshot::channel::<()>();
let collate_on = collate_on.unwrap_or_else(|| {
assert!(
!validator,
Expand Down Expand Up @@ -312,10 +395,13 @@ impl ContainerChainSpawner {
Err(e) => panic!("{} failed: {}", name, e),
}
}
_ = on_exit_future => {
// Graceful shutdown
stop_unassigned = on_exit_future => {
// Graceful shutdown.
// `stop_unassigned` will be `Ok` if `.stop()` has been called, which means that the
// container chain has been unassigned, and will be `Err` if the handle has been dropped,
// which means that the node is stopping.
// Delete existing database if running as collator
if validator {
if validator && stop_unassigned.is_ok() && !container_chain_cli.base.keep_db {
delete_container_chain_db(&db_path);
}
}
Expand All @@ -330,14 +416,24 @@ impl ContainerChainSpawner {
sc_service::error::Result::Ok(())
};

async {
async move {
match try_closure().await {
Ok(()) => {}
Err(sc_service::error::Error::Application(e)) if e.is::<NeedsRestart>() => {
let e = e.downcast::<NeedsRestart>().unwrap();

log::info!("Restarting container chain {}", container_chain_para_id);
// self.spawn must return a boxed future because of the recursion here
e.self2
.spawn(container_chain_para_id, start_collation, e.warp_sync)
.await;
tmpolaczyk marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => {
panic!("Failed to start container chain node: {}", e);
}
}
}
.boxed()
}

/// Stop a container chain. Prints a warning if the container chain was not running.
Expand All @@ -355,6 +451,9 @@ impl ContainerChainSpawner {
state
.spawned_containers_monitor
.set_stop_signal_time(id, Instant::now());

// Send signal to perform graceful shutdown, which will delete the db if needed
let _ = stop_handle.stop_handle.signal.send(());
}
None => {
log::warn!(
Expand All @@ -366,7 +465,7 @@ impl ContainerChainSpawner {
}

/// Receive and process `CcSpawnMsg`s indefinitely
pub async fn rx_loop(self, mut rx: UnboundedReceiver<CcSpawnMsg>) {
pub async fn rx_loop(self, mut rx: mpsc::UnboundedReceiver<CcSpawnMsg>) {
while let Some(msg) = rx.recv().await {
match msg {
CcSpawnMsg::UpdateAssignment { current, next } => {
Expand Down Expand Up @@ -410,7 +509,7 @@ impl ContainerChainSpawner {
// Edge case: when starting the node it may be assigned to a container chain, so we need to
// start a container chain already collating.
let start_collation = Some(para_id) == current;
self.spawn(para_id, start_collation).await;
self.spawn(para_id, start_collation, false).await;
Agusrodri marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -553,7 +652,7 @@ mod tests {
}

async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
let (signal, _on_exit) = exit_future::signal();
let (signal, _on_exit) = oneshot::channel();
let currently_collating_on2 = self.currently_collating_on.clone();
let collate_closure = move || async move {
let mut cco = currently_collating_on2.lock().unwrap();
Expand Down
Loading