Skip to content

Commit

Permalink
Implemented callstack capture task
Browse files Browse the repository at this point in the history
Currently the task in htt2p.rs only reports that capturing the callstack is not supported on any operating system. But now with the asynchronously executed capture task it will be possible to actually implement the capturing.
  • Loading branch information
Fluxie committed Dec 23, 2023
1 parent 77759c8 commit 9e2de1c
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 63 deletions.
68 changes: 68 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use http::{HeaderMap, HeaderValue};
use snafu::{ResultExt, Snafu};
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::mpsc::Sender;
use std::sync::Arc;
Expand Down Expand Up @@ -124,6 +126,58 @@ impl<TClient, TServer> Streams<TClient, TServer>
}
}

/// When available, identifies the thread in the calling or client process.
/// The client should reports its process id with the proxide-client-process-id" header and
/// the thread id with the "proxide-client-thread-id" header.
/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide
/// and the client are running on the same host.
pub struct ClientThreadId
{
process_id: u32,
thread_id: i64,
}

impl ClientThreadId
{
pub fn process_id(&self) -> u32
{
self.process_id
}

pub fn thread_id(&self) -> i64
{
self.thread_id
}
}

impl TryFrom<&MessageData> for ClientThreadId
{
type Error = ();

fn try_from(value: &MessageData) -> std::result::Result<Self, Self::Error>
{
ClientThreadId::try_from(&value.headers)
}
}

impl TryFrom<&HeaderMap> for ClientThreadId
{
type Error = ();

fn try_from(value: &HeaderMap) -> std::result::Result<Self, Self::Error>
{
let process_id: Option<u32> = number_or_none(&value.get("proxide-client-process-id"));
let thread_id: Option<i64> = number_or_none(&value.get("proxide-client-thread-id"));
match (process_id, thread_id) {
(Some(process_id), Some(thread_id)) => Ok(ClientThreadId {
process_id,
thread_id,
}),
_ => Err(()),
}
}
}

/// Handles a single client connection.
///
/// The connection handling is split into multiple functions, but the functions are chained in a
Expand Down Expand Up @@ -311,3 +365,17 @@ where
log::info!("Exit");
});
}

fn number_or_none<N>(header: &Option<&HeaderValue>) -> Option<N>
where
N: std::str::FromStr,
{
if let Some(value) = header {
value
.to_str()
.map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None))
.unwrap_or(None)
} else {
None
}
}
82 changes: 82 additions & 0 deletions src/connection/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use h2::{
use http::{HeaderMap, Request, Response};
use log::error;
use snafu::ResultExt;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::mpsc::Sender;
use std::task::{Context, Poll};
use std::time::SystemTime;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::task::{JoinHandle, JoinSet};
use uuid::Uuid;

use super::*;
Expand Down Expand Up @@ -141,6 +145,12 @@ pub struct ProxyRequest
client_response: SendResponse<Bytes>,
server_request: SendStream<Bytes>,
server_response: ResponseFuture,
request_processor: ProcessingFuture,
}

struct ProcessingFuture
{
inner: JoinHandle<()>,
}

impl ProxyRequest
Expand Down Expand Up @@ -191,6 +201,10 @@ impl ProxyRequest
}))
.unwrap();

// Request processor supports asynchronous message processing while the proxide is busy proxying data between
// the client and the server.
let request_processor = ProcessingFuture::spawn(uuid, &client_head, ui);

let server_request = Request::from_parts(client_head, ());

// Set up a server request.
Expand All @@ -208,6 +222,7 @@ impl ProxyRequest
client_response,
server_request,
server_response,
request_processor,
})
}

Expand Down Expand Up @@ -265,6 +280,7 @@ impl ProxyRequest
let mut client_response = self.client_response;
let server_response = self.server_response;
let connection_uuid = self.connection_uuid;
let request_processor = self.request_processor;
let ui_temp = ui.clone();
let response_future = async move {
let ui = ui_temp;
Expand Down Expand Up @@ -293,6 +309,11 @@ impl ProxyRequest
scenario: "sending response",
})?;

// Ensure the request processor has finished before we send the response to the client.
// Callstack capturing process inside the request processor may capture incorrect data if
// the client is given the final answer from the server as it no longer has to wait for the response.
request_processor.await;

// The server might have sent all the details in the headers, at which point there is
// no body present. Check for this scenario here.
if response_body.is_end_stream() {
Expand Down Expand Up @@ -440,3 +461,64 @@ fn is_fatal_error<S>(r: &Result<S, Error>) -> bool
},
}
}

impl ProcessingFuture
{
fn spawn(uuid: Uuid, client_head: &http::request::Parts, ui: &Sender<SessionEvent>) -> Self
{
let mut tasks: JoinSet<std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>> =
JoinSet::new();

// Task which attempts to capture client's callstack.
if let Ok(thread_id) = crate::connection::ClientThreadId::try_from(&client_head.headers) {
let ui_clone = ui.clone();
tasks.spawn(ProcessingFuture::capture_client_callstack(
uuid, thread_id, ui_clone,
));
}

Self {
inner: tokio::spawn(async move {
while let Some(result) = tasks.join_next().await {
match result {
Ok(_) => {}
Err(e) => {
// TODO: Send the error to UI.
eprintln!("{}", e);
error!("{}", e);
}
}
}
}),
}
}

async fn capture_client_callstack(
uuid: Uuid,
_client_thread_id: ClientThreadId,
ui: Sender<SessionEvent>,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>
{
// TODO: Try to capture the callstack
ui.send(SessionEvent::ClientCallstackProcessed(
ClientCallstackProcessedEvent {
uuid,
callstack: ClientCallstack::Unsupported,
},
))?;
Ok(())
}
}

impl Future for ProcessingFuture
{
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
{
match Pin::new(&mut self.inner).poll(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
}
59 changes: 59 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,15 @@ mod test
use log::SetLoggerError;
use serial_test::serial;
use std::io::{ErrorKind, Write};
use std::ops::Add;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot;
use tokio::time::Instant;

use crate::session::events::SessionEvent;
use crate::ConnectionOptions;
Expand Down Expand Up @@ -533,6 +535,63 @@ mod test
.expect("Waiting for proxide to stop failed.");
}

#[tokio::test]
#[serial]
async fn proxide_receives_client_callstack_ui_message()
{
// Logging must be enabled to detect errors inside proxide.
// Failure to monitor logs may cause the test to hang as errors that stop processing get silently ignored.
let mut error_monitor = get_error_monitor().expect("Acquiring error monitor failed.");

// Server
let server = GrpcServer::start()
.await
.expect("Starting test server failed.");

// Proxide
let options = get_proxide_options(&server);
let (abort_tx, abort_rx) = tokio::sync::oneshot::channel::<()>();
let (ui_tx, ui_rx_std) = std::sync::mpsc::channel();
let proxide_port = u16::from_str(&options.listen_port.to_string()).unwrap();
let proxide = tokio::spawn(crate::launch_proxide(options, abort_rx, ui_tx));

// Message generator and tester.
let tester = grpc_tester::GrpcTester::with_proxide(
server,
proxide_port,
grpc_tester::Args {
period: std::time::Duration::from_secs(0),
tasks: 1,
},
)
.await
.expect("Starting tester failed.");
let mut message_rx = async_from_sync(ui_rx_std);

// UI channel should be constantly receiving client callstack events.
// The generator includes the process id and the thread id in the messages it sends.
let timeout_at = Instant::now().add(Duration::from_secs(30));
while let Some(message) = tokio::select! {
result = message_rx.recv() => result,
_t = tokio::time::sleep( Duration::from_secs( 30 ) ) => panic!( "Timeout" ),
error = error_monitor.recv() => panic!( "{:?}", error ),
} {
if let SessionEvent::ClientCallstackProcessed(..) = message {
break;
} else if Instant::now() > timeout_at {
panic!("Timeout")
}
}

let mut server = tester.stop_generator().expect("Stopping generator failed.");
abort_tx.send(()).expect("Stopping proxide failed.");
proxide
.await
.expect("Waiting for proxide to stop failed.")
.expect("Waiting for proxide to stop failed.");
server.stop().expect("Stopping server failed");
}

/// Gets options for launching proxide.
fn get_proxide_options(server: &GrpcServer) -> Arc<ConnectionOptions>
{
Expand Down
8 changes: 8 additions & 0 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct RequestData

pub start_timestamp: DateTime<Local>,
pub end_timestamp: Option<DateTime<Local>>,
pub client_callstack: Option<ClientCallstack>,
pub status: Status,
}

Expand Down Expand Up @@ -126,6 +127,13 @@ impl MessageData
}
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum ClientCallstack
{
/// Proxide does not support callstack capture on the current platform/operating system.
Unsupported,
}

impl<T> IndexedVec<T>
{
pub fn push(&mut self, uuid: Uuid, item: T)
Expand Down
28 changes: 28 additions & 0 deletions src/session/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum SessionEvent
MessageDone(MessageDoneEvent),
RequestDone(RequestDoneEvent),
ConnectionDone(ConnectionDoneEvent),
ClientCallstackProcessed(ClientCallstackProcessedEvent),
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -84,6 +85,13 @@ pub struct ConnectionDoneEvent
pub timestamp: SystemTime,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ClientCallstackProcessedEvent
{
pub uuid: Uuid,
pub callstack: ClientCallstack,
}

pub enum SessionChange
{
NewConnection
Expand All @@ -110,6 +118,10 @@ pub enum SessionChange
{
connection: Uuid
},
Callstack
{
request: Uuid
},
}

impl Session
Expand All @@ -124,6 +136,7 @@ impl Session
SessionEvent::MessageDone(e) => self.on_message_done(e),
SessionEvent::RequestDone(e) => self.on_request_done(e),
SessionEvent::ConnectionDone(e) => self.on_connection_done(e),
SessionEvent::ClientCallstackProcessed(e) => self.on_client_callstack_processed(e),
}
}

Expand Down Expand Up @@ -154,6 +167,7 @@ impl Session
status: Status::InProgress,
start_timestamp: e.timestamp.into(),
end_timestamp: None,
client_callstack: None,
},
request_msg: MessageData::new(RequestPart::Request)
.with_headers(e.headers)
Expand Down Expand Up @@ -247,4 +261,18 @@ impl Session
vec![]
}
}

fn on_client_callstack_processed(
&mut self,
e: ClientCallstackProcessedEvent,
) -> Vec<SessionChange>
{
let request = self.requests.get_mut_by_uuid(e.uuid);
if let Some(request) = request {
request.request_data.client_callstack = Some(e.callstack);
vec![SessionChange::Callstack { request: e.uuid }]
} else {
vec![]
}
}
}
2 changes: 1 addition & 1 deletion src/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub fn main(
state.draw(&mut terminal).context(IoError {})?;
let mut redraw_pending = false;
loop {
let e = ui_rx.recv().expect( "Receiving UI events failed.");
let e = ui_rx.recv().expect("Receiving UI events failed.");
if let UiEvent::Redraw = e {
redraw_pending = false;
state.draw(&mut terminal).context(IoError {})?;
Expand Down
Loading

0 comments on commit 9e2de1c

Please sign in to comment.