Skip to content

Commit

Permalink
Optimise driver dispatch loop (#728)
Browse files Browse the repository at this point in the history
## Usage and product changes

We optimise the driver dispatch loop: previously, we could dispatch at
most 1000 serial messages per second, though in practice tokio would
sometimes sleep longer than expected and the query and transaction
latencies increased. This stems from Tokio's approximately millisecond
resolution. By moving the collect-dispatch loop to a dedicated thread,
we can now dispatch messages after microseconds of batching, improving
overall driver performance significantly.

## Implementation

We move the dispatch loop to a dedicated thread, which can sleep for
sub-millisecond resolution. We also make the background runtime a
multi-threaded runtime for scalability.
  • Loading branch information
flyingsilverfin authored Dec 19, 2024
1 parent de3d0ff commit eac383e
Showing 1 changed file with 30 additions and 40 deletions.
70 changes: 30 additions & 40 deletions rust/src/connection/network/transmitter/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
future::Future,
pin::Pin,
sync::{Arc, RwLock},
thread::sleep,
time::Duration,
};

Expand Down Expand Up @@ -241,65 +242,54 @@ impl TransactionTransmitter {
on_close: Default::default(),
callback_handler_sink,
};
tokio::spawn(Self::dispatch_loop(
queue_source,
request_sink,
collector.clone(),
on_close_callback_source,
shutdown_signal,
));
tokio::task::spawn_blocking({
let collector = collector.clone();
move || {
Self::dispatch_loop(queue_source, request_sink, collector, on_close_callback_source, shutdown_signal)
}
});
tokio::spawn(Self::listen_loop(response_source, collector, shutdown_sink));
}

async fn dispatch_loop(
fn dispatch_loop(
mut request_source: UnboundedReceiver<(TransactionRequest, Option<ResponseSink<TransactionResponse>>)>,
request_sink: UnboundedSender<transaction::Client>,
mut collector: ResponseCollector,
mut on_close_callback_source: UnboundedReceiver<Box<dyn FnOnce(Option<Error>) + Send + Sync>>,
mut shutdown_signal: UnboundedReceiver<()>,
) {
const MAX_GRPC_MESSAGE_LEN: usize = 1_000_000;
const DISPATCH_INTERVAL: Duration = Duration::from_millis(3);
const DISPATCH_INTERVAL: Duration = Duration::from_micros(50);

let mut request_buffer = TransactionRequestBuffer::default();
let mut next_dispatch = Instant::now() + DISPATCH_INTERVAL;
loop {
select! { biased;
_ = shutdown_signal.recv() => {
if !request_buffer.is_empty() {
request_sink.send(request_buffer.take()).unwrap();
}
break;
if let Ok(_) = shutdown_signal.try_recv() {
if !request_buffer.is_empty() {
request_sink.send(request_buffer.take()).unwrap();
}
_ = sleep_until(next_dispatch) => {
if !request_buffer.is_empty() {
request_sink.send(request_buffer.take()).unwrap();
}
next_dispatch = Instant::now() + DISPATCH_INTERVAL;
}
callback = on_close_callback_source.recv() => {
if let Some(callback) = callback {
collector.on_close.write().unwrap().push(callback)
}
break;
}
if let Ok(callback) = on_close_callback_source.try_recv() {
collector.on_close.write().unwrap().push(callback)
}
// sleep, then take all messages off the request queue and dispatch them
sleep(DISPATCH_INTERVAL);
while let Ok(recv) = request_source.try_recv() {
let (request, callback) = recv;
let request = request.into_proto();
if let Some(callback) = callback {
collector.register(request.req_id.clone().into(), callback);
}
recv = request_source.recv() => {
if let Some((request, callback)) = recv {
let request = request.into_proto();
if let Some(callback) = callback {
collector.register(request.req_id.clone().into(), callback);
}
if request_buffer.len() + request.encoded_len() > MAX_GRPC_MESSAGE_LEN {
request_sink.send(request_buffer.take()).unwrap();
}
request_buffer.push(request);
} else {
break;
}
if request_buffer.len() + request.encoded_len() > MAX_GRPC_MESSAGE_LEN {
request_sink.send(request_buffer.take()).unwrap();
}
request_buffer.push(request);
}
if !request_buffer.is_empty() {
request_sink.send(request_buffer.take()).unwrap();
}
}
}

async fn listen_loop(
mut grpc_source: Streaming<transaction::Server>,
collector: ResponseCollector,
Expand Down

0 comments on commit eac383e

Please sign in to comment.