Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always restart container chain before starting collation #307

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 88 additions & 24 deletions node/src/container_chain_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,16 @@ pub struct ContainerChainSpawnerState {

pub struct ContainerChainState {
/// Async callback that enables collation on this container chain
// We don't use it since we are always restarting container chains
#[allow(unused)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting should we remove it already then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but it will make it a bit harder to revert this change in the future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries then

collate_on: Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
/// Handle that stops the container chain when dropped
stop_handle: StopContainerChain,
}

/// Stops a container chain when dropped
pub struct StopContainerChain {
signal: oneshot::Sender<()>,
signal: oneshot::Sender<bool>,
id: usize,
}

Expand Down Expand Up @@ -189,6 +191,19 @@ impl ContainerChainSpawner {
container_chain_para_id
);

if !start_collation {
log::info!("This is a syncing container chain, using random ports");
// Use random ports to avoid conflicts with the other running container chain
let random_ports = [23456, 23457, 23458];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont it automatically use random ports even if we dont assign any?, in other words, can we use None here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We maybe can use [0, 0, 0], but the reason I selected the random ports myself is that if we actually use random ports there is a small probability of occupying a port from the collating container. The way it is implemented now that will only happen if this fixed random port is already taken, which can happen in zombienet because all of the collators try to use the same ports. But in production where every collator runs in isolation, I think it is better to use these fixed random ports instead of actually random.

container_chain_cli
.base
.base
.prometheus_params
.prometheus_port = Some(random_ports[0]);
container_chain_cli.base.base.network_params.port = Some(random_ports[1]);
container_chain_cli.base.base.rpc_port = Some(random_ports[2]);
}

// Update CLI params
container_chain_cli.base.para_id = Some(container_chain_para_id.into());

Expand Down Expand Up @@ -264,7 +279,7 @@ impl ContainerChainSpawner {
.await?;

// Signal that allows to gracefully stop a container chain
let (signal, on_exit) = oneshot::channel::<()>();
let (signal, on_exit) = oneshot::channel::<bool>();
let collate_on = collate_on.unwrap_or_else(|| {
assert!(
!validator,
Expand Down Expand Up @@ -328,11 +343,11 @@ impl ContainerChainSpawner {
}
stop_unassigned = on_exit_future => {
// Graceful shutdown.
// `stop_unassigned` will be `Ok` if `.stop()` has been called, which means that the
// `stop_unassigned` will be `Ok(keep_db)` if `.stop()` has been called, which means that the
// container chain has been unassigned, and will be `Err` if the handle has been dropped,
// which means that the node is stopping.
// Delete existing database if running as collator
if validator && stop_unassigned.is_ok() && !container_chain_cli.base.keep_db {
if validator && stop_unassigned == Ok(false) && !container_chain_cli.base.keep_db {
delete_container_chain_db(&db_path);
}
}
Expand All @@ -359,7 +374,7 @@ impl ContainerChainSpawner {
}

/// Stop a container chain. Prints a warning if the container chain was not running.
fn stop(&self, container_chain_para_id: ParaId) {
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) {
let mut state = self.state.lock().expect("poison error");
let stop_handle = state
.spawned_container_chains
Expand All @@ -375,7 +390,7 @@ impl ContainerChainSpawner {
.set_stop_signal_time(id, Instant::now());

// Send signal to perform graceful shutdown, which will delete the db if needed
let _ = stop_handle.stop_handle.signal.send(());
let _ = stop_handle.stop_handle.signal.send(keep_db);
}
None => {
log::warn!(
Expand Down Expand Up @@ -408,6 +423,7 @@ impl ContainerChainSpawner {
call_collate_on,
chains_to_stop,
chains_to_start,
need_to_restart,
} = handle_update_assignment_state_change(
&mut self.state.lock().expect("poison error"),
self.orchestrator_para_id,
Expand All @@ -423,7 +439,14 @@ impl ContainerChainSpawner {

// Stop all container chains that are no longer needed
for para_id in chains_to_stop {
self.stop(para_id);
// Keep db if we are currently assigned to this chain
let keep_db = Some(para_id) == current;
self.stop(para_id, keep_db);
}

if need_to_restart {
// Give it some time to stop properly
sleep(Duration::from_secs(10)).await;
}

// Start all new container chains (usually 1)
Expand All @@ -441,6 +464,7 @@ struct HandleUpdateAssignmentResult {
Option<Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>>,
chains_to_stop: Vec<ParaId>,
chains_to_start: Vec<ParaId>,
need_to_restart: bool,
}

// This is a separate function to allow testing
Expand All @@ -457,6 +481,7 @@ fn handle_update_assignment_state_change(
call_collate_on: None,
chains_to_stop: Default::default(),
chains_to_start: Default::default(),
need_to_restart: false,
};
}

Expand All @@ -475,6 +500,8 @@ fn handle_update_assignment_state_change(
running_chains_after.extend(current);
running_chains_after.extend(next);
running_chains_after.remove(&orchestrator_para_id);
let mut need_to_restart_current = false;
let mut need_to_restart_next = false;

if state.assigned_para_id != current {
// If the assigned container chain was already running but not collating, we need to call collate_on
Expand All @@ -483,32 +510,60 @@ fn handle_update_assignment_state_change(
if para_id == orchestrator_para_id {
call_collate_on = Some(collate_on_tanssi);
} else {
// When we get assigned to a different container chain, only need to call collate_on if it was already
// running before
if running_chains_before.contains(&para_id) {
let c = state.spawned_container_chains.get(&para_id).expect("container chain was running before so it should exist in spawned_container_chains");
call_collate_on = Some(c.collate_on.clone());
}
// When we get assigned to a different container chain, we don't need to call collate_on because
// we will restart that container chain in collation mode.
need_to_restart_current = true;
}
}

if let Some(para_id) = state.assigned_para_id {
if para_id != orchestrator_para_id && Some(para_id) == next {
need_to_restart_next = true;
}
}
}

state.assigned_para_id = current;
state.next_assigned_para_id = next;

let chains_to_stop = running_chains_before
let mut chains_to_stop: Vec<_> = running_chains_before
.difference(&running_chains_after)
.copied()
.collect();
let chains_to_start = running_chains_after
let mut chains_to_start: Vec<_> = running_chains_after
.difference(&running_chains_before)
.copied()
.collect();

if need_to_restart_current {
// Force restart of new assigned container chain: if it was running before it was in "syncing mode",
// which doesn't use the correct ports, so start it in "collation mode".
let id = current.unwrap();
if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
chains_to_stop.push(id);
}
if !chains_to_start.contains(&id) {
chains_to_start.push(id);
}
}

if need_to_restart_next {
// Handle edge case of going from (2000, 2001) to (2001, 2000). In that case we must restart both chains,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the second case, it restarts with random ports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we stop both of them, wait 10 seconds, and then start the new "syncing" with random ports, and the new "collating" with the good ports (in either order).

// because previously 2000 was collating and now 2000 will only be syncing.
let id = next.unwrap();
if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
chains_to_stop.push(id);
}
if !chains_to_start.contains(&id) {
chains_to_start.push(id);
}
}

HandleUpdateAssignmentResult {
call_collate_on,
chains_to_stop,
chains_to_start,
need_to_restart: need_to_restart_current || need_to_restart_next,
}
}

Expand Down Expand Up @@ -764,6 +819,7 @@ mod tests {
call_collate_on,
chains_to_stop,
chains_to_start,
need_to_restart,
} = handle_update_assignment_state_change(
&mut *self.state.lock().unwrap(),
self.orchestrator_para_id,
Expand All @@ -774,11 +830,22 @@ mod tests {

// Assert we never start and stop the same container chain
for para_id in &chains_to_start {
assert!(
!chains_to_stop.contains(para_id),
"Tried to start and stop same container chain: {}",
para_id
);
if !need_to_restart {
assert!(
!chains_to_stop.contains(para_id),
"Tried to start and stop same container chain: {}",
para_id
);
} else {
// Will try to start and stop container chain with id "current" or "next", so ignore that
if Some(*para_id) != current && Some(*para_id) != next {
assert!(
!chains_to_stop.contains(para_id),
"Tried to start and stop same container chain: {}",
para_id
);
}
}
}
// Assert we never start or stop the orchestrator chain
assert!(!chains_to_start.contains(&self.orchestrator_para_id));
Expand Down Expand Up @@ -1031,12 +1098,9 @@ mod tests {
m.assert_running_chains(&[2000.into()]);

m.handle_update_assignment(None, Some(2000.into()));
m.assert_collating_on(Some(2000.into()));
m.assert_collating_on(None);
m.assert_running_chains(&[2000.into()]);

// TODO: this will send an unneeded CollateOn message, because the ContainerChainSpawner
// doesn't remember that the last message has been sent to this chain,
// which is still running, so it is still collating.
m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
m.assert_collating_on(Some(2000.into()));
m.assert_running_chains(&[2000.into()]);
Expand Down
Loading