Skip to content

Commit

Permalink
wake
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Nov 6, 2023
1 parent aabe8e5 commit 7ba5f55
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/pd/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,13 @@ async fn run_tso(
let mut responses = pd_client.tso(request_stream).await?.into_inner();

while let Some(Ok(resp)) = responses.next().await {
let mut pending_requests = pending_requests.lock().await;

// Wake up the sending future blocked by too many pending requests as we are consuming
// some of them here.
if pending_requests.len() == MAX_PENDING_COUNT {
sending_future_waker.wake();
{
let mut pending_requests = pending_requests.lock().await;
allocate_timestamps(&resp, &mut pending_requests)?;
}

allocate_timestamps(&resp, &mut pending_requests)?;
// Wake up the sending future blocked by too many pending requests or locked.
sending_future_waker.wake();
}
// TODO: distinguish between unexpected stream termination and expected end of test
info!("TSO stream terminated");
Expand Down Expand Up @@ -139,6 +137,7 @@ impl Stream for TsoRequestStream {
{
pending_requests
} else {
this.self_waker.register(cx.waker());
return Poll::Pending;
};
let mut requests = Vec::new();
Expand Down

0 comments on commit 7ba5f55

Please sign in to comment.