From 1592583e7cb27b999e87b63c6e95dfc7b407d23b Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 28 Jul 2022 11:44:24 -0400 Subject: [PATCH 1/3] Initial faux-mgs implementation (just discovery/sp state) --- Cargo.lock | 12 +++ Cargo.toml | 2 + gateway/faux-mgs/Cargo.toml | 14 +++ gateway/faux-mgs/src/main.rs | 178 +++++++++++++++++++++++++++++++++++ 4 files changed, 206 insertions(+) create mode 100644 gateway/faux-mgs/Cargo.toml create mode 100644 gateway/faux-mgs/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 0f34f3c1c2..335fc95d76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1539,6 +1539,18 @@ dependencies = [ "log", ] +[[package]] +name = "faux-mgs" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap 3.2.15", + "gateway-messages", + "slog", + "slog-async", + "slog-term", +] + [[package]] name = "ff" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index 6385ce18dd..a538b6192d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "ddm-admin-client", "deploy", "gateway", + "gateway/faux-mgs", "gateway-cli", "gateway-client", "gateway-messages", @@ -40,6 +41,7 @@ default-members = [ "ddm-admin-client", "deploy", "gateway", + "gateway/faux-mgs", "gateway-cli", "gateway-client", "gateway-messages", diff --git a/gateway/faux-mgs/Cargo.toml b/gateway/faux-mgs/Cargo.toml new file mode 100644 index 0000000000..9471bfbe49 --- /dev/null +++ b/gateway/faux-mgs/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "faux-mgs" +version = "0.1.0" +edition = "2021" +license = "MPL-2.0" + +[dependencies] +anyhow = "1.0" +clap = { version = "3.2", features = ["derive"] } +slog = { version = "2.7", features = ["max_level_trace", "release_max_level_debug"] } +slog-async = "2.6" +slog-term = "2.9" + +gateway-messages = { path = "../../gateway-messages", features = ["std"] } diff --git a/gateway/faux-mgs/src/main.rs b/gateway/faux-mgs/src/main.rs new file mode 100644 index 0000000000..0b7ade591b --- /dev/null +++ b/gateway/faux-mgs/src/main.rs @@ -0,0 +1,178 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2022 Oxide Computer Company + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use clap::Subcommand; +use gateway_messages::version; +use gateway_messages::Request; +use gateway_messages::RequestKind; +use gateway_messages::ResponseError; +use gateway_messages::ResponseKind; +use gateway_messages::SerializedSize; +use gateway_messages::SpMessage; +use gateway_messages::SpMessageKind; +use slog::debug; +use slog::o; +use slog::trace; +use slog::warn; +use slog::Drain; +use slog::Level; +use slog::Logger; +use std::net::SocketAddrV6; +use std::net::UdpSocket; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; +use std::time::Duration; + +/// Command line program that can send MGS messages to a single SP. +#[derive(Parser, Debug)] +struct Args { + #[clap( + short, + long, + default_value = "info", + value_parser = level_from_str, + help = "Log level for MGS client", + )] + log_level: Level, + + #[clap(long)] + sp: SocketAddrV6, + + #[clap(long, short, default_value = "3000")] + timeout_millis: u64, + + #[clap(subcommand)] + command: Commands, +} + +fn level_from_str(s: &str) -> Result { + if let Ok(level) = s.parse() { + Ok(level) + } else { + bail!(format!("Invalid log level: {}", s)) + } +} + +#[derive(Subcommand, Debug)] +enum Commands { + /// Ask SP on which port it receives messages from us. + Discover, + + /// Ask SP for its current state. + State, +} + +fn main() -> Result<()> { + let args = Args::parse(); + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator) + .build() + .filter_level(args.log_level) + .fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let log = Logger::root(drain, o!("component" => "faux-mgs")); + + let socket = UdpSocket::bind("[::]:0") + .with_context(|| "failed to bind UDP socket")?; + socket + .set_read_timeout(Some(Duration::from_millis(args.timeout_millis))) + .with_context(|| "failed to set read timeout on UDP socket")?; + + let request_kind = match args.command { + Commands::Discover => RequestKind::Discover, + Commands::State => RequestKind::SpState, + }; + + let response = request_response(&log, &socket, args.sp, request_kind)?; + println!("{response:?}"); + + Ok(()) +} + +fn request_response( + log: &Logger, + socket: &UdpSocket, + addr: SocketAddrV6, + kind: RequestKind, +) -> Result> { + let request_id = send_request(log, socket, addr, kind)?; + loop { + let message = recv_sp_message(log, socket)?; + match message.kind { + SpMessageKind::Response { request_id: response_id, result } => { + if response_id != request_id { + warn!( + log, "ignoring unexpected response id"; + "response_id" => response_id, + ); + continue; + } + return Ok(result); + } + SpMessageKind::SerialConsole(_) => { + debug!(log, "ignoring serial console packet from SP"); + continue; + } + } + } +} + +// On success, returns the request ID we sent. +fn send_request( + log: &Logger, + socket: &UdpSocket, + addr: SocketAddrV6, + kind: RequestKind, +) -> Result { + static REQUEST_ID: AtomicU32 = AtomicU32::new(1); + + let version = version::V1; + let request_id = REQUEST_ID.fetch_add(1, Ordering::Relaxed); + let request = Request { version, request_id, kind }; + + let mut buf = [0; Request::MAX_SIZE]; + debug!( + log, "sending request to SP"; + "request" => ?request, + "sp" => %addr, + ); + let n = gateway_messages::serialize(&mut buf[..], &request).unwrap(); + socket + .send_to(&buf[..n], addr) + .with_context(|| format!("failed to send to {addr}"))?; + + Ok(request_id) +} + +fn recv_sp_message(log: &Logger, socket: &UdpSocket) -> Result { + let mut resp = [0; SpMessage::MAX_SIZE]; + + let (n, peer) = socket + .recv_from(&mut resp[..]) + .with_context(|| format!("failed to receive response"))?; + let resp = &resp[..n]; + trace!(log, "received packet"; "data" => ?resp); + + let (message, _leftover) = gateway_messages::deserialize::(resp) + .with_context(|| { + format!("failed to deserialize response from {peer}") + })?; + debug!(log, "received response"; "response" => ?message); + + if message.version != version::V1 { + bail!( + "incorrect message versiom from {peer}: {} (expected {})", + message.version, + version::V1 + ); + } + + Ok(message) +} From 43d2a061ce41b478468f9f4c55ef448875856040 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 1 Aug 2022 10:27:56 -0400 Subject: [PATCH 2/3] faux-mgs: Add usart-attach subcommand --- Cargo.lock | 13 ++ gateway/examples/config.toml | 2 +- gateway/faux-mgs/Cargo.toml | 4 + gateway/faux-mgs/src/main.rs | 52 ++++-- gateway/faux-mgs/src/usart.rs | 317 ++++++++++++++++++++++++++++++++++ 5 files changed, 371 insertions(+), 17 deletions(-) create mode 100644 gateway/faux-mgs/src/usart.rs diff --git a/Cargo.lock b/Cargo.lock index 335fc95d76..d4c6b1a821 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1545,10 +1545,14 @@ version = "0.1.0" dependencies = [ "anyhow", "clap 3.2.15", + "crossbeam-channel", "gateway-messages", + "mio", "slog", "slog-async", "slog-term", + "termios", + "thiserror", ] [[package]] @@ -5766,6 +5770,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termios" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "411c5bf740737c7918b8b1fe232dca4dc9f8e754b8ad5e20966814001ed0ac6b" +dependencies = [ + "libc", +] + [[package]] name = "termtree" version = "0.2.4" diff --git a/gateway/examples/config.toml b/gateway/examples/config.toml index 768c85adb1..321455c321 100644 --- a/gateway/examples/config.toml +++ b/gateway/examples/config.toml @@ -54,7 +54,7 @@ switch1 = ["switch", 1] [switch.port.1] data_link_addr = "[::]:33201" -multicast_addr = "[ff15:0:1de::1]:33310" +multicast_addr = "[ff15:0:1de::1%2]:11111" [switch.port.1.location] switch0 = ["sled", 0] switch1 = ["sled", 0] diff --git a/gateway/faux-mgs/Cargo.toml b/gateway/faux-mgs/Cargo.toml index 9471bfbe49..846de82778 100644 --- a/gateway/faux-mgs/Cargo.toml +++ b/gateway/faux-mgs/Cargo.toml @@ -7,8 +7,12 @@ license = "MPL-2.0" [dependencies] anyhow = "1.0" clap = { version = "3.2", features = ["derive"] } +crossbeam-channel = "0.5" +mio = "0.8" slog = { version = "2.7", features = ["max_level_trace", "release_max_level_debug"] } slog-async = "2.6" slog-term = "2.9" +termios = "0.3" +thiserror = "1.0" gateway-messages = { path = "../../gateway-messages", features = ["std"] } diff --git a/gateway/faux-mgs/src/main.rs b/gateway/faux-mgs/src/main.rs index 0b7ade591b..ac89d36b43 100644 --- a/gateway/faux-mgs/src/main.rs +++ b/gateway/faux-mgs/src/main.rs @@ -24,12 +24,15 @@ use slog::warn; use slog::Drain; use slog::Level; use slog::Logger; +use std::io; use std::net::SocketAddrV6; use std::net::UdpSocket; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; use std::time::Duration; +mod usart; + /// Command line program that can send MGS messages to a single SP. #[derive(Parser, Debug)] struct Args { @@ -67,6 +70,13 @@ enum Commands { /// Ask SP for its current state. State, + + /// Attach to the SP's USART. + UsartAttach { + /// Put the local terminal in raw mode. + #[clap(long)] + raw: bool, + }, } fn main() -> Result<()> { @@ -88,6 +98,9 @@ fn main() -> Result<()> { let request_kind = match args.command { Commands::Discover => RequestKind::Discover, Commands::State => RequestKind::SpState, + Commands::UsartAttach { raw } => { + return usart::run(log, socket, args.sp, raw); + } }; let response = request_response(&log, &socket, args.sp, request_kind)?; @@ -151,28 +164,35 @@ fn send_request( Ok(request_id) } -fn recv_sp_message(log: &Logger, socket: &UdpSocket) -> Result { +#[derive(Debug, thiserror::Error)] +enum RecvError { + #[error(transparent)] + Io(#[from] io::Error), + #[error("failed to deserialize response: {0}")] + Deserialize(#[from] gateway_messages::HubpackError), + #[error("incorrect message version (expected {expected}, got {got})")] + IncorrectVersion { expected: u32, got: u32 }, +} + +fn recv_sp_message( + log: &Logger, + socket: &UdpSocket, +) -> Result { let mut resp = [0; SpMessage::MAX_SIZE]; - let (n, peer) = socket - .recv_from(&mut resp[..]) - .with_context(|| format!("failed to receive response"))?; + let (n, _peer) = socket.recv_from(&mut resp[..])?; let resp = &resp[..n]; trace!(log, "received packet"; "data" => ?resp); - let (message, _leftover) = gateway_messages::deserialize::(resp) - .with_context(|| { - format!("failed to deserialize response from {peer}") - })?; + let (message, _) = gateway_messages::deserialize::(resp)?; debug!(log, "received response"; "response" => ?message); - if message.version != version::V1 { - bail!( - "incorrect message versiom from {peer}: {} (expected {})", - message.version, - version::V1 - ); + if message.version == version::V1 { + Ok(message) + } else { + Err(RecvError::IncorrectVersion { + expected: version::V1, + got: message.version, + }) } - - Ok(message) } diff --git a/gateway/faux-mgs/src/usart.rs b/gateway/faux-mgs/src/usart.rs new file mode 100644 index 0000000000..93e70a961a --- /dev/null +++ b/gateway/faux-mgs/src/usart.rs @@ -0,0 +1,317 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2022 Oxide Computer Company + +use anyhow::Context; +use anyhow::Result; +use crossbeam_channel::select; +use crossbeam_channel::Sender; +use gateway_messages::sp_impl::SerialConsolePacketizer; +use gateway_messages::RequestKind; +use gateway_messages::ResponseKind; +use gateway_messages::SpComponent; +use gateway_messages::SpMessage; +use gateway_messages::SpMessageKind; +use mio::unix::SourceFd; +use mio::Events; +use mio::Interest; +use mio::Poll; +use mio::Token; +use slog::debug; +use slog::error; +use slog::trace; +use slog::warn; +use slog::Logger; +use std::io; +use std::io::Write; +use std::net::SocketAddrV6; +use std::net::UdpSocket; +use std::os::unix::prelude::AsRawFd; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use std::time::Instant; +use termios::Termios; + +use crate::RecvError; + +const CTRL_A: u8 = b'\x01'; +const CTRL_C: u8 = b'\x03'; + +pub(crate) fn run( + log: Logger, + socket: UdpSocket, + sp: SocketAddrV6, + raw: bool, +) -> Result<()> { + // Put terminal in raw mode, if requested, with a guard to restore it. + let _guard = if raw { + let stdout = io::stdout().as_raw_fd(); + let mut ios = Termios::from_fd(stdout) + .with_context(|| "could not get termios for stdout")?; + let orig_ios = ios; + termios::cfmakeraw(&mut ios); + termios::tcsetattr(stdout, termios::TCSAFLUSH, &ios) + .with_context(|| "failed to set termios on stdout")?; + Some(UnrawTermiosGuard { stdout, ios: orig_ios }) + } else { + None + }; + + let socket = Arc::new(socket); + let rx_sp_msg = { + let (tx, rx) = crossbeam_channel::bounded(16); + let log = log.clone(); + let socket = Arc::clone(&socket); + thread::spawn(move || recv_task(log, socket, tx)); + rx + }; + let rx_stdin = { + let (tx, rx) = crossbeam_channel::bounded(16); + let log = log.clone(); + thread::spawn(move || stdin_task(log, raw, tx)); + rx + }; + + let mut packetizer = + SerialConsolePacketizer::new(SpComponent::try_from("sp3").unwrap()); + loop { + select! { + recv(rx_sp_msg) -> msg => { + // recv_task() runs as long as we're holding `rx_sp_msg`, so we + // can unwrap here. + handle_sp_message(&log, msg.unwrap(), None); + } + + recv(rx_stdin) -> result => { + let data = match result { + Ok(line) => line, + Err(_) => break, + }; + + for chunk in packetizer.packetize(&data) { + let request_id = crate::send_request( + &log, + &socket, + sp, + RequestKind::SerialConsoleWrite(chunk), + )?; + + // Don't read any more from our stdin until we've gotten + // acks that this line as been received. + for msg in rx_sp_msg.iter() { + if handle_sp_message(&log, msg, Some(request_id)) { + break; + } + } + } + } + } + } + + Ok(()) +} + +struct UnrawTermiosGuard { + stdout: i32, + ios: Termios, +} + +impl Drop for UnrawTermiosGuard { + fn drop(&mut self) { + termios::tcsetattr(self.stdout, termios::TCSAFLUSH, &self.ios).unwrap(); + } +} + +/// If `request_id` is `Some(n)`, returns `true` if `msg` contains a serial +/// console ack of that request id. Returns `false` in all other cases. +fn handle_sp_message( + log: &Logger, + msg: SpMessage, + request_id: Option, +) -> bool { + match msg.kind { + SpMessageKind::Response { request_id: response_id, result } => { + match result { + Ok(ResponseKind::SerialConsoleWriteAck) => { + if Some(response_id) == request_id { + debug!(log, "received expected ack"); + return true; + } else { + debug!(log, "received unexpected ack"); + } + } + Ok(other) => { + debug!( + log, "ignoring unexpected message"; + "message" => ?other, + ); + } + Err(err) => { + warn!(log, "received error response"; "err" => %err); + } + } + } + SpMessageKind::SerialConsole(chunk) => { + trace!(log, "writing {chunk:?} data to stdout"); + let mut stdout = io::stdout().lock(); + stdout.write_all(&chunk.data[..usize::from(chunk.len)]).unwrap(); + stdout.flush().unwrap(); + } + } + + false +} + +fn stdin_task(log: Logger, raw: bool, tx: Sender>) { + const BUFFER_DELAY: Duration = Duration::from_millis(1000); + + let mut stdin = io::stdin().lock(); + let stdin_fd = stdin.as_raw_fd(); + + let mut poll = Poll::new().expect("could not create mio Poll"); + let mut events = Events::with_capacity(16); + let mut source = SourceFd(&stdin_fd); + + const STDIN: Token = Token(0); + poll.registry() + .register(&mut source, STDIN, Interest::READABLE) + .expect("could not register readable interest for stdin"); + + let mut flush_deadline: Option = None; + let mut buf = InOutBuf::new(raw); + loop { + let mut timeout = + flush_deadline.map(|dl| dl.duration_since(Instant::now())); + if timeout == Some(Duration::ZERO) { + if tx.send(buf.outbuf.clone()).is_err() { + return; + } + timeout = None; + flush_deadline = None; + buf.outbuf.clear(); + } + poll.poll(&mut events, timeout).expect("poll failed"); + + for event in events.iter() { + match event.token() { + STDIN => match buf.read(&log, &mut stdin) { + Ok(false) => return, + Ok(true) => { + if flush_deadline.is_none() { + flush_deadline = + Some(Instant::now() + BUFFER_DELAY); + } + } + Err(err) if err.kind() == io::ErrorKind::WouldBlock => (), + Err(err) => { + error!(log, "error reading from stdin"; "err" => %err); + return; + } + }, + _ => unreachable!(), + } + } + } +} + +struct InOutBuf { + inbuf: Vec, + outbuf: Vec, + term_raw: bool, + next_raw: bool, +} + +impl InOutBuf { + fn new(term_raw: bool) -> Self { + Self { + inbuf: vec![0; 4096], + outbuf: Vec::with_capacity(4096), + term_raw, + next_raw: false, + } + } + + // On success, returns `Ok(true)` if at least one byte was read, or + // `Ok(false)` if `reader` returned `Ok(0)`. Read bytes will be stored into + // `outbuf`, possibly modified if we're in raw mode (allowing for Ctrl-A to + // be used as a prefix). + fn read( + &mut self, + log: &Logger, + reader: &mut R, + ) -> io::Result { + let n = reader.read(self.inbuf.as_mut_slice())?; + trace!(log, "read {n} bytes from stdin"); + if n == 0 { + return Ok(false); + } + + if !self.term_raw { + self.outbuf.extend_from_slice(&self.inbuf[..n]); + return Ok(true); + } + + // Put bytes from inbuf to outbuf, but don't send Ctrl-A unless + // next_raw is true. + for &c in &self.inbuf[..n] { + match c { + // Ctrl-A means send next one raw + CTRL_A => { + if self.next_raw { + // Ctrl-A Ctrl-A should be sent as Ctrl-A + self.outbuf.push(c); + self.next_raw = false; + } else { + self.next_raw = true; + } + } + CTRL_C => { + if !self.next_raw { + // Exit on non-raw Ctrl-C + return Ok(false); + } else { + // Otherwise send Ctrl-C + self.outbuf.push(c); + self.next_raw = false; + } + } + _ => { + self.outbuf.push(c); + self.next_raw = false; + } + } + } + + Ok(true) + } +} + +fn recv_task(log: Logger, socket: Arc, tx: Sender) { + loop { + match crate::recv_sp_message(&log, &socket) { + Ok(message) => { + if tx.send(message).is_err() { + break; + } + } + Err(err) => { + match err { + // ignore "would block" errors, which are what we see when + // the recv times out. we're not necessarily expecting a + // timely response here, so that's not worth logging. + RecvError::Io(err) + if err.kind() == io::ErrorKind::WouldBlock => + { + () + } + other => { + error!(log, "recv error"; "err" => %other); + } + } + } + } + } +} From 5a75fa6e80e49270d75449f50e35242215fdc29f Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 1 Aug 2022 10:37:43 -0400 Subject: [PATCH 3/3] faux-mgs: Add basic README --- Cargo.lock | 2 +- gateway/faux-mgs/README.adoc | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 gateway/faux-mgs/README.adoc diff --git a/Cargo.lock b/Cargo.lock index d4c6b1a821..7bcc5d5254 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1544,7 +1544,7 @@ name = "faux-mgs" version = "0.1.0" dependencies = [ "anyhow", - "clap 3.2.15", + "clap 3.2.16", "crossbeam-channel", "gateway-messages", "mio", diff --git a/gateway/faux-mgs/README.adoc b/gateway/faux-mgs/README.adoc new file mode 100644 index 0000000000..2c2f75f4ab --- /dev/null +++ b/gateway/faux-mgs/README.adoc @@ -0,0 +1,28 @@ += Faux MGS + +This directory contains a command-line application that acts like MGS. It is +intended to be a development tool for working with the SP. + +== Examples + +Sending the MGS discovery packet: + +``` +% cargo run --bin faux-mgs -- --sp '[fe80::c1d:7dff:feef:9f1d%2]:11111' discover +Ok(Discover(DiscoverResponse { sp_port: One })) +``` + +Asking an SP for its state (as of this writing, the state only contains the SP's +serial number): + +``` +% cargo run --bin faux-mgs -- --sp '[fe80::c1d:7dff:feef:9f1d%2]:11111' state +Ok(SpState(SpState { serial_number: [0, 68, 0, 26, 51, 48, 81, 17, 48, 51, 56, 55, 0, 0, 0, 0] })) +``` + +Attaching to the SP's usart (this forwards stdin to the SP, and prints anything +the SP sends to stdout): + +``` +% cargo run --bin faux-mgs -- --sp '[fe80::c1d:7dff:feef:9f1d%2]:11111' usart-attach +```