-
Notifications
You must be signed in to change notification settings - Fork 68
Ideal design of FLIP 27 integration
The Source
is a factory class to create the instances of the below concepts.
public class FlinkPravegaSource<T> implements Source<T, PravegaSplit, Checkpoint>, ResultTypeQueryable<T> {}
A Split
represents a Pravega segment with begin and end read offset.
public class PravegaSplit implements SourceSplit, Serializable {
private final long segmentId;
private final long beginOffset;
private final long endOffset;
}
The SplitEnumerator
is a single instance on Flink jobmanager. It is the "brain" of the source. It helps to discover and assign the splits.
public class PravegaSplitEnumerator implements SplitEnumerator<PravegaSplit, List<PravegaSplit>> {}
The Enumerator Checkpoint is the checkpoint storing all pending (unassigned or uncheckpointed) splits.
handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
method will receive RequestSplitEvent
from the source reader, it will look for current split pool to check if there are any split available for read. If not, it will then query the Pravega controller to discover the successor segments, if any successor segments are available, it will create new Splits and assign them to this subtask.[1]
addSplitsBack(List<PravegaSplit> splits, int subtaskId)
method is called when it tries to "regionly" recover from a failure to assign unassigned splits. We will reassign them to the reader task.
addReader(int subtaskId)
function will now do nothing. In the future for scaling, it may check with the current parallelism update, and then create or delete the reader accordingly.
snapshotState()
method will record the enumerator state which is all the unassigned Splits.
The SourceReader
has a default recommended Flink implementation SingleThreadMultiplexSourceReaderBase
for multiplexed readers. It read splits with one thread using one SplitReader
. With this recommended API, it allows us to just provide a SplitReader
and RecordEmitter
to implement this as a whole without considering complex sychronization with Flink internal threads.
public class PravegaSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<EventRead<T>, T, PravegaSplit, PravegaSplitState> {}
PravegaSplitState
is a mutable type of the Pravega split, it will update the latest read offset per event. It will have an additional currentOffset
field to the PravegaSplit
The SplitReader
is actually an instance of a ManualEventStreamReader
. It has a fetch()
interface to read one or more EventRead<T>
from an ManualEventStreamReader
.
public class PravegaSplitReader<T> implements SplitReader<EventRead<T>, PravegaSplit> {}
RecordEmitter
is actually a function to turn EventRead<T>
into T
. We should offer a default implementation for just get the event and update the offset in PravegaSplitState
[2].
[1] A split reader may receive multiple Splits. During the segment split and merge, the progress among all the split readers will mismatch how to handle idle segments, out-of-balance segments?
[2] One option: we can also let users to DIY, especially when they want to index with the Pravega metadata. We have similar implementation in https://github.com/pravega/flink-connectors/issues/180
This new API should be added separately, it will not influence current EventStreamReader
API. This manual reader can not belong to a reader group, it can be independent and doesn't require state synchronizer to store and keep the reader group state in-sync.
// Add a segment to the reader
// endOffset can be MAX_INT to indicate unbounded read
void addSegment(SegmentID, beginOffset, endOffset)
// List all the assigned segments to the reader, possibly not needed
List<(SegmentID, beginOffset, endOffset)> getSegments()
// Read the event from any of the added segments, returns a eventRead object just as normal reader
// EventRead.Position will contain last read offset from each segment
// EventRead.SegmentID needs to be added to figure out which segment the event comes from (info is now in EventPointer but private)
// End of bounded segment
// 1. readNextEvent should remove the segment from the list of segments to read.
// Handling Segment Sealing
// 1. When segment is sealed, readNextEvent should return the segment ID and remove the segment from the list of segments to read.
// 2. It should NOT contact the controller to get successors (only the Enumerator will do that).
// 3. Seal segment ID will be sent to Enumerator as a `RequestSplitEvent`
EventRead readNextEvent()
Option 1: Pravega does not provide watermark info to Flink. Users use Flink API to determine and handle watermarks.
Option 2: Use Pravega watermarks. This now uses Reader Groups and Stream Cuts and may not fit well with FLIP-27, might need refactoring.