Skip to content

Ideal design of FLIP 27 integration

Brian Zhou edited this page Oct 10, 2020 · 6 revisions

Concept mapping

Source

The Source is a factory class to create the instances of the below concepts.

public class FlinkPravegaSource<T> implements Source<T, PravegaSplit, Checkpoint>, ResultTypeQueryable<T> {}

Split

A Split represents a Pravega segment with begin, end and current read offset.

public class PravegaSplit implements SourceSplit, Serializable {
    private final long segmentId;
    private final long beginOffset;
    private final long endOffset;
    private final long currentOffset;
}

SplitEnumerator

The SplitEnumerator is a single instance on Flink jobmanager. It is the "brain" of the source. It helps to discover and assign the splits.

public class PravegaSplitEnumerator implements SplitEnumerator<PravegaSplit, List<PravegaSplit>> {}

The Enumerator Checkpoint is the checkpoint storing all pending (unassigned or uncheckpointed) splits. handleSourceEvent(int subtaskId, SourceEvent sourceEvent) method will receive RequestSplitEvent from the source reader, it will look for current split pool to check if there are any split available for read. If not, it will then query the Pravega controller to discover the successor segments, if any successor segments are available, it will create new Splits and assign them to this subtask.[1] addSplitsBack(List<PravegaSplit> splits, int subtaskId) method is called when it tries to "regionly" recover from a failure to assign unassigned splits. We will reassign them to the reader task. addReader(int subtaskId) function will now do nothing. In the future for scaling, it may check with the current parallelism update, and then create or delete the reader accordingly. snapshotState() method will record the enumerator state which is all the unassigned Splits.

SourceReader

The SourceReader has a default recommended Flink implementation SingleThreadMultiplexSourceReaderBase for multiplexed readers. It read splits with one thread using one SplitReader. With this recommended API, it allows us to just provide a SplitReader and RecordEmitter to implement this as a whole without considering complex sychronization with Flink internal threads. A base for {@link SourceReader}s that

public class PravegaSourceReader<T>
        extends SingleThreadMultiplexSourceReaderBase<EventRead<T>, T, PravegaSplit, PravegaSplit> {}
SplitReader

The SplitReader is actually an instance of a ManualEventStreamReader. It has a fetch() interface to read one or more EventRead<T> from an ManualEventStreamReader.

public class PravegaSplitReader<T> implements SplitReader<EventRead<T>, PravegaSplit> {}

RecordEmitter is actually a function to turn EventRead<T> into T. We should offer a default implementation for just get the event and update the offset in PravegaSplit[2].

[1] A split reader may receive multiple Splits. During the segment split and merge, the progress among all the split readers will mismatch how to handle idle segments, out-of-balance segments?

[2] One option: we can also let users to DIY, especially when they want to index with the Pravega metadata. We have similar implementation in https://github.com/pravega/flink-connectors/issues/180

Proposed Pravega API changes

ManualEventStreamReader

This new API should be added separately, it will not influence current EventStreamReader API. This manual reader can not belong to a reader group, it can be independent and doesn't require state synchronizer to store and keep the reader group state in-sync.

// Add a segment to the reader
// endOffset can be MAX_INT to indicate unbounded read

void addSegment(SegmentID, beginOffset, endOffset)


// List all the assigned segments to the reader, possibly not needed

List<(SegmentID, beginOffset, endOffset)> getSegments()


// Read the event from any of the added segments, returns a eventRead object just as normal reader
// EventRead.Position will contain last read offset from each segment
// EventRead.SegmentID needs to be added to figure out which segment the event comes from (info is now in EventPointer but private)
// End of bounded segment
//      1. readNextEvent should remove the segment from the list of segments to read.
// Handling Segment Sealing
//      1. When segment is sealed, readNextEvent should return the segment ID and remove the segment from the list of segments to read.
//      2. It should NOT contact the controller to get successors (only the Enumerator will do that).
//      3. Seal segment ID will be sent to Enumerator as a `RequestSplitEvent`

EventRead readNextEvent()

Watermark / timestamps

Option 1: Pravega does not provide watermark info to Flink. Users use Flink API to determine and handle watermarks. Option 2: Use Pravega watermarks. This now uses Reader Groups and Stream Cuts and may not fit well with FLIP-27, might need refactoring.