diff --git a/src/connection.rs b/src/connection.rs index 443a052..3003e62 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,4 +1,6 @@ +use http::{HeaderMap, HeaderValue}; use snafu::{ResultExt, Snafu}; +use std::convert::TryFrom; use std::net::SocketAddr; use std::sync::mpsc::Sender; use std::sync::Arc; @@ -124,6 +126,58 @@ impl Streams } } +/// When available, identifies the thread in the calling or client process. +/// The client should reports its process id with the proxide-client-process-id" header and +/// the thread id with the "proxide-client-thread-id" header. +/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide +/// and the client are running on the same host. +pub struct ClientThreadId +{ + process_id: u32, + thread_id: i64, +} + +impl ClientThreadId +{ + pub fn process_id(&self) -> u32 + { + self.process_id + } + + pub fn thread_id(&self) -> i64 + { + self.thread_id + } +} + +impl TryFrom<&MessageData> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &MessageData) -> std::result::Result + { + ClientThreadId::try_from(&value.headers) + } +} + +impl TryFrom<&HeaderMap> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &HeaderMap) -> std::result::Result + { + let process_id: Option = number_or_none(&value.get("proxide-client-process-id")); + let thread_id: Option = number_or_none(&value.get("proxide-client-thread-id")); + match (process_id, thread_id) { + (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { + process_id, + thread_id, + }), + _ => Err(()), + } + } +} + /// Handles a single client connection. /// /// The connection handling is split into multiple functions, but the functions are chained in a @@ -311,3 +365,17 @@ where log::info!("Exit"); }); } + +fn number_or_none(header: &Option<&HeaderValue>) -> Option +where + N: std::str::FromStr, +{ + if let Some(value) = header { + value + .to_str() + .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) + .unwrap_or(None) + } else { + None + } +} diff --git a/src/connection/http2.rs b/src/connection/http2.rs index 512b85b..c5a1c5f 100644 --- a/src/connection/http2.rs +++ b/src/connection/http2.rs @@ -8,10 +8,14 @@ use h2::{ use http::{HeaderMap, Request, Response}; use log::error; use snafu::ResultExt; +use std::convert::TryFrom; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::mpsc::Sender; +use std::task::{Context, Poll}; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::task::{JoinHandle, JoinSet}; use uuid::Uuid; use super::*; @@ -141,6 +145,12 @@ pub struct ProxyRequest client_response: SendResponse, server_request: SendStream, server_response: ResponseFuture, + request_processor: ProcessingFuture, +} + +struct ProcessingFuture +{ + inner: JoinHandle<()>, } impl ProxyRequest @@ -191,6 +201,10 @@ impl ProxyRequest })) .unwrap(); + // Request processor supports asynchronous message processing while the proxide is busy proxying data between + // the client and the server. + let request_processor = ProcessingFuture::spawn(uuid, &client_head, ui); + let server_request = Request::from_parts(client_head, ()); // Set up a server request. @@ -208,6 +222,7 @@ impl ProxyRequest client_response, server_request, server_response, + request_processor, }) } @@ -265,6 +280,7 @@ impl ProxyRequest let mut client_response = self.client_response; let server_response = self.server_response; let connection_uuid = self.connection_uuid; + let request_processor = self.request_processor; let ui_temp = ui.clone(); let response_future = async move { let ui = ui_temp; @@ -293,6 +309,11 @@ impl ProxyRequest scenario: "sending response", })?; + // Ensure the request processor has finished before we send the response to the client. + // Callstack capturing process inside the request processor may capture incorrect data if + // the client is given the final answer from the server as it no longer has to wait for the response. + request_processor.await; + // The server might have sent all the details in the headers, at which point there is // no body present. Check for this scenario here. if response_body.is_end_stream() { @@ -440,3 +461,64 @@ fn is_fatal_error(r: &Result) -> bool }, } } + +impl ProcessingFuture +{ + fn spawn(uuid: Uuid, client_head: &http::request::Parts, ui: &Sender) -> Self + { + let mut tasks: JoinSet>> = + JoinSet::new(); + + // Task which attempts to capture client's callstack. + if let Ok(thread_id) = crate::connection::ClientThreadId::try_from(&client_head.headers) { + let ui_clone = ui.clone(); + tasks.spawn(ProcessingFuture::capture_client_callstack( + uuid, thread_id, ui_clone, + )); + } + + Self { + inner: tokio::spawn(async move { + while let Some(result) = tasks.join_next().await { + match result { + Ok(_) => {} + Err(e) => { + // TODO: Send the error to UI. + eprintln!("{}", e); + error!("{}", e); + } + } + } + }), + } + } + + async fn capture_client_callstack( + uuid: Uuid, + _client_thread_id: ClientThreadId, + ui: Sender, + ) -> std::result::Result<(), Box> + { + // TODO: Try to capture the callstack + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Unsupported, + }, + ))?; + Ok(()) + } +} + +impl Future for ProcessingFuture +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll + { + match Pin::new(&mut self.inner).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/main.rs b/src/main.rs index 5f37324..c373346 100644 --- a/src/main.rs +++ b/src/main.rs @@ -419,6 +419,7 @@ mod test use log::SetLoggerError; use serial_test::serial; use std::io::{ErrorKind, Write}; + use std::ops::Add; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -426,6 +427,7 @@ mod test use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::oneshot; + use tokio::time::Instant; use crate::session::events::SessionEvent; use crate::ConnectionOptions; @@ -533,6 +535,68 @@ mod test .expect("Waiting for proxide to stop failed."); } + #[tokio::test] + #[serial] + async fn proxide_receives_client_callstack_ui_message() + { + // Logging must be enabled to detect errors inside proxide. + // Failure to monitor logs may cause the test to hang as errors that stop processing get silently ignored. + let mut error_monitor = get_error_monitor().expect("Acquiring error monitor failed."); + + // Server + let server = GrpcServer::start() + .await + .expect("Starting test server failed."); + + // Proxide + let options = get_proxide_options(&server); + let (abort_tx, abort_rx) = tokio::sync::oneshot::channel::<()>(); + let (ui_tx, ui_rx_std) = std::sync::mpsc::channel(); + let proxide_port = u16::from_str(&options.listen_port.to_string()).unwrap(); + let proxide = tokio::spawn(crate::launch_proxide(options, abort_rx, ui_tx)); + + // Message generator and tester. + let tester = grpc_tester::GrpcTester::with_proxide( + server, + proxide_port, + grpc_tester::Args { + period: std::time::Duration::from_secs(0), + tasks: 1, + }, + ) + .await + .expect("Starting tester failed."); + let mut message_rx = async_from_sync(ui_rx_std); + + // UI channel should be constantly receiving client callstack events. + // The generator includes the process id and the thread id in the messages it sends. + let mut client_callstack_received = false; + let timeout_at = Instant::now().add(Duration::from_secs(30)); + while let Some(message) = tokio::select! { + result = message_rx.recv() => result, + _t = tokio::time::sleep( Duration::from_secs( 30 ) ) => panic!( "Timeout" ), + error = error_monitor.recv() => panic!( "{:?}", error ), + } { + if let SessionEvent::ClientCallstackProcessed(..) = message { + client_callstack_received = true; + break; + } else if Instant::now() > timeout_at { + panic!("Timeout") + } + } + + // Ensure the ui channel was not closed prematurely. + assert!(client_callstack_received); + + let mut server = tester.stop_generator().expect("Stopping generator failed."); + abort_tx.send(()).expect("Stopping proxide failed."); + proxide + .await + .expect("Waiting for proxide to stop failed.") + .expect("Waiting for proxide to stop failed."); + server.stop().expect("Stopping server failed"); + } + /// Gets options for launching proxide. fn get_proxide_options(server: &GrpcServer) -> Arc { diff --git a/src/session.rs b/src/session.rs index 83f3ce1..971a2ed 100644 --- a/src/session.rs +++ b/src/session.rs @@ -56,6 +56,7 @@ pub struct RequestData pub start_timestamp: DateTime, pub end_timestamp: Option>, + pub client_callstack: Option, pub status: Status, } @@ -126,6 +127,13 @@ impl MessageData } } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum ClientCallstack +{ + /// Proxide does not support callstack capture on the current platform/operating system. + Unsupported, +} + impl IndexedVec { pub fn push(&mut self, uuid: Uuid, item: T) diff --git a/src/session/events.rs b/src/session/events.rs index 2334d04..7a77242 100644 --- a/src/session/events.rs +++ b/src/session/events.rs @@ -14,6 +14,7 @@ pub enum SessionEvent MessageDone(MessageDoneEvent), RequestDone(RequestDoneEvent), ConnectionDone(ConnectionDoneEvent), + ClientCallstackProcessed(ClientCallstackProcessedEvent), } #[derive(Serialize, Deserialize, Debug)] @@ -84,6 +85,13 @@ pub struct ConnectionDoneEvent pub timestamp: SystemTime, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ClientCallstackProcessedEvent +{ + pub uuid: Uuid, + pub callstack: ClientCallstack, +} + pub enum SessionChange { NewConnection @@ -110,6 +118,10 @@ pub enum SessionChange { connection: Uuid }, + Callstack + { + request: Uuid + }, } impl Session @@ -124,6 +136,7 @@ impl Session SessionEvent::MessageDone(e) => self.on_message_done(e), SessionEvent::RequestDone(e) => self.on_request_done(e), SessionEvent::ConnectionDone(e) => self.on_connection_done(e), + SessionEvent::ClientCallstackProcessed(e) => self.on_client_callstack_processed(e), } } @@ -154,6 +167,7 @@ impl Session status: Status::InProgress, start_timestamp: e.timestamp.into(), end_timestamp: None, + client_callstack: None, }, request_msg: MessageData::new(RequestPart::Request) .with_headers(e.headers) @@ -247,4 +261,18 @@ impl Session vec![] } } + + fn on_client_callstack_processed( + &mut self, + e: ClientCallstackProcessedEvent, + ) -> Vec + { + let request = self.requests.get_mut_by_uuid(e.uuid); + if let Some(request) = request { + request.request_data.client_callstack = Some(e.callstack); + vec![SessionChange::Callstack { request: e.uuid }] + } else { + vec![] + } + } } diff --git a/src/ui.rs b/src/ui.rs index 9f85a50..24ddd55 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -92,7 +92,7 @@ pub fn main( state.draw(&mut terminal).context(IoError {})?; let mut redraw_pending = false; loop { - let e = ui_rx.recv().expect( "Receiving UI events failed."); + let e = ui_rx.recv().expect("Receiving UI events failed."); if let UiEvent::Redraw = e { redraw_pending = false; state.draw(&mut terminal).context(IoError {})?; diff --git a/src/ui/sub_views/details_pane.rs b/src/ui/sub_views/details_pane.rs index 45bc0d7..d0c0a75 100644 --- a/src/ui/sub_views/details_pane.rs +++ b/src/ui/sub_views/details_pane.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::ui::prelude::*; use crate::session::{EncodedRequest, RequestPart}; -use crate::ui::views::{CallstackView, ClientThreadId, MessageView}; +use crate::ui::views::{CallstackView, MessageView}; #[derive(Clone, Default)] pub struct DetailsPane; @@ -63,15 +63,16 @@ impl DetailsPane c.x -= 1; c.width += 2; c.height += 1; - let vertical_chunks: Vec = if ClientThreadId::try_from(&request.request_msg).is_ok() { - Layout::default() - .direction(Direction::Vertical) - .margin(0) - .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) - .split(block.inner(c)) - } else { - Vec::from([block.inner(c)]) - }; + let vertical_chunks: Vec = + if crate::connection::ClientThreadId::try_from(&request.request_msg).is_ok() { + Layout::default() + .direction(Direction::Vertical) + .margin(0) + .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) + .split(block.inner(c)) + } else { + Vec::from([block.inner(c)]) + }; let req_resp_chunks = Layout::default() .direction(Direction::Horizontal) .margin(0) @@ -153,7 +154,7 @@ impl DetailsPane fn create_callstack_view(&mut self, req: &EncodedRequest) -> Option> { - if ClientThreadId::try_from(&req.request_msg).is_ok() { + if crate::connection::ClientThreadId::try_from(&req.request_msg).is_ok() { Some(HandleResult::PushView(Box::new(CallstackView { request: req.request_data.uuid, offset: 0, diff --git a/src/ui/views.rs b/src/ui/views.rs index f2c76c2..5f88b49 100644 --- a/src/ui/views.rs +++ b/src/ui/views.rs @@ -7,7 +7,7 @@ mod message_view; pub use message_view::MessageView; mod callstack_view; -pub use callstack_view::{CallstackView, ClientThreadId}; +pub use callstack_view::CallstackView; pub trait View { diff --git a/src/ui/views/callstack_view.rs b/src/ui/views/callstack_view.rs index 056c03d..ed843a9 100644 --- a/src/ui/views/callstack_view.rs +++ b/src/ui/views/callstack_view.rs @@ -1,23 +1,10 @@ use super::prelude::*; +use crate::session::ClientCallstack; use crossterm::event::KeyCode; -use http::HeaderValue; use std::convert::TryFrom; use tui::widgets::{Paragraph, Wrap}; use uuid::Uuid; -use crate::session::MessageData; - -/// When available, identifies the thread in the calling or client process. -/// The client should reports its process id with the proxide-client-process-id" header and -/// the thread id with the "proxide-client-thread-id" header. -/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide -/// and the client are running on the same host. -pub struct ClientThreadId -{ - process_id: u32, - thread_id: i64, -} - pub struct CallstackView { pub request: Uuid, @@ -35,17 +22,25 @@ impl View for CallstackView None => return, }; - let client_thread = match ClientThreadId::try_from(&request.request_msg) { + let client_thread = match crate::connection::ClientThreadId::try_from(&request.request_msg) + { Ok(thread_id) => thread_id, Err(_) => return, }; let title = format!( "Client call[s]tack, Process: {}, Thread: {}", - client_thread.process_id, client_thread.thread_id + client_thread.process_id(), + client_thread.thread_id() ); + let message = match request.request_data.client_callstack { + Some(ClientCallstack::Unsupported) => { + "Callstack unavailable:\n* Unsupported operating system." + } + None => ".. (Pending)", + }; let block = create_block(&title); - let request_data = Paragraph::new("Unimplemented.") + let request_data = Paragraph::new(message) .block(block) .wrap(Wrap { trim: false }) .scroll((self.offset, 0)); @@ -79,6 +74,7 @@ impl View for CallstackView SessionChange::Request { .. } => false, SessionChange::NewMessage { .. } => false, SessionChange::Message { .. } => false, + SessionChange::Callstack { request } => *request == self.request, } } @@ -90,36 +86,3 @@ impl View for CallstackView ) } } - -impl TryFrom<&MessageData> for ClientThreadId -{ - type Error = (); - - fn try_from(value: &MessageData) -> Result - { - let process_id: Option = - number_or_none(&value.headers.get("proxide-client-process-id")); - let thread_id: Option = number_or_none(&value.headers.get("proxide-client-thread-id")); - match (process_id, thread_id) { - (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { - process_id, - thread_id, - }), - _ => Err(()), - } - } -} - -fn number_or_none(header: &Option<&HeaderValue>) -> Option -where - N: std::str::FromStr, -{ - if let Some(value) = header { - value - .to_str() - .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) - .unwrap_or(None) - } else { - None - } -} diff --git a/src/ui/views/main_view.rs b/src/ui/views/main_view.rs index 7539f89..870952b 100644 --- a/src/ui/views/main_view.rs +++ b/src/ui/views/main_view.rs @@ -132,6 +132,7 @@ impl View for MainView .selected(&ctx.data.requests) .map(|r| r.request_data.uuid == *req) .unwrap_or(false), + SessionChange::Callstack { .. } => false, } } diff --git a/src/ui/views/message_view.rs b/src/ui/views/message_view.rs index 62070f0..e528147 100644 --- a/src/ui/views/message_view.rs +++ b/src/ui/views/message_view.rs @@ -155,6 +155,7 @@ impl View for MessageView | SessionChange::Message { request, part } => { *part == self.part && *request == self.request } + SessionChange::Callstack { .. } => false, } }