From 5f0cdee256c9e92a3b814002a56f5665fc2c4019 Mon Sep 17 00:00:00 2001 From: Tomasz Polaczyk Date: Tue, 31 Oct 2023 14:51:15 +0100 Subject: [PATCH 1/5] Increment ports of "odd" parachains to avoid port conflicts The user can specify p2p, rpc, and prometheus ports using the CLI, and since there can be 2 container chains running at the same time, the port used may be the one specified by the user, or that +1 --- node/src/container_chain_monitor.rs | 2 +- node/src/container_chain_spawner.rs | 42 +++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/node/src/container_chain_monitor.rs b/node/src/container_chain_monitor.rs index e66f8b99a..d2c0bd492 100644 --- a/node/src/container_chain_monitor.rs +++ b/node/src/container_chain_monitor.rs @@ -38,7 +38,7 @@ pub struct SpawnedContainersMonitor { /// stopping time and reference count. list: VecDeque, /// Count the number of times a container chain has been started - count: usize, + pub count: usize, } pub struct SpawnedContainer { diff --git a/node/src/container_chain_spawner.rs b/node/src/container_chain_spawner.rs index c535133ad..3af230e1e 100644 --- a/node/src/container_chain_spawner.rs +++ b/node/src/container_chain_spawner.rs @@ -189,6 +189,48 @@ impl ContainerChainSpawner { container_chain_para_id ); + let odd_parachain = { + let state = state.lock().expect("poison error"); + let monitor_id = state.spawned_containers_monitor.count; + + monitor_id % 2 == 1 + }; + + if odd_parachain { + log::info!("This is an odd parachain, incrementing all the ports by 1"); + // Increment all the ports by 1 to avoid conflicts with the other running container chain + container_chain_cli + .base + .base + .prometheus_params + .prometheus_port = Some( + container_chain_cli + .base + .base + .prometheus_params + .prometheus_port + .unwrap_or(9617) + .saturating_add(1), + ); + container_chain_cli.base.base.network_params.port = Some( + container_chain_cli + .base + .base + .network_params + .port + .unwrap_or(30335) + .saturating_add(1), + ); + container_chain_cli.base.base.rpc_port = Some( + container_chain_cli + .base + .base + .rpc_port + .unwrap_or(9946) + .saturating_add(1), + ); + } + // Update CLI params container_chain_cli.base.para_id = Some(container_chain_para_id.into()); From d15de55bdefb2dfb3b6c1842f121d32b91229051 Mon Sep 17 00:00:00 2001 From: Tomasz Polaczyk Date: Tue, 31 Oct 2023 15:05:57 +0100 Subject: [PATCH 2/5] Always restart container chain before starting collation --- node/src/container_chain_monitor.rs | 2 +- node/src/container_chain_spawner.rs | 71 +++++++++++------------------ 2 files changed, 28 insertions(+), 45 deletions(-) diff --git a/node/src/container_chain_monitor.rs b/node/src/container_chain_monitor.rs index d2c0bd492..e66f8b99a 100644 --- a/node/src/container_chain_monitor.rs +++ b/node/src/container_chain_monitor.rs @@ -38,7 +38,7 @@ pub struct SpawnedContainersMonitor { /// stopping time and reference count. list: VecDeque, /// Count the number of times a container chain has been started - pub count: usize, + count: usize, } pub struct SpawnedContainer { diff --git a/node/src/container_chain_spawner.rs b/node/src/container_chain_spawner.rs index 3af230e1e..330496c28 100644 --- a/node/src/container_chain_spawner.rs +++ b/node/src/container_chain_spawner.rs @@ -89,6 +89,8 @@ pub struct ContainerChainSpawnerState { pub struct ContainerChainState { /// Async callback that enables collation on this container chain + // We don't use it since we are always restarting container chains + #[allow(unused)] collate_on: Arc Pin + Send>> + Send + Sync>, /// Handle that stops the container chain when dropped stop_handle: StopContainerChain, @@ -189,46 +191,17 @@ impl ContainerChainSpawner { container_chain_para_id ); - let odd_parachain = { - let state = state.lock().expect("poison error"); - let monitor_id = state.spawned_containers_monitor.count; - - monitor_id % 2 == 1 - }; - - if odd_parachain { - log::info!("This is an odd parachain, incrementing all the ports by 1"); - // Increment all the ports by 1 to avoid conflicts with the other running container chain + if !start_collation { + log::info!("This is a syncing container chain, using random ports"); + // Use random ports to avoid conflicts with the other running container chain + let random_ports = [23456, 23457, 23458]; container_chain_cli .base .base .prometheus_params - .prometheus_port = Some( - container_chain_cli - .base - .base - .prometheus_params - .prometheus_port - .unwrap_or(9617) - .saturating_add(1), - ); - container_chain_cli.base.base.network_params.port = Some( - container_chain_cli - .base - .base - .network_params - .port - .unwrap_or(30335) - .saturating_add(1), - ); - container_chain_cli.base.base.rpc_port = Some( - container_chain_cli - .base - .base - .rpc_port - .unwrap_or(9946) - .saturating_add(1), - ); + .prometheus_port = Some(random_ports[0]); + container_chain_cli.base.base.network_params.port = Some(random_ports[1]); + container_chain_cli.base.base.rpc_port = Some(random_ports[2]); } // Update CLI params @@ -517,6 +490,7 @@ fn handle_update_assignment_state_change( running_chains_after.extend(current); running_chains_after.extend(next); running_chains_after.remove(&orchestrator_para_id); + let mut need_to_restart_current = false; if state.assigned_para_id != current { // If the assigned container chain was already running but not collating, we need to call collate_on @@ -525,12 +499,9 @@ fn handle_update_assignment_state_change( if para_id == orchestrator_para_id { call_collate_on = Some(collate_on_tanssi); } else { - // When we get assigned to a different container chain, only need to call collate_on if it was already - // running before - if running_chains_before.contains(¶_id) { - let c = state.spawned_container_chains.get(¶_id).expect("container chain was running before so it should exist in spawned_container_chains"); - call_collate_on = Some(c.collate_on.clone()); - } + // When we get assigned to a different container chain, we don't need to call collate_on because + // we will restart that container chain in collation mode. + need_to_restart_current = true; } } } @@ -538,15 +509,27 @@ fn handle_update_assignment_state_change( state.assigned_para_id = current; state.next_assigned_para_id = next; - let chains_to_stop = running_chains_before + let mut chains_to_stop: Vec<_> = running_chains_before .difference(&running_chains_after) .copied() .collect(); - let chains_to_start = running_chains_after + let mut chains_to_start: Vec<_> = running_chains_after .difference(&running_chains_before) .copied() .collect(); + if need_to_restart_current { + // Force restart of new assigned container chain: if it was running before it was in "syncing mode", + // which doesn't use the correct ports, so start it in "collation mode". + let id = current.unwrap(); + if !chains_to_start.contains(&id) { + chains_to_start.push(id); + } + if !chains_to_stop.contains(&id) { + chains_to_stop.push(id); + } + } + HandleUpdateAssignmentResult { call_collate_on, chains_to_stop, From 8630ab94fe6cade853c5470400820f9e81621035 Mon Sep 17 00:00:00 2001 From: Tomasz Polaczyk Date: Tue, 31 Oct 2023 15:39:14 +0100 Subject: [PATCH 3/5] Fix wrong condition and add keep_db to stop signal --- node/src/container_chain_spawner.rs | 31 +++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/node/src/container_chain_spawner.rs b/node/src/container_chain_spawner.rs index 330496c28..0bcfa4c74 100644 --- a/node/src/container_chain_spawner.rs +++ b/node/src/container_chain_spawner.rs @@ -98,7 +98,7 @@ pub struct ContainerChainState { /// Stops a container chain when dropped pub struct StopContainerChain { - signal: oneshot::Sender<()>, + signal: oneshot::Sender, id: usize, } @@ -279,7 +279,7 @@ impl ContainerChainSpawner { .await?; // Signal that allows to gracefully stop a container chain - let (signal, on_exit) = oneshot::channel::<()>(); + let (signal, on_exit) = oneshot::channel::(); let collate_on = collate_on.unwrap_or_else(|| { assert!( !validator, @@ -343,11 +343,11 @@ impl ContainerChainSpawner { } stop_unassigned = on_exit_future => { // Graceful shutdown. - // `stop_unassigned` will be `Ok` if `.stop()` has been called, which means that the + // `stop_unassigned` will be `Ok(keep_db)` if `.stop()` has been called, which means that the // container chain has been unassigned, and will be `Err` if the handle has been dropped, // which means that the node is stopping. // Delete existing database if running as collator - if validator && stop_unassigned.is_ok() && !container_chain_cli.base.keep_db { + if validator && stop_unassigned == Ok(false) && !container_chain_cli.base.keep_db { delete_container_chain_db(&db_path); } } @@ -374,7 +374,7 @@ impl ContainerChainSpawner { } /// Stop a container chain. Prints a warning if the container chain was not running. - fn stop(&self, container_chain_para_id: ParaId) { + fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) { let mut state = self.state.lock().expect("poison error"); let stop_handle = state .spawned_container_chains @@ -390,7 +390,7 @@ impl ContainerChainSpawner { .set_stop_signal_time(id, Instant::now()); // Send signal to perform graceful shutdown, which will delete the db if needed - let _ = stop_handle.stop_handle.signal.send(()); + let _ = stop_handle.stop_handle.signal.send(keep_db); } None => { log::warn!( @@ -423,6 +423,7 @@ impl ContainerChainSpawner { call_collate_on, chains_to_stop, chains_to_start, + need_to_restart_current, } = handle_update_assignment_state_change( &mut self.state.lock().expect("poison error"), self.orchestrator_para_id, @@ -438,7 +439,14 @@ impl ContainerChainSpawner { // Stop all container chains that are no longer needed for para_id in chains_to_stop { - self.stop(para_id); + // Keep db if we are currently assigned to this chain + let keep_db = Some(para_id) == current; + self.stop(para_id, keep_db); + } + + if need_to_restart_current { + // Give it some time to stop properly + sleep(Duration::from_secs(10)).await; } // Start all new container chains (usually 1) @@ -456,6 +464,7 @@ struct HandleUpdateAssignmentResult { Option Pin + Send>> + Send + Sync>>, chains_to_stop: Vec, chains_to_start: Vec, + need_to_restart_current: bool, } // This is a separate function to allow testing @@ -472,6 +481,7 @@ fn handle_update_assignment_state_change( call_collate_on: None, chains_to_stop: Default::default(), chains_to_start: Default::default(), + need_to_restart_current: false, }; } @@ -522,18 +532,19 @@ fn handle_update_assignment_state_change( // Force restart of new assigned container chain: if it was running before it was in "syncing mode", // which doesn't use the correct ports, so start it in "collation mode". let id = current.unwrap(); + if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) { + chains_to_stop.push(id); + } if !chains_to_start.contains(&id) { chains_to_start.push(id); } - if !chains_to_stop.contains(&id) { - chains_to_stop.push(id); - } } HandleUpdateAssignmentResult { call_collate_on, chains_to_stop, chains_to_start, + need_to_restart_current, } } From 09fc0df283ce5838006cd81f8ee8fd0d9cc5ed31 Mon Sep 17 00:00:00 2001 From: Tomasz Polaczyk Date: Tue, 31 Oct 2023 16:04:31 +0100 Subject: [PATCH 4/5] Fix unit tests --- node/src/container_chain_spawner.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/node/src/container_chain_spawner.rs b/node/src/container_chain_spawner.rs index 0bcfa4c74..149804489 100644 --- a/node/src/container_chain_spawner.rs +++ b/node/src/container_chain_spawner.rs @@ -800,6 +800,7 @@ mod tests { call_collate_on, chains_to_stop, chains_to_start, + need_to_restart_current, } = handle_update_assignment_state_change( &mut *self.state.lock().unwrap(), self.orchestrator_para_id, @@ -810,11 +811,22 @@ mod tests { // Assert we never start and stop the same container chain for para_id in &chains_to_start { - assert!( - !chains_to_stop.contains(para_id), - "Tried to start and stop same container chain: {}", - para_id - ); + if !need_to_restart_current { + assert!( + !chains_to_stop.contains(para_id), + "Tried to start and stop same container chain: {}", + para_id + ); + } else { + // Will try to start and stop container chain with id "current", so ignore that + if Some(*para_id) != current { + assert!( + !chains_to_stop.contains(para_id), + "Tried to start and stop same container chain: {}", + para_id + ); + } + } } // Assert we never start or stop the orchestrator chain assert!(!chains_to_start.contains(&self.orchestrator_para_id)); From cf0ab36b6fa0d8680b35b4ee730a87f348370a07 Mon Sep 17 00:00:00 2001 From: Tomasz Polaczyk Date: Thu, 2 Nov 2023 10:45:00 +0100 Subject: [PATCH 5/5] Fix edge case where we need to restart the next chain --- node/src/container_chain_spawner.rs | 42 ++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/node/src/container_chain_spawner.rs b/node/src/container_chain_spawner.rs index 149804489..4421bec79 100644 --- a/node/src/container_chain_spawner.rs +++ b/node/src/container_chain_spawner.rs @@ -423,7 +423,7 @@ impl ContainerChainSpawner { call_collate_on, chains_to_stop, chains_to_start, - need_to_restart_current, + need_to_restart, } = handle_update_assignment_state_change( &mut self.state.lock().expect("poison error"), self.orchestrator_para_id, @@ -444,7 +444,7 @@ impl ContainerChainSpawner { self.stop(para_id, keep_db); } - if need_to_restart_current { + if need_to_restart { // Give it some time to stop properly sleep(Duration::from_secs(10)).await; } @@ -464,7 +464,7 @@ struct HandleUpdateAssignmentResult { Option Pin + Send>> + Send + Sync>>, chains_to_stop: Vec, chains_to_start: Vec, - need_to_restart_current: bool, + need_to_restart: bool, } // This is a separate function to allow testing @@ -481,7 +481,7 @@ fn handle_update_assignment_state_change( call_collate_on: None, chains_to_stop: Default::default(), chains_to_start: Default::default(), - need_to_restart_current: false, + need_to_restart: false, }; } @@ -501,6 +501,7 @@ fn handle_update_assignment_state_change( running_chains_after.extend(next); running_chains_after.remove(&orchestrator_para_id); let mut need_to_restart_current = false; + let mut need_to_restart_next = false; if state.assigned_para_id != current { // If the assigned container chain was already running but not collating, we need to call collate_on @@ -514,6 +515,12 @@ fn handle_update_assignment_state_change( need_to_restart_current = true; } } + + if let Some(para_id) = state.assigned_para_id { + if para_id != orchestrator_para_id && Some(para_id) == next { + need_to_restart_next = true; + } + } } state.assigned_para_id = current; @@ -540,11 +547,23 @@ fn handle_update_assignment_state_change( } } + if need_to_restart_next { + // Handle edge case of going from (2000, 2001) to (2001, 2000). In that case we must restart both chains, + // because previously 2000 was collating and now 2000 will only be syncing. + let id = next.unwrap(); + if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) { + chains_to_stop.push(id); + } + if !chains_to_start.contains(&id) { + chains_to_start.push(id); + } + } + HandleUpdateAssignmentResult { call_collate_on, chains_to_stop, chains_to_start, - need_to_restart_current, + need_to_restart: need_to_restart_current || need_to_restart_next, } } @@ -800,7 +819,7 @@ mod tests { call_collate_on, chains_to_stop, chains_to_start, - need_to_restart_current, + need_to_restart, } = handle_update_assignment_state_change( &mut *self.state.lock().unwrap(), self.orchestrator_para_id, @@ -811,15 +830,15 @@ mod tests { // Assert we never start and stop the same container chain for para_id in &chains_to_start { - if !need_to_restart_current { + if !need_to_restart { assert!( !chains_to_stop.contains(para_id), "Tried to start and stop same container chain: {}", para_id ); } else { - // Will try to start and stop container chain with id "current", so ignore that - if Some(*para_id) != current { + // Will try to start and stop container chain with id "current" or "next", so ignore that + if Some(*para_id) != current && Some(*para_id) != next { assert!( !chains_to_stop.contains(para_id), "Tried to start and stop same container chain: {}", @@ -1079,12 +1098,9 @@ mod tests { m.assert_running_chains(&[2000.into()]); m.handle_update_assignment(None, Some(2000.into())); - m.assert_collating_on(Some(2000.into())); + m.assert_collating_on(None); m.assert_running_chains(&[2000.into()]); - // TODO: this will send an unneeded CollateOn message, because the ContainerChainSpawner - // doesn't remember that the last message has been sent to this chain, - // which is still running, so it is still collating. m.handle_update_assignment(Some(2000.into()), Some(2000.into())); m.assert_collating_on(Some(2000.into())); m.assert_running_chains(&[2000.into()]);