-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix RestartFlow issue; #5165 #5181
Conversation
Helps me keep overview on what im working on.
I looked at why they added a delay in cancellation. It seems there is a missing feature which up to this day they havent realised yet, namely the propagation of a cancellation cause or something. Which would enable the RestartFlow to prevent it from conditionally propagating the cancellation or something. So i went with the same Delay stage that they up to this day still have implemented. Please check my implementation, my scala-fu could be better, so i might have missed something subtle that im not aware of. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments - need to make sure you do API approval though: https://getakka.net/community/public-api-changes.html
Looks like the unit tests need to be updated too - lots of NREs |
Thx for the review. Answered some of your remarks. Yup the Units Tests needs work, ignored those for now. My next step is first going through the Source and Sink codebase to port their latest changes. And then im going to look at the UnitTests. |
Ok, im looking at this bit of code in the Tests. While im sure its very handy to verify that a flow wrapped by the RestartFlow is actually restarted. I think it does nothing to verify that a message supplied to the graph of which the RestartFlow stage is a part, is actually Retried without having the entire graph being completed/failed. I could not find the original unit tests in the scala repo btw, i wonder how they test this. |
I guess the easiest way to do it would be to pass a numbered sequence 1-10, set number 5 to cause a crash inside the original flow, and see if [1,4] [6,10] still arrive before the stream completes. |
@Danthar looks like the only specs that are failing right now are the API Approval specs https://getakka.net/community/public-api-changes.html |
Not quite. Since it should keep retrying the original message. So the error on number 5 should be transient, as in, only occur a few times. And then the flow should keep retrying untill it succeeds and end up emitting the entire sequence at the end. I still want to add a test which verifies this. But that will be next week probably before i can get back to this. |
Ah my bad - I thought the failed elements got skipped over eventually. |
Bad news. Playing around with a unit test shows it does not work. Now somehow it seems like the stage is not restarting. I now added a spec which verifies the behavior. There is already something in there which kinda does the same, but its not very readable for the uninitiated. The new spec is alot more readable in terms of whats happening. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -1010,8 +1075,8 @@ public void A_restart_with_backoff_flow_should_restart_on_failure_when_using_onl | |||
flowInProbe.RequestNext("c"); | |||
flowOutProbe.SendNext("d"); | |||
sink.RequestNext("d"); | |||
|
|||
sink.Request(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
For this PR i looked at what they did in the original bugfix on the JVM repo:
https://github.com/akka/akka/pull/24795/files#diff-ef6a29e6541e6af3e4452f69c790020e85116bad800388ce464dea7383eec9c0
And in whats currently in Master:
https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala
Im not completely done yet. Still need to go through the Source and Sink implementations. And i want to see if i can add a UnitTest which verifies the behavior we want to fix.
But i wanted to get early feedback, if any.
I did ran into some things. It seems our Akka.Streams port could use some love. Lots of little changes that haven't been ported over the last year.
For example see https://github.com/akka/akka/blob/master/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala#L425
The onDownstreamFinish has gotten a new Exception cause parameter. Which is absent in our codebase.