From 7ae89fdc8c621b4a0d7cde926c1eec80cbe96189 Mon Sep 17 00:00:00 2001 From: Luqman Aden Date: Thu, 19 Oct 2023 00:41:00 +0000 Subject: [PATCH] bootstore: don't use hardcoded ports in tests --- bootstore/src/schemes/v0/peer.rs | 607 +++++++++++++++++++------------ 1 file changed, 383 insertions(+), 224 deletions(-) diff --git a/bootstore/src/schemes/v0/peer.rs b/bootstore/src/schemes/v0/peer.rs index 7d29e2397a..abe7bd5f66 100644 --- a/bootstore/src/schemes/v0/peer.rs +++ b/bootstore/src/schemes/v0/peer.rs @@ -91,6 +91,9 @@ pub enum NodeApiRequest { /// These are generated from DDM prefixes learned by the bootstrap agent. PeerAddresses(BTreeSet), + /// Get the local [`SocketAddrV6`] the node is listening on. + GetAddress { responder: oneshot::Sender }, + /// Get the status of this node GetStatus { responder: oneshot::Sender }, @@ -175,6 +178,17 @@ impl NodeHandle { Ok(()) } + /// Get the address of this node + pub async fn get_address(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.tx + .send(NodeApiRequest::GetAddress { responder: tx }) + .await + .map_err(|_| NodeRequestError::Send)?; + let res = rx.await?; + Ok(res) + } + /// Get the status of this node pub async fn get_status(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -361,6 +375,11 @@ impl Node { let mut interval = interval(self.config.time_per_tick); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let listener = TcpListener::bind(&self.config.addr).await.unwrap(); + // If the config didn't specify a port, let's update it + // with the actual port we binded to on our listener. + if self.config.addr.port() == 0 { + self.config.addr.set_port(listener.local_addr().unwrap().port()); + } while !self.shutdown { tokio::select! { res = listener.accept() => self.on_accept(res).await, @@ -487,6 +506,9 @@ impl Node { info!(self.log, "Updated Peer Addresses: {peers:?}"); self.manage_connections(peers).await; } + NodeApiRequest::GetAddress { responder } => { + let _ = responder.send(self.config.addr); + } NodeApiRequest::GetStatus { responder } => { let status = Status { fsm_ledger_generation: self.fsm_ledger_generation, @@ -1025,11 +1047,11 @@ mod tests { use super::*; use camino_tempfile::Utf8TempDir; use slog::Drain; - use tokio::time::sleep; + use tokio::{task::JoinHandle, time::sleep}; use uuid::Uuid; fn initial_members() -> BTreeSet { - [("a", "1"), ("b", "1"), ("c", "1")] + [("a", "0"), ("b", "1"), ("c", "2")] .iter() .map(|(id, model)| { Baseboard::new_pc(id.to_string(), model.to_string()) @@ -1037,56 +1059,10 @@ mod tests { .collect() } - fn initial_config(tempdir: &Utf8TempDir, port_start: u16) -> Vec { - initial_members() - .into_iter() - .enumerate() - .map(|(i, id)| { - let fsm_file = format!("test-{i}-fsm-state-ledger"); - let network_file = format!("test-{i}-network-config-ledger"); - Config { - id, - addr: format!("[::1]:{}{}", port_start, i).parse().unwrap(), - time_per_tick: Duration::from_millis(20), - learn_timeout: Duration::from_secs(5), - rack_init_timeout: Duration::from_secs(10), - rack_secret_request_timeout: Duration::from_secs(1), - fsm_state_ledger_paths: vec![tempdir - .path() - .join(&fsm_file)], - network_config_ledger_paths: vec![tempdir - .path() - .join(&network_file)], - } - }) - .collect() - } - fn learner_id(n: usize) -> Baseboard { Baseboard::new_pc("learner".to_string(), n.to_string()) } - fn learner_config( - tempdir: &Utf8TempDir, - n: usize, - port_start: u16, - ) -> Config { - let fsm_file = format!("test-learner-{n}-fsm-state-ledger"); - let network_file = format!("test-{n}-network-config-ledger"); - Config { - id: learner_id(n), - addr: format!("[::1]:{}{}", port_start, 3).parse().unwrap(), - time_per_tick: Duration::from_millis(20), - learn_timeout: Duration::from_secs(5), - rack_init_timeout: Duration::from_secs(10), - rack_secret_request_timeout: Duration::from_secs(1), - fsm_state_ledger_paths: vec![tempdir.path().join(&fsm_file)], - network_config_ledger_paths: vec![tempdir - .path() - .join(&network_file)], - } - } - fn log() -> slog::Logger { let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); @@ -1095,118 +1071,344 @@ mod tests { slog::Logger::root(drain, o!()) } - #[tokio::test] - async fn basic_3_nodes() { - let port_start = 3333; - let tempdir = Utf8TempDir::new().unwrap(); - let log = log(); - let config = initial_config(&tempdir, port_start); - let (mut node0, handle0) = Node::new(config[0].clone(), &log).await; - let (mut node1, handle1) = Node::new(config[1].clone(), &log).await; - let (mut node2, handle2) = Node::new(config[2].clone(), &log).await; - - let jh0 = tokio::spawn(async move { - node0.run().await; - }); - let jh1 = tokio::spawn(async move { - node1.run().await; - }); - let jh2 = tokio::spawn(async move { - node2.run().await; - }); + struct TestNode { + log: Logger, + config: Config, + node_handles: Option<(NodeHandle, JoinHandle<()>)>, + } - // Inform each node about the known addresses - let mut addrs: BTreeSet<_> = config.iter().map(|c| c.addr).collect(); - for handle in [&handle0, &handle1, &handle2] { - let _ = handle.load_peer_addresses(addrs.clone()).await; + impl TestNode { + fn new(config: Config, log: Logger) -> TestNode { + TestNode { config, log, node_handles: None } } + async fn start_node(&mut self) { + // Node must have previously been shutdown (or never started) + assert!( + self.node_handles.is_none(), + "node ({}) already running", + self.config.id + ); + + // Reset port to pick any available + self.config.addr.set_port(0); + + // (Re-)create node with existing config and its persistent state (if any) + let (mut node, handle) = + Node::new(self.config.clone(), &self.log).await; + let jh = tokio::spawn(async move { + node.run().await; + }); + + // Grab assigned port + let port = handle + .get_address() + .await + .unwrap_or_else(|err| { + panic!( + "failed to get local address of node ({}): {err}", + self.config.id + ) + }) + .port(); + self.config.addr.set_port(port); + + self.node_handles = Some((handle, jh)); + } + + async fn shutdown_node(&mut self) { + let (handle, jh) = self.node_handles.take().unwrap_or_else(|| { + panic!("node ({}) not active", self.config.id) + }); + // Signal to the node it should shutdown + handle.shutdown().await.unwrap_or_else(|err| { + panic!("node ({}) failed to shutdown: {err}", self.config.id) + }); + // and wait for its task to spin down. + jh.await.unwrap_or_else(|err| { + panic!("node ({}) task failed: {err}", self.config.id) + }); + } + } + + struct TestNodes { + tempdir: Utf8TempDir, + log: Logger, + nodes: Vec, + learner: Option, + addrs: BTreeSet, + } + + impl TestNodes { + /// Create test nodes for the given set of members. + fn setup(initial_members: BTreeSet) -> TestNodes { + let tempdir = Utf8TempDir::new().unwrap(); + let log = log(); + let nodes = initial_members + .into_iter() + .enumerate() + .map(|(i, id)| { + let fsm_file = format!("test-{i}-fsm-state-ledger"); + let network_file = + format!("test-{i}-network-config-ledger"); + let config = Config { + id, + addr: SocketAddrV6::new( + std::net::Ipv6Addr::LOCALHOST, + 0, + 0, + 0, + ), + time_per_tick: Duration::from_millis(20), + learn_timeout: Duration::from_secs(5), + rack_init_timeout: Duration::from_secs(10), + rack_secret_request_timeout: Duration::from_secs(1), + fsm_state_ledger_paths: vec![tempdir + .path() + .join(&fsm_file)], + network_config_ledger_paths: vec![tempdir + .path() + .join(&network_file)], + }; + + TestNode::new(config, log.clone()) + }) + .collect(); + TestNodes { + tempdir, + log, + nodes, + learner: None, // No initial learner node + addrs: BTreeSet::new(), + } + } + + /// (Re-)start the given node and update peer addresses for everyone + async fn start_node(&mut self, i: usize) { + let node = &mut self.nodes[i]; + node.start_node().await; + self.addrs.insert(node.config.addr); + self.load_all_peer_addresses().await; + } + + // Stop the given node and update peer addresses for everyone + async fn shutdown_node(&mut self, i: usize) { + let node = &mut self.nodes[i]; + let addr = node.config.addr; + node.shutdown_node().await; + self.addrs.remove(&addr); + self.load_all_peer_addresses().await; + } + + /// Stop all active nodes (including the learner, if present). + async fn shutdown_all(&mut self) { + let nodes = self + .nodes + .iter_mut() + .chain(&mut self.learner) + .filter(|node| node.node_handles.is_some()); + for node in nodes { + node.shutdown_node().await; + } + self.addrs.clear(); + self.learner = None; + } + + /// Configure new learner node + async fn add_learner(&mut self, n: usize) { + assert!( + self.learner.is_none(), + "learner node already configured ({})", + self.learner.as_ref().unwrap().config.id + ); + + let fsm_file = format!("test-learner-{n}-fsm-state-ledger"); + let network_file = format!("test-{n}-network-config-ledger"); + let config = Config { + id: learner_id(n), + addr: SocketAddrV6::new(std::net::Ipv6Addr::LOCALHOST, 0, 0, 0), + time_per_tick: Duration::from_millis(20), + learn_timeout: Duration::from_secs(5), + rack_init_timeout: Duration::from_secs(10), + rack_secret_request_timeout: Duration::from_secs(1), + fsm_state_ledger_paths: vec![self + .tempdir + .path() + .join(&fsm_file)], + network_config_ledger_paths: vec![self + .tempdir + .path() + .join(&network_file)], + }; + + self.learner = Some(TestNode::new(config, self.log.clone())); + } + + /// Start a configured learner node and update peer addresses for everyone + async fn start_learner(&mut self) { + let learner = + self.learner.as_mut().expect("no learner node configured"); + learner.start_node().await; + let learner_addr = learner.config.addr; + + // Inform the learner and other nodes about all addresses including + // the learner. This simulates DDM discovery. + self.addrs.insert(learner_addr); + self.load_all_peer_addresses().await; + } + + /// Stop the learner node (but leave it configured) and update peer addresses for everyone + /// Can also optionally wipe the ledger persisted on disk. + async fn shutdown_learner(&mut self, wipe_ledger: bool) { + let learner = + self.learner.as_mut().expect("no learner node configured"); + let addr = learner.config.addr; + learner.shutdown_node().await; + + if wipe_ledger { + std::fs::remove_file(&learner.config.fsm_state_ledger_paths[0]) + .expect("failed to remove ledger"); + } + + // Update peer addresses + self.addrs.remove(&addr); + self.load_all_peer_addresses().await; + } + + /// Remove a configured learner node + async fn remove_learner(&mut self) { + // Shutdown the node if it's running + if matches!( + self.learner, + Some(TestNode { node_handles: Some(_), .. }) + ) { + self.shutdown_learner(false).await; + } + let _ = self.learner.take().expect("no learner node configured"); + } + + /// Inform each active node about its peers + async fn load_all_peer_addresses(&self) { + let nodes = + self.nodes.iter().chain(&self.learner).filter_map(|node| { + node.node_handles + .as_ref() + .map(|(h, _)| (&node.config.id, h)) + }); + for (id, node) in nodes { + node.load_peer_addresses(self.addrs.clone()).await.unwrap_or_else(|err| { + panic!("failed to update peer addresses for node ({id}): {err}") + }); + } + } + + /// Returns an iterator that yields the [`NodeHandle`]'s for all active + /// nodes (including the learner node, if present). + fn iter(&self) -> impl Iterator { + self.nodes + .iter() + .chain(&self.learner) + .filter_map(|node| node.node_handles.as_ref().map(|(h, _)| h)) + } + } + + impl std::ops::Index for TestNodes { + type Output = NodeHandle; + + fn index(&self, index: usize) -> &Self::Output { + self.nodes[index] + .node_handles + .as_ref() + .map(|(handle, _)| handle) + .unwrap_or_else(|| panic!("node{index} not running")) + } + } + + // A little convenience to access the learner node in a similar + // manner as other nodes (indexing) but with a non-usize index. + const LEARNER: () = (); + impl std::ops::Index<()> for TestNodes { + type Output = NodeHandle; + + fn index(&self, _: ()) -> &Self::Output { + self.learner + .as_ref() + .expect("no learner node") + .node_handles + .as_ref() + .map(|(handle, _)| handle) + .expect("learner node not running") + } + } + + #[tokio::test] + async fn basic_3_nodes() { + // Create and start test nodes + let mut nodes = TestNodes::setup(initial_members()); + nodes.start_node(0).await; + nodes.start_node(1).await; + nodes.start_node(2).await; + let rack_uuid = RackUuid(Uuid::new_v4()); - handle0.init_rack(rack_uuid, initial_members()).await.unwrap(); + nodes[0].init_rack(rack_uuid, initial_members()).await.unwrap(); - let status = handle0.get_status().await; + let status = nodes[0].get_status().await; println!("Status = {status:?}"); // Ensure we can load the rack secret at all nodes - handle0.load_rack_secret().await.unwrap(); - handle1.load_rack_secret().await.unwrap(); - handle2.load_rack_secret().await.unwrap(); + for node in nodes.iter() { + node.load_rack_secret().await.unwrap(); + } // load the rack secret a second time on node0 - handle0.load_rack_secret().await.unwrap(); + nodes[0].load_rack_secret().await.unwrap(); // Shutdown the node2 and make sure we can still load the rack // secret (threshold=2) at node0 and node1 - handle2.shutdown().await.unwrap(); - jh2.await.unwrap(); - handle0.load_rack_secret().await.unwrap(); - handle1.load_rack_secret().await.unwrap(); - - // Add a learner node - let learner_conf = learner_config(&tempdir, 1, port_start); - let (mut learner, learner_handle) = - Node::new(learner_conf.clone(), &log).await; - let learner_jh = tokio::spawn(async move { - learner.run().await; - }); - // Inform the learner and node0 and node1 about all addresses including - // the learner. This simulates DDM discovery - addrs.insert(learner_conf.addr); - let _ = learner_handle.load_peer_addresses(addrs.clone()).await; - let _ = handle0.load_peer_addresses(addrs.clone()).await; - let _ = handle1.load_peer_addresses(addrs.clone()).await; + nodes.shutdown_node(2).await; + nodes[0].load_rack_secret().await.unwrap(); + nodes[1].load_rack_secret().await.unwrap(); + + // Add and start a learner node + nodes.add_learner(1).await; + nodes.start_learner().await; // Tell the learner to go ahead and learn its share. - learner_handle.init_learner().await.unwrap(); + nodes[LEARNER].init_learner().await.unwrap(); // Shutdown node1 and show that we can still load the rack secret at // node0 and the learner, because threshold=2 and it never changes. - handle1.shutdown().await.unwrap(); - jh1.await.unwrap(); - handle0.load_rack_secret().await.unwrap(); - learner_handle.load_rack_secret().await.unwrap(); + nodes.shutdown_node(1).await; + nodes[0].load_rack_secret().await.unwrap(); + nodes[LEARNER].load_rack_secret().await.unwrap(); - // Now shutdown the learner and show that node0 cannot load the rack secret - learner_handle.shutdown().await.unwrap(); - learner_jh.await.unwrap(); - handle0.load_rack_secret().await.unwrap_err(); + // Now shutdown and remove the learner and show that node0 cannot load the rack secret + nodes.remove_learner().await; + nodes[0].load_rack_secret().await.unwrap_err(); - // Reload an node from persistent state and successfully reload the + // Reload a node from persistent state and successfully reload the // rack secret. - let (mut node1, handle1) = Node::new(config[1].clone(), &log).await; - let jh1 = tokio::spawn(async move { - node1.run().await; - }); - let _ = handle1.load_peer_addresses(addrs.clone()).await; - handle0.load_rack_secret().await.unwrap(); + nodes.start_node(1).await; + nodes[0].load_rack_secret().await.unwrap(); - // Add a second learner + // Grab the current generation numbers let peer0_gen = - handle0.get_status().await.unwrap().fsm_ledger_generation; + nodes[0].get_status().await.unwrap().fsm_ledger_generation; let peer1_gen = - handle1.get_status().await.unwrap().fsm_ledger_generation; - let learner_config = learner_config(&tempdir, 2, port_start); - let (mut learner, learner_handle) = - Node::new(learner_config.clone(), &log).await; - let learner_jh = tokio::spawn(async move { - learner.run().await; - }); + nodes[1].get_status().await.unwrap().fsm_ledger_generation; - // Inform the learner, node0, and node1 about all addresses including - // the learner. This simulates DDM discovery - addrs.insert(learner_config.addr); - let _ = learner_handle.load_peer_addresses(addrs.clone()).await; - let _ = handle0.load_peer_addresses(addrs.clone()).await; - let _ = handle1.load_peer_addresses(addrs.clone()).await; + // Add and start a second learner + nodes.add_learner(2).await; + nodes.start_learner().await; // Tell the learner to go ahead and learn its share. - learner_handle.init_learner().await.unwrap(); + nodes[LEARNER].init_learner().await.unwrap(); // Get the new generation numbers let peer0_gen_new = - handle0.get_status().await.unwrap().fsm_ledger_generation; + nodes[0].get_status().await.unwrap().fsm_ledger_generation; let peer1_gen_new = - handle1.get_status().await.unwrap().fsm_ledger_generation; + nodes[1].get_status().await.unwrap().fsm_ledger_generation; // Ensure only one of the peers generation numbers gets bumped assert!( @@ -1217,69 +1419,49 @@ mod tests { // Wipe the learner ledger, restart the learner and instruct it to // relearn its share, and ensure that the neither generation number gets - // bumped because persistence doesn't occur. - learner_handle.shutdown().await.unwrap(); - learner_jh.await.unwrap(); - std::fs::remove_file(&learner_config.fsm_state_ledger_paths[0]) - .unwrap(); - let (mut learner, learner_handle) = - Node::new(learner_config.clone(), &log).await; - let learner_jh = tokio::spawn(async move { - learner.run().await; - }); - let _ = learner_handle.load_peer_addresses(addrs.clone()).await; - learner_handle.init_learner().await.unwrap(); + // bumped because persistence doesn't occur. But for that to happen + // we need to make sure the learner asks the same peer. The choice of + // which peer is somewhat implementation dependent, so to get around + // that we stop the other peer until after restarting the learner. + + // Infer the correct peer based on which generation number didn't get + // bumped and stop it. + let non_learner_peer = if peer0_gen_new == peer0_gen { 0 } else { 1 }; + nodes.shutdown_node(non_learner_peer).await; + + // Now we can stop the learner, wipe its ledger, and restart it. + nodes.shutdown_learner(true).await; + nodes.start_learner().await; + + // Now restart the other peer and tell the learner to relearn its share + nodes.start_node(non_learner_peer).await; + nodes[LEARNER].init_learner().await.unwrap(); + + // Ensure the peers' generation numbers didn't get bumped. The learner + // should've asked the same sled for a share first, which it already + // handed out. let peer0_gen_new_2 = - handle0.get_status().await.unwrap().fsm_ledger_generation; + nodes[0].get_status().await.unwrap().fsm_ledger_generation; let peer1_gen_new_2 = - handle1.get_status().await.unwrap().fsm_ledger_generation; - - // Ensure the peer's generation numbers don't get bumped. The learner - // will ask the same sled for a share first, which it already handed - // out. - assert!( - peer0_gen_new == peer0_gen_new_2 - && peer1_gen_new == peer1_gen_new_2 - ); + nodes[1].get_status().await.unwrap().fsm_ledger_generation; + assert_eq!(peer0_gen_new, peer0_gen_new_2); + assert_eq!(peer1_gen_new, peer1_gen_new_2); - // Shutdown the new learner, node0, and node1 - learner_handle.shutdown().await.unwrap(); - learner_jh.await.unwrap(); - handle0.shutdown().await.unwrap(); - jh0.await.unwrap(); - handle1.shutdown().await.unwrap(); - jh1.await.unwrap(); + // Shut it all down + nodes.shutdown_all().await; } #[tokio::test] async fn network_config() { - let port_start = 4444; - let tempdir = Utf8TempDir::new().unwrap(); - let log = log(); - let config = initial_config(&tempdir, port_start); - let (mut node0, handle0) = Node::new(config[0].clone(), &log).await; - let (mut node1, handle1) = Node::new(config[1].clone(), &log).await; - let (mut node2, handle2) = Node::new(config[2].clone(), &log).await; - - let jh0 = tokio::spawn(async move { - node0.run().await; - }); - let jh1 = tokio::spawn(async move { - node1.run().await; - }); - let jh2 = tokio::spawn(async move { - node2.run().await; - }); - - // Inform each node about the known addresses - let mut addrs: BTreeSet<_> = config.iter().map(|c| c.addr).collect(); - for handle in [&handle0, &handle1, &handle2] { - let _ = handle.load_peer_addresses(addrs.clone()).await; - } + // Create and start test nodes + let mut nodes = TestNodes::setup(initial_members()); + nodes.start_node(0).await; + nodes.start_node(1).await; + nodes.start_node(2).await; // Ensure there is no network config at any of the nodes - for handle in [&handle0, &handle1, &handle2] { - assert_eq!(None, handle.get_network_config().await.unwrap()); + for node in nodes.iter() { + assert_eq!(None, node.get_network_config().await.unwrap()); } // Update the network config at node0 and ensure it has taken effect @@ -1287,10 +1469,10 @@ mod tests { generation: 1, blob: b"Some network data".to_vec(), }; - handle0.update_network_config(network_config.clone()).await.unwrap(); + nodes[0].update_network_config(network_config.clone()).await.unwrap(); assert_eq!( Some(&network_config), - handle0.get_network_config().await.unwrap().as_ref() + nodes[0].get_network_config().await.unwrap().as_ref() ); // Poll node1 and node2 until the network config update shows up @@ -1305,13 +1487,13 @@ mod tests { _ = sleep(timeout) => { panic!("Network config not replicated"); } - res = handle1.get_network_config(), if !node1_done => { + res = nodes[1].get_network_config(), if !node1_done => { if res.unwrap().as_ref() == Some(&network_config) { node1_done = true; continue; } } - res = handle2.get_network_config(), if !node2_done => { + res = nodes[2].get_network_config(), if !node2_done => { if res.unwrap().as_ref() == Some(&network_config) { node2_done = true; continue; @@ -1321,18 +1503,8 @@ mod tests { } // Bring a learner online - let learner_conf = learner_config(&tempdir, 1, port_start); - let (mut learner, learner_handle) = - Node::new(learner_conf.clone(), &log).await; - let learner_jh = tokio::spawn(async move { - learner.run().await; - }); - // Inform the learner and other nodes about all addresses including - // the learner. This simulates DDM discovery. - addrs.insert(learner_conf.addr); - for handle in [&learner_handle, &handle0, &handle1, &handle2] { - let _ = handle.load_peer_addresses(addrs.clone()).await; - } + nodes.add_learner(1).await; + nodes.start_learner().await; // Poll the learner to ensure it gets the network config // Note that the learner doesn't even need to learn its share @@ -1345,7 +1517,7 @@ mod tests { _ = sleep(timeout) => { panic!("Network config not replicated"); } - res = learner_handle.get_network_config() => { + res = nodes[LEARNER].get_network_config() => { if res.unwrap().as_ref() == Some(&network_config) { done = true; } @@ -1355,34 +1527,26 @@ mod tests { // Stop node0, bring it back online and ensure it still sees the config // at generation 1 - handle0.shutdown().await.unwrap(); - jh0.await.unwrap(); - let (mut node0, handle0) = Node::new(config[0].clone(), &log).await; - let jh0 = tokio::spawn(async move { - node0.run().await; - }); + nodes.shutdown_node(0).await; + nodes.start_node(0).await; assert_eq!( Some(&network_config), - handle0.get_network_config().await.unwrap().as_ref() + nodes[0].get_network_config().await.unwrap().as_ref() ); // Stop node0 again, update network config via node1, bring node0 back online, // and ensure all nodes see the latest configuration. + nodes.shutdown_node(0).await; let new_config = NetworkConfig { generation: 2, blob: b"Some more network data".to_vec(), }; - handle0.shutdown().await.unwrap(); - jh0.await.unwrap(); - handle1.update_network_config(new_config.clone()).await.unwrap(); + nodes[1].update_network_config(new_config.clone()).await.unwrap(); assert_eq!( Some(&new_config), - handle1.get_network_config().await.unwrap().as_ref() + nodes[1].get_network_config().await.unwrap().as_ref() ); - let (mut node0, handle0) = Node::new(config[0].clone(), &log).await; - let jh0 = tokio::spawn(async move { - node0.run().await; - }); + nodes.start_node(0).await; let start = Instant::now(); // These should all resolve instantly, so no real need for a select, // which is getting tedious. @@ -1392,8 +1556,8 @@ mod tests { if Instant::now() - start > POLL_TIMEOUT { panic!("network config not replicated"); } - for h in [&handle0, &handle1, &handle2, &learner_handle] { - if h.get_network_config().await.unwrap().as_ref() + for node in nodes.iter() { + if node.get_network_config().await.unwrap().as_ref() != Some(&new_config) { // We need to try again @@ -1410,16 +1574,11 @@ mod tests { current_generation: 2, }); assert_eq!( - handle0.update_network_config(network_config).await, + nodes[0].update_network_config(network_config).await, expected ); // Shut it all down - for h in [handle0, handle1, handle2, learner_handle] { - let _ = h.shutdown().await; - } - for jh in [jh0, jh1, jh2, learner_jh] { - jh.await.unwrap(); - } + nodes.shutdown_all().await; } }