Skip to content

PDP 24: Rolling Transaction

shivesh ranjan edited this page Jun 7, 2018 · 24 revisions

Proposal for rolling transaction in Pravega

Status: Under discussion

Related issues: Issue 179, Issue 1344. Discuss on: Issue 2045

Currently transactions and scale operation have implicit relationship because transactions are tied to parent segments against which they are created. This means we cannot proceed with scale and seal old segments until all transactions against those segments have completed. Since we do not want to stall scale for too long on account of transactions, we have introduced plethora of timeouts in our scheme which not only make our createTxn apis ugly, they also require clients to keep performing periodic pings lest their transactions timeout.

Another problem is that even though writers are continuously writing data into a transaction, they still require "pinging" the transaction by calling a controller api. This is counter intuitive for application developer. It also opens up possibilities of all sorts of races where if a controller crashes before recording a ping, by the time client recognizes and connects to a different controller the transaction may have timed out. Such situations are clearly undesirable.

Objective: Allow transactions to have large timeouts and ability to exist across multiple scale epochs.

We should be able to commit a transaction even after its parent segments have been sealed. If we are able to achieve this then transactions become decoupled from scale and can be committed at any point in time. This allows transactions to have large timeouts whereby removing the need for regular pings for these transactions.

Proposal: Currently when we commit a transaction, we merge txn segments into the parent segments. The proposal is to generalize the notion of transactions on segment store such that transactional segments become no different from regular segments. Then on controller, by performing metadata manipulation, we can elevate transaction segments to first class segments within a stream by decoupling their relationship with their parent epoch.

Proposal details:

We can generalize the concept of a transaction such that segment store need to no longer expose transaction as first class entities. We can build a notion of a transaction externally if Segment store were to expose two primitives – 1) ability to create segments with a new scheme; and 2) ability to merge segments satisfying some invariants. To achieve this, we can have the naming of a segment follow a scheme such that it is composed of two values -

  1. primary id
  2. secondary id The primary Id is part of a segment-id and is used to identify the segment container ownership. However, a composition of primaryId and secondaryId uniquely identifies a segment. So primary id can be seen as equivalent of existing segment ids. And secondary ids are equivalent of transaction ids. But they are more generic in their behavior.
    So translating this into existing scheme, if secondary id is null, the resultant id is same as existing segment ids. If secondary id is supplied, primary.#secondaryId is equivalent to parent.#txnId. But with the generalization the segment store does not need to have the notion of a parent and child relationship anymore. Segments primary, primary.#secondaryId1, primary.#secondaryId2 … are all unique and independent segments.
  • primary part of the name ensures that they all reside on the same segment container.
  • Segments with such naming can be created independent of presence and state of primary segment (Note: Currently transactions can only be created against parent segments if they exist and are not sealed. But we can do away with such requirements). So segment store can modify its createSegment primitive to include two parameters instead of one. A logical definition of segment id would be
class SegmentId {
   String scope;
   String stream; 
   Object primaryId;
   Object secondaryId;
}
// if only primary is supplied in segemntId then we create segment with name scope/stream/primary
// if both primary and secondaryId are supplied in segemntId then we create segment with name scope/stream/primary.#secondaryId

def createSegment(SegmentId segmentId)

The other important aspect of a transaction segment is the ability to merge it into parent segment. We should generalize mergeTransaction into mergeSegments primitive. mergeSegments will allow merging of any two segments if they have the same primary. Other constraints of a transaction merge and merge segment are equivalent : The segment-to-be-merged has to be sealed (and not truncated) and segment-to-merge-into should be open.

/*
* If into and toBeMerged share same primary and toBeMerged is sealed and into is open, then toBeMerged can be appended into. 
*/

def mergeSegments(Segment into, Segment toBeMerged)

With above primitives we can perform a scheme on controller such that controller composes a notion of transactions using these segments and merges them as and when necessary. Controller will continue to create segments with only primary and no secondary ids for regular segments. And for transactions it will create segments with an primary and transaction id as secondary id. So now we can have two modes of transaction commits:

  1. If transaction epoch is same as current epoch, controller will perform regular transaction segment merge into parent segment. merge(parent, parent.#txnId)
  2. If transaction epoch is behind current epoch, controller will make transaction segment a first class segment in the stream and potentially merge multiple transaction segments together.

Controller changes:

  • Linearize scale and commit transactions on a single processing channel (Stream Event Processor + Request Handler). This will ensure that while committing a transaction, no scale is being performed concurrently.
  • When commit for a txn is processed, one of the two things can happen:
  1. If txn was created on the currently active epoch, then merge this txn with its parent(/primary) segments (same as current flow)
  2. If txn creation epoch is behind current active epoch, seal active segments, elevate txn segments as first class segments and successors to active segments, recreate identical active segments

Transaction commits across epochs follow a workflow similar to scale - create new duplicate segments, add txn segments and partial record in history table, then seal existing active segments.

So as an example: Lets say we have two transactions created against epoch 1

  1. epoch 1:s11, s12, s13 ... s1n
  2. create txn t1, t2 (against epoch 1) t1 segments created = s11.#t1, s12.#t1, s13.#t1 ... s1n.#t1 t2 segments created = s11.#t2, s12.#t2, s13.#t2 ... s1n.#t2
  3. t1 commit comes. active epoch = 1. t1 is merged into s11, s12, s13 .. s1n t2 is still open
  4. scale happens --> new segments after scale s21, s22, s23 ... s2m epoch 2: s21, s22, s23 ... s2m
  5. t2 commit comes --> (txn epoch 1 < active epoch 2)
  • 4.a create duplicate active segments in segment store s21.#3, s22.#3, s23.#3 ... s2m.#3
  • 4.b create duplicate old epoch segments s11.#3, s12.#3, s13.#3 ... s1n.#3
  • 4.c merge s1i.#t2 into s1i.#3
  • 4.d add s11.#t2, s12.#t2, s13.#t2 ... s1n.#t2 as next set of segments in metadata + duplicate active segments atomically. epoch 3: s11.#3, s12.#3, s13.#3 ... s1n.#3 epoch 4: s21.#4, s22.#4, s23.#4 ... s2m.#4 (partial) [Note: s2i.#4 is a duplicate of s2i so any transaction on epoch 2 can be merged into epoch 4 as they share the same primaryId]
  • 4.e Seal active segments. complete partial record seal s21, s22, s23 ... s2m -> (complete epoch 4 history record)

A new create txn request, T3 can be received either: case 1: before 4.b : t3 epoch: 2.. t3 segments => s21.#t3 s22.#t3 s23.#t3 ... s2m.#t3 case 2: after 4.b : t3 epoch: 4.. t3 segments => s21.#t3, s22.#t3, s23.#t3 ... s2m.#t3

As an optimization, we can collect and merge multiple transactions from old epoch which have been marked for commit by merging them together into a single new epoch (loop over steps 4.c with multiple transactions). We can do this because we first mark a transaction for commit and then execute the commit workflow. This allows us to collate few transactions that may belong to same epoch and have been committed.

  1. Linearize commit transaction workflow on Event Processor.
  2. Introduce two modes of commit workflow for transaction.
  3. Update scale workflow to not wait on transaction completion.
  • Change Scale workflow to complete even if there are transactions against old epoch.
  1. Change transaction id scheme from random GUID to function(epoch, unique id). For example TXID = new GUID (MSB = epoch. LSB = counter).

Details of controller changes:

  1. Create Transaction: Create transaction can continue to work as it does in the current scheme with the difference of segment id being created following the aforesaid scheme as opposed to generated randomly. Controller will no longer call createTransaction primitive. Rather it will create multiple segments by createSegments primitive for each active segment in the epoch as primary and txnId as secondary id.
1. generate txnId = new UUID(epoch-number, distributedCounter.increment) 
2. for each segment in epoch.activeSegments
3. createSegment(segment.primary.#txnId)
  1. Commit Transaction: As described earlier, there are two ways commit could be proceeded with: commit within same epoch, and commit in a future epoch. In former case, we merge txn segment into parent segment.
1.	for each segment in epoch.activeSegments
2.	sealSegment(segment.primary.#txnId)
3.	mergeSegment(segment.primary, segment.primary.#txnId)

In latter case, we need to commit transactions into a new logical segment that is placed on same segment container as the parent segment and elevate it to being a first class segment.

1.	newId = generateUniqueNewId(currentEpoch) 
// loop over multiple transactions from the same epoch that are marked for commit. 
2.	For each txn in List-of-txn-to-commit-from-same-epoch
3.	createSegment(segment.primary, segment.primary.#newId)
4.	for each segment in txn.Epoch.activeSegments
5.	sealSegment(segment.primary.#TxnId)
6.	mergeSegment(segment.primary.#uniqueCounter, segment.primary.#txnId)
7.	loop end
8.	loop end
9.	sealSegment(segment.primary.#newId)
Now we can include this new segment “segment.primary.#newId” as a first class segment in the stream.

If you notice, we need ability to create duplicate segments for original segments to encode. We need an ability to distinguish them from their original, but at the same time, have the ability to identify the common root. To this effect, we will enhance the "integer" based segment number to become a "long" based segment id. This will comprise of two parts - the int segment number and int epoch. The epoch in which segment is created will effectively be prepended to its number. So new definition of SegmentId will look like:

String scope
String stream
int segmentNumber // primary
int epoch // secondary
Optional UUID // secondary
}

Notice that primary will be used for mapping. And secondary will be used for identifying the segment. Benefits of such an approach:

  1. we can easily encode duplicate segments using this scheme.
  2. we can easily differentiate between regular segments and transaction segments at controller and client (they continue to work exactly as they do now).
  3. we have the ability to create duplicates any time we need to and merging transaction segments into duplicates effectively allows us the ability to elevate transcation segments to first class segments which follow the consisted id scheme.

On controller, presently we post specific commit requests for transactions. However, we can change that to now post epoch instead of txn-id for commits. That way we can collect multiple transactions into a single request by taking all pending transactions marked for commit and loop over them and perform a single commit. To ensure ordering guarantees across retries and failovers, the event processing will create a temporary node which will capture all the transactions that will be committed as part of processing of this event. This ensures that if such a node exits, during failover, we complete the outstanding work without changing the ordering guarantees of transactions.

  1. Abort Transaction: Abort transaction works almost as it does presently, except it will no longer attempt to signal any incomplete scale operation. Also primitives it calls on segment store changes. It will call seal and delete segment for txn segments as opposed to abort-transaction.

Segment Store changes:

Replace transaction concept with a more generalized concept of segments with new segment id that has primarys and secondary ids.

  • SegmentId = function(primary, secondaryId)
class SegmentId {
   String scope;
   String stream; 
   long primary;
   UUID secondaryId;
}
  • Remove create-txn, commit-txn and abort-txn primitives.
  • Add following new primitives:
def createSegment(SegmentId segmentId)  // no changes needed here
def mergeSegments(Segment into, Segment toBeMerged)

Update mapping function to rely only on primary id (it already does that).

Client changes:

  1. Client uses Segment object which has segment represented as String scope, String stream, Integer segmentNumber. This should change to String scope, String stream, long primary, uuid secondaryId. This means all calls into segment store uses new definition of segment id. But everything else remains same.

Potentially inefficient scenarios and how we deal with them

We still want to allow transactions to have arbitrarily large lease time with no restriction on lease renewal. We do this by removing their interdependence on scale by getting rid of scaleGracePeriod completely. This means if such transactions exist and are committed at arbitrary times, we can have lot of epoch transitions with active segments being sealed and recreated everytime such transactions are committed. The proposal works well if transactions are typically short lived (contained within an epoch). And if few transactions from an epoch spill over a scale epoch boundary, their commits will be requested within small time of each other whereby opening possibility of collating multiple transactions. If the applications behavior does not conform to the above behavior, and has large number of transaction commits happen at arbitrary times, each commit will result in an effective scale like behavior. This will be inefficient for applications as they will frequently encounter segment seals. But for more expected behaviour of short lived transactions with commit requests on same epoch coming within short span of each other opens a possibility of clubbing them together to reduce the churn.

Implementation Details

SegmentId

long segmentId = Function(segmentNumber, epoch) = 32bit<epoch>32bit<segmentNumber> Here the segmentNumber is primary and is used to determine segment container mapping. Epoch is secondary and uniquely identifies a segment and distinguishes between duplicate segments.
Fully qualified segment name = /scope/stream/segmentNumber/#epoch.epoch

TransactionId

UUID txn id = function(epoch, uniqueNumber) = 16 bit epoch + 96 bit counter. Fully qualified txn segment name = Fully qualified segment name = /scope/stream/segmentNumber/#epoch.epoch/#transaction.txnId So a transaction can be merged into any segment that shares the primary part (=/scope/stream/segmentNumber) with it and anytime we create duplicate segments they look like : /scope/stream/segmentNumber/#epoch.newepoch now controller stores history table where each epoch is a list of "long" segmentIds.. each epoch is either a new epoch (created from a new scale) or a duplicate epoch (created during rolling Txn)

A transaction continues to be differentiated from regular segments.

Controller Metadata

Epochs:

Each epoch also contains a reference epoch. This refers to the root epoch whose duplicate this epoch is. For fresh epochs, created after a new scale operation, they will have a loopback reference to themselves. For duplicate epochs, they will refer to the original epoch whose duplicate they are. So for example, if epoch 2 was created as a duplicate of epoch 0 then 2 will have reference epoch set to 0. Now say a new epoch 4 needs to be created which is a duplicate of 2. It will refer to the reference epoch in 2 which is 0.

So the logic is:

  1. scale: newEpochRecord.reference = newEpoch
  2. rollingtxn: duplicateEpochRecord.reference = originalEpochRecord.reference
Transactions:

All transactions will always be created against the reference epoch. This way all transactions sharing the same root epoch can be collected together and we can optimize and merge them together if needed.

CreateTransaction:

  1. txnId = generateTransactionId (activeEpoch.getReference, counter)
  2. createSegments(segmentId = activeEpoch.getReference.getSegments, txnId)

CommitTransaction:

  1. if activeEpoch.getReferenceEpoch == transaction.getEpoch merge segments into active epoch
  2. else rolling transaction:
    • create duplicate epoch of transaction.getEpoch
    • merge txn segments (also collect all its sibling transactions that are set to committing) into epoch created in 2.1
    • create duplicate of active epoch
    • add new epoch records for both duplicate epochs
    • seal current active epoch

Scale: It no longer waits on transactions. All transaction commits and scale operations are linearized and only one of them can be processed at any time. So during txn commit no scale can happen and vice versa.

Discarded alternatives

  1. Always make transaction segments top level segments upon commit
  2. Rename transaction segments in segment store to elevate them to top level segments.
Clone this wiki locally