From 604740cc479c458d89a8021773ba994f7fbb6b25 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Tue, 8 Oct 2024 22:31:56 +0200 Subject: [PATCH 1/2] run rpc futures in a tokio thread till completion when ran directly inside hyper's service function, it might get aborted mid-way when a client disconnects, leaving the future in complete. this is an issue as we might have some code that need to be executed atomically inside any part of the code base that could be triggered by an RPC request. if the client disconnects, such a function will abort on the next await call, leaving us with non-atomic state (e.g. partial update for a map and its inverse). --- mm2src/mm2_main/src/rpc.rs | 48 ++++++++++++++++++++++--------- mm2src/mm2_net/src/sse_handler.rs | 13 ++++----- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/mm2src/mm2_main/src/rpc.rs b/mm2src/mm2_main/src/rpc.rs index 1bef856e15..85b61db612 100644 --- a/mm2src/mm2_main/src/rpc.rs +++ b/mm2src/mm2_main/src/rpc.rs @@ -37,6 +37,7 @@ use std::net::SocketAddr; cfg_native! { use hyper::{self, Body, Server}; + use futures::channel::oneshot; use mm2_net::sse_handler::{handle_sse, SSE_ENDPOINT}; } @@ -333,6 +334,34 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { Ok((cert_chain, privkey)) } + // Handles incoming HTTP requests. + async fn handle_request( + req: Request, + remote_addr: SocketAddr, + ctx_h: u32, + is_event_stream_enabled: bool, + ) -> Result, Infallible> { + let (tx, rx) = oneshot::channel(); + // We execute the request in a separate task to avoid it being left uncompleted if the client disconnects. + // So what's inside the spawn here will complete till completion (or panic). + common::executor::spawn(async move { + if is_event_stream_enabled && req.uri().path() == SSE_ENDPOINT { + tx.send(handle_sse(req, ctx_h).await).ok(); + } else { + tx.send(rpc_service(req, ctx_h, remote_addr).await).ok(); + } + }); + // On the other hand, this `.await` might be aborted if the client disconnects. + match rx.await { + Ok(res) => Ok(res), + Err(_) => { + let err = "The RPC service aborted without responding."; + error!("{}", err); + Ok(Response::builder().status(500).body(Body::from(err)).unwrap()) + }, + } + } + // NB: We need to manually handle the incoming connections in order to get the remote IP address, // cf. https://github.com/hyperium/hyper/issues/1410#issuecomment-419510220. // Although if the ability to access the remote IP address is solved by the Hyper in the future @@ -340,28 +369,19 @@ pub extern "C" fn spawn_rpc(ctx_h: u32) { // cf. https://github.com/hyperium/hyper/pull/1640. let ctx = MmArc::from_ffi_handle(ctx_h).expect("No context"); - let is_event_stream_enabled = ctx.event_stream_configuration.is_some(); - let make_svc_fut = move |remote_addr: SocketAddr| async move { - Ok::<_, Infallible>(service_fn(move |req: Request| async move { - if is_event_stream_enabled && req.uri().path() == SSE_ENDPOINT { - let res = handle_sse(req, ctx_h).await?; - return Ok::<_, Infallible>(res); - } - - let res = rpc_service(req, ctx_h, remote_addr).await; - Ok::<_, Infallible>(res) - })) - }; - //The `make_svc` macro creates a `make_service_fn` for a specified socket type. // `$socket_type`: The socket type with a `remote_addr` method that returns a `SocketAddr`. macro_rules! make_svc { ($socket_type:ty) => { make_service_fn(move |socket: &$socket_type| { let remote_addr = socket.remote_addr(); - make_svc_fut(remote_addr) + async move { + Ok::<_, Infallible>(service_fn(move |req: Request| { + handle_request(req, remote_addr, ctx_h, is_event_stream_enabled) + })) + } }) }; } diff --git a/mm2src/mm2_net/src/sse_handler.rs b/mm2src/mm2_net/src/sse_handler.rs index 3b3afeee58..568bfc98c0 100644 --- a/mm2src/mm2_net/src/sse_handler.rs +++ b/mm2src/mm2_net/src/sse_handler.rs @@ -1,12 +1,11 @@ use hyper::{body::Bytes, Body, Request, Response}; use mm2_core::mm_ctx::MmArc; use serde_json::json; -use std::convert::Infallible; pub const SSE_ENDPOINT: &str = "/event-stream"; /// Handles broadcasted messages from `mm2_event_stream` continuously. -pub async fn handle_sse(request: Request, ctx_h: u32) -> Result, Infallible> { +pub async fn handle_sse(request: Request, ctx_h: u32) -> Response { // This is only called once for per client on the initialization, // meaning this is not a resource intensive computation. let ctx = match MmArc::from_ffi_handle(ctx_h) { @@ -62,17 +61,15 @@ pub async fn handle_sse(request: Request, ctx_h: u32) -> Result Ok(res), + Ok(res) => res, Err(err) => handle_internal_error(err.to_string()).await, } } /// Fallback function for handling errors in SSE connections -async fn handle_internal_error(message: String) -> Result, Infallible> { - let response = Response::builder() +async fn handle_internal_error(message: String) -> Response { + Response::builder() .status(500) .body(Body::from(message)) - .expect("Returning 500 should never fail."); - - Ok(response) + .expect("Returning 500 should never fail.") } From b95110e7f7359aceacaf3182a823cd24da43258f Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Tue, 8 Oct 2024 17:55:19 +0200 Subject: [PATCH 2/2] remove sim_panic rpc looks like this was only here to simulate a panic from the rpc end. why you ask? i guess we will never know! --- .../src/rpc/dispatcher/dispatcher_legacy.rs | 1 - .../src/rpc/lp_commands/lp_commands_legacy.rs | 31 ------------------- 2 files changed, 32 deletions(-) diff --git a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher_legacy.rs b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher_legacy.rs index 2415bc31ef..03a1ee1a00 100644 --- a/mm2src/mm2_main/src/rpc/dispatcher/dispatcher_legacy.rs +++ b/mm2src/mm2_main/src/rpc/dispatcher/dispatcher_legacy.rs @@ -98,7 +98,6 @@ pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes { "order_status" => hyres(order_status(ctx, req)), "orderbook" => hyres(orderbook_rpc(ctx, req)), "orderbook_depth" => hyres(orderbook_depth_rpc(ctx, req)), - "sim_panic" => hyres(sim_panic(req)), "recover_funds_of_swap" => hyres(recover_funds_of_swap(ctx, req)), "sell" => hyres(sell(ctx, req)), "show_priv_key" => hyres(show_priv_key(ctx, req)), diff --git a/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs b/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs index 5ef386942c..ea5bab4fbf 100644 --- a/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs +++ b/mm2src/mm2_main/src/rpc/lp_commands/lp_commands_legacy.rs @@ -30,7 +30,6 @@ use mm2_net::p2p::P2PContext; use mm2_number::construct_detailed; use mm2_rpc::data::legacy::{BalanceResponse, CoinInitResponse, Mm2RpcResult, MmVersionResponse, Status}; use serde_json::{self as json, Value as Json}; -use std::borrow::Cow; use std::collections::HashSet; use uuid::Uuid; @@ -259,36 +258,6 @@ pub async fn stop(ctx: MmArc) -> Result>, String> { Ok(try_s!(Response::builder().body(res))) } -pub async fn sim_panic(req: Json) -> Result>, String> { - #[derive(Deserialize)] - struct Req { - #[serde(default)] - mode: String, - } - let req: Req = try_s!(json::from_value(req)); - - #[derive(Serialize)] - struct Ret<'a> { - /// Supported panic modes. - #[serde(skip_serializing_if = "Vec::is_empty")] - modes: Vec>, - } - let ret: Ret; - - if req.mode.is_empty() { - ret = Ret { - modes: vec!["simple".into()], - } - } else if req.mode == "simple" { - panic!("sim_panic: simple") - } else { - return ERR!("No such mode: {}", req.mode); - } - - let js = try_s!(json::to_vec(&ret)); - Ok(try_s!(Response::builder().body(js))) -} - pub fn version(ctx: MmArc) -> HyRes { match json::to_vec(&MmVersionResponse { result: ctx.mm_version.clone(),