From af1507226694c9267e8566ba57823496aec60612 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Sun, 14 Aug 2022 16:20:58 +0200 Subject: [PATCH] Generic streams support instead of boxed stream --- Cargo.toml | 2 +- README.md | 6 +++--- examples/csv-example.rs | 19 ++++++++----------- examples/json-example.rs | 19 ++++++++----------- examples/protobuf-example.rs | 19 ++++++++----------- src/csv_format.rs | 4 +++- src/json_formats.rs | 7 +++++-- src/lib.rs | 15 ++++++--------- src/protobuf_format.rs | 4 +++- ...{stream_body_with.rs => stream_body_as.rs} | 7 ++++--- 10 files changed, 49 insertions(+), 53 deletions(-) rename src/{stream_body_with.rs => stream_body_as.rs} (88%) diff --git a/Cargo.toml b/Cargo.toml index b6dc45d..23b4547 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" name = "axum-streams" readme = "README.md" include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"] -version = "0.6.0" +version = "0.7.0" [badges] maintenance = { status = "actively-developed" } diff --git a/README.md b/README.md index 1c6054d..2b1a0c0 100644 --- a/README.md +++ b/README.md @@ -29,13 +29,13 @@ struct MyTestStructure { some_test_field: String } -fn source_test_stream() -> BoxStream<'static, MyTestStructure> { +fn my_source_stream() -> impl Stream { // Simulating a stream with a plain vector and throttling to show how it works - Box::pin(stream::iter(vec![ + stream::iter(vec![ MyTestStructure { some_test_field: "test1".to_string() }; 1000 - ]).throttle(std::time::Duration::from_millis(50))) + ]).throttle(std::time::Duration::from_millis(50)) } async fn test_json_array_stream() -> impl IntoResponse { diff --git a/examples/csv-example.rs b/examples/csv-example.rs index e9e658a..72a309c 100644 --- a/examples/csv-example.rs +++ b/examples/csv-example.rs @@ -4,7 +4,6 @@ use axum::Router; use std::net::SocketAddr; use futures::prelude::*; -use futures_util::stream::BoxStream; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; @@ -15,17 +14,15 @@ struct MyTestStructure { some_test_field: String, } -fn source_test_stream() -> BoxStream<'static, MyTestStructure> { +fn source_test_stream() -> impl Stream { // Simulating a stream with a plain vector and throttling to show how it works - Box::pin( - stream::iter(vec![ - MyTestStructure { - some_test_field: "test1".to_string() - }; - 1000 - ]) - .throttle(std::time::Duration::from_millis(50)), - ) + stream::iter(vec![ + MyTestStructure { + some_test_field: "test1".to_string() + }; + 1000 + ]) + .throttle(std::time::Duration::from_millis(50)) } async fn test_csv_stream() -> impl IntoResponse { diff --git a/examples/json-example.rs b/examples/json-example.rs index 132051d..ae3de78 100644 --- a/examples/json-example.rs +++ b/examples/json-example.rs @@ -4,7 +4,6 @@ use axum::Router; use std::net::SocketAddr; use futures::prelude::*; -use futures_util::stream::BoxStream; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; @@ -15,17 +14,15 @@ struct MyTestStructure { some_test_field: String, } -fn source_test_stream() -> BoxStream<'static, MyTestStructure> { +fn source_test_stream() -> impl Stream { // Simulating a stream with a plain vector and throttling to show how it works - Box::pin( - stream::iter(vec![ - MyTestStructure { - some_test_field: "test1".to_string() - }; - 1000 - ]) - .throttle(std::time::Duration::from_millis(50)), - ) + stream::iter(vec![ + MyTestStructure { + some_test_field: "test1".to_string() + }; + 1000 + ]) + .throttle(std::time::Duration::from_millis(50)) } async fn test_json_array_stream() -> impl IntoResponse { diff --git a/examples/protobuf-example.rs b/examples/protobuf-example.rs index 84f286f..0a00cb7 100644 --- a/examples/protobuf-example.rs +++ b/examples/protobuf-example.rs @@ -4,7 +4,6 @@ use axum::Router; use std::net::SocketAddr; use futures::prelude::*; -use futures_util::stream::BoxStream; use tokio_stream::StreamExt; use axum_streams::*; @@ -15,17 +14,15 @@ struct MyTestStructure { some_test_field: String, } -fn source_test_stream() -> BoxStream<'static, MyTestStructure> { +fn source_test_stream() -> impl Stream { // Simulating a stream with a plain vector and throttling to show how it works - Box::pin( - stream::iter(vec![ - MyTestStructure { - some_test_field: "test1".to_string() - }; - 1000 - ]) - .throttle(std::time::Duration::from_millis(50)), - ) + stream::iter(vec![ + MyTestStructure { + some_test_field: "test1".to_string() + }; + 1000 + ]) + .throttle(std::time::Duration::from_millis(50)) } async fn test_protobuf_stream() -> impl IntoResponse { diff --git a/src/csv_format.rs b/src/csv_format.rs index 3e38a78..16d01c4 100644 --- a/src/csv_format.rs +++ b/src/csv_format.rs @@ -1,4 +1,5 @@ use crate::stream_format::StreamingFormat; +use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; @@ -64,9 +65,10 @@ where } impl<'a> crate::StreamBodyAs<'a> { - pub fn csv(stream: BoxStream<'a, T>) -> Self + pub fn csv(stream: S) -> Self where T: Serialize + Send + Sync + 'static, + S: Stream + 'a + Send, { Self::new(CsvStreamFormat::new(false, b','), stream) } diff --git a/src/json_formats.rs b/src/json_formats.rs index c5d0124..548e1ff 100644 --- a/src/json_formats.rs +++ b/src/json_formats.rs @@ -1,5 +1,6 @@ use crate::stream_format::StreamingFormat; use bytes::{BufMut, BytesMut}; +use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; @@ -107,16 +108,18 @@ const JSON_ARRAY_SEP_BYTES: &[u8] = ",".as_bytes(); const JSON_NL_SEP_BYTES: &[u8] = "\n".as_bytes(); impl<'a> crate::StreamBodyAs<'a> { - pub fn json_array(stream: BoxStream<'a, T>) -> Self + pub fn json_array(stream: S) -> Self where T: Serialize + Send + Sync + 'static, + S: Stream + 'a + Send, { Self::new(JsonArrayStreamFormat::new(), stream) } - pub fn json_nl(stream: BoxStream<'a, T>) -> Self + pub fn json_nl(stream: S) -> Self where T: Serialize + Send + Sync + 'static, + S: Stream + 'a + Send, { Self::new(JsonNewLineStreamFormat::new(), stream) } diff --git a/src/lib.rs b/src/lib.rs index b1317b7..65340ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,13 +16,13 @@ //! # Example //! //! ```rust -//! use futures_util::stream::BoxStream; //! use axum::{ //! Router, //! routing::get, //! http::{StatusCode, header::CONTENT_TYPE}, //! response::{Response, IntoResponse}, //! }; +//! use futures::Stream; //! use axum_streams::*; //! use serde::Serialize; //! @@ -32,14 +32,14 @@ //! } //! //! // Your possibly stream of objects -//! fn my_source_stream() -> BoxStream<'static, MyTestStructure> { +//! fn my_source_stream() -> impl Stream { //! // Simulating a stream with a plain vector and throttling to show how it works //! use tokio_stream::StreamExt; -//! Box::pin(futures::stream::iter(vec![ +//! futures::stream::iter(vec![ //! MyTestStructure { //! some_test_field: "test1".to_string() //! }; 1000 -//! ]).throttle(std::time::Duration::from_millis(50))) +//! ]).throttle(std::time::Duration::from_millis(50)) //! } //! //! // Route implementation: @@ -67,11 +67,8 @@ mod stream_format; -mod stream_body_with; -pub use self::stream_body_with::StreamBodyAs; - -// For compatibility reasons -pub type StreamBodyWith<'a> = StreamBodyAs<'a>; +mod stream_body_as; +pub use self::stream_body_as::StreamBodyAs; #[cfg(feature = "json")] mod json_formats; diff --git a/src/protobuf_format.rs b/src/protobuf_format.rs index 17c2254..c135335 100644 --- a/src/protobuf_format.rs +++ b/src/protobuf_format.rs @@ -1,4 +1,5 @@ use crate::stream_format::StreamingFormat; +use futures::Stream; use futures_util::stream::BoxStream; use futures_util::StreamExt; use http::HeaderMap; @@ -53,9 +54,10 @@ where } impl<'a> crate::StreamBodyAs<'a> { - pub fn protobuf(stream: BoxStream<'a, T>) -> Self + pub fn protobuf(stream: S) -> Self where T: prost::Message + Send + Sync + 'static, + S: Stream + 'a + Send, { Self::new(ProtobufStreamFormat::new(), stream) } diff --git a/src/stream_body_with.rs b/src/stream_body_as.rs similarity index 88% rename from src/stream_body_with.rs rename to src/stream_body_as.rs index d3793c6..aacd3df 100644 --- a/src/stream_body_with.rs +++ b/src/stream_body_as.rs @@ -1,6 +1,7 @@ use crate::stream_format::StreamingFormat; use axum::body::HttpBody; use axum::response::{IntoResponse, Response}; +use futures::Stream; use futures_util::stream::BoxStream; use http::HeaderMap; use std::fmt::Formatter; @@ -20,12 +21,13 @@ impl<'a> std::fmt::Debug for StreamBodyAs<'a> { impl<'a> StreamBodyAs<'a> { /// Create a new `StreamBodyWith` providing a stream of your objects in the specified format. - pub fn new(stream_format: FMT, stream: BoxStream<'a, T>) -> Self + pub fn new(stream_format: FMT, stream: S) -> Self where FMT: StreamingFormat, + S: Stream + 'a + Send, { Self { - stream: stream_format.to_bytes_stream(stream), + stream: stream_format.to_bytes_stream(Box::pin(stream)), trailers: stream_format.http_response_trailers(), } } @@ -45,7 +47,6 @@ impl<'a> HttpBody for StreamBodyAs<'a> { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - use futures_util::Stream; Pin::new(&mut self.stream).poll_next(cx) }