Skip to content

Commit

Permalink
Add server config option to spawn handlers detached from client disco…
Browse files Browse the repository at this point in the history
…nnect behavior (#701)


---------

Co-authored-by: David Pacheco <[email protected]>
  • Loading branch information
jgallagher and davepacheco authored Jun 15, 2023
1 parent a519993 commit 3a42491
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ https://github.com/oxidecomputer/dropshot/compare/v0.9.0\...HEAD[Full list of co

* https://github.com/oxidecomputer/dropshot/pull/676[#676] changed how TLS configuration is provided to Dropshot. **`ConfigDropshotTls` is now no longer part of `ConfigDropshot`.** If you're using TLS, you need to provide this as a separate argument to `HttpServerStarter::new_tls()`. See #676 for details.
* https://github.com/oxidecomputer/dropshot/pull/651[#651] The address of the remote peer is now available to request handlers via the `RequestInfo` struct. With this change we've removed the related `From<hyper::Request<B>>` implementation; instead use `RequestInfo::new<B>(&hyper::Request<B>, std::net::SocketAddr)`.
* https://github.com/oxidecomputer/dropshot/pull/701[#701] changes how Dropshot manages the tasks that are used to handle requests. There are two modes, now configurable server-wide using `HandlerTaskMode`. Prior to this change, the behavior matched what's now called `HandlerTaskMode::CancelOnDisconnect`: the Future associated with a request handler could be cancelled if, for example, the client disconnected early. After this change, the default behavior is what's now called `HandlerTaskMode::Detached`, which causes Dropshot to use `tokio::spawn` to run the request handler. That task will never be cancelled. This is useful for consumers whose request handlers may not be cancellation-safe.

=== Other notable Changes

Expand Down
32 changes: 29 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dropshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ version = "0.8.12"
features = [ "uuid1" ]

[dev-dependencies]
async-channel = "1.8.0"
buf-list = "1.0.3"
expectorate = "1.0.7"
hyper-rustls = "0.24.0"
Expand Down
3 changes: 2 additions & 1 deletion dropshot/src/api_description.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

/// ApiEndpoint represents a single API endpoint associated with an
/// ApiDescription. It has a handler, HTTP method (e.g. GET, POST), and a path--
Expand All @@ -33,7 +34,7 @@ use std::collections::HashSet;
#[derive(Debug)]
pub struct ApiEndpoint<Context: ServerContext> {
pub operation_id: String,
pub handler: Box<dyn RouteHandler<Context>>,
pub handler: Arc<dyn RouteHandler<Context>>,
pub method: Method,
pub path: String,
pub parameters: Vec<ApiEndpointParameter>,
Expand Down
29 changes: 29 additions & 0 deletions dropshot/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,34 @@ pub struct ConfigDropshot {
pub bind_address: SocketAddr,
/// maximum allowed size of a request body, defaults to 1024
pub request_body_max_bytes: usize,
/// Default behavior for HTTP handler functions with respect to clients
/// disconnecting early.
pub default_handler_task_mode: HandlerTaskMode,
}

/// Enum specifying options for how a Dropshot server should run its handler
/// futures.
///
/// The variants are phrased in terms of how the handler interacts with client
/// disconnection, but they control how the future is run: for
/// `CancelOnDisconnect`, the future is run directly, and it will be dropped
/// (and thus cancelled) if the client disconnects; for `Detach`, handler
/// futures will be `tokio::spawn()`'d, detaching their completion from the
/// behavior of the client.
///
/// If using `CancelOnDisconnect`, one must be careful that all handlers are
/// cancel-safe. If you're unsure, we recommend `Detached`.
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum HandlerTaskMode {
/// If a client disconnects while the handler is still running, cancel the
/// future.
CancelOnDisconnect,

/// If a client disconnects while the handler is still running, continue
/// running the handler future to completion (i.e., the handler future is
/// detached from the client connection).
Detached,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -78,6 +106,7 @@ impl Default for ConfigDropshot {
ConfigDropshot {
bind_address: "127.0.0.1:0".parse().unwrap(),
request_body_max_bytes: 1024,
default_handler_task_mode: HandlerTaskMode::Detached,
}
}
}
6 changes: 3 additions & 3 deletions dropshot/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ where
/// Given a function matching one of the supported API handler function
/// signatures, return a RouteHandler that can be used to respond to HTTP
/// requests using this function.
pub fn new(handler: HandlerType) -> Box<dyn RouteHandler<Context>> {
pub fn new(handler: HandlerType) -> Arc<dyn RouteHandler<Context>> {
HttpRouteHandler::new_with_name(handler, "<unlabeled handler>")
}

Expand All @@ -485,8 +485,8 @@ where
pub fn new_with_name(
handler: HandlerType,
label: &str,
) -> Box<dyn RouteHandler<Context>> {
Box::new(HttpRouteHandler {
) -> Arc<dyn RouteHandler<Context>> {
Arc::new(HttpRouteHandler {
label: label.to_string(),
handler,
phantom: PhantomData,
Expand Down
3 changes: 3 additions & 0 deletions dropshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
//! use dropshot::ConfigDropshot;
//! use dropshot::ConfigLogging;
//! use dropshot::ConfigLoggingLevel;
//! use dropshot::HandlerTaskMode;
//! use dropshot::HttpServerStarter;
//! use std::sync::Arc;
//!
Expand All @@ -69,6 +70,7 @@
//! &ConfigDropshot {
//! bind_address: "127.0.0.1:0".parse().unwrap(),
//! request_body_max_bytes: 1024,
//! default_handler_task_mode: HandlerTaskMode::Detached,
//! },
//! api,
//! Arc::new(()),
Expand Down Expand Up @@ -617,6 +619,7 @@ pub use api_description::TagDetails;
pub use api_description::TagExternalDocs;
pub use config::ConfigDropshot;
pub use config::ConfigTls;
pub use config::HandlerTaskMode;
pub use config::RawTlsConfig;
pub use dtrace::ProbeRegistration;
pub use error::HttpError;
Expand Down
24 changes: 13 additions & 11 deletions dropshot/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use http::StatusCode;
use percent_encoding::percent_decode_str;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;

/// `HttpRouter` is a simple data structure for routing incoming HTTP requests to
/// specific handler functions based on the request method and URI path. For
Expand Down Expand Up @@ -207,8 +208,8 @@ impl MapValue for VariableValue {
/// corresponding values in the actual path, and the expected body
/// content type.
#[derive(Debug)]
pub struct RouterLookupResult<'a, Context: ServerContext> {
pub handler: &'a dyn RouteHandler<Context>,
pub struct RouterLookupResult<Context: ServerContext> {
pub handler: Arc<dyn RouteHandler<Context>>,
pub variables: VariableSet,
pub body_content_type: ApiEndpointBodyContentType,
}
Expand Down Expand Up @@ -401,11 +402,11 @@ impl<Context: ServerContext> HttpRouter<Context> {
/// of variables assigned based on the request path as part of the lookup.
/// On failure, this returns an `HttpError` appropriate for the failure
/// mode.
pub fn lookup_route<'a, 'b>(
&'a self,
method: &'b Method,
path: InputPath<'b>,
) -> Result<RouterLookupResult<'a, Context>, HttpError> {
pub fn lookup_route(
&self,
method: &Method,
path: InputPath<'_>,
) -> Result<RouterLookupResult<Context>, HttpError> {
let all_segments = input_path_to_segments(&path).map_err(|_| {
HttpError::for_bad_request(
None,
Expand Down Expand Up @@ -480,7 +481,7 @@ impl<Context: ServerContext> HttpRouter<Context> {
node.methods
.get(&methodname)
.map(|handler| RouterLookupResult {
handler: &*handler.handler,
handler: Arc::clone(&handler.handler),
variables,
body_content_type: handler.body_content_type.clone(),
})
Expand Down Expand Up @@ -738,23 +739,24 @@ mod test {
use hyper::Response;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::sync::Arc;

async fn test_handler(
_: RequestContext<()>,
) -> Result<Response<Body>, HttpError> {
panic!("test handler is not supposed to run");
}

fn new_handler() -> Box<dyn RouteHandler<()>> {
fn new_handler() -> Arc<dyn RouteHandler<()>> {
HttpRouteHandler::new(test_handler)
}

fn new_handler_named(name: &str) -> Box<dyn RouteHandler<()>> {
fn new_handler_named(name: &str) -> Arc<dyn RouteHandler<()>> {
HttpRouteHandler::new_with_name(test_handler, name)
}

fn new_endpoint(
handler: Box<dyn RouteHandler<()>>,
handler: Arc<dyn RouteHandler<()>>,
method: Method,
path: &str,
) -> ApiEndpoint<()> {
Expand Down
63 changes: 61 additions & 2 deletions dropshot/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ use std::convert::TryFrom;
use std::future::Future;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::panic;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::ReadBuf;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot;
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use uuid::Uuid;

use crate::config::HandlerTaskMode;
use crate::RequestInfo;
use slog::Logger;

Expand Down Expand Up @@ -84,6 +87,9 @@ pub struct ServerConfig {
pub page_max_nitems: NonZeroU32,
/// default size for a page of results
pub page_default_nitems: NonZeroU32,
/// Default behavior for HTTP handler functions with respect to clients
/// disconnecting early.
pub default_handler_task_mode: HandlerTaskMode,
}

pub struct HttpServerStarter<C: ServerContext> {
Expand Down Expand Up @@ -114,6 +120,7 @@ impl<C: ServerContext> HttpServerStarter<C> {
request_body_max_bytes: config.request_body_max_bytes,
page_max_nitems: NonZeroU32::new(10000).unwrap(),
page_default_nitems: NonZeroU32::new(100).unwrap(),
default_handler_task_mode: config.default_handler_task_mode,
};

let starter = match &tls {
Expand Down Expand Up @@ -850,8 +857,60 @@ async fn http_request_handle<C: ServerContext>(
request_id: request_id.to_string(),
log: request_log,
};
let mut response =
lookup_result.handler.handle_request(rqctx, request).await?;
let handler = lookup_result.handler;

let mut response = match server.config.default_handler_task_mode {
HandlerTaskMode::CancelOnDisconnect => {
// For CancelOnDisconnect, we run the request handler directly: if
// the client disconnects, we will be cancelled, and therefore this
// future will too.
//
// TODO-robustness: We should log a warning if we are dropped before
// this handler completes; see
// https://github.com/oxidecomputer/dropshot/pull/701#pullrequestreview-1480426914.
handler.handle_request(rqctx, request).await?
}
HandlerTaskMode::Detached => {
// Spawn the handler so if we're cancelled, the handler still runs
// to completion.
let (tx, rx) = oneshot::channel();
let request_log = rqctx.log.clone();
let handler_task = tokio::spawn(async move {
let request_log = rqctx.log.clone();
let result = handler.handle_request(rqctx, request).await;

// If this send fails, our spawning task has been cancelled in
// the `rx.await` below; log such a result.
if tx.send(result).is_err() {
warn!(
request_log,
"client disconnected before response returned"
);
}
});

// The only way we can fail to receive on `rx` is if `tx` is
// dropped before a result is sent, which can only happen if
// `handle_request` panics. We will propogate such a panic here,
// just as we would have in `CancelOnDisconnect` mode above (where
// we call the handler directly).
match rx.await {
Ok(result) => result?,
Err(_) => {
error!(request_log, "handler panicked; propogating panic");

// To get the panic, we now need to await `handler_task`; we
// know it is complete _and_ it failed, because it has
// dropped `tx` without sending us a result, which is only
// possible if it panicked.
let task_err = handler_task.await.expect_err(
"task failed to send result but didn't panic",
);
panic::resume_unwind(task_err.into_panic());
}
}
}
};
response.headers_mut().insert(
HEADER_REQUEST_ID,
http::header::HeaderValue::from_str(&request_id).unwrap(),
Expand Down
3 changes: 3 additions & 0 deletions dropshot/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl JsonSchema for WebsocketUpgrade {

#[cfg(test)]
mod tests {
use crate::config::HandlerTaskMode;
use crate::router::HttpRouter;
use crate::server::{DropshotState, ServerConfig};
use crate::{
Expand Down Expand Up @@ -325,6 +326,8 @@ mod tests {
request_body_max_bytes: 0,
page_max_nitems: NonZeroU32::new(1).unwrap(),
page_default_nitems: NonZeroU32::new(1).unwrap(),
default_handler_task_mode:
HandlerTaskMode::CancelOnDisconnect,
},
router: HttpRouter::new(),
log: log.clone(),
Expand Down
Loading

0 comments on commit 3a42491

Please sign in to comment.