From 9e2de1c99b0dc2160a1070d743bcc19e3b07fdaf Mon Sep 17 00:00:00 2001 From: fluxie Date: Sat, 23 Dec 2023 17:00:11 +0200 Subject: [PATCH] Implemented callstack capture task Currently the task in htt2p.rs only reports that capturing the callstack is not supported on any operating system. But now with the asynchronously executed capture task it will be possible to actually implement the capturing. --- src/connection.rs | 68 ++++++++++++++++++++++++++ src/connection/http2.rs | 82 ++++++++++++++++++++++++++++++++ src/main.rs | 59 +++++++++++++++++++++++ src/session.rs | 8 ++++ src/session/events.rs | 28 +++++++++++ src/ui.rs | 2 +- src/ui/sub_views/details_pane.rs | 23 ++++----- src/ui/views.rs | 2 +- src/ui/views/callstack_view.rs | 63 +++++------------------- src/ui/views/main_view.rs | 1 + src/ui/views/message_view.rs | 1 + 11 files changed, 274 insertions(+), 63 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 443a052..3003e62 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,4 +1,6 @@ +use http::{HeaderMap, HeaderValue}; use snafu::{ResultExt, Snafu}; +use std::convert::TryFrom; use std::net::SocketAddr; use std::sync::mpsc::Sender; use std::sync::Arc; @@ -124,6 +126,58 @@ impl Streams } } +/// When available, identifies the thread in the calling or client process. +/// The client should reports its process id with the proxide-client-process-id" header and +/// the thread id with the "proxide-client-thread-id" header. +/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide +/// and the client are running on the same host. +pub struct ClientThreadId +{ + process_id: u32, + thread_id: i64, +} + +impl ClientThreadId +{ + pub fn process_id(&self) -> u32 + { + self.process_id + } + + pub fn thread_id(&self) -> i64 + { + self.thread_id + } +} + +impl TryFrom<&MessageData> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &MessageData) -> std::result::Result + { + ClientThreadId::try_from(&value.headers) + } +} + +impl TryFrom<&HeaderMap> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &HeaderMap) -> std::result::Result + { + let process_id: Option = number_or_none(&value.get("proxide-client-process-id")); + let thread_id: Option = number_or_none(&value.get("proxide-client-thread-id")); + match (process_id, thread_id) { + (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { + process_id, + thread_id, + }), + _ => Err(()), + } + } +} + /// Handles a single client connection. /// /// The connection handling is split into multiple functions, but the functions are chained in a @@ -311,3 +365,17 @@ where log::info!("Exit"); }); } + +fn number_or_none(header: &Option<&HeaderValue>) -> Option +where + N: std::str::FromStr, +{ + if let Some(value) = header { + value + .to_str() + .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) + .unwrap_or(None) + } else { + None + } +} diff --git a/src/connection/http2.rs b/src/connection/http2.rs index 512b85b..c5a1c5f 100644 --- a/src/connection/http2.rs +++ b/src/connection/http2.rs @@ -8,10 +8,14 @@ use h2::{ use http::{HeaderMap, Request, Response}; use log::error; use snafu::ResultExt; +use std::convert::TryFrom; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::mpsc::Sender; +use std::task::{Context, Poll}; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::task::{JoinHandle, JoinSet}; use uuid::Uuid; use super::*; @@ -141,6 +145,12 @@ pub struct ProxyRequest client_response: SendResponse, server_request: SendStream, server_response: ResponseFuture, + request_processor: ProcessingFuture, +} + +struct ProcessingFuture +{ + inner: JoinHandle<()>, } impl ProxyRequest @@ -191,6 +201,10 @@ impl ProxyRequest })) .unwrap(); + // Request processor supports asynchronous message processing while the proxide is busy proxying data between + // the client and the server. + let request_processor = ProcessingFuture::spawn(uuid, &client_head, ui); + let server_request = Request::from_parts(client_head, ()); // Set up a server request. @@ -208,6 +222,7 @@ impl ProxyRequest client_response, server_request, server_response, + request_processor, }) } @@ -265,6 +280,7 @@ impl ProxyRequest let mut client_response = self.client_response; let server_response = self.server_response; let connection_uuid = self.connection_uuid; + let request_processor = self.request_processor; let ui_temp = ui.clone(); let response_future = async move { let ui = ui_temp; @@ -293,6 +309,11 @@ impl ProxyRequest scenario: "sending response", })?; + // Ensure the request processor has finished before we send the response to the client. + // Callstack capturing process inside the request processor may capture incorrect data if + // the client is given the final answer from the server as it no longer has to wait for the response. + request_processor.await; + // The server might have sent all the details in the headers, at which point there is // no body present. Check for this scenario here. if response_body.is_end_stream() { @@ -440,3 +461,64 @@ fn is_fatal_error(r: &Result) -> bool }, } } + +impl ProcessingFuture +{ + fn spawn(uuid: Uuid, client_head: &http::request::Parts, ui: &Sender) -> Self + { + let mut tasks: JoinSet>> = + JoinSet::new(); + + // Task which attempts to capture client's callstack. + if let Ok(thread_id) = crate::connection::ClientThreadId::try_from(&client_head.headers) { + let ui_clone = ui.clone(); + tasks.spawn(ProcessingFuture::capture_client_callstack( + uuid, thread_id, ui_clone, + )); + } + + Self { + inner: tokio::spawn(async move { + while let Some(result) = tasks.join_next().await { + match result { + Ok(_) => {} + Err(e) => { + // TODO: Send the error to UI. + eprintln!("{}", e); + error!("{}", e); + } + } + } + }), + } + } + + async fn capture_client_callstack( + uuid: Uuid, + _client_thread_id: ClientThreadId, + ui: Sender, + ) -> std::result::Result<(), Box> + { + // TODO: Try to capture the callstack + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Unsupported, + }, + ))?; + Ok(()) + } +} + +impl Future for ProcessingFuture +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll + { + match Pin::new(&mut self.inner).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/main.rs b/src/main.rs index 5f37324..239e806 100644 --- a/src/main.rs +++ b/src/main.rs @@ -419,6 +419,7 @@ mod test use log::SetLoggerError; use serial_test::serial; use std::io::{ErrorKind, Write}; + use std::ops::Add; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -426,6 +427,7 @@ mod test use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::oneshot; + use tokio::time::Instant; use crate::session::events::SessionEvent; use crate::ConnectionOptions; @@ -533,6 +535,63 @@ mod test .expect("Waiting for proxide to stop failed."); } + #[tokio::test] + #[serial] + async fn proxide_receives_client_callstack_ui_message() + { + // Logging must be enabled to detect errors inside proxide. + // Failure to monitor logs may cause the test to hang as errors that stop processing get silently ignored. + let mut error_monitor = get_error_monitor().expect("Acquiring error monitor failed."); + + // Server + let server = GrpcServer::start() + .await + .expect("Starting test server failed."); + + // Proxide + let options = get_proxide_options(&server); + let (abort_tx, abort_rx) = tokio::sync::oneshot::channel::<()>(); + let (ui_tx, ui_rx_std) = std::sync::mpsc::channel(); + let proxide_port = u16::from_str(&options.listen_port.to_string()).unwrap(); + let proxide = tokio::spawn(crate::launch_proxide(options, abort_rx, ui_tx)); + + // Message generator and tester. + let tester = grpc_tester::GrpcTester::with_proxide( + server, + proxide_port, + grpc_tester::Args { + period: std::time::Duration::from_secs(0), + tasks: 1, + }, + ) + .await + .expect("Starting tester failed."); + let mut message_rx = async_from_sync(ui_rx_std); + + // UI channel should be constantly receiving client callstack events. + // The generator includes the process id and the thread id in the messages it sends. + let timeout_at = Instant::now().add(Duration::from_secs(30)); + while let Some(message) = tokio::select! { + result = message_rx.recv() => result, + _t = tokio::time::sleep( Duration::from_secs( 30 ) ) => panic!( "Timeout" ), + error = error_monitor.recv() => panic!( "{:?}", error ), + } { + if let SessionEvent::ClientCallstackProcessed(..) = message { + break; + } else if Instant::now() > timeout_at { + panic!("Timeout") + } + } + + let mut server = tester.stop_generator().expect("Stopping generator failed."); + abort_tx.send(()).expect("Stopping proxide failed."); + proxide + .await + .expect("Waiting for proxide to stop failed.") + .expect("Waiting for proxide to stop failed."); + server.stop().expect("Stopping server failed"); + } + /// Gets options for launching proxide. fn get_proxide_options(server: &GrpcServer) -> Arc { diff --git a/src/session.rs b/src/session.rs index 83f3ce1..971a2ed 100644 --- a/src/session.rs +++ b/src/session.rs @@ -56,6 +56,7 @@ pub struct RequestData pub start_timestamp: DateTime, pub end_timestamp: Option>, + pub client_callstack: Option, pub status: Status, } @@ -126,6 +127,13 @@ impl MessageData } } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum ClientCallstack +{ + /// Proxide does not support callstack capture on the current platform/operating system. + Unsupported, +} + impl IndexedVec { pub fn push(&mut self, uuid: Uuid, item: T) diff --git a/src/session/events.rs b/src/session/events.rs index 2334d04..7a77242 100644 --- a/src/session/events.rs +++ b/src/session/events.rs @@ -14,6 +14,7 @@ pub enum SessionEvent MessageDone(MessageDoneEvent), RequestDone(RequestDoneEvent), ConnectionDone(ConnectionDoneEvent), + ClientCallstackProcessed(ClientCallstackProcessedEvent), } #[derive(Serialize, Deserialize, Debug)] @@ -84,6 +85,13 @@ pub struct ConnectionDoneEvent pub timestamp: SystemTime, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ClientCallstackProcessedEvent +{ + pub uuid: Uuid, + pub callstack: ClientCallstack, +} + pub enum SessionChange { NewConnection @@ -110,6 +118,10 @@ pub enum SessionChange { connection: Uuid }, + Callstack + { + request: Uuid + }, } impl Session @@ -124,6 +136,7 @@ impl Session SessionEvent::MessageDone(e) => self.on_message_done(e), SessionEvent::RequestDone(e) => self.on_request_done(e), SessionEvent::ConnectionDone(e) => self.on_connection_done(e), + SessionEvent::ClientCallstackProcessed(e) => self.on_client_callstack_processed(e), } } @@ -154,6 +167,7 @@ impl Session status: Status::InProgress, start_timestamp: e.timestamp.into(), end_timestamp: None, + client_callstack: None, }, request_msg: MessageData::new(RequestPart::Request) .with_headers(e.headers) @@ -247,4 +261,18 @@ impl Session vec![] } } + + fn on_client_callstack_processed( + &mut self, + e: ClientCallstackProcessedEvent, + ) -> Vec + { + let request = self.requests.get_mut_by_uuid(e.uuid); + if let Some(request) = request { + request.request_data.client_callstack = Some(e.callstack); + vec![SessionChange::Callstack { request: e.uuid }] + } else { + vec![] + } + } } diff --git a/src/ui.rs b/src/ui.rs index 9f85a50..24ddd55 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -92,7 +92,7 @@ pub fn main( state.draw(&mut terminal).context(IoError {})?; let mut redraw_pending = false; loop { - let e = ui_rx.recv().expect( "Receiving UI events failed."); + let e = ui_rx.recv().expect("Receiving UI events failed."); if let UiEvent::Redraw = e { redraw_pending = false; state.draw(&mut terminal).context(IoError {})?; diff --git a/src/ui/sub_views/details_pane.rs b/src/ui/sub_views/details_pane.rs index 45bc0d7..d0c0a75 100644 --- a/src/ui/sub_views/details_pane.rs +++ b/src/ui/sub_views/details_pane.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::ui::prelude::*; use crate::session::{EncodedRequest, RequestPart}; -use crate::ui::views::{CallstackView, ClientThreadId, MessageView}; +use crate::ui::views::{CallstackView, MessageView}; #[derive(Clone, Default)] pub struct DetailsPane; @@ -63,15 +63,16 @@ impl DetailsPane c.x -= 1; c.width += 2; c.height += 1; - let vertical_chunks: Vec = if ClientThreadId::try_from(&request.request_msg).is_ok() { - Layout::default() - .direction(Direction::Vertical) - .margin(0) - .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) - .split(block.inner(c)) - } else { - Vec::from([block.inner(c)]) - }; + let vertical_chunks: Vec = + if crate::connection::ClientThreadId::try_from(&request.request_msg).is_ok() { + Layout::default() + .direction(Direction::Vertical) + .margin(0) + .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) + .split(block.inner(c)) + } else { + Vec::from([block.inner(c)]) + }; let req_resp_chunks = Layout::default() .direction(Direction::Horizontal) .margin(0) @@ -153,7 +154,7 @@ impl DetailsPane fn create_callstack_view(&mut self, req: &EncodedRequest) -> Option> { - if ClientThreadId::try_from(&req.request_msg).is_ok() { + if crate::connection::ClientThreadId::try_from(&req.request_msg).is_ok() { Some(HandleResult::PushView(Box::new(CallstackView { request: req.request_data.uuid, offset: 0, diff --git a/src/ui/views.rs b/src/ui/views.rs index f2c76c2..5f88b49 100644 --- a/src/ui/views.rs +++ b/src/ui/views.rs @@ -7,7 +7,7 @@ mod message_view; pub use message_view::MessageView; mod callstack_view; -pub use callstack_view::{CallstackView, ClientThreadId}; +pub use callstack_view::CallstackView; pub trait View { diff --git a/src/ui/views/callstack_view.rs b/src/ui/views/callstack_view.rs index 056c03d..ed843a9 100644 --- a/src/ui/views/callstack_view.rs +++ b/src/ui/views/callstack_view.rs @@ -1,23 +1,10 @@ use super::prelude::*; +use crate::session::ClientCallstack; use crossterm::event::KeyCode; -use http::HeaderValue; use std::convert::TryFrom; use tui::widgets::{Paragraph, Wrap}; use uuid::Uuid; -use crate::session::MessageData; - -/// When available, identifies the thread in the calling or client process. -/// The client should reports its process id with the proxide-client-process-id" header and -/// the thread id with the "proxide-client-thread-id" header. -/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide -/// and the client are running on the same host. -pub struct ClientThreadId -{ - process_id: u32, - thread_id: i64, -} - pub struct CallstackView { pub request: Uuid, @@ -35,17 +22,25 @@ impl View for CallstackView None => return, }; - let client_thread = match ClientThreadId::try_from(&request.request_msg) { + let client_thread = match crate::connection::ClientThreadId::try_from(&request.request_msg) + { Ok(thread_id) => thread_id, Err(_) => return, }; let title = format!( "Client call[s]tack, Process: {}, Thread: {}", - client_thread.process_id, client_thread.thread_id + client_thread.process_id(), + client_thread.thread_id() ); + let message = match request.request_data.client_callstack { + Some(ClientCallstack::Unsupported) => { + "Callstack unavailable:\n* Unsupported operating system." + } + None => ".. (Pending)", + }; let block = create_block(&title); - let request_data = Paragraph::new("Unimplemented.") + let request_data = Paragraph::new(message) .block(block) .wrap(Wrap { trim: false }) .scroll((self.offset, 0)); @@ -79,6 +74,7 @@ impl View for CallstackView SessionChange::Request { .. } => false, SessionChange::NewMessage { .. } => false, SessionChange::Message { .. } => false, + SessionChange::Callstack { request } => *request == self.request, } } @@ -90,36 +86,3 @@ impl View for CallstackView ) } } - -impl TryFrom<&MessageData> for ClientThreadId -{ - type Error = (); - - fn try_from(value: &MessageData) -> Result - { - let process_id: Option = - number_or_none(&value.headers.get("proxide-client-process-id")); - let thread_id: Option = number_or_none(&value.headers.get("proxide-client-thread-id")); - match (process_id, thread_id) { - (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { - process_id, - thread_id, - }), - _ => Err(()), - } - } -} - -fn number_or_none(header: &Option<&HeaderValue>) -> Option -where - N: std::str::FromStr, -{ - if let Some(value) = header { - value - .to_str() - .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) - .unwrap_or(None) - } else { - None - } -} diff --git a/src/ui/views/main_view.rs b/src/ui/views/main_view.rs index 7539f89..870952b 100644 --- a/src/ui/views/main_view.rs +++ b/src/ui/views/main_view.rs @@ -132,6 +132,7 @@ impl View for MainView .selected(&ctx.data.requests) .map(|r| r.request_data.uuid == *req) .unwrap_or(false), + SessionChange::Callstack { .. } => false, } } diff --git a/src/ui/views/message_view.rs b/src/ui/views/message_view.rs index 62070f0..e528147 100644 --- a/src/ui/views/message_view.rs +++ b/src/ui/views/message_view.rs @@ -155,6 +155,7 @@ impl View for MessageView | SessionChange::Message { request, part } => { *part == self.part && *request == self.request } + SessionChange::Callstack { .. } => false, } }