Skip to content

PDP 47: Pravega Streams: Consumption Based Retention

Prajakta Belgundi edited this page Oct 7, 2020 · 20 revisions

Motivation

Message Deletion in Pravega

  • No message level deletes.
  • Stream truncation happens at periodic intervals along Stream-Cuts
  • Stream truncation based on space/time limits for the Stream.
  • Stream truncation is agnostic of message consumption by Reader Groups.

Pravega Streams provide support for only time and space-based truncation.  We do not track if the messages being truncated have been consumed by Readers or not.

A Consumption based retention mechanism for a Pravega Stream would ensure:

  1. The stream carries only unconsumed data and is space efficient.

 2. Messages are not deleted before they are consumed by all consumer(subscribers). 

This helps particularly when using a Pravega Stream as a message queue and for edge use-cases where we may not need to keep data in the Stream after it has been pushed to the Core.

Terminology

Reader Group (RG) - A Pravega Reader Group that reads from one or more Pravega Streams. A Reader Group can have multiple Readers each reading from 1 or more Segments in the Stream.

Retention Policy (RP) - A Stream Configuration that tells on what basis the Stream will be truncated. Currently supported policies are Time and Space based policies. These truncate data in a Stream if it is older than a configured time interval or if Stream size is beyond a certain configured size limit.

Consumption Based Retention (CBR) - A Retention Policy on a Pravega Stream that requires data to be truncated from the Stream only after all Subscribers have read it. A Stream can have both "durable" and "non-durable" Subscribers and read positions of only "durable" subscribers will be tracked by CBR policy for truncation.

**Position **- A 'Position' object representing the position of a Reader in a Pravega Stream. It is a map of segments owned by the Reader and their read offsets. A Stream-Cut can be generated from a set of "Position" objects for all Readers in the Reader Group.

Stream-Cut - A set of pairs for a single stream that represent a consistent position in the Stream. A Stream-Cut covers the complete key-space at a given point in time. The offset always points to the event boundary and hence there will be no offset pointing to an incomplete event.

Checkpoint - A system event that causes all Readers in a Reader Group to persist their current read positions to State Synchronizer. A Checkpoint is necessary for a Reader Group to be able to resume reading from the Stream after it goes down. Check-pointing is prerequisite for assignment of unassigned segments in the Stream. For a Reader Group, checkpoints can be generated automatically (periodically) or explicitly on user request. Automated check-pointing is enabled by default. Every time a new checkpoint is generated its value is persisted in the latestCheckpoint field in RGState in State Synchronizer.

Automated Checkpoint - An automated checkpoint is a checkpoint generated by Pravega Client at periodic intervals to ensure that Readers persist their read positions. This process is done without any user interaction. The name of auto-generated checkpoints is not exposed to users.

Manual Checkpoint - A manual checkpoint is generated on user request and its name returned to the User once the checkpoint completes.

Subscriber Reader Group - A "Subscriber RG" is a Reader Group whose read positions are considered for computing the truncation point for a Stream with Consumption Based Retention. When such a Subscribing Reader Group goes down, the system will truncate the Stream based on last published position of this Reader Group and will not truncate any data _not _read by this Reader Group.

Non-Subscriber Reader Group - A Non-Subscriber Reader Group (default) is the one that wants to read from a Stream, but does not want the consumption to impact Stream truncation. When this Reader Group goes down and comes back up, it may have lost some messages in the queue because those were deleted while it was away.

Publisher - An Pravega writer that writes messages to a Pravega Stream.

Design

New Concepts

A Stream can be read by one or more Reader Groups and they can start/stop reading from a Stream at any point during its lifetime. The Reader Groups' state, including read positions, is maintained on the Client using the State Synchronizer. Stream truncation, on the other hand, is handled by the Controller but it is not aware of Reader Groups or their read positions.

Stream Subscription Currently, Pravega Streams do not have a concept of “Subscription”. A Reader Group or set of Reader Groups may be the primary consumers of a Stream, such that once data is read by them, it can be discarded from the Stream. In Consumption based retention, we want that read/processed positions of such a Reader Group must impact the Stream's truncation. This intent is expressed with "Subscription". A Stream can be read by both "Subscriber" and "Non-Subscriber" reader groups, but the reads of only Subscriber Reader Groups would impact Streams' truncation. There may be transient Reader Groups that read a Stream temporarily (between Stream Cuts) but we would not want these reads to impact the Stream's truncation policy and hence these should be configured as "Non-Subscriber" Reader Groups.

User Acknowledgement Users acknowledge till what position in the Stream they have “processed” data and so it can be deleted. With EventStreamReader, since everything till the last checkpoint is considered “processed”, the last checkpoint can be considered as the processed boundary on the Stream , once all Readers have read past it. These acks are cumulative and indicate that all events preceding that position in the Stream are "processed" and can be safely removed. Reference: http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/EventRead.html

How CBR would work

  • Stream Retention Policy ="CONSUMPTION“
  • On Client, at least one RG reading from the Stream must be a “Subscriber”.
  • On Client, each Subscriber RG periodically publishes a truncation Stream-Cut to Controller. This could be the Stream-Cut of the last-Checkpoint or a custom Stream-Cut provided by user.
  • On Controller, a "Truncation Stream-Cut"  would be stored against each RG. When the truncation cycle runs, a common truncation Stream-Cut would be calculated based on truncation Stream-Cuts from all RGs.
  • On Controller, the stream would be truncated at this common truncation Stream-Cut.

Client Changes

A Stream with "Consumption" based retention policy can be read by Subscriber as well as non-Subscriber Reader Groups.

Subscriber Reader Groups

A Subscriber Reader Group would periodically publish truncation Stream-Cuts to Controller. Non-Subscriber would not publish these, but would just read from the Stream. An existing Reader Group would be a non-subscriber RG by default.

The following 2 new boolean fields should be added to ReaderGroupConfiguration:

  1. _isSubscriber _: A reader group can be made a "Subscriber" by setting this flag to true. Default would be false. Setting this flag to true, should invoke Controller "addSubscriber" API to add this ReaderGroup name as "subscriber" for each Stream in the RGConfiguration. Similarly, when a ReaderGroup is deleted, or the isSubscriber flag is flipped to "false", a "removeSubscriber" API call should be made to remove the ReaderGroup from the list of Stream Subscribers.

  2. autoTruncateAtLastCheckpoint: When this flag is set, we publish the StreamCut corresponding to the lastCheckpoint from Client to Controller as "truncation StreamCut" for the Reader Group.

Updating Reader Group Configuration

ReaderGroupConfiguration can be updated in the following ways:

The set of Streams to read from can be updated. If a Stream is removed from RG Configuration, the RG should correspondingly be removed from its list of Subscribers by invoking the removeSubscriber API on Controller. If a Stream is added to RG Configuration, the RG should be added to Subscriber list for the Stream by invoking addSubscriber API on Controller for the new Stream 2. If a "Subscriber" RG becomes "Non-Subscriber" , the Client should invoke removeSubscriber API on all Streams in the configuration for this RG.

  1. If a "Non-Subscriber" RG becomes "Subscriber, the Client should invoke addSubscriber API on all Streams in the RG configuration.

Updating Truncation Stream-Cut to Controller

The Controller should know truncation Stream-cut for each "Subscriber" on the Stream, to be able to arrive at a common truncation Stream-Cut. This Truncation StreamCut needs to be published by each Subscriber ReaderGroup on Client periodically.

If auto-checkpointing is enabled or manual checkpointing is happening and the flag autoTruncateAtLastCheckpoint = true, then Subscriber RG should publish the last generated checkpoint stream cut as truncationStreamCut to Controller using controller API updateTruncationStreamCut

Clone this wiki locally