-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: fix occasional ordering issue in result in unsafeOptionalDataVia #1611
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't unsafeOptionalDataVia
keep the order?
I thought the 'unsafe' aspect was the Flow
parameter must be a flow that produces 1 output per input without reordering (and we can't 'safely' express that in the typesystem), but as long as that's the case (as it is here) it should keep order.
Shouldn't we change the implementation rather than the test?
@raboof That's what's I'm wondering, because the current design ,it can not keep order, not sure if that's how @mdedetrich expected it works. |
Let me retract my approval, I didn't pay attention to the change. Indeed the ordering of the elements should not change, @raboof is correct where the unsafe part pertains to the fact that you could provide a Flow that reorders/drops elements and there isn't any check for this but assuming you provide a flow that only transforms elements than ordering should be retained. Funnily enough I haven't experienced this in production, ill have a look at it. |
@mdedetrich Can you give some more context of the current design? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking for now as the ordering is meant to be determinstic, prob the implementation needs to be fixed.
So the context is primarily when using kafka cursors with FlowWithContext/SourceWithContext. This feature lets you provide a flow that only transforms the data portion of the stream while leaving the context part untouched (which is critical when dealing with Kafka and committing cursors). |
Let me update it, I can workout a solution. |
@raboof @mdedetrich @pjfanning I think it should be OK now, it now track the origin order with an index and then merge back with the same index. |
broadcast.out(1) ~> filterUnavailable ~> merge.in(1) | ||
//format: off | ||
s ~> sequence ~> partition.in | ||
partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cast here to avoid allocation, it will always be None
because of partition
Do we know if the test runs in PR CI builds? I did a quick check of the test logs and didn't find a match. |
I found |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
||
val filterUnavailable = Flow[(Option[SOut], Ctx)].collect { | ||
case (None, ctx) => (Option.empty[FOut], ctx) | ||
case class IndexedCtx(idx: Long, ctx: Ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra allocation here
Thanks for the review |
refs: #1610
Motivation:
Fix flaky test in unsafeOptionalDataVia
Modification:
mergeSequence