From 7ba5f5502d723c614c7f50e5b8f32bb7a51ae3eb Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 6 Nov 2023 19:06:51 +0800 Subject: [PATCH] wake Signed-off-by: Ping Yu --- src/pd/timestamp.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index ac99e052..a1cc7fbd 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -98,15 +98,13 @@ async fn run_tso( let mut responses = pd_client.tso(request_stream).await?.into_inner(); while let Some(Ok(resp)) = responses.next().await { - let mut pending_requests = pending_requests.lock().await; - - // Wake up the sending future blocked by too many pending requests as we are consuming - // some of them here. - if pending_requests.len() == MAX_PENDING_COUNT { - sending_future_waker.wake(); + { + let mut pending_requests = pending_requests.lock().await; + allocate_timestamps(&resp, &mut pending_requests)?; } - allocate_timestamps(&resp, &mut pending_requests)?; + // Wake up the sending future blocked by too many pending requests or locked. + sending_future_waker.wake(); } // TODO: distinguish between unexpected stream termination and expected end of test info!("TSO stream terminated"); @@ -139,6 +137,7 @@ impl Stream for TsoRequestStream { { pending_requests } else { + this.self_waker.register(cx.waker()); return Poll::Pending; }; let mut requests = Vec::new();