Skip to content

Commit

Permalink
Always restart container chain before starting collation (#307)
Browse files Browse the repository at this point in the history
* Increment ports of "odd" parachains to avoid port conflicts

The user can specify p2p, rpc, and prometheus ports using the CLI,
and since there can be 2 container chains running at the same time,
the port used may be the one specified by the user, or that +1

* Always restart container chain before starting collation

* Fix wrong condition and add keep_db to stop signal

* Fix unit tests

* Fix edge case where we need to restart the next chain
  • Loading branch information
tmpolaczyk authored Nov 7, 2023
1 parent e59b74a commit e1a9ead
Showing 1 changed file with 88 additions and 24 deletions.
112 changes: 88 additions & 24 deletions node/src/container_chain_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,16 @@ pub struct ContainerChainSpawnerState {

pub struct ContainerChainState {
/// Async callback that enables collation on this container chain
// We don't use it since we are always restarting container chains
#[allow(unused)]
collate_on: Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
/// Handle that stops the container chain when dropped
stop_handle: StopContainerChain,
}

/// Stops a container chain when dropped
pub struct StopContainerChain {
signal: oneshot::Sender<()>,
signal: oneshot::Sender<bool>,
id: usize,
}

Expand Down Expand Up @@ -189,6 +191,19 @@ impl ContainerChainSpawner {
container_chain_para_id
);

if !start_collation {
log::info!("This is a syncing container chain, using random ports");
// Use random ports to avoid conflicts with the other running container chain
let random_ports = [23456, 23457, 23458];
container_chain_cli
.base
.base
.prometheus_params
.prometheus_port = Some(random_ports[0]);
container_chain_cli.base.base.network_params.port = Some(random_ports[1]);
container_chain_cli.base.base.rpc_port = Some(random_ports[2]);
}

// Update CLI params
container_chain_cli.base.para_id = Some(container_chain_para_id.into());

Expand Down Expand Up @@ -264,7 +279,7 @@ impl ContainerChainSpawner {
.await?;

// Signal that allows to gracefully stop a container chain
let (signal, on_exit) = oneshot::channel::<()>();
let (signal, on_exit) = oneshot::channel::<bool>();
let collate_on = collate_on.unwrap_or_else(|| {
assert!(
!validator,
Expand Down Expand Up @@ -328,11 +343,11 @@ impl ContainerChainSpawner {
}
stop_unassigned = on_exit_future => {
// Graceful shutdown.
// `stop_unassigned` will be `Ok` if `.stop()` has been called, which means that the
// `stop_unassigned` will be `Ok(keep_db)` if `.stop()` has been called, which means that the
// container chain has been unassigned, and will be `Err` if the handle has been dropped,
// which means that the node is stopping.
// Delete existing database if running as collator
if validator && stop_unassigned.is_ok() && !container_chain_cli.base.keep_db {
if validator && stop_unassigned == Ok(false) && !container_chain_cli.base.keep_db {
delete_container_chain_db(&db_path);
}
}
Expand All @@ -359,7 +374,7 @@ impl ContainerChainSpawner {
}

/// Stop a container chain. Prints a warning if the container chain was not running.
fn stop(&self, container_chain_para_id: ParaId) {
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) {
let mut state = self.state.lock().expect("poison error");
let stop_handle = state
.spawned_container_chains
Expand All @@ -375,7 +390,7 @@ impl ContainerChainSpawner {
.set_stop_signal_time(id, Instant::now());

// Send signal to perform graceful shutdown, which will delete the db if needed
let _ = stop_handle.stop_handle.signal.send(());
let _ = stop_handle.stop_handle.signal.send(keep_db);
}
None => {
log::warn!(
Expand Down Expand Up @@ -408,6 +423,7 @@ impl ContainerChainSpawner {
call_collate_on,
chains_to_stop,
chains_to_start,
need_to_restart,
} = handle_update_assignment_state_change(
&mut self.state.lock().expect("poison error"),
self.orchestrator_para_id,
Expand All @@ -423,7 +439,14 @@ impl ContainerChainSpawner {

// Stop all container chains that are no longer needed
for para_id in chains_to_stop {
self.stop(para_id);
// Keep db if we are currently assigned to this chain
let keep_db = Some(para_id) == current;
self.stop(para_id, keep_db);
}

if need_to_restart {
// Give it some time to stop properly
sleep(Duration::from_secs(10)).await;
}

// Start all new container chains (usually 1)
Expand All @@ -441,6 +464,7 @@ struct HandleUpdateAssignmentResult {
Option<Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>>,
chains_to_stop: Vec<ParaId>,
chains_to_start: Vec<ParaId>,
need_to_restart: bool,
}

// This is a separate function to allow testing
Expand All @@ -457,6 +481,7 @@ fn handle_update_assignment_state_change(
call_collate_on: None,
chains_to_stop: Default::default(),
chains_to_start: Default::default(),
need_to_restart: false,
};
}

Expand All @@ -475,6 +500,8 @@ fn handle_update_assignment_state_change(
running_chains_after.extend(current);
running_chains_after.extend(next);
running_chains_after.remove(&orchestrator_para_id);
let mut need_to_restart_current = false;
let mut need_to_restart_next = false;

if state.assigned_para_id != current {
// If the assigned container chain was already running but not collating, we need to call collate_on
Expand All @@ -483,32 +510,60 @@ fn handle_update_assignment_state_change(
if para_id == orchestrator_para_id {
call_collate_on = Some(collate_on_tanssi);
} else {
// When we get assigned to a different container chain, only need to call collate_on if it was already
// running before
if running_chains_before.contains(&para_id) {
let c = state.spawned_container_chains.get(&para_id).expect("container chain was running before so it should exist in spawned_container_chains");
call_collate_on = Some(c.collate_on.clone());
}
// When we get assigned to a different container chain, we don't need to call collate_on because
// we will restart that container chain in collation mode.
need_to_restart_current = true;
}
}

if let Some(para_id) = state.assigned_para_id {
if para_id != orchestrator_para_id && Some(para_id) == next {
need_to_restart_next = true;
}
}
}

state.assigned_para_id = current;
state.next_assigned_para_id = next;

let chains_to_stop = running_chains_before
let mut chains_to_stop: Vec<_> = running_chains_before
.difference(&running_chains_after)
.copied()
.collect();
let chains_to_start = running_chains_after
let mut chains_to_start: Vec<_> = running_chains_after
.difference(&running_chains_before)
.copied()
.collect();

if need_to_restart_current {
// Force restart of new assigned container chain: if it was running before it was in "syncing mode",
// which doesn't use the correct ports, so start it in "collation mode".
let id = current.unwrap();
if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
chains_to_stop.push(id);
}
if !chains_to_start.contains(&id) {
chains_to_start.push(id);
}
}

if need_to_restart_next {
// Handle edge case of going from (2000, 2001) to (2001, 2000). In that case we must restart both chains,
// because previously 2000 was collating and now 2000 will only be syncing.
let id = next.unwrap();
if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
chains_to_stop.push(id);
}
if !chains_to_start.contains(&id) {
chains_to_start.push(id);
}
}

HandleUpdateAssignmentResult {
call_collate_on,
chains_to_stop,
chains_to_start,
need_to_restart: need_to_restart_current || need_to_restart_next,
}
}

Expand Down Expand Up @@ -764,6 +819,7 @@ mod tests {
call_collate_on,
chains_to_stop,
chains_to_start,
need_to_restart,
} = handle_update_assignment_state_change(
&mut *self.state.lock().unwrap(),
self.orchestrator_para_id,
Expand All @@ -774,11 +830,22 @@ mod tests {

// Assert we never start and stop the same container chain
for para_id in &chains_to_start {
assert!(
!chains_to_stop.contains(para_id),
"Tried to start and stop same container chain: {}",
para_id
);
if !need_to_restart {
assert!(
!chains_to_stop.contains(para_id),
"Tried to start and stop same container chain: {}",
para_id
);
} else {
// Will try to start and stop container chain with id "current" or "next", so ignore that
if Some(*para_id) != current && Some(*para_id) != next {
assert!(
!chains_to_stop.contains(para_id),
"Tried to start and stop same container chain: {}",
para_id
);
}
}
}
// Assert we never start or stop the orchestrator chain
assert!(!chains_to_start.contains(&self.orchestrator_para_id));
Expand Down Expand Up @@ -1031,12 +1098,9 @@ mod tests {
m.assert_running_chains(&[2000.into()]);

m.handle_update_assignment(None, Some(2000.into()));
m.assert_collating_on(Some(2000.into()));
m.assert_collating_on(None);
m.assert_running_chains(&[2000.into()]);

// TODO: this will send an unneeded CollateOn message, because the ContainerChainSpawner
// doesn't remember that the last message has been sent to this chain,
// which is still running, so it is still collating.
m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
m.assert_collating_on(Some(2000.into()));
m.assert_running_chains(&[2000.into()]);
Expand Down

0 comments on commit e1a9ead

Please sign in to comment.