From 857fe7408abc8f121ba47958e61f51448fef4185 Mon Sep 17 00:00:00 2001 From: Xudong Huang Date: Fri, 1 Mar 2024 22:31:36 +0800 Subject: [PATCH] :pencil: use BytesMut for frame parse --- examples/throughput.rs | 4 +--- src/conetty/frame.rs | 28 ++++++++++++++++++---------- src/conetty/multiplex_client.rs | 4 +++- src/conetty/server.rs | 12 +++++++----- src/conetty/stream_client.rs | 6 +++++- src/conetty/udp_client.rs | 5 ++++- 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/examples/throughput.rs b/examples/throughput.rs index 12bf61b..e79b278 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -45,14 +45,12 @@ fn main() { Ok(n) => assert_eq!(n, j + 1), } } - // println!("thread done, id={_i}"); }); vec.push(h); } - for (_i, h) in vec.into_iter().enumerate() { + for h in vec { h.join().unwrap(); - // println!("thread {_i} joined"); } let dur = now.elapsed(); diff --git a/src/conetty/frame.rs b/src/conetty/frame.rs index e8531f7..322560b 100644 --- a/src/conetty/frame.rs +++ b/src/conetty/frame.rs @@ -2,6 +2,7 @@ use std::io::{self, Cursor, ErrorKind, Read, Write}; use crate::{Error, WireError}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use bytes::{BufMut, Bytes, BytesMut}; // Frame layout // id(u64) + len(u64) + payload([u8; len]) @@ -22,12 +23,12 @@ pub struct Frame { /// frame id, req and rsp has the same id pub id: u64, /// payload data - data: Vec, + data: Bytes, } impl Frame { /// decode a frame from the reader - pub fn decode_from(r: &mut R) -> io::Result { + pub fn decode_from(r: &mut R, buf: &mut BytesMut) -> io::Result { let id = r.read_u64::()?; info!("decode id = {:?}", id); @@ -40,14 +41,21 @@ impl Frame { return Err(io::Error::new(ErrorKind::InvalidInput, s)); } - let mut data = vec![0; len as usize]; - r.read_exact(&mut data[16..])?; + let buf_len = len as usize; - // blow can be skipped, we don't need them in the buffer - let mut cursor = Cursor::new(data); - cursor.write_u64::(id).unwrap(); - cursor.write_u64::(len - 16).unwrap(); - let data = cursor.into_inner(); + buf.reserve(len as usize); + let data: &mut [u8] = unsafe { std::mem::transmute(buf.chunk_mut()) }; + + r.read_exact(&mut data[16..buf_len])?; + unsafe { buf.advance_mut(buf_len) }; + let mut data = buf.split_to(buf_len); + + unsafe { data.set_len(0) }; + data.put_u64(id); + data.put_u64(len - 16); + unsafe { data.set_len(buf_len) }; + + let data = data.freeze(); Ok(Frame { id, data }) } @@ -75,7 +83,7 @@ impl Frame { pub fn decode_rsp(&self) -> Result<&[u8], Error> { use Error::*; - let mut r = Cursor::new(&self.data); + let mut r = Cursor::new(&self.data[..]); // skip the frame head r.set_position(16); diff --git a/src/conetty/multiplex_client.rs b/src/conetty/multiplex_client.rs index 0b5886f..a1b6d87 100644 --- a/src/conetty/multiplex_client.rs +++ b/src/conetty/multiplex_client.rs @@ -8,6 +8,7 @@ use super::queued_writer::QueuedWriter; use super::stream_ext::StreamExt; use super::Client; +use bytes::BytesMut; use may::io::SplitWriter; use may::{coroutine, go}; use may_waiter::TokenWaiter; @@ -50,8 +51,9 @@ impl MultiplexClient { let listener = go!( coroutine::Builder::new().name("MultiPlexClientListener".to_owned()), move || { + let mut buf = BytesMut::with_capacity(1024 * 32); loop { - let rsp_frame = match Frame::decode_from(&mut r_stream) { + let rsp_frame = match Frame::decode_from(&mut r_stream, &mut buf) { Ok(r) => r, Err(ref e) => { if e.kind() == io::ErrorKind::UnexpectedEof { diff --git a/src/conetty/server.rs b/src/conetty/server.rs index e0a4cb1..091da54 100644 --- a/src/conetty/server.rs +++ b/src/conetty/server.rs @@ -8,6 +8,7 @@ use super::frame::{Frame, RspBuf}; use super::queued_writer::QueuedWriter; use crate::Server; +use bytes::BytesMut; use co_managed::Manager; use may::net::{TcpListener, UdpSocket}; #[cfg(unix)] @@ -65,13 +66,14 @@ pub trait UdpServer: Server { // the write half need to be protected by mutex // for that coroutine io obj can't shared safely let sock = Arc::new(Mutex::new(sock)); + let mut body_buf = BytesMut::with_capacity(1024 * 32); loop { // each udp packet should be less than 1024 bytes let (len, addr) = t!(sock1.recv_from(&mut buf)); info!("recv_from: len={:?} addr={:?}", len, addr); // if we failed to deserialize the request frame, just continue - let req = t!(Frame::decode_from(&mut Cursor::new(&buf))); + let req = t!(Frame::decode_from(&mut Cursor::new(&buf), &mut body_buf)); let sock = sock.clone(); let server = server.clone(); // let mutex = mutex.clone(); @@ -117,9 +119,9 @@ pub trait TcpServer: Server { let mut rs = BufReader::new(rs); // the write half of the stream let ws = Arc::new(QueuedWriter::new(stream)); - + let mut buf = BytesMut::with_capacity(1024 * 32); loop { - let req = match Frame::decode_from(&mut rs) { + let req = match Frame::decode_from(&mut rs, &mut buf) { Ok(r) => r, Err(ref e) => { if e.kind() == io::ErrorKind::UnexpectedEof { @@ -182,9 +184,9 @@ pub trait UdsServer: Server { // the write half need to be protected by mutex // for that coroutine io obj can't shared safely let ws = Arc::new(QueuedWriter::new(stream)); - + let mut buf = BytesMut::with_capacity(1024 * 32); loop { - let req = match Frame::decode_from(&mut rs) { + let req = match Frame::decode_from(&mut rs, &mut buf) { Ok(r) => r, Err(ref e) => { if e.kind() == io::ErrorKind::UnexpectedEof { diff --git a/src/conetty/stream_client.rs b/src/conetty/stream_client.rs index 508e530..fd787d2 100644 --- a/src/conetty/stream_client.rs +++ b/src/conetty/stream_client.rs @@ -1,6 +1,8 @@ use std::io::{self, BufReader}; use std::time::Duration; +use bytes::BytesMut; + use super::errors::Error; use super::frame::{Frame, ReqBuf}; use super::stream_ext::StreamExt; @@ -42,10 +44,12 @@ impl StreamClient { // encode the request self.stream.get_mut().write_all(&(req.finish(id)))?; + let mut buf = BytesMut::with_capacity(1024 * 32); + // read the response loop { // deserialize the rsp - let rsp_frame = Frame::decode_from(&mut self.stream) + let rsp_frame = Frame::decode_from(&mut self.stream, &mut buf) .map_err(|e| Error::ClientDeserialize(e.to_string()))?; // discard the rsp that is is not belong to us diff --git a/src/conetty/udp_client.rs b/src/conetty/udp_client.rs index 1117404..c07ba34 100644 --- a/src/conetty/udp_client.rs +++ b/src/conetty/udp_client.rs @@ -5,6 +5,7 @@ use std::time::Duration; use super::errors::Error; use super::frame::{Frame, ReqBuf}; +use bytes::BytesMut; use may::net::UdpSocket; /// Udp Client @@ -52,12 +53,14 @@ impl UdpClient { // send the data to server self.sock.send(&(req.finish(id))).map_err(Error::from)?; + let mut buf = BytesMut::with_capacity(1024 * 32); + // read the response loop { self.sock.recv(&mut self.buf).map_err(Error::from)?; // deserialize the rsp - let rsp_frame = Frame::decode_from(&mut Cursor::new(&self.buf)) + let rsp_frame = Frame::decode_from(&mut Cursor::new(&self.buf), &mut buf) .map_err(|e| Error::ClientDeserialize(e.to_string()))?; // discard the rsp that is is not belong to us