diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index 5fe16a12..20243990 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -106,15 +106,13 @@ async fn run_tso( let _enter = span.enter(); debug!("got response: {:?}", resp); - let mut pending_requests = pending_requests.lock().await; - - // Wake up the sending future blocked by too many pending requests as we are consuming - // some of them here. - if pending_requests.len() == MAX_PENDING_COUNT { - sending_future_waker.wake(); + { + let mut pending_requests = pending_requests.lock().await; + allocate_timestamps(&resp, &mut pending_requests)?; } - allocate_timestamps(&resp, &mut pending_requests)?; + // Wake up the sending future blocked by too many pending requests or locked. + sending_future_waker.wake(); } // TODO: distinguish between unexpected stream termination and expected end of test info!("TSO stream terminated"); @@ -148,6 +146,7 @@ impl Stream for TsoRequestStream { { pending_requests } else { + this.self_waker.register(cx.waker()); return Poll::Pending; }; let mut requests = Vec::new();