Skip to content

Commit

Permalink
MGS: Flesh out RFD250-style SP/location discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
jgallagher committed Apr 21, 2022
1 parent 674ffc2 commit 2fbb360
Show file tree
Hide file tree
Showing 24 changed files with 878 additions and 517 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions gateway-sp-comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ http = "0.2.6"
hyper = "0.14.17"
ringbuffer = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.12.0"
thiserror = "1.0.30"
tokio-tungstenite = "0.17"
tokio-stream = "0.1.8"
Expand Down
177 changes: 59 additions & 118 deletions gateway-sp-comms/src/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,31 @@ use crate::error::Error;
use crate::error::SpCommunicationError;
use crate::error::StartupError;
use crate::management_switch::ManagementSwitch;
use crate::management_switch::ManagementSwitchDiscovery;
use crate::management_switch::SpSocket;
use crate::management_switch::SwitchPort;
use crate::recv_handler::RecvHandler;
use crate::Elapsed;
use crate::KnownSps;
use crate::SpIdentifier;
use crate::SwitchConfig;
use crate::Timeout;
use futures::stream::FuturesUnordered;
use futures::Future;
use futures::Stream;
use gateway_messages::version;
use gateway_messages::BulkIgnitionState;
use gateway_messages::DiscoverResponse;
use gateway_messages::IgnitionCommand;
use gateway_messages::IgnitionState;
use gateway_messages::Request;
use gateway_messages::RequestKind;
use gateway_messages::ResponseError;
use gateway_messages::ResponseKind;
use gateway_messages::SerialConsole;
use gateway_messages::SerializedSize;
use gateway_messages::SpComponent;
use gateway_messages::SpState;
use hyper::header;
use hyper::upgrade;
use hyper::Body;
use omicron_common::backoff;
use omicron_common::backoff::Backoff;
use slog::debug;
use slog::info;
use slog::o;
use slog::Logger;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use tokio::time::Instant;
use std::sync::Arc;
use std::time::Duration;
use tokio_tungstenite::tungstenite::handshake;
Expand All @@ -66,28 +56,30 @@ where

#[derive(Debug)]
pub struct Communicator {
log: Logger,
switch: ManagementSwitch,
request_id: AtomicU32,
recv_handler: Arc<RecvHandler>,
}

impl Communicator {
pub async fn new(
known_sps: KnownSps,
config: SwitchConfig,
discovery_deadline: Instant,
log: &Logger,
) -> Result<Self, StartupError> {
let log = log.new(o!("component" => "SpCommunicator"));
let discovery = ManagementSwitchDiscovery::placeholder_start(
known_sps,
log.clone(),
)
.await?;

let (switch, recv_handler) = RecvHandler::new(discovery, log.clone());
let switch =
ManagementSwitch::new(config, discovery_deadline, log.clone())
.await?;

info!(&log, "started SP communicator");
Ok(Self { log, switch, request_id: AtomicU32::new(0), recv_handler })
Ok(Self { switch })
}

/// Get the name of our location.
///
/// This matches one of the names specified as a possible location in the
/// configuration we were given.
pub fn location_name(&self) -> &str {
&self.switch.location_name()
}

// convert an identifier to a port number; this is fallible because
Expand All @@ -103,13 +95,36 @@ impl Communicator {
self.switch.switch_port_to_id(port)
}

/// Returns true if we've discovered the IP address of our local ignition
/// controller.
///
/// This method exists to be polled during test setup (to wait for discovery
/// to happen); it should not be called outside tests.
pub fn local_ignition_controller_address_known(&self) -> bool {
self.switch.ignition_controller().is_some()
}

/// Returns true if we've discovered the IP address of the specified SP.
///
/// This method exists to be polled during test setup (to wait for discovery
/// to happen); it should not be called outside tests. In particular, it
/// panics instead of running an error if `sp` describes an SP that isn't
/// known to this communicator.
pub fn address_known(&self, sp: SpIdentifier) -> bool {
let port = self.switch.switch_port(sp).unwrap();
self.switch.sp_socket(port).is_some()
}

/// Ask the local ignition controller for the ignition state of a given SP.
pub async fn get_ignition_state(
&self,
sp: SpIdentifier,
timeout: Timeout,
) -> Result<IgnitionState, Error> {
let controller = self.switch.ignition_controller();
let controller = self
.switch
.ignition_controller()
.ok_or(Error::LocalIgnitionControllerAddressUnknown)?;
let port = self.id_to_port(sp)?;
let request =
RequestKind::IgnitionState { target: port.as_ignition_target() };
Expand All @@ -129,7 +144,10 @@ impl Communicator {
&self,
timeout: Timeout,
) -> Result<Vec<(SpIdentifier, IgnitionState)>, Error> {
let controller = self.switch.ignition_controller();
let controller = self
.switch
.ignition_controller()
.ok_or(Error::LocalIgnitionControllerAddressUnknown)?;
let request = RequestKind::BulkIgnitionState;

let bulk_state = self
Expand Down Expand Up @@ -170,7 +188,10 @@ impl Communicator {
command: IgnitionCommand,
timeout: Timeout,
) -> Result<(), Error> {
let controller = self.switch.ignition_controller();
let controller = self
.switch
.ignition_controller()
.ok_or(Error::LocalIgnitionControllerAddressUnknown)?;
let target = self.id_to_port(target_sp)?.as_ignition_target();
let request = RequestKind::IgnitionCommand { target, command };

Expand All @@ -196,6 +217,11 @@ impl Communicator {
// via UDP. SPs will continuously broadcast any serial console data, even if
// there is no attached client. Maybe this is fine, since the serial console
// shouldn't be noisy without a corresponding client driving it?
//
// TODO Because this method doesn't contact the target SP, it succeeds even
// if we don't know the IP address of that SP (yet, or possibly ever)! The
// connection will start working if we later discover the address, but this
// is probably not the behavior we want.
pub async fn serial_console_attach(
self: &Arc<Self>,
request: &mut http::Request<Body>,
Expand Down Expand Up @@ -250,7 +276,7 @@ impl Communicator {
.map(|key| handshake::derive_accept_key(key))
.ok_or(Error::BadWebsocketConnection("missing websocket key"))?;

self.recv_handler.serial_console_attach(
self.switch.serial_console_attach(
Arc::clone(self),
port,
component,
Expand Down Expand Up @@ -281,7 +307,7 @@ impl Communicator {
component: &SpComponent,
) -> Result<(), Error> {
let port = self.id_to_port(sp)?;
self.recv_handler.serial_console_detach(port, component)
self.switch.serial_console_detach(port, component)
}

/// Send `packet` to the given SP component's serial console.
Expand Down Expand Up @@ -398,102 +424,17 @@ impl Communicator {
.collect::<FuturesUnordered<_>>()
}

pub(crate) async fn request_response<F, T>(
async fn request_response<F, T>(
&self,
sp: &SpSocket<'_>,
mut kind: RequestKind,
mut map_response_kind: F,
kind: RequestKind,
map_response_kind: F,
timeout: Option<Timeout>,
) -> Result<T, Error>
where
F: FnMut(ResponseKind) -> Result<T, BadResponseType>,
{
// helper to wrap a future in a timeout if we have one
async fn maybe_with_timeout<F, U>(
timeout: Option<Timeout>,
fut: F,
) -> Result<U, Elapsed>
where
F: Future<Output = U>,
{
match timeout {
Some(t) => t.timeout_at(fut).await,
None => Ok(fut.await),
}
}

// We'll use exponential backoff if and only if the SP responds with
// "busy"; any other error will cause the loop below to terminate.
let mut backoff = backoff::internal_service_policy();

loop {
// It would be nicer to use `backoff::retry()` instead of manually
// stepping the backoff policy, but the dance we do with `kind` to
// avoid cloning it is hard to map into `retry()` in a way that
// satisfies the borrow checker. ("The dance we do with `kind` to
// avoid cloning it" being that we move it into `request` below, and
// on a busy response from the SP we move it back out into the
// `kind` local var.)
let duration = backoff
.next_backoff()
.expect("internal backoff policy gave up");
maybe_with_timeout(timeout, tokio::time::sleep(duration))
.await
.map_err(|err| Error::Timeout {
timeout: err.duration(),
sp: self.port_to_id(sp.port()),
})?;

// request IDs will eventually roll over; since we enforce timeouts
// this should be a non-issue in practice. does this need testing?
let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);

// update our recv_handler to expect a response for this request ID
let response_fut =
self.recv_handler.register_request_id(sp.port(), request_id);

// Serialize and send our request. We know `buf` is large enough for
// any `Request`, so unwrapping here is fine.
let request = Request { version: version::V1, request_id, kind };
let mut buf = [0; Request::MAX_SIZE];
let n = gateway_messages::serialize(&mut buf, &request).unwrap();
let serialized_request = &buf[..n];

// Actual communication, guarded by `timeout` if it's not `None`.
let result = maybe_with_timeout(timeout, async {
debug!(&self.log, "sending {:?} to SP {:?}", request, sp);
sp.send(serialized_request).await.map_err(|err| {
SpCommunicationError::UdpSend { addr: sp.addr(), err }
})?;

Ok::<ResponseKind, SpCommunicationError>(response_fut.await?)
})
.await
.map_err(|err| Error::Timeout {
timeout: err.duration(),
sp: self.port_to_id(sp.port()),
})?;

match result {
Ok(response_kind) => {
return map_response_kind(response_kind)
.map_err(SpCommunicationError::from)
.map_err(Error::from)
}
Err(SpCommunicationError::SpError(ResponseError::Busy)) => {
debug!(
&self.log,
"SP busy; sleeping before retrying send";
"sp" => ?sp,
);

// move `kind` back into local var; required to satisfy
// borrow check of this loop
kind = request.kind;
}
Err(err) => return Err(err.into()),
}
}
self.switch.request_response(sp, kind, map_response_kind, timeout).await
}
}

Expand Down
6 changes: 4 additions & 2 deletions gateway-sp-comms/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ pub enum Error {
.0.slot,
)]
SpAddressUnknown(SpIdentifier),
#[error("timeout ({timeout:?}) elapsed communicating with {sp:?}")]
Timeout { timeout: Duration, sp: SpIdentifier },
#[error(
"timeout ({timeout:?}) elapsed communicating with {sp:?} on port {port}"
)]
Timeout { timeout: Duration, port: usize, sp: Option<SpIdentifier> },
#[error("error communicating with SP: {0}")]
SpCommunicationFailed(#[from] SpCommunicationError),
#[error("serial console is already attached")]
Expand Down
10 changes: 4 additions & 6 deletions gateway-sp-comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ pub mod error;

pub use communicator::Communicator;
pub use communicator::FuturesUnorderedImpl;
pub use management_switch::LocationConfig;
pub use management_switch::LocationDeterminationConfig;
pub use management_switch::SpIdentifier;
pub use management_switch::SpType;
pub use management_switch::SwitchConfig;
pub use management_switch::SwitchPortConfig;
pub use timeout::Elapsed;
pub use timeout::Timeout;

// TODO these will remain public for a while, but eventually will be removed
// altogther; currently these provide a way to hard-code the rack topology,
// which is not what we want.
pub use management_switch::KnownSp;
pub use management_switch::KnownSps;
Loading

0 comments on commit 2fbb360

Please sign in to comment.