Skip to content

Commit

Permalink
clear completed events in beginning of queue
Browse files Browse the repository at this point in the history
Signed-off-by: dellThejas <[email protected]>
  • Loading branch information
dellThejas committed Jan 6, 2022
1 parent 92a712a commit d64601f
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pravega_client_shared::{ScopedStream, WriterId};

use std::collections::VecDeque;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tracing::info_span;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -171,6 +172,7 @@ impl EventWriter {
.expect("send error");
rx_error
} else {
self.clear_initial_complete_events();
self.event_handles.push_back(rx_flush);
rx
}
Expand All @@ -181,6 +183,7 @@ impl EventWriter {
/// It will wait until all pending appends have acknowledgment.
/// ```
pub async fn flush(&mut self) -> Result<(), Error> {
self.clear_initial_complete_events();
while let Some(receiver) = self.event_handles.pop_front() {
let recv = receiver.await.map_err(|e| Error::InternalFailure {
msg: format!("oneshot error {:?}", e),
Expand All @@ -190,6 +193,22 @@ impl EventWriter {
}
Ok(())
}

/// Clear initial completed events from flush queue.
fn clear_initial_complete_events(&mut self) {
while let Some(mut receiver) = self.event_handles.pop_front() {
let try_recv = receiver.try_recv();

match try_recv {
Err(TryRecvError::Empty) => {
self.event_handles.push_front(receiver);
break;
}
Err(TryRecvError::Closed) => {}
_ => {}
}
}
}
}

impl Drop for EventWriter {
Expand Down

0 comments on commit d64601f

Please sign in to comment.