-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement watermark idleness #17
Comments
Yes, that would definitely be good. What we need for that is callbacks when readers go to zero subscribed segments and when they go again to at least one subscribed segment. The notification (zero subscribed segments) should also come when the reader starts with zero segments. |
I believe this is applicable only when our sources start sending events with timestamps (using SourceContext.collectWithTimestamp) which we currently don't; until we have this - pravega/pravega#191. |
it is also relevant if users explicitly extract timestamps and data-driven watermarks after the source, like that: DataStream<Event> stream = env.addSource(new FlinkPravegaReader<>(...));
stream
.assignTimestampsAndWatermarks(new MyExtractor())
.keyBy( ... )
.window(...)
.apply(...); |
Opened pravega/pravega#1285 to track the enhancement needed to the reader. |
I'm not sure I follow. What's the difference between a source that has a segment assigned and no data for an unbounded amount of time versus a source that has no segment assigned? Currently, the Pravega code makes no distinction between the two cases. |
@fpj @tkaitchuck it is a nuanced distinction to be sure. But I think there's a good rationale. In the following, I will use the term idle and active to distinguish between a source that has no segments assigned and one that is actively waiting (maybe indefinitely) for new elements. The issue boils down to how watermarks are generated and propagated. Streams have an associated watermark assigner, and that assigner has considerable freedom as to when to advance the event-time clock (by emitting a watermark). Typically the assigner uses incoming elements to produce a watermark, perhaps with a bounded out-of-orderness. As far as I know, the system does not automatically advance the event-time clock due to inactivity; unless you use a specialized assigner, the event-time clock will remain at a fixed point in time until a new element arrives. How watermarks propagate thru the distributed system is, a given operator instance takes the minimum watermark across all active source instances that it is wired to. If an active source instance never emits a watermark, time will simply not advance downstream of that source. That is by design (however, see FLINK-5018 which intends to add an idle timeout). When a source instance marks itself as idle, it indicates to downstream operator instances to not consider that source in the min-watermark calculation, thus allowing the clock to advance. In other words, an idle source releases its hold on the event-time clock. One more thing: if an idle instance were to later become active (assumedly by being assigned a segment), any record whose timestamp is older than the current watermark is considered late. Late records are by default dropped (though may be handled by advanced programs). Some classes I found useful to survey in preparing this:
|
We should only emit an idleness marker if both the reader has no segments AND there are no unassigned segments. Because if there are unassigned segments we don't want those excluded from the min-time calculation. But this actually highlights the problem is more complicated. Even if there are assigned segments, the mere fact that some are unassigned could lead it to falsely conclude that all of the data is accounted for when it is not. So instead we would want to add some sort of other marker that means the reverse of idle when there are unassigned segments. That way if there are say 10 readers and each has one segment the presence of an 11th segment that nobody is reading from should be taken into account. This of course is further complicated by the fact that not all of the readers will be emitting the same status concurrently. So if for example one segment were passed from one reader to another, becoming briefly unassigned, this interval might be short enough that it would not be perceptible in the minimum calculation. But if it was not the time could suddenly jump forward and then back. Alternately if we have the readers flagging this property, they might disagree as to when the segment was actually unavailable resulting in a much large window during which the minimum could not be safely calculated. We could get a consistent snapshot as to where all the readers are would be to using the existing ReaderGroup checkpoint mechanism. If ever say 5 seconds some process invoked another checkpoint with an ID that was a monotonic sequence, each of the readers would eventually emit that CPID, this is guaranteed to represent a coherent snapshot of the stream. This would be great for tracking time if all the segments were assigned. But as they might not be we don't have a way to know the timestamp for those are not. If we did have a notion of time, even a application supplied one, then we could provide this. We could obviously stick timestamps into the events in our system, but that would only give us write time and not event time. So we could instead support a mechanism where upon CP (or perhaps randomly) the application could supply a timestamp. Then that could be stored and used as a point of reference for the unassigned segments. Then we could simply have method on the reader group to extract the minimum time of the group. The problem I see with this is that it puts a bunch more logic into the Reader itself. Is there a simpler solution to this? Certainly we have an API on reader group to inquire if there are any unassigned segments, or to generate a StreamCut from something, or to inject a marker. But I'm not really sure what a clean API would be. |
Tom's comments seem relevant to #41 too. Basically the dynamic assignment of segments to readers complicates watermark handling. I agree that event time cannot advance if there are unassigned segments, i.e. some part of the keyspace is not being consumed by someone. Future segments are a different matter, right? I'm drawn to per-segment watermarks, provided by app code ( |
It feels like there are two different, but related, issues being discussed: The distinction is important because the consequences of the dynamic assignment of segments are present even in the absence of idle sources. As I understand it, emitting an idle marker say that the source has no segment assigned, independent of the state of the rest of the group. There might be unassigned segments, which the idle source might pick up at a later time, but at the moment it has none and it doesn't want advancing the clock to be blocked on it unnecessarily. The second point refers to the correctness of the event-time clock in the presence of dynamic assignment of segments. It seems that the main concern is that an unassigned segment might be unaccounted for in the computation that advances the logical clock. This computation depends on each source computing the minimum locally and sending downstream. I have a couple of questions about this point: Doing per segment watermarks and using the reader group state to coordinate sounds correct, but it seems heavyweight and it loses the benefit of simply computing a local minimum (operator instance takes the minimum watermark across all active source instances). If we continue computing a local minimum, then it might be fine from consistency perspective, but possibly at the cost of declaring some events late unnecessarily. |
Thanks for all the good input and thoughts here! If I understood this correctly, to determine if a source is idle, we'd need a consistent point. Can we just say that a FlinkPravegaReader instance checks at checkpoint whether it has any segment (pending)? Checkpoints should come frequently enough to be suitable points for that. To avoid the problem that Flavio illustrated seems more tricky. We would need to exploit the partial order between records or segments to fix that. That partial order is dependent on who created the records or segments when. Actually, hah, here is another cool problem - how do we guarantee order per keys through the entire streaming topology? For some users that is pretty crucial. When a segment moves from one reader to another, its events can overtake events emitted by the reader that previously had the segment. |
Stephan, your last point relates to the lack of total ordering in the topology. My understanding is that event time and the progression of watermarks is the basis for correct, ordered processing; one cannot depend on the physical ordering. I believe this is why certain operators (e.g. CEP) use buffers to reorder elements into event time order. Your point is still valid and points to a real problem, at least in other time domains. |
I think there, we have at least a partial solution. Within a reader group segments that are 'available' can get picked up by any reader at any time. But the only way for segments to become available are:
It is fairly reasonable to assume that the application "flushed" all the data it is going to emit before shutting down or being declared dead. If the same was true of resuming reading after encountering a checkpoint, then aside from when the stream is scaling, all of the data should be "flushed" from the first reader before any is seen by the second. Perhaps we could inject an automatic checkpoint to handle the split case. Or we could give checkpoints an 'ack' to delay the pickup of the segment. We have a few options. Using checkpoints as a time marker is not a bad idea, but obviously there would still need to be some logic to extract and track the timestamps from events themselves. This probably makes the most sense, but a crazier alternative would be to inject checkpoint markers into the stream at Production time. That would obviously involve creating some new weird API, but it would allow for checkpoints to come directly from the data stream, rather than relying on a master process, and they could have timestamps embedded into them. |
Problem description
Subtasks that are without assigned segments/shards/partitions for an indefinite period should enter an idle state. Otherwise downstream operators may stall waiting for watermark progression. See FLINK-5017.
From
SourceFunction.SourceContext
:Problem location
FlinkPravegaReader
Suggestions for an improvement
Call
markAsTemporarilyIdle
whenever the Pravega reader has no segments assigned.The text was updated successfully, but these errors were encountered: