From 46ffedabf0a4eebc17ebc1e2ce489b19f758b366 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Thu, 4 Jan 2024 09:21:09 +0100 Subject: [PATCH] Use poll_recv_many for receiving messages to publish to the connection --- Cargo.toml | 2 ++ async-nats/src/lib.rs | 24 ++++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8e7b289a..f2e06ae54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,5 @@ members = [ "nats-server" ] +[patch.crates-io] +tokio = { git = "https://github.com/tokio-rs/tokio.git" } diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index d1ce061c1..85e9cc015 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -443,6 +443,7 @@ impl ConnectionHandler { struct ProcessFut<'a> { handler: &'a mut ConnectionHandler, receiver: &'a mut mpsc::Receiver, + recv_buf: &'a mut Vec, } enum ExitReason { @@ -451,6 +452,8 @@ impl ConnectionHandler { } impl<'a> ProcessFut<'a> { + const RECV_CHUNK_SIZE: usize = 16; + #[cold] fn ping(&mut self) -> Poll { self.handler.pending_pings += 1; @@ -516,13 +519,24 @@ impl ConnectionHandler { let mut made_progress = true; loop { while !self.handler.connection.is_write_buf_full() { - match self.receiver.poll_recv(cx) { + debug_assert!(self.recv_buf.is_empty()); + + let Self { + recv_buf, + handler, + receiver, + } = &mut *self; + match receiver.poll_recv_many(cx, recv_buf, Self::RECV_CHUNK_SIZE) { Poll::Pending => break, - Poll::Ready(Some(cmd)) => { + Poll::Ready(1..) => { made_progress = true; - self.handler.handle_command(cmd); + + for cmd in recv_buf.drain(..) { + handler.handle_command(cmd); + } } - Poll::Ready(None) => return Poll::Ready(ExitReason::Closed), + // TODO: replace `_` with `0` after bumping MSRV to 1.75 + Poll::Ready(_) => return Poll::Ready(ExitReason::Closed), } } @@ -575,10 +589,12 @@ impl ConnectionHandler { } } + let mut recv_buf = Vec::with_capacity(ProcessFut::RECV_CHUNK_SIZE); loop { let process = ProcessFut { handler: self, receiver, + recv_buf: &mut recv_buf, }; match process.await { ExitReason::Disconnected(err) => {