diff --git a/Cargo.lock b/Cargo.lock index 6d5cdbd3993..15550489137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1420,6 +1420,7 @@ dependencies = [ "once_cell", "ringbuffer", "serde", + "serde_with", "slog", "thiserror", "tokio", diff --git a/gateway-sp-comms/Cargo.toml b/gateway-sp-comms/Cargo.toml index 899a31c544b..32f0c6f3733 100644 --- a/gateway-sp-comms/Cargo.toml +++ b/gateway-sp-comms/Cargo.toml @@ -10,6 +10,7 @@ http = "0.2.6" hyper = "0.14.17" ringbuffer = "0.8" serde = { version = "1.0", features = ["derive"] } +serde_with = "1.12.0" thiserror = "1.0.30" tokio-tungstenite = "0.17" tokio-stream = "0.1.8" diff --git a/gateway-sp-comms/src/communicator.rs b/gateway-sp-comms/src/communicator.rs index 892d583bd4d..94145115936 100644 --- a/gateway-sp-comms/src/communicator.rs +++ b/gateway-sp-comms/src/communicator.rs @@ -9,39 +9,28 @@ use crate::error::Error; use crate::error::SpCommunicationError; use crate::error::StartupError; use crate::management_switch::ManagementSwitch; -use crate::management_switch::ManagementSwitchDiscovery; use crate::management_switch::SpSocket; use crate::management_switch::SwitchPort; -use crate::recv_handler::RecvHandler; -use crate::KnownSps; use crate::SpIdentifier; +use crate::SwitchConfig; use futures::stream::FuturesUnordered; use futures::Future; use futures::Stream; -use gateway_messages::version; use gateway_messages::BulkIgnitionState; use gateway_messages::DiscoverResponse; use gateway_messages::IgnitionCommand; use gateway_messages::IgnitionState; -use gateway_messages::Request; use gateway_messages::RequestKind; -use gateway_messages::ResponseError; use gateway_messages::ResponseKind; use gateway_messages::SerialConsole; -use gateway_messages::SerializedSize; use gateway_messages::SpComponent; use gateway_messages::SpState; use hyper::header; use hyper::upgrade; use hyper::Body; -use omicron_common::backoff; -use omicron_common::backoff::Backoff; -use slog::debug; use slog::info; use slog::o; use slog::Logger; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; @@ -65,28 +54,30 @@ where #[derive(Debug)] pub struct Communicator { - log: Logger, switch: ManagementSwitch, - request_id: AtomicU32, - recv_handler: Arc, } impl Communicator { pub async fn new( - known_sps: KnownSps, + config: SwitchConfig, + discovery_deadline: Instant, log: &Logger, ) -> Result { let log = log.new(o!("component" => "SpCommunicator")); - let discovery = ManagementSwitchDiscovery::placeholder_start( - known_sps, - log.clone(), - ) - .await?; - - let (switch, recv_handler) = RecvHandler::new(discovery, log.clone()); + let switch = + ManagementSwitch::new(config, discovery_deadline, log.clone()) + .await?; info!(&log, "started SP communicator"); - Ok(Self { log, switch, request_id: AtomicU32::new(0), recv_handler }) + Ok(Self { switch }) + } + + /// Get the name of our location. + /// + /// This matches one of the names specified as a possible location in the + /// configuration we were given. + pub fn location_name(&self) -> &str { + &self.switch.location_name() } // convert an identifier to a port number; this is fallible because @@ -102,13 +93,36 @@ impl Communicator { self.switch.switch_port_to_id(port) } + /// Returns true if we've discovered the IP address of our local ignition + /// controller. + /// + /// This method exists to be polled during test setup (to wait for discovery + /// to happen); it should not be called outside tests. + pub fn local_ignition_controller_address_known(&self) -> bool { + self.switch.ignition_controller().is_some() + } + + /// Returns true if we've discovered the IP address of the specified SP. + /// + /// This method exists to be polled during test setup (to wait for discovery + /// to happen); it should not be called outside tests. In particular, it + /// panics instead of running an error if `sp` describes an SP that isn't + /// known to this communicator. + pub fn address_known(&self, sp: SpIdentifier) -> bool { + let port = self.switch.switch_port(sp).unwrap(); + self.switch.sp_socket(port).is_some() + } + /// Ask the local ignition controller for the ignition state of a given SP. pub async fn get_ignition_state( &self, sp: SpIdentifier, timeout: Instant, ) -> Result { - let controller = self.switch.ignition_controller(); + let controller = self + .switch + .ignition_controller() + .ok_or(Error::LocalIgnitionControllerAddressUnknown)?; let port = self.id_to_port(sp)?; let request = RequestKind::IgnitionState { target: port.as_ignition_target() }; @@ -128,7 +142,10 @@ impl Communicator { &self, timeout: Instant, ) -> Result, Error> { - let controller = self.switch.ignition_controller(); + let controller = self + .switch + .ignition_controller() + .ok_or(Error::LocalIgnitionControllerAddressUnknown)?; let request = RequestKind::BulkIgnitionState; let bulk_state = self @@ -169,7 +186,10 @@ impl Communicator { command: IgnitionCommand, timeout: Instant, ) -> Result<(), Error> { - let controller = self.switch.ignition_controller(); + let controller = self + .switch + .ignition_controller() + .ok_or(Error::LocalIgnitionControllerAddressUnknown)?; let target = self.id_to_port(target_sp)?.as_ignition_target(); let request = RequestKind::IgnitionCommand { target, command }; @@ -195,6 +215,11 @@ impl Communicator { // via UDP. SPs will continuously broadcast any serial console data, even if // there is no attached client. Maybe this is fine, since the serial console // shouldn't be noisy without a corresponding client driving it? + // + // TODO Because this method doesn't contact the target SP, it succeeds even + // if we don't know the IP address of that SP (yet, or possibly ever)! The + // connection will start working if we later discover the address, but this + // is probably not the behavior we want. pub async fn serial_console_attach( self: &Arc, request: &mut http::Request, @@ -249,7 +274,7 @@ impl Communicator { .map(|key| handshake::derive_accept_key(key)) .ok_or(Error::BadWebsocketConnection("missing websocket key"))?; - self.recv_handler.serial_console_attach( + self.switch.serial_console_attach( Arc::clone(self), port, component, @@ -280,7 +305,7 @@ impl Communicator { component: &SpComponent, ) -> Result<(), Error> { let port = self.id_to_port(sp)?; - self.recv_handler.serial_console_detach(port, component) + self.switch.serial_console_detach(port, component) } /// Send `packet` to the given SP component's serial console. @@ -400,92 +425,17 @@ impl Communicator { .collect::>() } - pub(crate) async fn request_response( + async fn request_response( &self, sp: &SpSocket<'_>, - mut kind: RequestKind, - mut map_response_kind: F, + kind: RequestKind, + map_response_kind: F, timeout: Option, ) -> Result where F: FnMut(ResponseKind) -> Result, { - // helper to wrap a future in a timeout if we have one - async fn maybe_with_timeout( - timeout: Option, - fut: F, - ) -> Result - where - F: Future, - { - match timeout { - Some(t) => tokio::time::timeout_at(t, fut).await, - None => Ok(fut.await), - } - } - - // We'll use exponential backoff if and only if the SP responds with - // "busy"; any other error will cause the loop below to terminate. - let mut backoff = backoff::internal_service_policy(); - - loop { - // It would be nicer to use `backoff::retry()` instead of manually - // stepping the backoff policy, but the dance we do with `kind` to - // avoid cloning it is hard to map into `retry()` in a way that - // satisfies the borrow checker. ("The dance we do with `kind` to - // avoid cloning it" being that we move it into `request` below, and - // on a busy response from the SP we move it back out into the - // `kind` local var.) - let duration = backoff - .next_backoff() - .expect("internal backoff policy gave up"); - maybe_with_timeout(timeout, tokio::time::sleep(duration)).await?; - - // request IDs will eventually roll over; since we enforce timeouts - // this should be a non-issue in practice. does this need testing? - let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); - - // update our recv_handler to expect a response for this request ID - let response_fut = - self.recv_handler.register_request_id(sp.port(), request_id); - - // Serialize and send our request. We know `buf` is large enough for - // any `Request`, so unwrapping here is fine. - let request = Request { version: version::V1, request_id, kind }; - let mut buf = [0; Request::MAX_SIZE]; - let n = gateway_messages::serialize(&mut buf, &request).unwrap(); - let serialized_request = &buf[..n]; - - // Actual communication, guarded by `timeout` if it's not `None`. - let result = maybe_with_timeout(timeout, async { - debug!(&self.log, "sending {:?} to SP {:?}", request, sp); - sp.send(serialized_request).await.map_err(|err| { - SpCommunicationError::UdpSend { addr: sp.addr(), err } - })?; - - Ok::(response_fut.await?) - }) - .await?; - - match result { - Ok(response_kind) => { - return map_response_kind(response_kind) - .map_err(SpCommunicationError::from) - } - Err(SpCommunicationError::SpError(ResponseError::Busy)) => { - debug!( - &self.log, - "SP busy; sleeping before retrying send"; - "sp" => ?sp, - ); - - // move `kind` back into local var; required to satisfy - // borrow check of this loop - kind = request.kind; - } - Err(err) => return Err(err), - } - } + self.switch.request_response(sp, kind, map_response_kind, timeout).await } } diff --git a/gateway-sp-comms/src/lib.rs b/gateway-sp-comms/src/lib.rs index e1142ff4b6c..ca3000ee9f7 100644 --- a/gateway-sp-comms/src/lib.rs +++ b/gateway-sp-comms/src/lib.rs @@ -23,11 +23,9 @@ pub mod error; pub use communicator::Communicator; pub use communicator::FuturesUnorderedImpl; +pub use management_switch::LocationConfig; +pub use management_switch::LocationDeterminationConfig; pub use management_switch::SpIdentifier; pub use management_switch::SpType; - -// TODO these will remain public for a while, but eventually will be removed -// altogther; currently these provide a way to hard-code the rack topology, -// which is not what we want. -pub use management_switch::KnownSp; -pub use management_switch::KnownSps; +pub use management_switch::SwitchConfig; +pub use management_switch::SwitchPortConfig; diff --git a/gateway-sp-comms/src/management_switch.rs b/gateway-sp-comms/src/management_switch.rs index 474c3358ed2..8a9be4b36de 100644 --- a/gateway-sp-comms/src/management_switch.rs +++ b/gateway-sp-comms/src/management_switch.rs @@ -11,44 +11,55 @@ //! See RFD 250 for details. //! -#[allow(dead_code)] // we don't use this yet, but will shortly mod location_map; +use self::location_map::LocationMap; +pub use self::location_map::LocationConfig; +pub use self::location_map::LocationDeterminationConfig; +pub use self::location_map::SwitchPortConfig; + +use crate::error::BadResponseType; +use crate::error::Error; +use crate::error::SpCommunicationError; use crate::error::StartupError; +use crate::recv_handler::RecvHandler; +use crate::Communicator; use futures::stream::FuturesUnordered; +use futures::Future; use futures::StreamExt; +use gateway_messages::version; +use gateway_messages::Request; +use gateway_messages::RequestKind; +use gateway_messages::ResponseError; +use gateway_messages::ResponseKind; use gateway_messages::SerializedSize; +use gateway_messages::SpComponent; use gateway_messages::SpMessage; +use hyper::upgrade::OnUpgrade; +use omicron_common::backoff; +use omicron_common::backoff::Backoff; use serde::Deserialize; use serde::Serialize; +use serde_with::serde_as; +use serde_with::DisplayFromStr; use slog::debug; -use slog::warn; use slog::Logger; +use std::collections::HashMap; use std::io; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use tokio::net::UdpSocket; use tokio::task::JoinHandle; +use tokio::time::Instant; -/// TODO For now, we don't do RFD 250-style discovery; instead, we take a -/// hard-coded list of known SPs. Each known SP has two address: the (presumably -/// fake) SP and the "local" address for the corresponding management switch -/// port. -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct KnownSp { - pub sp: SocketAddr, - pub switch_port: SocketAddr, -} - +#[serde_as] #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct KnownSps { - /// Must be nonempty. The first is interpreted as the local ignition - /// controller. - pub switches: Vec, - /// Must be nonempty. TBD which we assume (if any) is our local SP. - pub sleds: Vec, - /// May be empty. - pub power_controllers: Vec, +pub struct SwitchConfig { + pub local_ignition_controller_port: usize, + pub location: LocationConfig, + #[serde_as(as = "HashMap")] + pub port: HashMap, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] @@ -77,7 +88,6 @@ pub enum SpType { #[derive( Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, )] -#[serde(transparent)] pub(crate) struct SwitchPort(usize); impl SwitchPort { @@ -94,122 +104,189 @@ impl SwitchPort { } #[derive(Debug)] -pub(crate) struct ManagementSwitchDiscovery { - inner: Arc, +pub(crate) struct ManagementSwitch { + local_ignition_controller_port: SwitchPort, + recv_handler: Arc, + sockets: Arc>, + location_map: LocationMap, + log: Logger, + + // handle to the running task that calls recv on all `switch_ports` sockets; + // we keep this handle only to kill it when we're dropped + recv_task: JoinHandle<()>, +} + +impl Drop for ManagementSwitch { + fn drop(&mut self) { + self.recv_task.abort(); + } } -impl ManagementSwitchDiscovery { - // TODO: Replace this with real RFD 250-style discovery. For now, just take - // a hardcoded list of (simulated) SPs and a list of addresses to bind to. - // For now, we assumes SPs should talk to the bound addresses in the order - // - // ``` - // [switch 0, switch 1, ..., switch n - 1, - // sled 0, sled 1, ..., sled n - 1, - // power_controller 0, ..., power_controller n - 1] - // ``` - // - // and any leftover bind addresses are ignored. - pub(crate) async fn placeholder_start( - known_sps: KnownSps, +impl ManagementSwitch { + pub(crate) async fn new( + config: SwitchConfig, + discovery_deadline: Instant, log: Logger, ) -> Result { - assert!(!known_sps.switches.is_empty(), "at least one switch required"); - assert!(!known_sps.sleds.is_empty(), "at least one sled required"); - - let mut ports = Vec::new(); - for known_sps in [ - &known_sps.switches, - &known_sps.sleds, - &known_sps.power_controllers, - ] { - for bind_addr in known_sps.iter().map(|k| k.switch_port) { - ports.push(UdpSocket::bind(bind_addr).await.map_err( - |err| StartupError::UdpBind { addr: bind_addr, err }, - )?); - } + // begin by binding to all our configured ports; insert them into a map + // keyed by the switch port they're listening on + let mut sockets = HashMap::with_capacity(config.port.len()); + // while we're at it, rekey `config.port` to use `SwitchPort` keys + // instead of `usize`. + let mut ports = HashMap::with_capacity(config.port.len()); + for (port, port_config) in config.port { + let addr = port_config.data_link_addr; + let socket = UdpSocket::bind(addr) + .await + .map_err(|err| StartupError::UdpBind { addr, err })?; + + let port = SwitchPort(port); + sockets.insert(port, socket); + ports.insert(port, port_config); } - let inner = Arc::new(Inner { log, known_sps, ports }); - - Ok(Self { inner }) - } + // sanity check the local ignition controller port is bound + let local_ignition_controller_port = + SwitchPort(config.local_ignition_controller_port); + if !ports.contains_key(&local_ignition_controller_port) { + return Err(StartupError::InvalidConfig { + reason: format!( + "missing local ignition controller port {}", + local_ignition_controller_port.0 + ), + }); + } - /// Get a list of all ports on this switch. - // TODO 1 currently this only returns configured ports based on - // `placeholder_start`; eventually it should be all real management switch - // ports. - // - // TODO 2 should we attach the SP type to each port? For now just return a - // flat list. - pub(crate) fn all_ports( - &self, - ) -> impl ExactSizeIterator + 'static { - (0..self.inner.ports.len()).map(SwitchPort) - } + // set up a handler for incoming packets + let recv_handler = + RecvHandler::new(sockets.keys().copied(), log.clone()); - /// Consume `self` and start a long-running task to receive packets on all - /// ports, calling `recv_callback` for each. - pub(crate) fn start_recv_task(self, recv_callback: F) -> ManagementSwitch - where - F: Fn(SwitchPort, &'_ [u8]) + Send + 'static, - { + // spawn background task that listens for incoming packets on all ports + // and passes them to `recv_handler` + let sockets = Arc::new(sockets); let recv_task = { - let inner = Arc::clone(&self.inner); - tokio::spawn(recv_task(inner, recv_callback)) + let recv_handler = Arc::clone(&recv_handler); + tokio::spawn(recv_task( + Arc::clone(&sockets), + move |port, addr, data| { + recv_handler.handle_incoming_packet(port, addr, data) + }, + log.clone(), + )) }; - ManagementSwitch { inner: self.inner, recv_task } - } -} -#[derive(Debug)] -pub(crate) struct ManagementSwitch { - inner: Arc, + // run discovery to figure out the physical location of ourselves (and + // therefore all SPs we talk to) + let location_map = LocationMap::run_discovery( + config.location, + ports, + Arc::clone(&sockets), + Arc::clone(&recv_handler), + discovery_deadline, + &log, + ) + .await?; + + Ok(Self { + local_ignition_controller_port, + recv_handler, + location_map, + sockets, + log, + recv_task, + }) + } - // handle to the running task that calls recv on all `switch_ports` sockets; - // we keep this handle only to kill it when we're dropped - recv_task: JoinHandle<()>, -} + /// Get the name of our location. + /// + /// This matches one of the names specified as a possible location in the + /// configuration we were given. + pub(super) fn location_name(&self) -> &str { + &self.location_map.location_name() + } -impl Drop for ManagementSwitch { - fn drop(&mut self) { - self.recv_task.abort(); + /// Get the socket to use to communicate with an SP and the socket address + /// of that SP. + pub(crate) fn sp_socket(&self, port: SwitchPort) -> Option> { + self.recv_handler.remote_addr(port).map(|addr| { + let socket = self.sockets.get(&port).unwrap(); + SpSocket { socket, addr, port } + }) } -} -impl ManagementSwitch { /// Get the socket connected to the local ignition controller. - pub(crate) fn ignition_controller(&self) -> SpSocket { - // TODO for now this is guaranteed to exist based on the assertions in - // `placeholder_start`; once that's replaced by a non-placeholder - // implementation, revisit this. - let port = self.inner.switch_port(SpType::Switch, 0).unwrap(); - self.inner.sp_socket(port).unwrap() + pub(crate) fn ignition_controller(&self) -> Option> { + self.sp_socket(self.local_ignition_controller_port) } pub(crate) fn switch_port_from_ignition_target( &self, target: usize, ) -> Option { - // TODO this assumes `self.inner.ports` is ordered the same as ignition - // targets; confirm once we replace `placeholder_start` - if target < self.inner.ports.len() { - Some(SwitchPort(target)) + let port = SwitchPort(target); + if self.sockets.contains_key(&port) { + Some(port) } else { None } } pub(crate) fn switch_port(&self, id: SpIdentifier) -> Option { - self.inner.switch_port(id.typ, id.slot) + self.location_map.id_to_port(id) } pub(crate) fn switch_port_to_id(&self, port: SwitchPort) -> SpIdentifier { - self.inner.port_to_id(port) + self.location_map.port_to_id(port) } - pub(crate) fn sp_socket(&self, port: SwitchPort) -> Option> { - self.inner.sp_socket(port) + /// Spawn a tokio task responsible for forwarding serial console data + /// between the SP component on `port` and the websocket connection provided + /// by `upgrade_fut`. + pub(crate) fn serial_console_attach( + &self, + communicator: Arc, + port: SwitchPort, + component: SpComponent, + sp_ack_timeout: Duration, + upgrade_fut: OnUpgrade, + ) -> Result<(), Error> { + self.recv_handler.serial_console_attach( + communicator, + port, + component, + sp_ack_timeout, + upgrade_fut, + ) + } + + /// Shut down the serial console task associated with the given port and + /// component, if one exists and is attached. + pub(crate) fn serial_console_detach( + &self, + port: SwitchPort, + component: &SpComponent, + ) -> Result<(), Error> { + self.recv_handler.serial_console_detach(port, component) + } + + pub(crate) async fn request_response( + &self, + sp: &SpSocket<'_>, + kind: RequestKind, + map_response_kind: F, + timeout: Option, + ) -> Result + where + F: FnMut(ResponseKind) -> Result, + { + sp.request_response( + &self.recv_handler, + kind, + map_response_kind, + timeout, + &self.log, + ) + .await } } @@ -223,117 +300,114 @@ pub(crate) struct SpSocket<'a> { } impl SpSocket<'_> { - pub(crate) fn addr(&self) -> SocketAddr { - self.addr - } - - pub(crate) fn port(&self) -> SwitchPort { - self.port - } - - /// Wrapper around `send_to` that uses the SP address stored in `self` as - /// the destination address. - pub(crate) async fn send(&self, buf: &[u8]) -> io::Result { - self.socket.send_to(buf, self.addr).await - } -} - -#[derive(Debug)] -struct Inner { - log: Logger, - known_sps: KnownSps, - - // One UDP socket per management switch port. For now, this is guaranteed to - // have a length equal to the sum of the lengths of the fields of - // `known_sps`; eventually, it will be guaranteed to be the length of the - // number of ports on the connected management switch. - ports: Vec, -} - -impl Inner { - /// Convert a logical SP location (e.g., "sled 7") into a port on the - /// management switch. - fn switch_port(&self, sp_type: SpType, slot: usize) -> Option { - // map `sp_type` down to the range of port slots that cover it - let (base_slot, num_slots) = match sp_type { - SpType::Switch => (0, self.known_sps.switches.len()), - SpType::Sled => { - (self.known_sps.switches.len(), self.known_sps.sleds.len()) - } - SpType::Power => ( - self.known_sps.switches.len() + self.known_sps.sleds.len(), - self.known_sps.power_controllers.len(), - ), - }; - - if slot < num_slots { - Some(SwitchPort(base_slot + slot)) - } else { - None - } - } - - /// Convert a [`SwitchPort`] into the logical identifier of the SP connected - /// to that port. - fn port_to_id(&self, port: SwitchPort) -> SpIdentifier { - let mut n = port.0; - - for (typ, num) in [ - (SpType::Switch, self.known_sps.switches.len()), - (SpType::Sled, self.known_sps.sleds.len()), - (SpType::Power, self.known_sps.power_controllers.len()), - ] { - if n < num { - return SpIdentifier { typ, slot: n }; + // TODO The `timeout` we take here is the overall timeout for receiving a + // response. We only resend the request if the SP sends us a "busy" + // response; if the SP doesn't answer at all we never resend the request. + // Should we take a separate timeout for individual sends? E.g., with an + // overall timeout of 5 sec and a per-request timeout of 1 sec, we could + // treat "no response at 1 sec" the same as a "busy" and resend the request. + async fn request_response( + &self, + recv_handler: &RecvHandler, + mut kind: RequestKind, + mut map_response_kind: F, + timeout: Option, + log: &Logger, + ) -> Result + where + F: FnMut(ResponseKind) -> Result, + { + // helper to wrap a future in a timeout if we have one + async fn maybe_with_timeout( + timeout: Option, + fut: F, + ) -> Result + where + F: Future, + { + match timeout { + Some(t) => tokio::time::timeout_at(t, fut).await, + None => Ok(fut.await), } - n -= num; } - unreachable!("invalid port instance {:?}", port); - } - - /// Get the local bound address for the given switch port. - #[cfg(test)] // for now we only use this in unit tests - fn local_addr(&self, port: SwitchPort) -> io::Result { - self.ports[port.0].local_addr() - } + // We'll use exponential backoff if and only if the SP responds with + // "busy"; any other error will cause the loop below to terminate. + let mut backoff = backoff::internal_service_policy(); + + loop { + // It would be nicer to use `backoff::retry()` instead of manually + // stepping the backoff policy, but the dance we do with `kind` to + // avoid cloning it is hard to map into `retry()` in a way that + // satisfies the borrow checker. ("The dance we do with `kind` to + // avoid cloning it" being that we move it into `request` below, and + // on a busy response from the SP we move it back out into the + // `kind` local var.) + let duration = backoff + .next_backoff() + .expect("internal backoff policy gave up"); + maybe_with_timeout(timeout, tokio::time::sleep(duration)).await?; + + // update our recv_handler to expect a response for this request ID + let (request_id, response_fut) = + recv_handler.register_request_id(self.port); + + // Serialize and send our request. We know `buf` is large enough for + // any `Request`, so unwrapping here is fine. + let request = Request { version: version::V1, request_id, kind }; + let mut buf = [0; Request::MAX_SIZE]; + let n = gateway_messages::serialize(&mut buf, &request).unwrap(); + let serialized_request = &buf[..n]; + + // Actual communication, guarded by `timeout` if it's not `None`. + let result = maybe_with_timeout(timeout, async { + debug!( + log, "sending request"; + "request" => ?request, + "dest_addr" => %self.addr, + "port" => ?self.port, + ); + self.socket + .send_to(serialized_request, self.addr) + .await + .map_err(|err| SpCommunicationError::UdpSend { + addr: self.addr, + err, + })?; + + Ok::(response_fut.await?) + }) + .await?; + + match result { + Ok(response_kind) => { + return map_response_kind(response_kind) + .map_err(SpCommunicationError::from) + } + Err(SpCommunicationError::SpError(ResponseError::Busy)) => { + debug!( + log, + "SP busy; sleeping before retrying send"; + "dest_addr" => %self.addr, + "port" => ?self.port, + ); - /// Get the socket to use to communicate with an SP and the socket address - /// of that SP. - fn sp_socket(&self, port: SwitchPort) -> Option> { - // NOTE: For now, it's not possible for this method to return `None`. We - // control construction of `SwitchPort`s, and only hand them out for - // valid "ports" where we know an SP is (at least supposed to) be - // listening. In the future this may return `None` if there is no SP - // connected on this port. - let mut n = port.0; - for known_sps in [ - &self.known_sps.switches, - &self.known_sps.sleds, - &self.known_sps.power_controllers, - ] { - if n < known_sps.len() { - return Some(SpSocket { - socket: &self.ports[port.0], - addr: known_sps[n].sp, - port, - }); + // move `kind` back into local var; required to satisfy + // borrow check of this loop + kind = request.kind; + } + Err(err) => return Err(err), } - n -= known_sps.len(); } - - // We only construct `SwitchPort`s with valid indices, so the only way - // to get here is if someone constructs two `ManagementSwitch` instances - // (with different port cardinalities) and then mixes up `SwitchPort`s - // between them (or other similarly outlandish shenanigans) - fine to - // panic in that case. - unreachable!("invalid port {:?}", port) } } -async fn recv_task(inner: Arc, mut callback: F) -where - F: FnMut(SwitchPort, &'_ [u8]), +async fn recv_task( + ports: Arc>, + mut recv_handler: F, + log: Logger, +) where + F: FnMut(SwitchPort, SocketAddr, &[u8]), { // helper function to tag a socket's `.readable()` future with an index; we // need this to make rustc happy about the types we push into @@ -341,16 +415,16 @@ where async fn readable_with_port( port: SwitchPort, sock: &UdpSocket, - ) -> (SwitchPort, io::Result<()>) { + ) -> (SwitchPort, &UdpSocket, io::Result<()>) { let result = sock.readable().await; - (port, result) + (port, sock, result) } // set up collection of futures tracking readability of all switch port // sockets let mut recv_all_sockets = FuturesUnordered::new(); - for (i, sock) in inner.ports.iter().enumerate() { - recv_all_sockets.push(readable_with_port(SwitchPort(i), sock)); + for (port, sock) in ports.iter() { + recv_all_sockets.push(readable_with_port(*port, &sock)); } let mut buf = [0; SpMessage::MAX_SIZE]; @@ -359,7 +433,7 @@ where // `recv_all_sockets.next()` will never return `None` because we // immediately push a new future into it every time we pull one out // (to reregister readable interest in the corresponding socket) - let (port, result) = recv_all_sockets.next().await.unwrap(); + let (port, sock, result) = recv_all_sockets.next().await.unwrap(); // checking readability of the socket can't fail without violating some // internal state in tokio in a presumably-strage way; at that point we @@ -368,11 +442,6 @@ where panic!("error in socket readability: {} (port={:?})", err, port); } - // immediately push a new future requesting readability interest, as - // noted above - let sock = &inner.ports[port.0]; - recv_all_sockets.push(readable_with_port(port, sock)); - match sock.try_recv_from(&mut buf) { Ok((n, addr)) => { let buf = &buf[..n]; @@ -382,20 +451,11 @@ where buf.as_ptr() as usize as u64, buf.len() as u64 )); - if Some(addr) != inner.sp_socket(port).map(|s| s.addr) { - // TODO-security: we received a packet from an address that - // doesn't match what we believe is the SP's address. for - // now, log and discard; what should we really do? - warn!( - inner.log, - "discarding packet from unknown source"; - "port" => ?port, - "src_addr" => addr, - ); - } else { - debug!(inner.log, "received {} bytes", n; "port" => ?port); - callback(port, buf); - } + debug!( + log, "received {} bytes", n; + "port" => ?port, "addr" => %addr, + ); + recv_handler(port, addr, &buf[..n]); } // spurious wakeup; no need to log, just continue Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), @@ -406,6 +466,9 @@ where panic!("error in recv_from: {} (port={:?})", err, port); } } + + // push a new future requesting readability interest, as noted above + recv_all_sockets.push(readable_with_port(port, sock)); } } @@ -457,32 +520,71 @@ mod tests { .chain(self.pscs.iter()) } - fn as_known_sps(&self) -> KnownSps { - KnownSps { - switches: self - .switches - .iter() - .map(|sock| KnownSp { - sp: sock.local_addr().unwrap(), - switch_port: "127.0.0.1:0".parse().unwrap(), - }) - .collect(), - sleds: self - .sleds - .iter() - .map(|sock| KnownSp { - sp: sock.local_addr().unwrap(), - switch_port: "127.0.0.1:0".parse().unwrap(), - }) - .collect(), - power_controllers: self - .pscs - .iter() - .map(|sock| KnownSp { - sp: sock.local_addr().unwrap(), - switch_port: "127.0.0.1:0".parse().unwrap(), - }) - .collect(), + async fn make_management_switch( + &self, + mut recv_callback: F, + ) -> ManagementSwitch + where + F: FnMut(SwitchPort, &[u8]) + Send + 'static, + { + let log = Logger::root(slog::Discard, slog::o!()); + + // Skip the discovery process by constructing a `ManagementSwitch` + // by hand + let mut sockets = HashMap::new(); + let mut port_to_id = HashMap::new(); + let mut sp_addrs = HashMap::new(); + for (typ, sp_sockets) in [ + (SpType::Switch, &self.switches), + (SpType::Sled, &self.sleds), + (SpType::Power, &self.pscs), + ] { + for (slot, sp_sock) in sp_sockets.iter().enumerate() { + let port = SwitchPort(sockets.len()); + let id = SpIdentifier { typ, slot }; + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + sp_addrs.insert(port, sp_sock.local_addr().unwrap()); + port_to_id.insert(port, id); + sockets.insert(port, socket); + } + } + let sockets = Arc::new(sockets); + + let recv_handler = + RecvHandler::new(sockets.keys().copied(), log.clone()); + + // Since we skipped the discovery process, we have to tell + // `recv_handler` what all the fake SP addresses are. + for (port, addr) in sp_addrs { + recv_handler.set_remote_addr(port, addr); + } + + let recv_task = { + let recv_handler = Arc::clone(&recv_handler); + tokio::spawn(recv_task( + Arc::clone(&sockets), + move |port, addr, data| { + recv_handler.handle_incoming_packet(port, addr, data); + recv_callback(port, data); + }, + log.clone(), + )) + }; + + let location_map = + LocationMap::new_raw(String::from("test"), port_to_id); + let local_ignition_controller_port = location_map + .id_to_port(SpIdentifier { typ: SpType::Switch, slot: 0 }) + .unwrap(); + + ManagementSwitch { + local_ignition_controller_port, + recv_handler, + location_map, + sockets, + log, + recv_task, } } } @@ -495,19 +597,16 @@ mod tests { // callback that accumlates received packets into a hashmap let received: Arc>>>> = Arc::default(); - let discovery = ManagementSwitchDiscovery::placeholder_start( - harness.as_known_sps(), - Logger::root(slog::Discard, slog::o!()), - ) - .await - .unwrap(); - let switch = discovery.start_recv_task({ - let received = Arc::clone(&received); - move |port, data| { - let mut received = received.lock().unwrap(); - received.entry(port).or_default().push(data.to_vec()); - } - }); + + let switch = harness + .make_management_switch({ + let received = Arc::clone(&received); + move |port, data: &[u8]| { + let mut received = received.lock().unwrap(); + received.entry(port).or_default().push(data.to_vec()); + } + }) + .await; // Actual test - send a bunch of data to each of the ports... let mut expected: HashMap>> = HashMap::new(); @@ -516,7 +615,8 @@ mod tests { let port = SwitchPort(port_num); let data = format!("message {} to {:?}", i, port).into_bytes(); - let addr = switch.inner.local_addr(port).unwrap(); + let addr = + switch.sockets.get(&port).unwrap().local_addr().unwrap(); sock.send_to(&data, addr).await.unwrap(); expected.entry(port).or_default().push(data); } @@ -572,13 +672,7 @@ mod tests { async fn test_sp_socket() { let harness = Harness::new().await; - let discovery = ManagementSwitchDiscovery::placeholder_start( - harness.as_known_sps(), - Logger::root(slog::Discard, slog::o!()), - ) - .await - .unwrap(); - let switch = discovery.start_recv_task(|_port, _data| { /* ignore */ }); + let switch = harness.make_management_switch(|_, _| {}).await; // confirm messages sent to the switch's sp sockets show up on our // harness sockets @@ -589,18 +683,22 @@ mod tests { (SpType::Power, &harness.pscs), ] { for (slot, sp_sock) in sp_sockets.iter().enumerate() { - let port = switch.inner.switch_port(typ, slot).unwrap(); - let sock = switch.inner.sp_socket(port).unwrap(); + let port = switch + .location_map + .id_to_port(SpIdentifier { typ, slot }) + .unwrap(); + let sock = switch.sp_socket(port).unwrap(); + let local_addr = sock.socket.local_addr().unwrap(); let message = format!("{:?} {}", typ, slot).into_bytes(); - sock.send(&message).await.unwrap(); + sock.socket.send_to(&message, sock.addr).await.unwrap(); let (n, addr) = sp_sock.recv_from(&mut buf).await.unwrap(); // confirm we received the expected message from the // corresponding switch port assert_eq!(&buf[..n], message); - assert_eq!(addr, switch.inner.local_addr(port).unwrap()); + assert_eq!(addr, local_addr); } } } diff --git a/gateway-sp-comms/src/management_switch/location_map.rs b/gateway-sp-comms/src/management_switch/location_map.rs index ca1afbf8715..54a6c944b2f 100644 --- a/gateway-sp-comms/src/management_switch/location_map.rs +++ b/gateway-sp-comms/src/management_switch/location_map.rs @@ -107,7 +107,6 @@ impl LocationMap { } pub(super) async fn run_discovery( - communicator: Arc, config: LocationConfig, ports: HashMap, sockets: Arc>, @@ -127,7 +126,6 @@ impl LocationMap { let ports = ports.clone(); tokio::spawn(async move { discover_sps( - &communicator, &sockets, ports, &recv_handler, @@ -210,10 +208,9 @@ impl LocationMap { /// and the list of locations we could be in based on the SP's response on that /// port. Our spawner is responsible for collecting/using those messages. async fn discover_sps( - communicator: &crate::Communicator, sockets: &HashMap, port_config: HashMap, - _recv_handler: &RecvHandler, + recv_handler: &RecvHandler, mut location_determination: Vec, refined_locations: mpsc::Sender<(SwitchPort, Vec)>, log: &Logger, @@ -241,9 +238,9 @@ async fn discover_sps( .expect("internal backoff policy gave up"); tokio::time::sleep(duration).await; - let result = communicator + let result = socket .request_response( - &socket, + &recv_handler, RequestKind::Discover, ResponseKindExt::try_into_discover, // TODO should this timeout be configurable or itself @@ -255,6 +252,7 @@ async fn discover_sps( // reasonably large number; this may solve itself when // we move to some kind of authenticated comms channel. Some(Instant::now() + Duration::from_secs(5)), + &log, ) .await; diff --git a/gateway-sp-comms/src/recv_handler/mod.rs b/gateway-sp-comms/src/recv_handler/mod.rs index bfd006c8480..0db2b575b8b 100644 --- a/gateway-sp-comms/src/recv_handler/mod.rs +++ b/gateway-sp-comms/src/recv_handler/mod.rs @@ -6,11 +6,10 @@ use crate::error::Error; use crate::error::SpCommunicationError; -use crate::management_switch::ManagementSwitch; -use crate::management_switch::ManagementSwitchDiscovery; use crate::management_switch::SwitchPort; use crate::Communicator; use futures::future::Fuse; +use futures::Future; use futures::FutureExt; use futures::SinkExt; use futures::StreamExt; @@ -29,12 +28,16 @@ use slog::debug; use slog::error; use slog::info; use slog::trace; +use slog::warn; use slog::Logger; use std::borrow::Cow; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::VecDeque; use std::convert::TryInto; +use std::net::SocketAddr; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -75,6 +78,7 @@ use self::request_response_map::ResponseIngestResult; /// corresponding to the future returned in step 1, fulfilling it. #[derive(Debug)] pub(crate) struct RecvHandler { + request_id: AtomicU32, sp_state: HashMap, log: Logger, } @@ -83,34 +87,25 @@ impl RecvHandler { /// Create a new `RecvHandler` that is aware of all ports described by /// `switch`. pub(crate) fn new( - switch_discovery: ManagementSwitchDiscovery, + ports: impl ExactSizeIterator, log: Logger, - ) -> (ManagementSwitch, Arc) { + ) -> Arc { // prime `sp_state` with all known ports of the switch - let all_ports = switch_discovery.all_ports(); - let mut sp_state = HashMap::with_capacity(all_ports.len()); - for port in all_ports { + let mut sp_state = HashMap::with_capacity(ports.len()); + for port in ports { sp_state.insert(port, SingleSpState::default()); } - // configure a `ManagementSwitch` that notifies us of every incoming - // packet - let handler = Arc::new(Self { sp_state, log }); - let switch = { - let handler = Arc::clone(&handler); - switch_discovery.start_recv_task(move |port, buf| { - handler.handle_incoming_packet(port, buf) - }) - }; - - (switch, handler) + // TODO: Should we init our request_id randomly instead of always + // starting at 0? + Arc::new(Self { request_id: AtomicU32::new(0), sp_state, log }) } - // SwitchPort instances can only be created by `ManagementSwitch`, so we - // should never be able to instantiate a port that we don't have in - // `self.sp_state` (which we initialize with all ports declared by the - // switch we were given). fn sp_state(&self, port: SwitchPort) -> &SingleSpState { + // SwitchPort instances can only be created by `ManagementSwitch`, so we + // should never be able to instantiate a port that we don't have in + // `self.sp_state` (which we initialize with all ports declared by the + // switch we were given). self.sp_state.get(&port).expect("invalid switch port") } @@ -210,20 +205,37 @@ impl RecvHandler { Ok(()) } - /// Returns a future that will complete when we receive a response on the - /// given `port` with the corresponding `request_id`. + /// Returns a new request ID and a future that will complete when we receive + /// a response on the given `port` with that request ID. /// /// Panics if `port` is not one of the ports defined by the `switch` given /// to this `RecvHandler` when it was constructed. - pub(crate) async fn register_request_id( + pub(crate) fn register_request_id( &self, port: SwitchPort, - request_id: u32, - ) -> Result { - self.sp_state(port).requests.wait_for_response(request_id).await + ) -> (u32, impl Future> + '_) + { + let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); + (request_id, self.sp_state(port).requests.wait_for_response(request_id)) + } + + /// Returns the address of the SP connected to `port`, if we know it. + pub(crate) fn remote_addr(&self, port: SwitchPort) -> Option { + *self.sp_state(port).addr.lock().unwrap() } - fn handle_incoming_packet(&self, port: SwitchPort, buf: &[u8]) { + // Only available for tests: set the remote address of a given port. + #[cfg(test)] + pub(crate) fn set_remote_addr(&self, port: SwitchPort, addr: SocketAddr) { + *self.sp_state(port).addr.lock().unwrap() = Some(addr); + } + + pub(crate) fn handle_incoming_packet( + &self, + port: SwitchPort, + addr: SocketAddr, + buf: &[u8], + ) { trace!(&self.log, "received {} bytes from {:?}", buf.len(), port); // the first four bytes of packets we expect is always a version number; @@ -269,6 +281,35 @@ impl RecvHandler { }; debug!(&self.log, "received {:?} from {:?}", sp_msg, port); + // update our knowledge of the sender's address + let state = self.sp_state(port); + match state.addr.lock().unwrap().replace(addr) { + None => { + // expected but rare: our first packet on this port + debug!( + &self.log, "discovered remote address for port"; + "port" => ?port, + "addr" => %addr, + ); + } + Some(old) if old == addr => { + // expected; we got another packet from the expected address + } + Some(old) => { + // unexpected; the remote address changed + // TODO-security - What should we do here? Could the sled have + // been physically replaced and we're now hearing from a new SP? + // This question/TODO may go away on its own if we add an + // authenticated channel? + warn!( + &self.log, "updated remote address for port"; + "port" => ?port, + "old_addr" => %old, + "new_addr" => %addr, + ); + } + } + // decide whether this is a response to an outstanding request or an // unprompted message match sp_msg.kind { @@ -341,6 +382,7 @@ impl RecvHandler { #[derive(Debug, Default)] struct SingleSpState { + addr: Mutex>, requests: RequestResponseMap>, serial_console_tasks: Mutex>, } diff --git a/gateway/examples/config.toml b/gateway/examples/config.toml index 8922a21fa33..f3113abf585 100644 --- a/gateway/examples/config.toml +++ b/gateway/examples/config.toml @@ -5,19 +5,69 @@ # Identifier for this instance of MGS id = "8afcb12d-f625-4df9-bdf2-f495c3bbd323" -[known_sps] -switches = [ - # first switch is assumed to be the local ignition controller - { sp = "[::1]:33300", switch_port = "[::1]:33200" }, -] -sleds = [ - { sp = "[::1]:33310", switch_port = "[::1]:33201" }, - { sp = "[::1]:33320", switch_port = "[::1]:33202" }, -] -power_controllers = [ -] +[switch] +# which vsc port is connected to our local sidecar SP (i.e., the SP that acts as +# our contact to the ignition controller) +local_ignition_controller_port = 0 + +[switch.location] +# possible locations where MGS could be running; these names appear in logs and +# are used in the remainder of the `[switch.*]` configuration to define port +# mappings +names = ["sidecar-a", "sidecar-b"] + +# `[[switch.location.determination]]` is a list of switch ports we should +# contact in order to determine our location; each port defines a subset of +# `[switch.location.names]` which are the possible location(s) of this MGS +# instance if the message was received on the given SP port. When MGS starts, it +# will send a discovery message on each port listed in this section, collect the +# responses, and determine its location via the intersection of the names listed +# below (for all ports which returned a successful response). This process can +# fail if too few SPs respond (leaving us with 2 or more possible locations) or +# if there is a miscabling that results in an unsolvable system (e.g., +# determination 0 reports "sidecar-a" and determination 1 reports "sidecar-b"). +[[switch.location.determination]] +switch_port = 1 +sp_port_0 = ["sidecar-a"] +sp_port_1 = ["sidecar-b"] + +[[switch.location.determination]] +switch_port = 2 +sp_port_0 = ["sidecar-a"] +sp_port_1 = ["sidecar-b"] + +# `[[switch.port.*]]` defines the local data link address (in RFD 250 terms, the +# interface configured to use VLAN tag assigned to the given port) and the +# logical ID of the remote SP ("sled 7", "switch 1", etc.), which must have an +# entry for each member of `[switch.location]` above. +# +# TODO This section has some concessions to local testing: ultimately we will +# use a single multicast address, target port, and source port, but for now all +# three are configured on a per-port basis, which allows a single system to +# simulate a full set of ports and SPs. +[switch.port.0] +data_link_addr = "[::]:33200" +multicast_addr = "[ff15:0:1de::0]:33300" +[switch.port.0.location] +sidecar-a = ["switch", 0] +sidecar-b = ["switch", 1] + +[switch.port.1] +data_link_addr = "[::]:33201" +multicast_addr = "[ff15:0:1de::1]:33310" +[switch.port.1.location] +sidecar-a = ["sled", 0] +sidecar-b = ["sled", 0] + +[switch.port.2] +data_link_addr = "[::]:33202" +multicast_addr = "[ff15:0:1de::2]:33320" +[switch.port.2.location] +sidecar-a = ["sled", 1] +sidecar-b = ["sled", 1] [timeouts] +discovery_millis = 1_000 ignition_controller_millis = 1_000 sp_request_millis = 1_000 bulk_request_default_millis = 5_000 diff --git a/gateway/src/bin/gateway.rs b/gateway/src/bin/gateway.rs index cfecceea8c0..a9c30d378bb 100644 --- a/gateway/src/bin/gateway.rs +++ b/gateway/src/bin/gateway.rs @@ -41,6 +41,6 @@ async fn do_run() -> Result<(), CmdError> { if args.openapi { run_openapi().map_err(CmdError::Failure) } else { - run_server(&config).await.map_err(CmdError::Failure) + run_server(config).await.map_err(CmdError::Failure) } } diff --git a/gateway/src/config.rs b/gateway/src/config.rs index 30a51e895ba..34e225028e9 100644 --- a/gateway/src/config.rs +++ b/gateway/src/config.rs @@ -6,7 +6,7 @@ //! configuration use dropshot::{ConfigDropshot, ConfigLogging}; -use gateway_sp_comms::KnownSps; +use gateway_sp_comms::SwitchConfig; use serde::{Deserialize, Serialize}; use std::path::Path; use std::path::PathBuf; @@ -14,6 +14,9 @@ use thiserror::Error; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct Timeouts { + /// Timeout for running the discovery process to determine logical mappings + /// of switches/sleds. + pub discovery_millis: u64, /// Timeout for messages to our local ignition controller SP. pub ignition_controller_millis: u64, /// Timeout for requests sent to arbitrary SPs. @@ -46,8 +49,8 @@ pub struct Config { pub timeouts: Timeouts, /// Dropshot configuration for API server pub dropshot: ConfigDropshot, - /// Placeholder description of all known SPs in the system. - pub known_sps: KnownSps, + /// Configuration of the management switch. + pub switch: SwitchConfig, /// Server-wide logging configuration. pub log: ConfigLogging, } diff --git a/gateway/src/context.rs b/gateway/src/context.rs index 75830e4f04f..66a31dda45a 100644 --- a/gateway/src/context.rs +++ b/gateway/src/context.rs @@ -2,11 +2,12 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use crate::{bulk_state_get::BulkSpStateRequests, Config}; -use gateway_sp_comms::error::StartupError; +use crate::bulk_state_get::BulkSpStateRequests; use gateway_sp_comms::Communicator; +use gateway_sp_comms::{error::StartupError, SwitchConfig}; use slog::Logger; use std::{sync::Arc, time::Duration}; +use tokio::time::Instant; /// Shared state used by API request handlers pub struct ServerContext { @@ -49,15 +50,19 @@ impl From<&'_ crate::config::Timeouts> for Timeouts { impl ServerContext { pub async fn new( - config: &Config, + switch_config: SwitchConfig, + timeouts: crate::config::Timeouts, log: &Logger, ) -> Result, StartupError> { - let comms = - Arc::new(Communicator::new(config.known_sps.clone(), log).await?); + let discovery_deadline = + Instant::now() + Duration::from_millis(timeouts.discovery_millis); + let comms = Arc::new( + Communicator::new(switch_config, discovery_deadline, log).await?, + ); Ok(Arc::new(ServerContext { sp_comms: Arc::clone(&comms), bulk_sp_state_requests: BulkSpStateRequests::new(comms, log), - timeouts: Timeouts::from(&config.timeouts), + timeouts: Timeouts::from(&timeouts), })) } } diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index c548f2d35b7..008088a7c6c 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -38,7 +38,7 @@ pub struct Server { impl Server { /// Start a gateway server. pub async fn start( - config: &Config, + config: Config, _rack_id: Uuid, log: &Logger, ) -> Result { @@ -52,8 +52,9 @@ impl Server { } } - let apictx = - ServerContext::new(config, &log).await.map_err(|error| { + let apictx = ServerContext::new(config.switch, config.timeouts, &log) + .await + .map_err(|error| { format!("initializing server context: {}", error) })?; @@ -90,7 +91,7 @@ impl Server { } /// Run an instance of the [Server]. -pub async fn run_server(config: &Config) -> Result<(), String> { +pub async fn run_server(config: Config) -> Result<(), String> { use slog::Drain; let (drain, registration) = slog_dtrace::with_drain( config diff --git a/gateway/tests/config.test.toml b/gateway/tests/config.test.toml index cb7fb21c86d..ebe5edadc2d 100644 --- a/gateway/tests/config.test.toml +++ b/gateway/tests/config.test.toml @@ -6,15 +6,78 @@ # NOTE: The test suite always overrides this. id = "8afcb12d-f625-4df9-bdf2-f495c3bbd323" -[known_sps] -# NOTE: The test suite overrides this section based on the configured simulator. -switches = [] -sleds = [] -power_controllers = [] +[switch] +# which vsc port is connected to our local sidecar SP (i.e., the SP that acts as +# our contact to the ignition controller) +local_ignition_controller_port = 0 + +[switch.location] +# possible locations where MGS could be running; these names appear in logs and +# are used in the remainder of the `[switch.*]` configuration to define port +# mappings +names = ["sidecar-a", "sidecar-b"] + +# `[[switch.location.determination]]` is a list of switch ports we should +# contact in order to determine our location; each port defines a subset of +# `[switch.location.names]` which are the possible location(s) of this MGS +# instance if the message was received on the given SP port. When MGS starts, it +# will send a discovery message on each port listed in this section, collect the +# responses, and determine its location via the intersection of the names listed +# below (for all ports which returned a successful response). This process can +# fail if too few SPs respond (leaving us with 2 or more possible locations) or +# if there is a miscabling that results in an unsolvable system (e.g., +# determination 0 reports "sidecar-a" and determination 1 reports "sidecar-b"). +[[switch.location.determination]] +switch_port = 1 +sp_port_0 = ["sidecar-a"] +sp_port_1 = ["sidecar-b"] + +[[switch.location.determination]] +switch_port = 2 +sp_port_0 = ["sidecar-a"] +sp_port_1 = ["sidecar-b"] + +# `[[switch.port.*]]` defines the local data link address (in RFD 250 terms, the +# interface configured to use VLAN tag assigned to the given port) and the +# logical ID of the remote SP ("sled 7", "switch 1", etc.), which must have an +# entry for each member of `[switch.location]` above. +# +# TODO This section has some concessions to local testing: ultimately we will +# use a single multicast address, target port, and source port, but for now all +# three are configured on a per-port basis, which allows a single system to +# simulate a full set of ports and SPs. +[switch.port.0] +data_link_addr = "[::1]:0" +multicast_addr = "[::1]:0" +[switch.port.0.location] +sidecar-a = ["switch", 0] +sidecar-b = ["switch", 1] + +[switch.port.1] +data_link_addr = "[::1]:0" +multicast_addr = "[::1]:0" +[switch.port.1.location] +sidecar-a = ["switch", 1] +sidecar-b = ["switch", 0] + +[switch.port.2] +data_link_addr = "[::1]:0" +multicast_addr = "[::1]:0" +[switch.port.2.location] +sidecar-a = ["sled", 0] +sidecar-b = ["sled", 0] + +[switch.port.3] +data_link_addr = "[::1]:0" +multicast_addr = "[::1]:0" +[switch.port.3.location] +sidecar-a = ["sled", 1] +sidecar-b = ["sled", 1] [timeouts] +discovery_millis = 1_000 ignition_controller_millis = 500 -sp_request_millis = 500 +sp_request_millis = 1_000 bulk_request_default_millis = 1_000 bulk_request_max_millis = 2_000 bulk_request_page_millis = 500 diff --git a/gateway/tests/integration_tests/bulk_state_get.rs b/gateway/tests/integration_tests/bulk_state_get.rs index aca1ba8e8f0..8f8a1ab9e92 100644 --- a/gateway/tests/integration_tests/bulk_state_get.rs +++ b/gateway/tests/integration_tests/bulk_state_get.rs @@ -34,7 +34,7 @@ macro_rules! assert_eq_unordered { #[tokio::test] async fn bulk_sp_get_all_online() { - let testctx = setup::test_setup("bulk_sp_get_all_online").await; + let testctx = setup::test_setup("bulk_sp_get_all_online", 0).await; let client = &testctx.client; // simulator just started; all SPs are online @@ -58,7 +58,7 @@ async fn bulk_sp_get_all_online() { #[tokio::test] async fn bulk_sp_get_one_sp_powered_off() { - let testctx = setup::test_setup("bulk_sp_get_all_online").await; + let testctx = setup::test_setup("bulk_sp_get_all_online", 0).await; let client = &testctx.client; // simulator just started; all SPs are online @@ -119,7 +119,7 @@ async fn bulk_sp_get_one_sp_powered_off() { #[tokio::test] async fn bulk_sp_get_one_sp_unresponsive() { - let testctx = setup::test_setup("bulk_sp_get_all_online").await; + let testctx = setup::test_setup("bulk_sp_get_all_online", 0).await; let client = &testctx.client; // simulator just started; all SPs are online diff --git a/gateway/tests/integration_tests/location_discovery.rs b/gateway/tests/integration_tests/location_discovery.rs new file mode 100644 index 00000000000..ad9c65012ac --- /dev/null +++ b/gateway/tests/integration_tests/location_discovery.rs @@ -0,0 +1,47 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2022 Oxide Computer Company + +use super::setup; +use dropshot::test_util; +use omicron_gateway::http_entrypoints::SpInfo; +use omicron_gateway::http_entrypoints::SpState; + +#[tokio::test] +async fn discovery_both_locations() { + let testctx0 = setup::test_setup("discovery_both_locations_0", 0).await; + let testctx1 = setup::test_setup("discovery_both_locations_1", 1).await; + + let client0 = &testctx0.client; + let client1 = &testctx1.client; + + // the two instances should've discovered that they were sidecar-a and + // sidecar-b, respectively + assert_eq!(testctx0.server.apictx.sp_comms.location_name(), "sidecar-a"); + assert_eq!(testctx1.server.apictx.sp_comms.location_name(), "sidecar-b"); + + // both instances should report the same serial number for switch 0 and + // switch 1, and it should match the expected values from the config + for (switch, expected_serial) in [ + (0, "00010001000100010001000100010001"), + (1, "01000100010001000100010001000100"), + ] { + for client in [client0, client1] { + let url = + format!("{}", client0.url(&format!("/sp/switch/{}", switch))); + + let resp: SpInfo = test_util::object_get(client, &url).await; + match resp.details { + SpState::Enabled { serial_number } => { + assert_eq!(serial_number, expected_serial) + } + other => panic!("unexpected state {:?}", other), + } + } + } + + testctx0.teardown().await; + testctx1.teardown().await; +} diff --git a/gateway/tests/integration_tests/mod.rs b/gateway/tests/integration_tests/mod.rs index a9b38d58bba..d52f37f0fee 100644 --- a/gateway/tests/integration_tests/mod.rs +++ b/gateway/tests/integration_tests/mod.rs @@ -16,6 +16,7 @@ use sp_sim::SimulatedSp; mod bulk_state_get; mod commands; +mod location_discovery; mod serial_console; mod setup; diff --git a/gateway/tests/integration_tests/serial_console.rs b/gateway/tests/integration_tests/serial_console.rs index 8a5dbddf01e..82fa1d4fa59 100644 --- a/gateway/tests/integration_tests/serial_console.rs +++ b/gateway/tests/integration_tests/serial_console.rs @@ -59,7 +59,7 @@ async fn sim_sp_serial_console( #[tokio::test] async fn serial_console_communication() { - let testctx = setup::test_setup("serial_console_communication").await; + let testctx = setup::test_setup("serial_console_communication", 0).await; let client = &testctx.client; let simrack = &testctx.simrack; @@ -103,7 +103,7 @@ async fn serial_console_communication() { #[tokio::test] async fn serial_console_detach() { - let testctx = setup::test_setup("serial_console_communication").await; + let testctx = setup::test_setup("serial_console_communication", 0).await; let client = &testctx.client; let simrack = &testctx.simrack; diff --git a/gateway/tests/integration_tests/setup.rs b/gateway/tests/integration_tests/setup.rs index 2486ffd1b28..1f5d4b02a2f 100644 --- a/gateway/tests/integration_tests/setup.rs +++ b/gateway/tests/integration_tests/setup.rs @@ -6,12 +6,17 @@ use dropshot::test_util::ClientTestContext; use dropshot::test_util::LogContext; -use gateway_sp_comms::KnownSp; -use gateway_sp_comms::KnownSps; +use gateway_sp_comms::SpType; +use omicron_test_utils::dev::poll; +use omicron_test_utils::dev::poll::CondCheckError; use slog::o; use sp_sim::SimRack; use sp_sim::SimulatedSp; +use std::collections::HashSet; +use std::convert::Infallible; +use std::future; use std::path::Path; +use std::time::Duration; use uuid::Uuid; // TODO this exact value is copy/pasted from `nexus/test-utils` - should we @@ -45,15 +50,42 @@ pub fn load_test_config() -> (omicron_gateway::Config, sp_sim::Config) { (server_config, sp_sim_config) } -pub async fn test_setup(test_name: &str) -> GatewayTestContext { - let (mut server_config, mut sp_sim_config) = load_test_config(); - test_setup_with_config(test_name, &mut server_config, &mut sp_sim_config) - .await +pub async fn test_setup(test_name: &str, sp_port: usize) -> GatewayTestContext { + let (server_config, mut sp_sim_config) = load_test_config(); + test_setup_with_config( + test_name, + sp_port, + server_config, + &mut sp_sim_config, + ) + .await +} + +fn expected_location( + config: &omicron_gateway::Config, + sp_port: usize, +) -> String { + let config = &config.switch.location; + let mut locations = config.names.iter().cloned().collect::>(); + + for determination in &config.determination { + let refined = match sp_port { + 0 => &determination.sp_port_0, + 1 => &determination.sp_port_1, + n => panic!("invalid sp port {}", n), + }; + + locations.retain(|name| refined.contains(name)); + } + + assert_eq!(locations.len(), 1); + locations.into_iter().next().unwrap() } pub async fn test_setup_with_config( test_name: &str, - server_config: &mut omicron_gateway::Config, + sp_port: usize, + mut server_config: omicron_gateway::Config, sp_sim_config: &mut sp_sim::Config, ) -> GatewayTestContext { // Use log settings from the server config and ignore log settings in @@ -64,35 +96,66 @@ pub async fn test_setup_with_config( // Start fake rack of simulated SPs let simrack = SimRack::start(sp_sim_config, log).await.unwrap(); - // Update gateway config to match the simulated rack. - let sidecars = simrack - .sidecars - .iter() - .map(|simsp| KnownSp { - sp: simsp.local_addr(0), - switch_port: "[::1]:0".parse().unwrap(), - }) - .collect::>(); - let gimlets = simrack - .gimlets - .iter() - .map(|simsp| KnownSp { - sp: simsp.local_addr(0), - switch_port: "[::1]:0".parse().unwrap(), - }) - .collect::>(); - server_config.known_sps = KnownSps { - switches: sidecars, - sleds: gimlets, - power_controllers: vec![], // TODO - }; + // TODO parameter for SP port + let expected_location = expected_location(&server_config, sp_port); + + // Update multicast addrs of `server_config` to point to the SP ports that + // will identify us as the expected location + for port_config in server_config.switch.port.values_mut() { + // we need to know whether this port points to a switch or sled; for now + // assume that matches whether we end up as `sidecar-a` or `sidecar-b` + let target_sp = port_config.location.get(&expected_location).unwrap(); + let sp_addr = match target_sp.typ { + SpType::Switch => { + simrack.sidecars[target_sp.slot].local_addr(sp_port) + } + SpType::Sled => simrack.gimlets[target_sp.slot].local_addr(sp_port), + SpType::Power => todo!(), + }; + port_config.multicast_addr.set_port(sp_addr.port()); + } // Start gateway server let rack_id = Uuid::parse_str(RACK_UUID).unwrap(); - let server = omicron_gateway::Server::start(server_config, rack_id, log) - .await - .unwrap(); + let server = + omicron_gateway::Server::start(server_config.clone(), rack_id, log) + .await + .unwrap(); + + // Make sure it discovered the location we expect + assert_eq!(server.apictx.sp_comms.location_name(), expected_location); + + // Build a list of all SPs defined in our config + let mut all_sp_ids = Vec::new(); + for port_config in server_config.switch.port.values() { + all_sp_ids.push( + port_config + .location + .get(server.apictx.sp_comms.location_name()) + .copied() + .unwrap(), + ); + } + + // Wait until the server has figured out the socket address of all those SPs + poll::wait_for_condition::<(), Infallible, _, _>( + || { + let comms = &server.apictx.sp_comms; + let result = if comms.local_ignition_controller_address_known() + && all_sp_ids.iter().all(|&id| comms.address_known(id)) + { + Ok(()) + } else { + Err(CondCheckError::NotYet) + }; + future::ready(result) + }, + &Duration::from_millis(100), + &Duration::from_secs(1), + ) + .await + .unwrap(); let client = ClientTestContext::new( server.http_server.local_addr(), diff --git a/gateway/tests/sp_sim_config.test.toml b/gateway/tests/sp_sim_config.test.toml index 42a089d7d52..12504ac99e2 100644 --- a/gateway/tests/sp_sim_config.test.toml +++ b/gateway/tests/sp_sim_config.test.toml @@ -12,6 +12,11 @@ multicast_addr = "::1" bind_addrs = ["[::1]:0", "[::1]:0"] serial_number = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] +[[simulated_sps.sidecar]] +multicast_addr = "::1" +bind_addrs = ["[::1]:0", "[::1]:0"] +serial_number = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0] + [[simulated_sps.gimlet]] multicast_addr = "::1" bind_addrs = ["[::1]:0", "[::1]:0"] diff --git a/sp-sim/examples/config.toml b/sp-sim/examples/config.toml index 43cec485e8f..fe178bc549f 100644 --- a/sp-sim/examples/config.toml +++ b/sp-sim/examples/config.toml @@ -4,12 +4,12 @@ [[simulated_sps.sidecar]] multicast_addr = "ff15:0:1de::0" -bind_addrs = ["[::1]:33300", "[::1]:33301"] +bind_addrs = ["[::]:33300", "[::]:33301"] serial_number = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] [[simulated_sps.gimlet]] multicast_addr = "ff15:0:1de::1" -bind_addrs = ["[::1]:33310", "[::1]:33311"] +bind_addrs = ["[::]:33310", "[::]:33311"] serial_number = [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3] [[simulated_sps.gimlet.components]] name = "sp3" @@ -17,7 +17,7 @@ serial_console = "[::1]:33312" [[simulated_sps.gimlet]] multicast_addr = "ff15:0:1de::2" -bind_addrs = ["[::1]:33320", "[::1]:33321"] +bind_addrs = ["[::]:33320", "[::]:33321"] serial_number = [4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 7, 7] [[simulated_sps.gimlet.components]] name = "sp3"