Skip to content

PDP 33: Watermarking

Tom Kaitchuck edited this page Sep 26, 2019 · 16 revisions

Summary

The Goal of watermarking is to provide a time bound from the writers to the readers so that they can identify where they are in the stream. The bound should always be conservative. Meaning that if the writer’s time is well behaved, the mark should indicate to the reader that it has received everything before a timestamp. It may well have received more than this.

This immediately raises the issue of how time is defined and what properties it has. This proposal avoids answering that question by leaving it up to the application to define its notion of elapsed time. This decision was made for two reasons:

  1. Different applications may have different requirements for time.
  2. In many applications, meaningful time is necessarily associated to the data itself.

Overview

The general approach is to add new API calls to enable the writer to provide time information. The controller aggregates this time information and stores it in a special segment associated with the stream. The information stored is ⟨timestamp, streamCut⟩ pairs.

The timestamp in each pair is obtained by taking the minimum of the most recently reported timestamp across all writers. The streamCut is a location in the stream at an event boundary that is at least as far into the stream as the location of the writers when the timestamps were provided.

The readers can read these streamCuts and timestamps from the special segment, and use the ReaderGroupState to coordinate their relative position.

API Changes

Client Factory changes

When an application creates a new writer, it must specify an identifier that enables the controller to track the progress of timestamps for the writer. The writer id is a new field we add for the creation of a new writer: writerId

<T> EventStreamWriter<T> createEventWriter(String writerId, String streamName, Serializer<T> s, EventWriterConfig config);

the same holds for transactional writers:

<T> TransactionalEventStreamWriter<T> createTransactionalEventWriter(String writerId, String streamName, Serializer<T> s, EventWriterConfig config);

This is a breaking API change. Alternatively we can add writerId as an optional parameter and if it is not supplied have noteTime (below) throw if invoked.

Event writer API changes

In our public interface we add a new API on EventStreamWriter.

    /**
     * Notes a time that can be seen by readers which read from this stream by
     * {@link EventStreamReader#getCurrentTimeWindow()}. The semantics or meaning of the timestamp
     * is left to the application. Readers might expect timestamps to be monotonic. So this is
     * recommended but not enforced.
     * 
     * There is no requirement to call this method. Never doing so will result in readers invoking
     * {@link EventStreamReader#getCurrentTimeWindow()} receiving a null for both upper and lower times.
     * 
     * Calling this method can be automated by setting
     * {@link EventWriterConfigBuilder#automaticallyNoteTime(boolean)} to true when creating a
     * writer.
     * 
     * @param timestamp a timestamp that represents the current location in the stream.
     */
    void noteTime(long timestamp);

Calling this API is optional. An alternative is that the application configures the writer to automate invoking the method passing wall clock time.

On the Transactional writer, we also need additional support. Unlike the regular writer it does not continuously emit data, but rather new data is only added at commit time, it makes sense to overload commit() by adding an optional timestamp parameter:

    /**
     * Commits the transaction similar to {@link #commit()}, but also notes an associated timestamp.
     * Similar to {@link EventStreamWriter#noteTime(long)}.
     * 
     * @param timestamp A timestamp associated with this transaction.
     * @throws TxnFailedException The Transaction is no longer in state {@link Status#OPEN}
     */
    void commit(long timestamp) throws TxnFailedException;

The timestamp is passed to the controller as part of the commit. This way a commit simultaneously commits the data and supplies the corresponding timestamp.

Config changes

The StreamConfiguration contains two new parameters:

  1. A timeout parameter that determines how long the controller needs to wait for a timestamp update from a given writer ID. This timeout value determines how long the controller takes to react to ungraceful halts of a writer (e.g., crashes).
  2. A switch to turn automatic noting on and off. This option is available for applications that have no need to pass their own timestamps.

The signatures of the methods are as follows:

    /**
     * The duration after the last call to {@link EventStreamWriter#noteTime(long)} which the
     * timestamp should be considered valid before it is forgotten. Meaning that after this long of
     * not calling {@link EventStreamWriter#noteTime(long)} the writer will be forgotten.
     * If there are no known writers, readers that call {@link EventStreamReader#getCurrentTimeWindow()}
     * will receive a `null` when they are at the corresponding position in the stream.
     */
    private final long timestampAggregationTimeout;

    /**
     * Automatically invoke {@link EventStreamWriter#noteTime(long)} passing
     * {@link System#currentTimeMillis()} on a regular interval.
     */
    private final boolean automaticallyNoteTime;

Because it may take some time for all the writers to come online, for a new stream we will add an intentional delay before writing the first mark to the marks segment. In the future we may offer an option to configure this time on a per-stream basis.

Event reader API changes

On the reader side, we would add a new method on the EventStreamReader to obtain the location of the reader in terms of time:

    /**
     * Returns a window which represents the range of time that this reader is currently reading as
     * provided by writers via the {@link EventStreamWriter#noteTime(long)} API.
     * 
     * If no writers were providing timestamps at the current position in the stream `null` will be returned.
     *  
     * @return A TimeWindow which bounds the current location in the stream, or null if one cannot be established.
     */
    TimeWindow getCurrentTimeWindow();

Where TimeWindow is defined as:

/**
 * Represents a time window for the events which are currently being read by a reader.
 * 
 * The upper time bound is a timestamp which is greater than or equal to any that were provided by
 * any writer via the {@link EventStreamWriter#noteTime(long)} API prior to the current location in
 * the stream.
 * 
 * Similarly the lower time bound is a timestamp which is less than or equal to the most recent
 * value provided via the {@link EventStreamWriter#noteTime(long)} API for by any writer using that
 * API at the current location in the stream.
 *
 * upperTimeBound will always be greater than or equal to lowerTimeBound.
 */
@Data
public class TimeWindow {
    private final long lowerTimeBound;
    private final long upperTimeBound;
}

Note that the TimeWindow the readers observe can be affected by timestampAggregationTimeout in the config above as the gap between lower and upper can be wider with a larger value of the timeout. Different applications may choose to be more or less conservative as to when writers are 'alive' and need to be considered.

Internal Changes

To support this new API some additional information is needed.

For each Event Stream we add a metadata segment which contains timestamps. (This is referred to below as the ‘marks segment’) It contains records which consist of:

  • A timestamp
  • A set of writer ids
  • A streamCut

To write this data, the controller needs more information from the client and the service. On the Controller interface in the client, we need to add the writerId and the timestamp to a transaction commit call:

     /**
      * Commits a transaction, atomically committing all events to the stream, subject to the
      * ordering guarantees specified in {@link EventStreamWriter}. Will fail with
      * {@link TxnFailedException} if the transaction has already been committed or aborted.
      * 
      * @param stream Stream name
      * @param writerId The writer that is committing the transaction.
      * @param timestamp The timestamp the writer provided for the commit (or null if they did not specify one).
      * @param txId Transaction id
      * @return Void or TxnFailedException
      */
     CompletableFuture<Void> commitTransaction(final Stream stream, final String writerId, final Long timestamp, final UUID txId);

and we need to add a brand new call to propagate the information obtained from a call to noteTimestamp() on EventStreamWriter. That would look like:

    /**
     * Notifies that the specified writer has noted the provided timestamp when it was at
     * lastWrittenPosition.
     * 
     * This is called by writers via {@link EventStreamWriter#noteTime(long)} or
     * {@link Transaction#commit(long)}. The controller should aggrigate this information and write
     * it to the stream's marks segment so that it read by readers who will in turn ultimately
     * surface this information through the {@link EventStreamReader#getCurrentTimeWindow()} API.
     * 
     * @param writer The name of the writer. (User defined)
     * @param stream The stream the timestamp is associated with.
     * @param timestamp The new timestamp for the writer on the stream.
     * @param lastWrittenPosition The position the writer was at when it noted the time.
     */
    void noteTimestampFromWriter(String writer, Stream stream, long timestamp, Position lastWrittenPosition);

    /**
     * Notifies the controller that the specified writer is shutting down gracefully and no longer
     * needs to be considered for calculating entries for the marks segment. This may not be called
     * in the event that writer crashes.
     * 
     * @param writerId The name of the writer. (User defined)
     * @param stream The stream the writer was on.
     */
    void writerShutdown(String writerId, Stream stream);

To supply this Position, the EventStreamWriterImpl needs to internally track the offsets for the segments it is writing to. To do this, we piggyback this information onto the acks it is already receiving from the server. This additional information involves a backwards compatible change to the wire protocol to add a currentSegmentWriteOffset to DataAppended:

     @Data
     public static final class DataAppended implements Reply, WireCommand {
        final WireCommandType type = WireCommandType.DATA_APPENDED;
        final UUID writerId;
        final long eventNumber;
        final long previousEventNumber;
        final long currentSegmentWriteOffset;
//...

A similar field would be required for SegmentMerged:

     @Data
     public static final class SegmentsMerged implements Reply, WireCommand {
        final WireCommandType type = WireCommandType.SEGMENTS_MERGED;
        final long requestId;
        final String target;
        final String source;
        final long newTargetWriteOffset;
//...

This increases the size of each of these messages by 8 bytes. It is a compatible change, however, because the absence of the field can be detected.

Server changes

Besides returning the current offsets on merge and ack there are no changes required. In the event that a SegmentMerged is lost and the client reconnects to see the segment is already merged, the client can synthesize a valid, conservative offset by calling getSegmentLength().

Client changes

Internally the writer needs to collect the offsets returned in the acks it is receiving from the server. When noteTime is invoked, it sends the timestamp provided along with the latest offsets it has for the segments it is writing to are sent to the controller’s noteTimestampFromWriter method.

For a transactional writer, it just passes the timestamp specified on commit to the controller.

Each reader needs to read from the mark segment and compare its position to the streamcuts. In the reader group state, a new map needs to be added from reader to timestamp. Every time a reader advances beyond a streamCut (counting as its own any unassigned segments) it updates its entry in the reader group state providing the timestamp it just passed. If a reader loses its last segment it removes itself from the map.

When the getCurrentTimeWindow method is called on the reader or the reader group state, the minimum time in the map in the reader group state for each stream is returned, i.e., the result is a TimeWindow where the value is the greatest lower bound of the writer times.

When getCheckpoint or getStreamCut is called on the reader group, it will have to include not just the offsets in the normal segments, but also the minimum offset where the readers are in the special segment.

Controller changes

The role of the controller is to take the timestamps from all of the writers on the stream and write out a stream cut that is greater than or equal to the maximum of all writer’s positions and a timestamp that is the minimum of all of the writers.

A call to noteTimestampFromWriter or commitTransaction updates a structure with the position of the writer (or where the transaction committed) and the timestamp provided. In the case of commit the controller will need to obtain the position information itself, where as in the case of noteTimestampFromWriter it is explicitly provided by the client.

To do this there needs to be a single controller instance responsible for aggregating this data for any given stream. It may be useful for this to be the same instance that is managing transactions for that stream.

In the event that a controller instance crashes, the in memory structure will be temporarily lost. It can be recovered by reading the last value written. During recovery there will be a gap where time does not advance.

There are two main objectives we want to achieve while designing this scheme - 1) we do not want any affinity requirements for writers while interacting with controller instances, 2) We want workload of computing watermarks for different stream to be fairly distributed across available controller instances.

To achieve above stated goals, we break down the work into two categories - 1) recording writer marks and 2) generating consolidated watermarks. Writers may contact any arbitrary controller instance and report their timestamps and positions. This controller conditionally persists the reported value in the shared metadata store. For generating watermarks, all available controller instances divide ownership of streams amongst themselves such that each controller instance is exclusively responsible for computing watermarks for only a subset of streams. So for any stream there is exactly one controller instance which is periodically generating marks by consolidating all the individual writer marks which could be processed by any arbitrary controller instance.

Distributing Streams across Controller instances:

As a stream is created, a reference to it is added to one of the fixed number of "buckets" by computing a hash of its fully qualified name. Each controller instance contests with others to acquire ownership of each bucket individually. So every controller instance will be able to acquire ownership for a subset of buckets whereby becoming leader process for all streams that fall into the bucket. For each bucket that controller owns, it schedules a periodic background watermarking workflow for each stream within this bucket. It also registers for bucket change notifications and adds (or cancels) this scheduled work as streams are added (or removed). It is important to note that exactly one controller instance will be responsible for running background workflow for any given stream. We will refer to this controller instance as the "leader" when we talk in context of watermark processing for that stream.

The watermarking workflow for the stream will be invoked periodically on the leader controller instance and in each cycle it will collect the mark as reported by writers from the shared metadata store. All marks across all available writers are consolidated into an in-memory data structure and once marks from all known writers has been received, a new watermark can be emitted for the stream.

Metadata:

The information stored in the metadata store is a map of writer Id to {time, position} tuple. This record can be updated by any controller instance and updates are conditional and optimistic concurrency protected. The record is updated only if a new update strictly advances time with sanity check that position is also greater than or equal to previously recorded position for the writer. The second condition is important if multiple clients have the same writer id and are reporting different marks. In an unlikely case, this could happen if a writer got partitioned out and the application's cluster manager started a replacement writer with the same Id.

In-memory state:

Leader controller will maintain a collection of "known" writers that ought to be included for computation of next watermark. A subset of this collection of known writers are then taken into what is called a "generation". The collection of known writers is a map of writerIds to their most recent mark ({time, position} tuple). A "generation" is a subset of writers and their marks that have been selected for next watermark generation. With every new watermark, since our objetive is to keep progressing time in a monotonically increasing fashion, we will include only those writers in the current generation . For this purpose, leader keeps track of last watermark time and only includes writers in current "generation" if their last marks are greater than it.

Writer Marks

There are two types of writers (event writers and transaction writers) each with their own characteristics with respect to time to position tracking. So event writers and transaction writers will have their corresponding marks generated differently. However, their marks will be reported at the same place in the shared metadata store described above.

  • Event Writer Marks: Event writers are expected to supply their positions and corresponding times explicitly to any arbitrary controller instance. This controller instance will durably record the received mark in the store after checking the mark for sanity.

  • Transaction Writer Marks: Transaction writers, on the other hand, are only aware of transaction ids and the transaction commits happen asynchronously and are orchestrated by controller. Since controller knows when a transaction is committed, it is best suited to compute the "position" where it was committed.

As transaction writers request controller to commit a transaction, the protocol between controller and client for commitTransaction will need to include additional information about "writerId" and "time". Using this time and the computed position, controller can generate mark on behalf of the writer as the transaction gets committed. However, since transaction commits happen asynchronously and a writer could have issued multiple outstanding transaction commit requests, it becomes imperetive to provide some form of ordering guarantee on controller. controller will provide strict ordering guarantees for non-overlapping transactions from the same writer. A non-overlapping transactions is a pair of transactions where second transaction is created only after commit request for first transaction is submitted. With this guarantee, a transaction writer's "mark" can be tracked on controller and each subsequent mark will guarantee increasing order of time and position (as long as writers provide increasing order of time).

Details of how controller will provide strong ordering guarantees for non overlapping transactions is out of scope for this PDP.

The controller will compute "position" as part of execution of commit workflow for the said transaction. The controller is aware of segments where transcation's segments are merged and as part of merge request, it can get merged offset from the segment store. Since segment store does not persist the merged offsets perpetually, so any failure in retrieving this value from first commit request will make it impossible for controller to get the "exact" offset at which transaction was committed. However, since our purposes require us to merely compute a "position" which is an upper bound of merged transaction's position, controller can get the tailing offset of the parent segment if it failed to get the exact offset.

Periodic Watermark workflow

The leader controller maintains an in memory watermarking related state for each stream and schedules an unconditional loop for execution of watermarking workflow. At the end of each execution cycle, next iteration is scheduled after a fixed periodic delay. During bootstrap, we could choose to have a delayed start to workflow in order to give time for all writers to be started. At the start of each new cycle, controller fetches marks from the metadata store for all the writers. This forms the "known set" of writers for the next cycle. It then starts a new generation with a time barrier equal to timestamp of previous watermark (during bootstrap this will be defaulted to 0). Only those writers from the "known" set with marks that have times greater than generation's barrier will be included in the generation. Excluded writers from the known set are those whose most recent marks are older than the previously emitted watermark's time. Presence of such writers could mean one of two things - a) these writers have not reported any marks in at least last cycle; or b) these writers were not present in previous generation and have been recently added, yet they are reporting older timestamps. Case a may be caused by intermittent failure of a writer and warrants us to wait until these writers either recover or are timed out and removed from the set. Case b is out of our control and including such writers in watermark set will break the watermark's time order and hence they will be ignored for any computation until they catch up with remainder of the writers that participated in emmission of previous watermarks. Irrespective, controller cannot emit a watermark until it has either received marks from all such writers or it declares them "dead". This will depend on a timeout mechanism and if, after starting a new generation, if writers have not been included in it for at least timeout period, they will be removed from known writer set. This algorithm is bounded to run and produce a "watermark" and progress the generation within one timeout period.

Watermark computation

A watermark is composed of a time T and stream cut S. A watermark effectively indicates that "uptil stream cut S, all data correspoding to time T has been written across all writers. To compose such a watermark from the given positions of writers, we need to take the lowest time` from marks across writers and compute a greatest upper bound on all positions from across writers marks. To compute greatest upper bound on, we will loop over each writer's mark and loop over each segment in writer's position and include it in the set conditionally - a segment is included in upperbound only if none of its successors already exist in the upperbound. An included segment will replace all its predecessor segments in the result set. A segment included in the upperbound will include the highest offset across all writer marks that contain that segment.

It is important to understand that the writer's position is not same as a stream cut because a writer could be writing concurrently to a predecessor and successor segment but for different routing keys.
This means any composition of writer positions may not yield a well formed and complete stream cut. So the above computed greatest upper bound may have missing key ranges whereby we will need to augment above algorithm to compute a complete Stream Cut.

To illustrate, with an example: Say we have segments 0, 1 with even range splits [0.0, 0.5), [0.5-1.0).

Now lets say we start a scale to segments 2[0-0.6), 3 [0.6-1.0). We could have a writer that has a position which is comprised of segments 1 and 2 -> (1/off1, 2/off2). This is possible in either of the following cases:

  1. While this scale is ongoing, we can have a situation such that segment 2, 3 are created and 0 is sealed but 1 may still be open.
  2. Alternatively, writer may not have attempted to write any data in the range covered by 1 and hence is never replaced while 0 is replaced by 2.

Computing greatest upper bound on such a position will yield (2/off2). This is clearly not a well formed stream cut as it has missing ranges. So we need to augment the algorithm to fill up missing ranges with segments such that entire range is covered without breaking the greatest upperbound criteria. We can use the epoch of the highest segment in the upperBound and include its sibling segments from this epoch such that cover the missing range. All such included segments can be assigned offset 0 without breaking any guarantees. Inclusion of any such sibling segment may also result in exclusion of predecessor segments from upperBound and we should run this loop until there are no missing ranges. Worst case, we will end up with upperbound comprising of all segments from this epoch. However, this will still be an upper bound on all positions of all writers of the stream. We can have a segment x, which replaced segment y, that may only cover its partial range. Which means in x's epoch there was some set of segment z1, z2 ... zk that covered the remainder range covered by y. It is possible that union of z1, z2 .. zk covered more than the range covered by y and may end up replacing other segments from the upperBound set. We can include all of z1, z2 ... zk without breaking the upper bound promise. And then repeat this until all missing ranges are accounted for.

class Mark {
    long timestamp;
	Map<Segment, Long> position; // segment to offset map 
}

class Generations {
	long previousWatermarkTime;
	Map<String, Mark> writerMarks;

	Pair<StreamCut, Long> generateWaterMark() {
		upperBound = computeGreatestUpperBound();
		StreamCut streamCut = computeStreamCut(upperbound);
		int lowerBoundOnTime = writerMarks.values().stream().map(x -> x.timestamp).min();
		return new Pair(streamCut, lowerBoundOnTime);
	}
	
	Map<Segment, Long> computeGreatestUpperBound() {
		Map<Segment, Long> upperBound;
		for (Position pos : writerPositions) {
			addToUpperBound(pos.segmentOffsetMap, upperBound);
		} 		
	}

	void addToUpperBound(Map<Segment, Long> segmentOffsetMap, Map<Segment, Long> upperBound) {
		for (Entry<Segment, Long> entry : segmentOffsetMap) {
		    Segment s = entry.key;
			Long offset = entry.value;
		    if (upperBound.containsKey(s)) { // update offset if the segment is already present. 
			    long newOffset = Math.max(p.getOffsetFor(s), upperBound.get(s));
				upperBound.put(s, newOffset);
			} else if (!hasSuccessors(s, upperBound.keys)) { // only include segment if it doesnt have a successor already included in the set. 
    			list<Segment> toReplace = findPredecessors(s, upperBound.keys()); // replace any predecessor segments from the result set. 
		    	upperBound.removeAllKeys(toReplace);
				currentOffsets.put(s, offset);
			}
		}
	}

	StreamCut computeStreamCut(Map<Segment, Long> upperbound) {
		Pair<Double, Double> missingRange = findMissingRange(upperBound);
		Map<Segment, Long> streamCut = upperbound;
		if (missingRange != null) {
			int highestEpoch = upperbound.segments.stream().map(segment -> segment.epoch).max();
			EpochRecord highestEpochRecord = metadataStore.getEpoch(highestEpoch);
		
			do {
				List<Segment> replacement = findSegmentsForMissingRange(highestEpochRecord, missingRange);
				Map<Segment, Long> replacementSegmentOffsetMap = replacement.stream().collect(Collectors.toMap(x -> x, x -> 0);
				addToUpperBound(replacementSegmentOffsetMap, upperBound);
				missingRange = findMissingRange(streamCut);
			} while (missingRange != null);
		}
		
		return new StreamCut(streamCut);
	}

}

Compatibility and Migration

Use of the API for marking is optional. In the event that the writer does not provide us with any information, nothing will be written to the mark segment and the reader will not be able to receive timestamps.

The only backward incompatible change it the addition of the writerId to the call to create a new writer.

Future possibilities

One natural extension to this idea is to allow the user to request a streamCut for given time. We could implement this if we had an index of the marks segment. This could be done by maintaining an ‘index segment’ which contained records that consist of:

  • an offset (a pointer to the mark segment)
  • A timestamp. Which are a fixed 16 bytes.

Because the index segment has fixed size records we could easily perform a binary search on it to obtain the closest streamcut in the marks segment to any given timestamp.

Another possible improvement is to use the time information to help with reader rebalancing. If a stream is marked, when readers rebalance segments instead of rebalancing by summing the number of bytes remaining, they can instead use their time location in the stream to compare who is ahead or behind.

Disadvantages

Ties time/progress to a long

Because it becomes part of our API, time is necessarily defined in terms of a long. For some applications this may not be desirable as it does not represent the application's notion of progress.

Applications reading from multiple streams

This proposal focuses on a single stream, there are applications that process multiple streams. Enrichment joins are a good example for the case in which the time is aligned for the multiple streams.

For the case in which time is not aligned, it does not make sense to talk about watermarking. The contract of watermarks is that once the operator receives a watermark for time t, all events with timestamp t or earlier have been received. With streams with data corresponding to time epochs that might not even overlap, such a contract is meaningless.

Time window bounds may not be tight

The upper bound provided for time may be quite far behind if one writer had not noted the time for a while when the data was being written. Similarly if a connection is dropped after a segment is merged the offsets for that segment may be behind.

The lower bound is also slack in the event that clocks are not well synchronized. Similarly if committing transactions has a very high latency, it will add to the uncertainty in the lower bound.

Discarded Approaches

Keeping a time index on a per segment basis

This could provide a very similar API to the one proposed, and eliminate much of the work done on the controller. However it requires significant changes on the server side and does not work with transactional writers.

Only supporting ingestion time

Ingestion time has a lot of nice properties, like you don’t actually need to know or hear from all the writers to advance time. However it just doesn’t work for all use cases. Particularly historical data could be a problem.

Using extended attributes instead of a index segment

If the SegmentStore could provide API like findNextAttributeFollowing(Key) and could scale to very large numbers of attributes, this could be an alternative to the index segment. This requires a lot of work server side and would be less compact in terms of storage. The advantage it would have is that creating a new reader group based on a timestamp would be faster. However this is a rarely performed operation that is not performance critical.

References

Watermarking in other systems:

Clone this wiki locally