Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 340: Fix event writer implementation and add flush #341

Merged
merged 12 commits into from
Jan 7, 2022
8 changes: 4 additions & 4 deletions integration_test/src/event_writer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,7 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory:
i += 1;
}
// the data should write successfully.
for rx in receivers {
let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot");
assert!(reply.is_ok());
}
assert!(writer.flush().await.is_ok());

let segment_name = ScopedSegment {
scope: scope_name.clone(),
Expand Down Expand Up @@ -364,6 +361,9 @@ async fn test_write_correctness_with_routing_key(writer: &mut EventWriter, facto
}
i += 1;
}

assert!(writer.flush().await.is_ok());

let first_segment = ScopedSegment {
scope: scope_name.clone(),
stream: stream_name.clone(),
Expand Down
6 changes: 4 additions & 2 deletions integration_test/src/reader_group_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ fn test_read_offline_with_offset(client_factory: &ClientFactoryAsync) {
event.value.as_slice(),
"Corrupted event read"
);
let offset_map = HashMap::from([(
let mut offset_map = HashMap::new();

offset_map.insert(
slice.meta.scoped_segment.clone(),
event.offset_in_segment + EVENT_SIZE as i64 + 8,
)]);
);

// Segment slice is dropped here and it will update the RG state with the offsets.
// Now mark the reader offline
Expand Down
65 changes: 60 additions & 5 deletions src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use crate::util::get_random_u128;
use pravega_client_channel::{create_channel, ChannelSender};
use pravega_client_shared::{ScopedStream, WriterId};

use std::collections::VecDeque;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tracing::info_span;
use tracing_futures::Instrument;

Expand Down Expand Up @@ -75,6 +77,7 @@ use tracing_futures::Instrument;
pub struct EventWriter {
writer_id: WriterId,
sender: ChannelSender<Incoming>,
event_handles: VecDeque<oneshot::Receiver<Result<(), Error>>>,
}

impl EventWriter {
Expand All @@ -93,6 +96,7 @@ impl EventWriter {
EventWriter {
writer_id,
sender: tx,
event_handles: VecDeque::new(),
tkaitchuck marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -118,10 +122,13 @@ impl EventWriter {
pub async fn write_event(&mut self, event: Vec<u8>) -> oneshot::Receiver<Result<(), Error>> {
let size = event.len();
let (tx, rx) = oneshot::channel();
let (tx_flush, rx_flush) = oneshot::channel();
let routing_info = RoutingInfo::RoutingKey(None);
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) {
if let Some(pending_event) =
PendingEvent::with_header_flush(routing_info, event, None, tx, Some(tx_flush))
{
let append_event = Incoming::AppendEvent(pending_event);
self.writer_event_internal(append_event, size, rx).await
self.writer_event_internal(append_event, size, rx, rx_flush).await
} else {
rx
}
Expand All @@ -137,10 +144,13 @@ impl EventWriter {
) -> oneshot::Receiver<Result<(), Error>> {
let size = event.len();
let (tx, rx) = oneshot::channel();
let (tx_flush, rx_flush) = oneshot::channel();
let routing_info = RoutingInfo::RoutingKey(Some(routing_key));
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) {
if let Some(pending_event) =
PendingEvent::with_header_flush(routing_info, event, None, tx, Some(tx_flush))
{
let append_event = Incoming::AppendEvent(pending_event);
self.writer_event_internal(append_event, size, rx).await
self.writer_event_internal(append_event, size, rx, rx_flush).await
} else {
rx
}
Expand All @@ -151,8 +161,14 @@ impl EventWriter {
append_event: Incoming,
size: usize,
rx: oneshot::Receiver<Result<(), Error>>,
rx_flush: oneshot::Receiver<Result<(), Error>>,
) -> oneshot::Receiver<Result<(), Error>> {
if let Err(_e) = self.sender.send((append_event, size)).await {
if let Err(err) = self.clear_initial_complete_events() {
// fail fast upon checking previous write events
let (tx_error, rx_error) = oneshot::channel();
tx_error.send(Err(err)).expect("send error");
rx_error
} else if let Err(_e) = self.sender.send((append_event, size)).await {
let (tx_error, rx_error) = oneshot::channel();
tx_error
.send(Err(Error::InternalFailure {
Expand All @@ -161,9 +177,48 @@ impl EventWriter {
.expect("send error");
rx_error
} else {
self.event_handles.push_back(rx_flush);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to support the case where the caller never calls flush without leaking memory. Currently the flush call removes items from event_handles but if it is not called this code will OOM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to periodically call flush to clear this? Or upon every write, call flush on all previous writes? Or clean up event_handles while doing every write? (It would need to go through every write call to check if its processed or not)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. On a write call, clear out any completed ones stopping at the first uncompleted one.

rx
}
}

/// Flush data.
///
/// It will wait until all pending appends have acknowledgment.
pub async fn flush(&mut self) -> Result<(), Error> {
while let Some(receiver) = self.event_handles.pop_front() {
let recv = receiver.await.map_err(|e| Error::InternalFailure {
msg: format!("oneshot error {:?}", e),
})?;

recv?;
}
Ok(())
}

/// Clear initial completed events from flush queue.
fn clear_initial_complete_events(&mut self) -> Result<(), Error> {
while let Some(mut receiver) = self.event_handles.pop_front() {
let try_recv = receiver.try_recv();

match try_recv {
Err(TryRecvError::Empty) => {
self.event_handles.push_front(receiver);
break;
}
Err(TryRecvError::Closed) => {
let res = try_recv.map_err(|e| Error::InternalFailure {
msg: format!("Trying to flush a closed channel {:?}", e),
})?;

return res;
}
Ok(_) => {}
}
}

Ok(())
}
}

impl Drop for EventWriter {
Expand Down
41 changes: 38 additions & 3 deletions src/segment/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub(crate) struct PendingEvent {
pub(crate) data: Vec<u8>,
pub(crate) conditional_offset: Option<i64>,
pub(crate) oneshot_sender: oneshot::Sender<Result<(), Error>>,
pub(crate) flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush_oneshot_sender is never used anywhere; we would need to invoke a send() with the write status on it.

}

impl PendingEvent {
Expand All @@ -59,6 +60,7 @@ impl PendingEvent {
data: Vec<u8>,
conditional_offset: Option<i64>,
oneshot_sender: oneshot::Sender<Result<(), Error>>,
flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
) -> Option<Self> {
if data.len() > PendingEvent::MAX_WRITE_SIZE {
warn!(
Expand All @@ -82,19 +84,27 @@ impl PendingEvent {
data,
conditional_offset,
oneshot_sender,
flush_oneshot_sender,
})
}
}

pub(crate) fn with_header(
pub(crate) fn with_header_flush(
routing_info: RoutingInfo,
data: Vec<u8>,
conditional_offset: Option<i64>,
oneshot_sender: oneshot::Sender<Result<(), Error>>,
flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
) -> Option<PendingEvent> {
let cmd = EventCommand { data };
match cmd.write_fields() {
Ok(data) => PendingEvent::new(routing_info, data, conditional_offset, oneshot_sender),
Ok(data) => PendingEvent::new(
routing_info,
data,
conditional_offset,
oneshot_sender,
flush_oneshot_sender,
),
Err(e) => {
warn!("failed to serialize event to event command, sending this error back to caller");
oneshot_sender
Expand All @@ -107,13 +117,38 @@ impl PendingEvent {
}
}

pub(crate) fn with_header(
routing_info: RoutingInfo,
data: Vec<u8>,
conditional_offset: Option<i64>,
oneshot_sender: oneshot::Sender<Result<(), Error>>,
) -> Option<PendingEvent> {
PendingEvent::with_header_flush(routing_info, data, conditional_offset, oneshot_sender, None)
}

pub(crate) fn without_header_flush(
routing_info: RoutingInfo,
data: Vec<u8>,
conditional_offset: Option<i64>,
oneshot_sender: oneshot::Sender<Result<(), Error>>,
flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>,
) -> Option<PendingEvent> {
PendingEvent::new(
routing_info,
data,
conditional_offset,
oneshot_sender,
flush_oneshot_sender,
)
}

pub(crate) fn without_header(
routing_info: RoutingInfo,
data: Vec<u8>,
conditional_offset: Option<i64>,
oneshot_sender: oneshot::Sender<Result<(), Error>>,
) -> Option<PendingEvent> {
PendingEvent::new(routing_info, data, conditional_offset, oneshot_sender)
PendingEvent::without_header_flush(routing_info, data, conditional_offset, oneshot_sender, None)
}

pub(crate) fn is_empty(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/segment/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ pub(crate) mod test {
) -> oneshot::Receiver<Result<(), Error>> {
let (oneshot_sender, oneshot_receiver) = tokio::sync::oneshot::channel();
let routing_info = RoutingInfo::RoutingKey(Some("routing_key".to_string()));
let event = PendingEvent::new(routing_info, vec![1; size], None, oneshot_sender)
let event = PendingEvent::new(routing_info, vec![1; size], None, oneshot_sender, None)
.expect("create pending event");
sender.send((Incoming::AppendEvent(event), size)).await.unwrap();
oneshot_receiver
Expand Down
9 changes: 9 additions & 0 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ impl SegmentWriter {
acked.event_id
);
}
if let Some(flush_sender) = acked.event.flush_oneshot_sender {
if flush_sender.send(Result::Ok(())).is_err() {
info!(
"failed to send ack back to caller using oneshot due to Receiver dropped: event id {:?}",
acked.event_id
);
}
}

// ack up to event id
if acked.event_id == event_id {
Expand Down Expand Up @@ -784,6 +792,7 @@ pub(crate) mod test {
vec![1; size],
offset,
oneshot_sender,
None,
)
.expect("create pending event");
sender.send((Incoming::AppendEvent(event), size)).await.unwrap();
Expand Down