From 149913fed948bbe2149b52b9016170bcaef950ab Mon Sep 17 00:00:00 2001 From: Nicholas Parker Date: Mon, 7 Aug 2023 04:09:40 +1200 Subject: [PATCH] Add example (and doc reference) for warp::body::stream() (#1061) * Add example (and doc reference) for warp::body::stream() As of right now it didn't look like there was example usage for this functionality. It took me some trial and error to figure out the typing for the Stream so that I could pass it into a handler function like this. This example hopefully avoids similar headaches for others. * cargo fmt --- examples/stream.rs | 30 ++++++++++++++++++++++++++++++ src/filters/body.rs | 2 ++ 2 files changed, 32 insertions(+) create mode 100644 examples/stream.rs diff --git a/examples/stream.rs b/examples/stream.rs new file mode 100644 index 000000000..d5f578357 --- /dev/null +++ b/examples/stream.rs @@ -0,0 +1,30 @@ +use bytes::Buf; +use futures_util::{Stream, StreamExt}; +use warp::{reply::Response, Filter, Reply}; + +#[tokio::main] +async fn main() { + // Running curl -T /path/to/a/file 'localhost:3030/' should echo back the content of the file, + // or an HTTP 413 error if the configured size limit is exceeded. + let route = warp::body::content_length_limit(65536) + .and(warp::body::stream()) + .then(handler); + warp::serve(route).run(([127, 0, 0, 1], 3030)).await; +} + +async fn handler( + mut body: impl Stream> + Unpin + Send + Sync, +) -> Response { + let mut collected: Vec = vec![]; + while let Some(buf) = body.next().await { + let mut buf = buf.unwrap(); + while buf.remaining() > 0 { + let chunk = buf.chunk(); + let chunk_len = chunk.len(); + collected.extend_from_slice(chunk); + buf.advance(chunk_len); + } + } + println!("Sending {} bytes", collected.len()); + collected.into_response() +} diff --git a/src/filters/body.rs b/src/filters/body.rs index 3bb08d2b4..85dabbfea 100644 --- a/src/filters/body.rs +++ b/src/filters/body.rs @@ -70,6 +70,8 @@ pub fn content_length_limit(limit: u64) -> impl Filter