Skip to content

Commit

Permalink
Generic streams support instead of boxed stream
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Aug 14, 2022
1 parent 09ead0d commit af15072
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "Apache-2.0"
name = "axum-streams"
readme = "README.md"
include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"]
version = "0.6.0"
version = "0.7.0"

[badges]
maintenance = { status = "actively-developed" }
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ struct MyTestStructure {
some_test_field: String
}

fn source_test_stream() -> BoxStream<'static, MyTestStructure> {
fn my_source_stream() -> impl Stream<Item=MyTestStructure> {
// Simulating a stream with a plain vector and throttling to show how it works
Box::pin(stream::iter(vec![
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
}; 1000
]).throttle(std::time::Duration::from_millis(50)))
]).throttle(std::time::Duration::from_millis(50))
}

async fn test_json_array_stream() -> impl IntoResponse {
Expand Down
19 changes: 8 additions & 11 deletions examples/csv-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use axum::Router;
use std::net::SocketAddr;

use futures::prelude::*;
use futures_util::stream::BoxStream;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

Expand All @@ -15,17 +14,15 @@ struct MyTestStructure {
some_test_field: String,
}

fn source_test_stream() -> BoxStream<'static, MyTestStructure> {
fn source_test_stream() -> impl Stream<Item = MyTestStructure> {
// Simulating a stream with a plain vector and throttling to show how it works
Box::pin(
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
};
1000
])
.throttle(std::time::Duration::from_millis(50)),
)
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
};
1000
])
.throttle(std::time::Duration::from_millis(50))
}

async fn test_csv_stream() -> impl IntoResponse {
Expand Down
19 changes: 8 additions & 11 deletions examples/json-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use axum::Router;
use std::net::SocketAddr;

use futures::prelude::*;
use futures_util::stream::BoxStream;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

Expand All @@ -15,17 +14,15 @@ struct MyTestStructure {
some_test_field: String,
}

fn source_test_stream() -> BoxStream<'static, MyTestStructure> {
fn source_test_stream() -> impl Stream<Item = MyTestStructure> {
// Simulating a stream with a plain vector and throttling to show how it works
Box::pin(
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
};
1000
])
.throttle(std::time::Duration::from_millis(50)),
)
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
};
1000
])
.throttle(std::time::Duration::from_millis(50))
}

async fn test_json_array_stream() -> impl IntoResponse {
Expand Down
19 changes: 8 additions & 11 deletions examples/protobuf-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use axum::Router;
use std::net::SocketAddr;

use futures::prelude::*;
use futures_util::stream::BoxStream;
use tokio_stream::StreamExt;

use axum_streams::*;
Expand All @@ -15,17 +14,15 @@ struct MyTestStructure {
some_test_field: String,
}

fn source_test_stream() -> BoxStream<'static, MyTestStructure> {
fn source_test_stream() -> impl Stream<Item = MyTestStructure> {
// Simulating a stream with a plain vector and throttling to show how it works
Box::pin(
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
};
1000
])
.throttle(std::time::Duration::from_millis(50)),
)
stream::iter(vec![
MyTestStructure {
some_test_field: "test1".to_string()
};
1000
])
.throttle(std::time::Duration::from_millis(50))
}

async fn test_protobuf_stream() -> impl IntoResponse {
Expand Down
4 changes: 3 additions & 1 deletion src/csv_format.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::stream_format::StreamingFormat;
use futures::Stream;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use http::HeaderMap;
Expand Down Expand Up @@ -64,9 +65,10 @@ where
}

impl<'a> crate::StreamBodyAs<'a> {
pub fn csv<T>(stream: BoxStream<'a, T>) -> Self
pub fn csv<S, T>(stream: S) -> Self
where
T: Serialize + Send + Sync + 'static,
S: Stream<Item = T> + 'a + Send,
{
Self::new(CsvStreamFormat::new(false, b','), stream)
}
Expand Down
7 changes: 5 additions & 2 deletions src/json_formats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::stream_format::StreamingFormat;
use bytes::{BufMut, BytesMut};
use futures::Stream;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use http::HeaderMap;
Expand Down Expand Up @@ -107,16 +108,18 @@ const JSON_ARRAY_SEP_BYTES: &[u8] = ",".as_bytes();
const JSON_NL_SEP_BYTES: &[u8] = "\n".as_bytes();

impl<'a> crate::StreamBodyAs<'a> {
pub fn json_array<T>(stream: BoxStream<'a, T>) -> Self
pub fn json_array<S, T>(stream: S) -> Self
where
T: Serialize + Send + Sync + 'static,
S: Stream<Item = T> + 'a + Send,
{
Self::new(JsonArrayStreamFormat::new(), stream)
}

pub fn json_nl<T>(stream: BoxStream<'a, T>) -> Self
pub fn json_nl<S, T>(stream: S) -> Self
where
T: Serialize + Send + Sync + 'static,
S: Stream<Item = T> + 'a + Send,
{
Self::new(JsonNewLineStreamFormat::new(), stream)
}
Expand Down
15 changes: 6 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
//! # Example
//!
//! ```rust
//! use futures_util::stream::BoxStream;
//! use axum::{
//! Router,
//! routing::get,
//! http::{StatusCode, header::CONTENT_TYPE},
//! response::{Response, IntoResponse},
//! };
//! use futures::Stream;
//! use axum_streams::*;
//! use serde::Serialize;
//!
Expand All @@ -32,14 +32,14 @@
//! }
//!
//! // Your possibly stream of objects
//! fn my_source_stream() -> BoxStream<'static, MyTestStructure> {
//! fn my_source_stream() -> impl Stream<Item=MyTestStructure> {
//! // Simulating a stream with a plain vector and throttling to show how it works
//! use tokio_stream::StreamExt;
//! Box::pin(futures::stream::iter(vec![
//! futures::stream::iter(vec![
//! MyTestStructure {
//! some_test_field: "test1".to_string()
//! }; 1000
//! ]).throttle(std::time::Duration::from_millis(50)))
//! ]).throttle(std::time::Duration::from_millis(50))
//! }
//!
//! // Route implementation:
Expand Down Expand Up @@ -67,11 +67,8 @@

mod stream_format;

mod stream_body_with;
pub use self::stream_body_with::StreamBodyAs;

// For compatibility reasons
pub type StreamBodyWith<'a> = StreamBodyAs<'a>;
mod stream_body_as;
pub use self::stream_body_as::StreamBodyAs;

#[cfg(feature = "json")]
mod json_formats;
Expand Down
4 changes: 3 additions & 1 deletion src/protobuf_format.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::stream_format::StreamingFormat;
use futures::Stream;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use http::HeaderMap;
Expand Down Expand Up @@ -53,9 +54,10 @@ where
}

impl<'a> crate::StreamBodyAs<'a> {
pub fn protobuf<T>(stream: BoxStream<'a, T>) -> Self
pub fn protobuf<S, T>(stream: S) -> Self
where
T: prost::Message + Send + Sync + 'static,
S: Stream<Item = T> + 'a + Send,
{
Self::new(ProtobufStreamFormat::new(), stream)
}
Expand Down
7 changes: 4 additions & 3 deletions src/stream_body_with.rs → src/stream_body_as.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::stream_format::StreamingFormat;
use axum::body::HttpBody;
use axum::response::{IntoResponse, Response};
use futures::Stream;
use futures_util::stream::BoxStream;
use http::HeaderMap;
use std::fmt::Formatter;
Expand All @@ -20,12 +21,13 @@ impl<'a> std::fmt::Debug for StreamBodyAs<'a> {

impl<'a> StreamBodyAs<'a> {
/// Create a new `StreamBodyWith` providing a stream of your objects in the specified format.
pub fn new<T, FMT>(stream_format: FMT, stream: BoxStream<'a, T>) -> Self
pub fn new<S, T, FMT>(stream_format: FMT, stream: S) -> Self
where
FMT: StreamingFormat<T>,
S: Stream<Item = T> + 'a + Send,
{
Self {
stream: stream_format.to_bytes_stream(stream),
stream: stream_format.to_bytes_stream(Box::pin(stream)),
trailers: stream_format.http_response_trailers(),
}
}
Expand All @@ -45,7 +47,6 @@ impl<'a> HttpBody for StreamBodyAs<'a> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
use futures_util::Stream;
Pin::new(&mut self.stream).poll_next(cx)
}

Expand Down

0 comments on commit af15072

Please sign in to comment.