Skip to content

Commit

Permalink
📝 use BytesMut for frame parse
Browse files Browse the repository at this point in the history
  • Loading branch information
Xudong-Huang committed Mar 1, 2024
1 parent 4cbaf7e commit 857fe74
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 21 deletions.
4 changes: 1 addition & 3 deletions examples/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ fn main() {
Ok(n) => assert_eq!(n, j + 1),
}
}
// println!("thread done, id={_i}");
});
vec.push(h);
}

for (_i, h) in vec.into_iter().enumerate() {
for h in vec {
h.join().unwrap();
// println!("thread {_i} joined");
}

let dur = now.elapsed();
Expand Down
28 changes: 18 additions & 10 deletions src/conetty/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::{self, Cursor, ErrorKind, Read, Write};

use crate::{Error, WireError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use bytes::{BufMut, Bytes, BytesMut};

// Frame layout
// id(u64) + len(u64) + payload([u8; len])
Expand All @@ -22,12 +23,12 @@ pub struct Frame {
/// frame id, req and rsp has the same id
pub id: u64,
/// payload data
data: Vec<u8>,
data: Bytes,
}

impl Frame {
/// decode a frame from the reader
pub fn decode_from<R: Read>(r: &mut R) -> io::Result<Self> {
pub fn decode_from<R: Read>(r: &mut R, buf: &mut BytesMut) -> io::Result<Self> {
let id = r.read_u64::<BigEndian>()?;
info!("decode id = {:?}", id);

Expand All @@ -40,14 +41,21 @@ impl Frame {
return Err(io::Error::new(ErrorKind::InvalidInput, s));
}

let mut data = vec![0; len as usize];
r.read_exact(&mut data[16..])?;
let buf_len = len as usize;

// blow can be skipped, we don't need them in the buffer
let mut cursor = Cursor::new(data);
cursor.write_u64::<BigEndian>(id).unwrap();
cursor.write_u64::<BigEndian>(len - 16).unwrap();
let data = cursor.into_inner();
buf.reserve(len as usize);
let data: &mut [u8] = unsafe { std::mem::transmute(buf.chunk_mut()) };

r.read_exact(&mut data[16..buf_len])?;
unsafe { buf.advance_mut(buf_len) };
let mut data = buf.split_to(buf_len);

unsafe { data.set_len(0) };
data.put_u64(id);
data.put_u64(len - 16);
unsafe { data.set_len(buf_len) };

let data = data.freeze();

Ok(Frame { id, data })
}
Expand Down Expand Up @@ -75,7 +83,7 @@ impl Frame {
pub fn decode_rsp(&self) -> Result<&[u8], Error> {
use Error::*;

let mut r = Cursor::new(&self.data);
let mut r = Cursor::new(&self.data[..]);
// skip the frame head
r.set_position(16);

Expand Down
4 changes: 3 additions & 1 deletion src/conetty/multiplex_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::queued_writer::QueuedWriter;
use super::stream_ext::StreamExt;
use super::Client;

use bytes::BytesMut;
use may::io::SplitWriter;
use may::{coroutine, go};
use may_waiter::TokenWaiter;
Expand Down Expand Up @@ -50,8 +51,9 @@ impl<S: StreamExt> MultiplexClient<S> {
let listener = go!(
coroutine::Builder::new().name("MultiPlexClientListener".to_owned()),
move || {
let mut buf = BytesMut::with_capacity(1024 * 32);
loop {
let rsp_frame = match Frame::decode_from(&mut r_stream) {
let rsp_frame = match Frame::decode_from(&mut r_stream, &mut buf) {
Ok(r) => r,
Err(ref e) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
Expand Down
12 changes: 7 additions & 5 deletions src/conetty/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::frame::{Frame, RspBuf};
use super::queued_writer::QueuedWriter;
use crate::Server;

use bytes::BytesMut;
use co_managed::Manager;
use may::net::{TcpListener, UdpSocket};
#[cfg(unix)]
Expand Down Expand Up @@ -65,13 +66,14 @@ pub trait UdpServer: Server {
// the write half need to be protected by mutex
// for that coroutine io obj can't shared safely
let sock = Arc::new(Mutex::new(sock));
let mut body_buf = BytesMut::with_capacity(1024 * 32);
loop {
// each udp packet should be less than 1024 bytes
let (len, addr) = t!(sock1.recv_from(&mut buf));
info!("recv_from: len={:?} addr={:?}", len, addr);

// if we failed to deserialize the request frame, just continue
let req = t!(Frame::decode_from(&mut Cursor::new(&buf)));
let req = t!(Frame::decode_from(&mut Cursor::new(&buf), &mut body_buf));
let sock = sock.clone();
let server = server.clone();
// let mutex = mutex.clone();
Expand Down Expand Up @@ -117,9 +119,9 @@ pub trait TcpServer: Server {
let mut rs = BufReader::new(rs);
// the write half of the stream
let ws = Arc::new(QueuedWriter::new(stream));

let mut buf = BytesMut::with_capacity(1024 * 32);
loop {
let req = match Frame::decode_from(&mut rs) {
let req = match Frame::decode_from(&mut rs, &mut buf) {
Ok(r) => r,
Err(ref e) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
Expand Down Expand Up @@ -182,9 +184,9 @@ pub trait UdsServer: Server {
// the write half need to be protected by mutex
// for that coroutine io obj can't shared safely
let ws = Arc::new(QueuedWriter::new(stream));

let mut buf = BytesMut::with_capacity(1024 * 32);
loop {
let req = match Frame::decode_from(&mut rs) {
let req = match Frame::decode_from(&mut rs, &mut buf) {
Ok(r) => r,
Err(ref e) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
Expand Down
6 changes: 5 additions & 1 deletion src/conetty/stream_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::io::{self, BufReader};
use std::time::Duration;

use bytes::BytesMut;

use super::errors::Error;
use super::frame::{Frame, ReqBuf};
use super::stream_ext::StreamExt;
Expand Down Expand Up @@ -42,10 +44,12 @@ impl<S: StreamExt> StreamClient<S> {
// encode the request
self.stream.get_mut().write_all(&(req.finish(id)))?;

let mut buf = BytesMut::with_capacity(1024 * 32);

// read the response
loop {
// deserialize the rsp
let rsp_frame = Frame::decode_from(&mut self.stream)
let rsp_frame = Frame::decode_from(&mut self.stream, &mut buf)
.map_err(|e| Error::ClientDeserialize(e.to_string()))?;

// discard the rsp that is is not belong to us
Expand Down
5 changes: 4 additions & 1 deletion src/conetty/udp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use super::errors::Error;
use super::frame::{Frame, ReqBuf};

use bytes::BytesMut;
use may::net::UdpSocket;

/// Udp Client
Expand Down Expand Up @@ -52,12 +53,14 @@ impl UdpClient {
// send the data to server
self.sock.send(&(req.finish(id))).map_err(Error::from)?;

let mut buf = BytesMut::with_capacity(1024 * 32);

// read the response
loop {
self.sock.recv(&mut self.buf).map_err(Error::from)?;

// deserialize the rsp
let rsp_frame = Frame::decode_from(&mut Cursor::new(&self.buf))
let rsp_frame = Frame::decode_from(&mut Cursor::new(&self.buf), &mut buf)
.map_err(|e| Error::ClientDeserialize(e.to_string()))?;

// discard the rsp that is is not belong to us
Expand Down

0 comments on commit 857fe74

Please sign in to comment.