Skip to content

PDP 47: Pravega Streams: Consumption Based Retention

Prajakta Belgundi edited this page Oct 7, 2020 · 20 revisions

Motivation

Message Deletion in Pravega

  • No event/message level deletes.
  • Stream truncation happens at periodic intervals along Stream-Cuts.
  • Stream truncation based on space/time limits for the Stream.
  • Stream truncation is agnostic of message consumption by Reader Groups.

Pravega Streams provide support for only time and space-based truncation. A Consumption based retention mechanism for a Pravega Stream would ensure:

  1. The stream carries only unconsumed data and is space efficient.

  2. Messages are not deleted before they are consumed by all consumer(subscribers). 

This is particularly useful when using a Pravega Stream as a message queue and in use-cases where we may not be necessary to keep data in the Stream after it has been consumed.

Terminology

Reader Group (RG) - A Pravega Reader Group that reads from one or more Pravega Streams. A Reader Group can have multiple Readers each reading from 1 or more Segments in the Stream.

Retention Policy (RP) - A Stream Configuration that tells on what basis the Stream will be truncated. Currently supported policies are Time and Space based policies. These truncate data in a Stream if it is older than a configured time interval or if Stream size is beyond a certain configured size limit.

Consumption Based Retention (CBR) - A Retention Policy on a Pravega Stream that requires data to be truncated from the Stream only after all Subscribers have read it. A Stream can have both "durable" and "non-durable" Subscribers and read positions of only "durable" subscribers will be tracked by CBR policy for truncation.

Position - A 'Position' object representing the position of a Reader in a Pravega Stream. It is a map of segments owned by the Reader and their read offsets. A Stream-Cut can be generated from a set of "Position" objects for all Readers in the Reader Group.

Stream-Cut - A set of pairs for a single stream that represent a consistent position in the Stream. A Stream-Cut covers the complete key-space at a given point in time. The offset always points to the event boundary and hence there will be no offset pointing to an incomplete event.

Checkpoint - A system event that causes all Readers in a Reader Group to persist their current read positions to State Synchronizer. A Checkpoint is necessary for a Reader Group to be able to resume reading from the Stream after it goes down. With ESR, Check-pointing is also a prerequisite for assignment of unassigned segments in the Stream. For a Reader Group, checkpoints can be generated automatically or explicitly on user request. Automated check-pointing is enabled by default. The lastCheckpoint is stored in ReaderGroupState.

Automated Checkpoint - An automated checkpoint is a checkpoint generated by the Pravega Client at periodic intervals to ensure that Readers persist their read positions. This process is done without any user interaction. Auto-generated checkpoints are not returned to users.

Manual Checkpoint - A manual checkpoint is generated on user request and it is returned to the User when it completes.

Subscriber Reader Group - A "Subscriber RG" is a Reader Group whose read positions are considered for computing the truncation point for a Stream with Consumption Based Retention. When such a Subscribing Reader Group goes down, the system will truncate the Stream based on last published position of this Reader Group and will not truncate any data _not _read by this Reader Group.

Non-Subscriber Reader Group - A Non-Subscriber Reader Group (default) is the one that wants to read from a Stream, but does not want the consumption to impact Stream truncation. When this Reader Group goes down and comes back up, it may have lost some messages in the queue because those were deleted while it was away.

Design

New Concepts

A Stream can be read by one or more Reader Groups and they can start/stop reading from a Stream at any point during its lifetime. The Reader Groups' state, including read positions, is maintained on the Client using the State Synchronizer. Stream truncation, on the other hand, is handled by the Controller but it is not aware of Reader Groups or their read positions.

Stream Subscription

Currently, Pravega Streams do not have a concept of “Subscription”. A Reader Group or set of Reader Groups may be the primary consumers of a Stream, such that once data is read by them, it can be discarded from the Stream. In Consumption based retention, we want that read/processed positions of such a Reader Group must impact the Stream's truncation. This intent is expressed with "Subscription".

A Stream can be read by both "Subscriber" and "Non-Subscriber" reader groups, but the reads of only Subscriber Reader Groups would impact Streams' truncation.

There may be transient Reader Groups that read a Stream temporarily (between Stream Cuts) but we would not want these reads to impact the Stream's truncation policy and hence these should be configured as "Non-Subscriber" Reader Groups.

User Acknowledgement

Users acknowledge till what position in the Stream they have “processed” data and so it can be deleted. With EventStreamReader, since everything till the last checkpoint is considered “processed”, the last checkpoint can be considered as the processed boundary on the Stream , once all Readers have read past it.

Reference: http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/EventRead.html

How CBR would work

  • Stream Retention Policy ="CONSUMPTION“
  • On Client, at least one RG reading from the Stream must be a “Subscriber”.
  • On Client, each Subscriber RG periodically publishes a truncation Stream-Cut to Controller. This could be the Stream-Cut of the last-Checkpoint or a custom Stream-Cut provided by user.
  • On Controller, a "Truncation Stream-Cut"  would be stored for each Subscriber. When the truncation cycle runs, a common truncation Stream-Cut would be calculated based on truncation Stream-Cuts of all Subscribers.
  • On Controller, the stream would be truncated at this common truncation Stream-Cut.

Client Changes

A Stream with "Consumption" based retention policy can be read by Subscriber as well as non-Subscriber Reader Groups.

Subscriber Reader Groups

A Subscriber Reader Group would periodically publish truncation Stream-Cuts to Controller. Non-Subscriber would not publish these, but would just read from the Stream. An existing Reader Group would be a non-subscriber RG by default.

The following 2 new boolean fields should be added to ReaderGroupConfiguration:

  1. isSubscriber : A reader group can be made a "Subscriber" by setting this flag to true. Default would be false. Setting this flag to true, should invoke Controller "addSubscriber" API to add this ReaderGroup name as "subscriber" for each Stream in the RGConfiguration. Similarly, when a ReaderGroup is deleted, or the isSubscriber flag is flipped to "false", a "removeSubscriber" API call should be made to remove the ReaderGroup from the list of Stream Subscribers.

Note: Reader Group Subscription is a "none" vs "all" concept. A Subscriber RG needs to publish truncation StreamCuts for every Stream in its Configuration, regardless of the Stream's truncation policy on Controller.

  1. autoTruncateAtLastCheckpoint: When this flag is set, we publish the StreamCut corresponding to the lastCheckpoint from Client to Controller as "truncation StreamCut" for the Reader Group.

Updating Reader Group Configuration

ReaderGroupConfiguration can be updated in the following ways:

  1. The set of Streams to read from can be updated in RGConfiguration:
  • If a Stream is removed from RG Configuration, the RG should correspondingly be removed from its list of Subscribers by invoking the removeSubscriber API on Controller.
  • If a Stream is added to RG Configuration, the RG should be added to Subscriber list for the Stream by invoking addSubscriber API on Controller for the new Stream
  1. If a "Subscriber" RG becomes "Non-Subscriber" , the Client should invoke removeSubscriber API on all Streams in the configuration for this RG.

  2. If a "Non-Subscriber" RG becomes "Subscriber, the Client should invoke addSubscriber API on all Streams in the RG configuration.

Updating Truncation Stream-Cut to Controller

The Controller should know truncation Stream-cut for each "Subscriber" on the Stream, to be able to arrive at a common truncation Stream-Cut. This Truncation StreamCut needs to be published by each Subscriber ReaderGroup on Client periodically.

If auto-checkpointing is enabled or manual checkpointing is happening and the flag autoTruncateAtLastCheckpoint = true, then Subscriber RG should publish the last generated checkpoint StreamCut as truncationStreamCut to Controller using controller API updateTruncationStreamCut.

If checkpointing is not enabled and there is no manual check-pointing as well, users need to identify StreamCuts upto which they have completed processing in the Stream and publish the same to Controller Client API updateTruncationStreamCut that would internally invoke the Controller API updateTruncationStreamCut.

Controller Changes

Controller should support these 3 new APIs :

Status addSubscriber (String RGName, String scopeName, String streamName) Status removeSubscriber (String RGName, String scopeName, String streamName) Status updateTruncationCheckpoint(String RGName, String checkpointName)

Consumption Based Retention Controller should support processing Consumption Based Retention Policy on Streams. For this, it would need to maintain data about Subscribers and their last published truncationStreamCut. When the Truncation Cycle runs, if a Stream has Consumption based retention configured, Controller should pull up StreamCuts corresponding to all Subscribers and arrive at a common truncation Stream-Cut and then truncate the Stream at this point.

CBR Fallback Policy

Handling Subscriber Failures

  • If no Subscribers publish truncation Stream-Cuts for a very long time, the Stream size can grow unbounded.
  • A space or time based maximum limit can be configured in the CBR policy.
  • For example, if space limit = 50G and after consumption based truncation, if Stream Size > 50G, the Stream will be truncated to retain not more than 50 GB.
  • Note: This can cause unconsumed messages to be deleted, and impact delivery guarantees, so specifying this limit is optional.

Handling late Subscribers

  • If a Stream has multiple subscribing RGs, and some subscribers join late, the early subscribers may truncate the Stream before the other Subscribers have had a chance to consume.
  • A space or time based minimum limit can be configured in the CBR policy.
  • For example, if time limit = 30 min, prior to consumption based truncation, if the head of Stream is less than 30 mins old, consumption based truncation would not be attempted.
  • Note: While this can cause more data to be retained than necessary, specifying this limit is optional.

Changing Retention policy for a Stream

Moving from Time/Space based Retention to CBR

If the Retention Policy for a Stream is changed from Time/Space based to Consumption based, but RG Configuration is not changed to make it a "Subscriber" , the Stream would not get truncated, since the Stream configuration on Controller would not have any Subscribers with their StreamCuts.

Here it is necessary to update both the RetentionPolicy on the Stream (Controller) as well as the RG configuration on Client for CBR to work.

Moving from CBR to Time/Space based Retention

When updating RetentionPolicy from CBR to Time/Space based Retention, it may be good to update the RG configuration and switch off the "Subscriber" flag.

This can help reduce unnecessary API calls from Client to Controller for updating the truncation Stream Cut.

It however won't impact working of Time/Space based retention policy.

Manual Stream Truncation

As in case of Time/Space based Retention, a user can manually truncate a Stream with Consumption based policy.

The truncation would work, but policy related guarantees won't not hold good in this case.

Users are expected to not arbitrarily truncate a Stream manually, if it is being consumed by multiple Reader Groups and has a Retention Policy set.

To enforce this requirement we could have a security policy that allows only "admin" users to have rights to truncate a Stream manually.

FAQ

Can aggressive check-pointing overwhelm the system?

Aggressive checkpointing does not help truncate faster, as Controller truncates only when the truncation cycle runs (every truncation interval) Automatic checkpointing runs every 30 secs by default for each RG. The checkpointing interval is configurable and can be lower bounded to a certain value to prevent over aggressive checkpointing.

Do we support acknowledgments and deletes at per event level ?

No, we only support cumulative acknowledgements.

Can a new Subscriber read messages published prior to its joining?

Yes, a new Subscriber can read from anywhere in the Stream. This could be changed in the future if needed, to limit it to reading only messages published after the Subscriber has joined.

Can a slow subscriber slow down Stream Truncation?

Yes. This can happen. A space/time based upper limit would ensure the Stream does not grow unbounded. If it is important to never loose messages users can skip specifying the upper bound.

Do we have also have a TTL per message with Consumption based retention policy?

The time based upper bound works as a TTL.

Github Issue

https://github.com/pravega/pravega/issues/5108

Clone this wiki locally