Skip to content

PDP 35: Move controller metadata to KVS

shivesh ranjan edited this page Aug 1, 2019 · 1 revision

Motivation

We currently use zookeeper as the durable store for metadata stored by controller. This limits our scalability story of the controller because a) zookeeper is not efficient for very large volumes of writes b) we are limited by storage capacity of provisioned zookeeper cluster.

Since we are building a KV store using segments, we intend to move most of controller metadata to KVS.

Goal

  • Decouple cluster mangement and metadata store in the store abstractions to facilitate movement of metadata to KVS without relying on zookeeper's capabilities or a shared session.
  • Implement all metadata stores in controller with KVS as the durable store.

Non goal

Move cluster management out of zookeeper.

Proposal

In this document will describe changes to controller's metadata store abstraction and processing changes to work with any underlying store that provides following capabilities:

Prerequisites:

  1. A way to group related keys cheaply: Either via a directory structure for storing keys and value a la zookeeper. Or creation of cheap tables so that all related keys can be placed in specific tables.
  2. Efficient CRUD apis on key-value entries: We should be able to create key value entries efficiently. We should be able to update existing keys and have ability to remove them.
  3. Table level get-all-keys.

Controller instances are stateless and need to store all of the state in some shared durable store. Each controller instance independently performs processing of background workflows against incoming requests and mutate the persisted state. Each different processing is typically performed exclusively on one of the controller instances. Different background worker frameworks use different concurrency mechanisms but currently they are all predicated on using same client for cluster membership and store. This means losing membership also renders controller instance unable to update the state. We have updated controller's processor's exclusive processing guarantees to rely on underlying store's optimistic concurrency support.

Metadata

There are 4 top level metadata stores used in controller: HostStore, TaskStore, CheckpointStore, StreamMetadataStore. TaskStore and StreamMetadataStore also use indexes that are stored in zookeeper.

  1. Host Store: This is used by cluster management. The information stored using the hostStore in zookeeper is directly watched by all segment store instances for segment container allocation. So it is non goal to move cluster management of segment store out of zookeeper.

  2. Task Store Task store interface is used to store a durable task "lock" Task Framework: TaskFramework is build to use a Task store to store metadata related to ongoing tasks. A task represents a background workflow that is performed mutually exclusively against a given resource by one of the controller instances. Task Store stores the information about all ongoing background tasks along with information about current owner of the task.
    We have only one type of task on task framework, for "stream creation".

Proposal (Optional): Remove task framework. It is an overkill to be only used for stream creation. CreateStream to be called as a method on each controller instance. If controller instance crashes while creating a stream, the client will anyway have to reattempt creation of stream as it would not have received positive response from controller. To handle concurrency during stream creation, we will employ the same concurrency model that we will use for all stream metadata updates (described later). At a high level, if two controller instances attempt to create the same stream concurrently, one of them will start stream metadata creation but the second one will then fence it out and complete the creation.

  1. Event processor checkpoint store: Event processors store their reader states in checkpoint store. This checkpoint store will continue to be in zookeeper as checkpoint update should be performed by controller with active zk session.

  2. Stream metadata store: Metadata Store manages multiple types of metadata for the stream. We will have the metadata broken up into multiple tables:

Table naming convention: _system/_tables/<table-name>

Tables with keys and values:

1. System/scopes table
key: scope-name, value: unique scope id

2. scope/streams-in-scope table
key: stream name, value: unique stream id

The unique scope and stream ids are used to create new metadata records post recreation of stream. This is done because when older metdata tables are removed,segmentstore deletes them lazily hence we may not be able to create the tables immediately. 

3. stream metadata tables:
    3.1 `<scoped-stream-name>_streamMetadata`
     1. Key: creation time  // never deleted
     2. Key: configuration record // never deleted
     3. key: truncation record // never deleted
     4. key: state record // never deleted
     5. key: epoch transition record // never deleted
     6. key: committing-transaction-record // never deleted
     7. key: current epoch record // never deleted
     8. key: retention set record // never deleted
     9. key: epoch record <epoch number>  // never deleted
     10. key: history time series <chunk> // never deleted O(number of epochs / 1000)
     11. key: segment sealed size map <shard-number> // never deleted O(number of epochs / 1000)
     12. key: <segment number> // never deleted O(number of segments)
     13. key: markers_<segment-number> // deletable entries
     14. key: stream-cut-<reference> // deletable entries
     15. key: waitingRequestProcessor // deletable entry

     3.2 <scoped-stream-name>_epochsWithTransactions
     1. key: epoch value: empty
     3.3 <scoped-stream-name>_transactionsInEpoch
     1. key: txnId value: ActiveTxnRecord
     Whenever new epoch gets created we create a new transactions in epoch table. And we opportunistically attempt to delete empty older transactionsInEpochs tables after each commit and abort completion. 
    
     3.4 _system/completed_transaction batches
     key: batch, value: empty

     3.5 completed_transactions_<batch>
     key: <scoped-stream-name-txnId>, value: completed txn record 

Processing:

Processing against stream metadata (all stream workflows, including create stream): We should expose version on stream metadata store interface so that processors work with the store entities by relying on optimistic concurrency and idempotent updates to metadata. This will allow us to be resilient to multiple processors attempting to update the same metadata record concurrently. So if we run into a situation where two controller instances are processing same event concurrently, one of them will succeed in its attempt to update the metadata while the other will fail with a retryable WriteConflict exception.

Clone this wiki locally