Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(volo-thrift): use afit and rpitit to optimize codec #230

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion volo-thrift/src/codec/default/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl<D: ZeroCopyDecoder> FramedDecoder<D> {
/// https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#compatibility
pub const HEADER_DETECT_LENGTH: usize = 6;

#[async_trait::async_trait]
impl<D> ZeroCopyDecoder for FramedDecoder<D>
where
D: ZeroCopyDecoder,
Expand Down
9 changes: 4 additions & 5 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
//! [Kitex]: https://github.com/cloudwego/kitex
//! [TTHeader]: https://www.cloudwego.io/docs/kitex/reference/transport_protocol_ttheader/
//! [Framed]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport
use std::future::Future;

use bytes::Bytes;
use linkedbytes::LinkedBytes;
use pilota::thrift::{DecodeError, EncodeError, TransportError};
Expand Down Expand Up @@ -73,7 +75,6 @@ pub trait ZeroCopyEncoder: Send + Sync + 'static {
/// [`ZeroCopyDecoder`] tries to decode a message without copying large data, so the [`Bytes`] in
/// the [`decode`] method is not designed to be reused, and the implementation can use
/// `Bytes::split_to` to get a [`Bytes`] and hand it to the user directly.
#[async_trait::async_trait]
pub trait ZeroCopyDecoder: Send + Sync + 'static {
/// If the outer decoder is framed, it can reads all the payload into a [`Bytes`] and
/// call this function for better performance.
Expand All @@ -85,15 +86,15 @@ pub trait ZeroCopyDecoder: Send + Sync + 'static {

/// The [`DefaultDecoder`] will always call `decode_async`, so the most outer decoder
/// must implement this function.
async fn decode_async<
fn decode_async<
Msg: Send + EntryMessage,
Cx: ThriftContext,
R: AsyncRead + Unpin + Send + Sync,
>(
&mut self,
cx: &mut Cx,
reader: &mut BufReader<R>,
) -> Result<Option<ThriftMessage<Msg>>, DecodeError>;
) -> impl Future<Output = Result<Option<ThriftMessage<Msg>>, DecodeError>> + Send;
}

/// [`MakeZeroCopyCodec`] is used to create a [`ZeroCopyEncoder`] and a [`ZeroCopyDecoder`].
Expand All @@ -112,7 +113,6 @@ pub struct DefaultEncoder<E, W> {
linked_bytes: LinkedBytes,
}

#[async_trait::async_trait]
impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
for DefaultEncoder<E, W>
{
Expand Down Expand Up @@ -184,7 +184,6 @@ pub struct DefaultDecoder<D, R> {
reader: BufReader<R>,
}

#[async_trait::async_trait]
impl<D: ZeroCopyDecoder, R: AsyncRead + Unpin + Send + Sync + 'static> Decoder
for DefaultDecoder<D, R>
{
Expand Down
1 change: 0 additions & 1 deletion volo-thrift/src/codec/default/thrift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ impl Default for ThriftCodec {
}
}

#[async_trait::async_trait]
impl ZeroCopyDecoder for ThriftCodec {
#[inline]
fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
Expand Down
1 change: 0 additions & 1 deletion volo-thrift/src/codec/default/ttheader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl<D: ZeroCopyDecoder> TTHeaderDecoder<D> {
/// https://www.cloudwego.io/docs/kitex/reference/transport_protocol_ttheader/
pub const HEADER_DETECT_LENGTH: usize = 6;

#[async_trait::async_trait]
impl<D> ZeroCopyDecoder for TTHeaderDecoder<D>
where
D: ZeroCopyDecoder,
Expand Down
12 changes: 6 additions & 6 deletions volo-thrift/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use tokio::io::{AsyncRead, AsyncWrite};

use crate::{context::ThriftContext, EntryMessage, ThriftMessage};
Expand All @@ -12,24 +14,22 @@ pub use default::DefaultMakeCodec;
/// Returning an Ok(None) indicates the EOF has been reached.
///
/// Note: [`Decoder`] should be designed to be ready for reuse.
#[async_trait::async_trait]
pub trait Decoder: Send + 'static {
async fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
fn decode<Msg: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
) -> Result<Option<ThriftMessage<Msg>>, crate::Error>;
) -> impl Future<Output = Result<Option<ThriftMessage<Msg>>, crate::Error>> + Send;
}

/// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data.
///
/// Note: [`Encoder`] should be designed to be ready for reuse.
#[async_trait::async_trait]
pub trait Encoder: Send + 'static {
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> Result<(), crate::Error>;
) -> impl Future<Output = Result<(), crate::Error>> + Send;
}

/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a
Expand Down