-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 340: Fix event writer implementation and add flush #341
Conversation
8dd54dc
to
1456b1e
Compare
Codecov Report
@@ Coverage Diff @@
## master #341 +/- ##
==========================================
+ Coverage 67.43% 74.86% +7.42%
==========================================
Files 26 46 +20
Lines 6243 11671 +5428
==========================================
+ Hits 4210 8737 +4527
- Misses 2033 2934 +901
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes are looking good. I have a couple of comments/questions.
src/event/writer.rs
Outdated
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) { | ||
let append_event = Incoming::AppendEvent(pending_event); | ||
self.writer_event_internal(append_event, size, rx).await | ||
let write_event = self.writer_event_internal(append_event, size, rx).await; | ||
if let Some(pending_event_flush) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we handle the else part here?
similar comment for the method above...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a panic statement
src/event/writer.rs
Outdated
let flush_rec = self.writer_event_internal(append_event_flush, 0, rx_flush).await; | ||
self.event_handles.push_back(flush_rec); | ||
} else { | ||
panic!("Failed to track event in flush."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tkaitchuck is there a better way to solve this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.expect()
src/event/writer.rs
Outdated
if let Some(pending_event) = PendingEvent::with_header(routing_info, event, None, tx) { | ||
let append_event = Incoming::AppendEvent(pending_event); | ||
self.writer_event_internal(append_event, size, rx).await | ||
let write_event = self.writer_event_internal(append_event, size, rx).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling writer_event_internal here, it might be better to modify it to have a single call which takes two rx
es. (IE embedding both tx
es into the same PendingEvent.)
This would avoid a bug which is currently in this code which is that it is passing None
as the routing key for both the rx_flush
and the rx
which implies that it is legal for these to be sent to different segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an additional optional Sender within PendingEvent, and write_event_internal now is only called once to track flush activity as well as write. This should eliminate our concerns dealing with different routing keys, and different segments.
src/event/writer.rs
Outdated
let flush_rec = self.writer_event_internal(append_event_flush, 0, rx_flush).await; | ||
self.event_handles.push_back(flush_rec); | ||
} else { | ||
panic!("Failed to track event in flush."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.expect()
src/event/writer.rs
Outdated
let routing_info = RoutingInfo::RoutingKey(Some(routing_key)); | ||
let routing_info_flush = RoutingInfo::RoutingKey(None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will route the flush event differently from the event it is supposed to be following. This could result in flush returning even when the data has not yet been written. Please add a test that covers this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we embedd the tx_flush within PendingEvent, we wont need to have a separate Routing Key.
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
4ca2e97
to
61c7df3
Compare
src/event/writer.rs
Outdated
/// ``` | ||
pub async fn flush(&mut self) -> Result<(), Error> { | ||
while self.event_handles.front().is_some() { | ||
let mut receiver = self.event_handles.swap_remove_front(0).expect("get first handle"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think pop_front()
would be preferable to swap_remove_front(0)
here.
This could also be combined with the conditional as in:
while let Some(receiver) = self.event_handles.front()
To be more efficient, clear, and concise.
src/event/writer.rs
Outdated
/// It will wait until all pending appends have acknowledgment. | ||
/// | ||
/// # Examples | ||
/// ```ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the ignore. This example is broken.
src/event/writer.rs
Outdated
/// ```ignore | ||
/// let mut byte_writer = client_factory.create_byte_writer(segment).await; | ||
/// let payload = vec![0; 8]; | ||
/// let size = event_writer.write_event(&payload).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A size is not returned here.
src/event/writer.rs
Outdated
_ => { | ||
try_recv.map_err(|e| Error::InternalFailure { | ||
msg: format!("oneshot error {:?}", e), | ||
})??; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an await missing here?
It looks like this method will just return without actually waiting on anything.
Also instead of _
it's probably worth explicitly having an Ok
case as this branch is currently handling both all unlisted errors and the success case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue here was that I wasnt sending anything to tx_flush sender. Now, I am so we wont have this issue.
@@ -161,9 +172,39 @@ impl EventWriter { | |||
.expect("send error"); | |||
rx_error | |||
} else { | |||
self.event_handles.push_back(rx_flush); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to support the case where the caller never calls flush without leaking memory. Currently the flush call removes items from event_handles
but if it is not called this code will OOM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to periodically call flush to clear this? Or upon every write, call flush on all previous writes? Or clean up event_handles while doing every write? (It would need to go through every write call to check if its processed or not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. On a write call, clear out any completed ones stopping at the first uncompleted one.
@@ -50,6 +50,7 @@ pub(crate) struct PendingEvent { | |||
pub(crate) data: Vec<u8>, | |||
pub(crate) conditional_offset: Option<i64>, | |||
pub(crate) oneshot_sender: oneshot::Sender<Result<(), Error>>, | |||
pub(crate) flush_oneshot_sender: Option<oneshot::Sender<Result<(), Error>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush_oneshot_sender is never used anywhere; we would need to invoke a send() with the write status on it.
Signed-off-by: dellThejas <[email protected]>
Signed-off-by: dellThejas <[email protected]>
src/event/writer.rs
Outdated
/// It will wait until all pending appends have acknowledgment. | ||
/// ``` | ||
pub async fn flush(&mut self) -> Result<(), Error> { | ||
self.clear_initial_complete_events(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this required here ?
src/event/writer.rs
Outdated
self.event_handles.push_front(receiver); | ||
break; | ||
} | ||
Err(TryRecvError::Closed) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should return an error here and fail fast.
src/event/writer.rs
Outdated
break; | ||
} | ||
Err(TryRecvError::Closed) => {} | ||
_ => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this:
Ok(_) => {}
That way if a new error value is ever added to try_receive
we'll get a compile error rather than being unaware of it and simply skipping over the error at runtime.
src/segment/writer.rs
Outdated
@@ -377,6 +377,14 @@ impl SegmentWriter { | |||
acked.event_id | |||
); | |||
} | |||
if let Some(flush_sender) = acked.event.flush_oneshot_sender { | |||
if flush_sender.send(Result::Ok(())).is_err() { | |||
trace!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be info.
Signed-off-by: dellThejas <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Signed-off-by: dellThejas [email protected]
Change log description
Added flush for event writer
Updated implementation of Event Writer
Purpose of the change
#340
What the code does
Flush implementation added to event writer similar to Byte writer
Also, event writer will not be return the receiver anymore. This implementation helps setup the flush, and makes the write operation easier to implement for users.
How to verify it
ran existing tests