Skip to content

Commit

Permalink
add flush for event writer
Browse files Browse the repository at this point in the history
Signed-off-by: dellThejas <[email protected]>
  • Loading branch information
dellThejas committed Nov 22, 2021
1 parent 792b81d commit 1456b1e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
10 changes: 6 additions & 4 deletions integration_test/src/event_writer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,8 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory:
i += 1;
}
// the data should write successfully.
for rx in receivers {
let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot");
assert!(reply.is_ok());
}
writer.flush().await.expect("wait for writer events to flush");
assert!(writer.flush_cleared());

let segment_name = ScopedSegment {
scope: scope_name.clone(),
Expand Down Expand Up @@ -364,6 +362,10 @@ async fn test_write_correctness_with_routing_key(writer: &mut EventWriter, facto
}
i += 1;
}

writer.flush().await.expect("flush events");
assert!(writer.flush_cleared());

let first_segment = ScopedSegment {
scope: scope_name.clone(),
stream: stream_name.clone(),
Expand Down
49 changes: 49 additions & 0 deletions src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use crate::util::get_random_u128;
use pravega_client_channel::{create_channel, ChannelSender};
use pravega_client_shared::{ScopedStream, WriterId};

use std::collections::VecDeque;
use tokio::sync::oneshot;
use tracing::info_span;
use tracing_futures::Instrument;

// type EventHandle = oneshot::Receiver<Result<(), Error>>;

/// Write events exactly once to a given stream.
///
/// EventWriter spawns a `Reactor` that runs in the background for processing incoming events.
Expand Down Expand Up @@ -75,6 +78,7 @@ use tracing_futures::Instrument;
pub struct EventWriter {
writer_id: WriterId,
sender: ChannelSender<Incoming>,
event_handles: VecDeque<oneshot::Receiver<Result<(), Error>>>,
}

impl EventWriter {
Expand All @@ -93,6 +97,7 @@ impl EventWriter {
EventWriter {
writer_id,
sender: tx,
event_handles: VecDeque::new(),
}
}

Expand All @@ -118,9 +123,18 @@ impl EventWriter {
pub async fn write_event(&mut self, event: Vec<u8>) -> oneshot::Receiver<Result<(), Error>> {
let size = event.len();
let (tx, rx) = oneshot::channel();
let (tx_flush, rx_flush) = oneshot::channel();
let routing_info = RoutingInfo::RoutingKey(None);
let routing_info_flush = RoutingInfo::RoutingKey(None);
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) {
let append_event = Incoming::AppendEvent(pending_event);
if let Some(pending_event_flush) =
PendingEvent::with_header(routing_info_flush, Vec::new(), None, tx_flush)
{
let append_event_flush = Incoming::AppendEvent(pending_event_flush);
let flush_rec = self.writer_event_internal(append_event_flush, 0, rx_flush).await;
self.event_handles.push_back(flush_rec);
}
self.writer_event_internal(append_event, size, rx).await
} else {
rx
Expand All @@ -137,9 +151,18 @@ impl EventWriter {
) -> oneshot::Receiver<Result<(), Error>> {
let size = event.len();
let (tx, rx) = oneshot::channel();
let (tx_flush, rx_flush) = oneshot::channel();
let routing_info = RoutingInfo::RoutingKey(Some(routing_key));
let routing_info_flush = RoutingInfo::RoutingKey(None);
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) {
let append_event = Incoming::AppendEvent(pending_event);
if let Some(pending_event_flush) =
PendingEvent::with_header(routing_info_flush, Vec::new(), None, tx_flush)
{
let append_event_flush = Incoming::AppendEvent(pending_event_flush);
let flush_rec = self.writer_event_internal(append_event_flush, 0, rx_flush).await;
self.event_handles.push_back(flush_rec);
}
self.writer_event_internal(append_event, size, rx).await
} else {
rx
Expand All @@ -164,6 +187,32 @@ impl EventWriter {
rx
}
}

/// Flush data.
///
/// It will wait until all pending appends have acknowledgment.
///
/// # Examples
/// ```ignore
/// let mut byte_writer = client_factory.create_byte_writer(segment).await;
/// let payload = vec![0; 8];
/// let size = event_writer.write_event(&payload).await;
/// event_writer.flush().await;
/// ```
pub async fn flush(&mut self) -> Result<(), Error> {
while self.event_handles.front().is_some() {
let handle = self.event_handles.pop_front().expect("get first handle");
let event_result = handle.await.map_err(|e| Error::InternalFailure {
msg: format!("oneshot error {:?}", e),
})?;
event_result?;
}
Ok(())
}

pub fn flush_cleared(&self) -> bool {
self.event_handles.is_empty()
}
}

impl Drop for EventWriter {
Expand Down

0 comments on commit 1456b1e

Please sign in to comment.