diff --git a/integration_test/src/event_writer_tests.rs b/integration_test/src/event_writer_tests.rs index 3fdf4e098..2902264b0 100644 --- a/integration_test/src/event_writer_tests.rs +++ b/integration_test/src/event_writer_tests.rs @@ -271,10 +271,7 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory: i += 1; } // the data should write successfully. - for rx in receivers { - let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot"); - assert!(reply.is_ok()); - } + assert!(writer.flush().await.is_ok()); let segment_name = ScopedSegment { scope: scope_name.clone(), @@ -364,6 +361,9 @@ async fn test_write_correctness_with_routing_key(writer: &mut EventWriter, facto } i += 1; } + + assert!(writer.flush().await.is_ok()); + let first_segment = ScopedSegment { scope: scope_name.clone(), stream: stream_name.clone(), diff --git a/integration_test/src/reader_group_tests.rs b/integration_test/src/reader_group_tests.rs index cd13fd9a1..2c380d612 100644 --- a/integration_test/src/reader_group_tests.rs +++ b/integration_test/src/reader_group_tests.rs @@ -165,10 +165,12 @@ fn test_read_offline_with_offset(client_factory: &ClientFactoryAsync) { event.value.as_slice(), "Corrupted event read" ); - let offset_map = HashMap::from([( + let mut offset_map = HashMap::new(); + + offset_map.insert( slice.meta.scoped_segment.clone(), event.offset_in_segment + EVENT_SIZE as i64 + 8, - )]); + ); // Segment slice is dropped here and it will update the RG state with the offsets. // Now mark the reader offline diff --git a/src/event/writer.rs b/src/event/writer.rs index f51f3171d..0ccdf56bd 100644 --- a/src/event/writer.rs +++ b/src/event/writer.rs @@ -17,7 +17,9 @@ use crate::util::get_random_u128; use pravega_client_channel::{create_channel, ChannelSender}; use pravega_client_shared::{ScopedStream, WriterId}; +use std::collections::VecDeque; use tokio::sync::oneshot; +use tokio::sync::oneshot::error::TryRecvError; use tracing::info_span; use tracing_futures::Instrument; @@ -75,6 +77,7 @@ use tracing_futures::Instrument; pub struct EventWriter { writer_id: WriterId, sender: ChannelSender, + event_handles: VecDeque>>, } impl EventWriter { @@ -93,6 +96,7 @@ impl EventWriter { EventWriter { writer_id, sender: tx, + event_handles: VecDeque::new(), } } @@ -118,10 +122,13 @@ impl EventWriter { pub async fn write_event(&mut self, event: Vec) -> oneshot::Receiver> { let size = event.len(); let (tx, rx) = oneshot::channel(); + let (tx_flush, rx_flush) = oneshot::channel(); let routing_info = RoutingInfo::RoutingKey(None); - if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) { + if let Some(pending_event) = + PendingEvent::with_header_flush(routing_info, event, None, tx, Some(tx_flush)) + { let append_event = Incoming::AppendEvent(pending_event); - self.writer_event_internal(append_event, size, rx).await + self.writer_event_internal(append_event, size, rx, rx_flush).await } else { rx } @@ -137,10 +144,13 @@ impl EventWriter { ) -> oneshot::Receiver> { let size = event.len(); let (tx, rx) = oneshot::channel(); + let (tx_flush, rx_flush) = oneshot::channel(); let routing_info = RoutingInfo::RoutingKey(Some(routing_key)); - if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) { + if let Some(pending_event) = + PendingEvent::with_header_flush(routing_info, event, None, tx, Some(tx_flush)) + { let append_event = Incoming::AppendEvent(pending_event); - self.writer_event_internal(append_event, size, rx).await + self.writer_event_internal(append_event, size, rx, rx_flush).await } else { rx } @@ -151,8 +161,14 @@ impl EventWriter { append_event: Incoming, size: usize, rx: oneshot::Receiver>, + rx_flush: oneshot::Receiver>, ) -> oneshot::Receiver> { - if let Err(_e) = self.sender.send((append_event, size)).await { + if let Err(err) = self.clear_initial_complete_events() { + // fail fast upon checking previous write events + let (tx_error, rx_error) = oneshot::channel(); + tx_error.send(Err(err)).expect("send error"); + rx_error + } else if let Err(_e) = self.sender.send((append_event, size)).await { let (tx_error, rx_error) = oneshot::channel(); tx_error .send(Err(Error::InternalFailure { @@ -161,9 +177,48 @@ impl EventWriter { .expect("send error"); rx_error } else { + self.event_handles.push_back(rx_flush); rx } } + + /// Flush data. + /// + /// It will wait until all pending appends have acknowledgment. + pub async fn flush(&mut self) -> Result<(), Error> { + while let Some(receiver) = self.event_handles.pop_front() { + let recv = receiver.await.map_err(|e| Error::InternalFailure { + msg: format!("oneshot error {:?}", e), + })?; + + recv?; + } + Ok(()) + } + + /// Clear initial completed events from flush queue. + fn clear_initial_complete_events(&mut self) -> Result<(), Error> { + while let Some(mut receiver) = self.event_handles.pop_front() { + let try_recv = receiver.try_recv(); + + match try_recv { + Err(TryRecvError::Empty) => { + self.event_handles.push_front(receiver); + break; + } + Err(TryRecvError::Closed) => { + let res = try_recv.map_err(|e| Error::InternalFailure { + msg: format!("Trying to flush a closed channel {:?}", e), + })?; + + return res; + } + Ok(_) => {} + } + } + + Ok(()) + } } impl Drop for EventWriter { diff --git a/src/segment/event.rs b/src/segment/event.rs index 1adbecb06..6576dbc4b 100644 --- a/src/segment/event.rs +++ b/src/segment/event.rs @@ -50,6 +50,7 @@ pub(crate) struct PendingEvent { pub(crate) data: Vec, pub(crate) conditional_offset: Option, pub(crate) oneshot_sender: oneshot::Sender>, + pub(crate) flush_oneshot_sender: Option>>, } impl PendingEvent { @@ -59,6 +60,7 @@ impl PendingEvent { data: Vec, conditional_offset: Option, oneshot_sender: oneshot::Sender>, + flush_oneshot_sender: Option>>, ) -> Option { if data.len() > PendingEvent::MAX_WRITE_SIZE { warn!( @@ -82,19 +84,27 @@ impl PendingEvent { data, conditional_offset, oneshot_sender, + flush_oneshot_sender, }) } } - pub(crate) fn with_header( + pub(crate) fn with_header_flush( routing_info: RoutingInfo, data: Vec, conditional_offset: Option, oneshot_sender: oneshot::Sender>, + flush_oneshot_sender: Option>>, ) -> Option { let cmd = EventCommand { data }; match cmd.write_fields() { - Ok(data) => PendingEvent::new(routing_info, data, conditional_offset, oneshot_sender), + Ok(data) => PendingEvent::new( + routing_info, + data, + conditional_offset, + oneshot_sender, + flush_oneshot_sender, + ), Err(e) => { warn!("failed to serialize event to event command, sending this error back to caller"); oneshot_sender @@ -107,13 +117,38 @@ impl PendingEvent { } } + pub(crate) fn with_header( + routing_info: RoutingInfo, + data: Vec, + conditional_offset: Option, + oneshot_sender: oneshot::Sender>, + ) -> Option { + PendingEvent::with_header_flush(routing_info, data, conditional_offset, oneshot_sender, None) + } + + pub(crate) fn without_header_flush( + routing_info: RoutingInfo, + data: Vec, + conditional_offset: Option, + oneshot_sender: oneshot::Sender>, + flush_oneshot_sender: Option>>, + ) -> Option { + PendingEvent::new( + routing_info, + data, + conditional_offset, + oneshot_sender, + flush_oneshot_sender, + ) + } + pub(crate) fn without_header( routing_info: RoutingInfo, data: Vec, conditional_offset: Option, oneshot_sender: oneshot::Sender>, ) -> Option { - PendingEvent::new(routing_info, data, conditional_offset, oneshot_sender) + PendingEvent::without_header_flush(routing_info, data, conditional_offset, oneshot_sender, None) } pub(crate) fn is_empty(&self) -> bool { diff --git a/src/segment/reactor.rs b/src/segment/reactor.rs index b58d4b5e2..ca34746d4 100644 --- a/src/segment/reactor.rs +++ b/src/segment/reactor.rs @@ -298,7 +298,7 @@ pub(crate) mod test { ) -> oneshot::Receiver> { let (oneshot_sender, oneshot_receiver) = tokio::sync::oneshot::channel(); let routing_info = RoutingInfo::RoutingKey(Some("routing_key".to_string())); - let event = PendingEvent::new(routing_info, vec![1; size], None, oneshot_sender) + let event = PendingEvent::new(routing_info, vec![1; size], None, oneshot_sender, None) .expect("create pending event"); sender.send((Incoming::AppendEvent(event), size)).await.unwrap(); oneshot_receiver diff --git a/src/segment/writer.rs b/src/segment/writer.rs index cb597cdfe..27bc107d8 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -377,6 +377,14 @@ impl SegmentWriter { acked.event_id ); } + if let Some(flush_sender) = acked.event.flush_oneshot_sender { + if flush_sender.send(Result::Ok(())).is_err() { + info!( + "failed to send ack back to caller using oneshot due to Receiver dropped: event id {:?}", + acked.event_id + ); + } + } // ack up to event id if acked.event_id == event_id { @@ -784,6 +792,7 @@ pub(crate) mod test { vec![1; size], offset, oneshot_sender, + None, ) .expect("create pending event"); sender.send((Incoming::AppendEvent(event), size)).await.unwrap();