Skip to content

PDP 44: Single Routing Key Transactions

shivesh ranjan edited this page Aug 19, 2020 · 6 revisions

Motivation

Pravega provides transactions for atomic update to a batch of events across routing keys. The transactions are designed such that when users create a new transaction, a new set of “shadow” segments are created for the transaction. These segments are separate from the stream’s active segments set and mimic their key range distribution such that events written into the transaction are routed to corresponding shadow segments as they would have routed if they were directly ingested into the stream. When a commit request is issued on the transaction, pravega controller service coordinates merging of shadow transaction segments into their stream segments.

This transaction abstraction is very powerful as it allows users to get atomic guarantees for events written across multiple routing keys. However, the lifecycle of transactions during both create and commit phases requires distributed coordination across multiple segments which could be spread across multiple segment store instances.

All the distributed coordination performed by controller service makes the transactions costly where the cost is not justified or amortized if there are small number of events in the transaction.

If we could avoid the overheads of centralized coordination then that would make the transactions very fast to work with. However, inherently allowing for multiple routing keys in a transactions means they could be mapped to different segments whereby some form of coordination across multiple segments for atomic guarantee is inevitable.

There is a popular use case where users may want to write several related events for a single routing key atomically. And using the aforesaid transactions for atomic guarantees could prove to be costly, particularly if the number of events being written is also small. We anticipate this to be a common use case and users may desire write performance equivalent to writing non-transactional data into the stream for small single routing key transactions.

By limiting the scope for atomicity to a single routing key, since all data would be written to a single segment, it will require zero coordination while providing atomic update guarantees for a batch of events.

Proposal

We propose to introduce a new concept called a “singleRoutingKey transaction”. This singleRoutingKey transaction abstraction is scoped to a single routing key and promises to provide atomic writes guarantees for batch of events with same routing key. Generally, all events from a routing key are written into the same stream segment. However, in case of scaling, the segment may be sealed and replaced. For sake of simplicity we will first describe how the approach will work for a single segment and then describe how we will provide exactly once write and atomicity guarantees across network glitches and segment sealed events.

The mechanism for batch updates is conceived by leveraging the Append Batch wire protocol that pravega writers use for writing data to pravega. The append batch protocol is initiated when a writer sets up a new append with the segment store and then serialized bytes corresponding to the events are transmitted to the segment store in Append Block packets until an Append Block End packet is sent indicating the end of the batch. The append blocks are buffered on the server until an append block end is send, which is when all the data is written into the segment.

For the new singleRoutingKey transactions proposal we intend to use the append block protocol to transfer the data for a transaction.

Here is the sequence of actions from the user’s perspective:

  1. Users initiate a new singleRoutingKey transaction object by specifying a “routing key”.

This would include talking to controller to get the active stream segments and their location and then selecting the segment which is responsible for the data corresponding to the routing key. A new Segment writer is instantiated for this segment whose range includes the consistent hash for the routing key.

  1. User invokes beginTransaction

This sets up a new append with segment store by invoking a new setup append.
This effectively generates a new writerId against which the writer state is managed in the segment’s attributes on segment store.

  1. User writes multiple events into this transaction.

The data they write is accumulated on the client. The data is buffered on the client in case it needs to retransmit the data (in case the underlying segment is scaled). The limit on the data is set to 16 MB at a maximum to ensure that we do not have large amounts of data to transmit.

  1. User calls commit on the transaction When a user issues a commit, it is sent over to segment store via append block protocol. The segment writer sends append block end which results in the all events being included atomically.

Once a transaction is committed, we could reuse the same transaction object for subsequent transactions. This avoids setting up the state and new attributes on the segment store for a new writer.

Single Routing Key Transaction:

The above approach only requires to be implemented on pravega client by creating new abstraction classes for SingleRoutingKey transaction with transaction segment writer class.

Just like a regular transaction, a SingleRoutingKey transaction writer could be created using the clientFactory. The SingleRoutingKey transaction writer can then be used to create transaction objects which are then used to write data into the transaction. Following are the interfaces for SingleRoutingKeyTranscation -

/**
 * A single routing key transaction writer that is similar to {@link TransactionalEventStreamWriter}, however it imposes constraints
 * on size of payload and routing keys. 
 * This writer uses the {@link SingleRoutingKeyTransaction} to write multiple events with same routing key atomically into a pravega stream. 
 * 
 * Prior to committing a transaction, the events written to it cannot be read or otherwise seen by readers.
 */
public interface SingleRoutingKeyTransactionWriter<Type> extends AutoCloseable  {
    
    /**
     * Start a new transaction on this stream. This allows events written to the transaction be written an committed atomically.
     * Note that transactions can only be open for {@link EventWriterConfig#transactionTimeoutTime}.
     * 
     * @return A transaction through which multiple events can be written atomically.
     */
    SingleRoutingKeyTransaction<Type> beginTxn();
    
    /**
     * Closes the writer. (No further methods may be called)
     */
    @Override
    void close();
}
/**
 * Provides a mechanism for writing multiple events with the same routing key atomically. 
 * Users can create new singleRoutingKey transactions for a routing key. After writing events into the transaction
 * they can call commit on it which will atomically write all events in the stream. 
 * A SingleRoutingKey Transaction is bounded in both size and time. Maximum allowed serialized size for all the events 
 * in the transaction is 16 MB. There is also a notion of time limit on the transaction - if the user does not issue a
 * commit on a transaction for the specified period then it is automatically aborted and all the events are discarded.
 * 
 * All methods on this class may block.
 *
 * @param <Type> The type of events in the associated stream.
 */
public interface SingleRoutingKeyTransaction<Type> {
    /**
     * Sends an event to the stream just like {@link EventStreamWriter#writeEvent} but with the caveat that
     * the message will not be visible to anyone until {@link #commit()} is called.
     *
     * So all events written this way will be fully ordered and contiguous when read.
     *
     * @param event The Event to write. (Null is disallowed)
     * @throws TxnFailedException thrown if the transaction has timed out. 
     */
    void writeEvent(Type event) throws TxnFailedException;
    
    /**
     * Causes all messages previously written to the transaction to go into the stream contiguously.
     * This operation will either fully succeed making all events consumable or fully fail such that none of them are.
     * Once this call returns, the readers are guaranteed to find the events in the transactions to be available for reads. 
     *
     * @throws TxnFailedException thrown if the transaction is timed out. 
     */
    void commit() throws TxnFailedException;
    
    /**
     * Drops the transaction, causing all events written to it to be discarded.
     */
    void abort();
}

The new SingleRoutingKeyTransactionWriter implementation will be responsible for creating new singleRoutingKey transactions on the stream. This will maintain a state which will include the underlying stream segment corresponding to the routing key. When begin transaction is invoked on it, it will result in a new SingleRoutingKeyTransaction object being created. Logically this implementation will be a modified SegmentOutputStream object which will be responsible for performing the Append block wire protocol over a new connection. This new segment output stream writer will be similar to SegmentOutputStream but without automatic batching. This segment writer would maintain state about all the inflight events and the writer id and the event sequence numbers.

This class will also need to tackle failure scenarios like connection glitches or segment sealed events. To handle those cases, the segment writer will buffer all the data in the transaction locally and retransmit the data in case it was interrupted due to one of the handleable failure scenarios.

Sample usage:

        SingleRoutingKeyTransactionWriter<String> txnWriter = clientFactory.singleRoutingKeyTransactionWriter("myRoutingKey", new JavaSerializer<String>()); 

        SingleRoutingKeyTransaction<String> txn = txnWriter.beginTransaction(); 

        txn.writeEvent("first txn 1") 
              .thenCompose(v -> txn.writeEvent("first txn 2")) 
              .thenCompose(v -> txn.writeEvent("first txn 3")) 
              .join(); 

        txn.commit();  

        SingleRoutingKeyTransaction<String> txn2 = txnWriter.beginTransaction(); 

        txn2.writeEvent("second txn 1") 
              .thenCompose(v -> txn.writeEvent("second txn 2")) 
              .join(); 

        txn2.commit();  

Handling failure scenarios:

Intermediate connection failures:

Since the batch is completed only when the writer sends append batch end, any partially created batch is discarded by segment store in case of connection failures and the client will establish a new connection with the new setup append and retransmit all the events.

In case the client had issued an append block end, it would reuse the same writer id while reconnecting to the segment and as part of setup append, it would know the event sequence number till which it had written the data and if that sequence number is ahead of the starting sequence number for the current transaction, then the current transaction is considered committed.

Segment Sealed

If a segment is found to be sealed, the SingleRoutingKeyTranscation object will request LWTWriter for the successor segment and replace the previous LWTSegmentOutputStream with the successor segment writer and initiate a new setup append with the segment store for the new segment and retransmit all the events from the buffer.

If the writer had already issued AppendBlockEnd but there was a connection glitch followed by a segment sealed, the writer will check the state with the sealed segment to determine the last event isequence number and if the transaction is not committed already, then it’s events will be retransmitted to successor segment.

Unhandled failure case:

There is a corner case which also plagues pravega’s exactly once write guarantees – client encounters network glitch after issuing Setup block end and when it reconnects it finds the segment to have been deleted. In such cases there is no way for the client to determine if the previous attempt to write was successful or not. But this is very improbable and can happen only if during the network glitch time the stream is scaled and then truncated which deletes the said segment. The exactly once guarantee for writes is impossible to ascertain in this situation for both regular writers and light weight transactions as they rely on segment attributes of previous segment to determine if their write had succeeded or not.

Automatically Aborting or timing out transactions

Users can either explicitly abort a transaction or it would be timed out if users do not issue a commit on the transaction within a desired time. This will ensure that we release any buffers on client that were created for the ta for this transaction. Since these are singleRoutingKey transactions and there is no centralized coordination, we can keep the default timeouts small. But since this timeout only affects the resources on the client, we can tolerate arbitrarily large timeouts too.

Limitations:

The drawback of singleRoutingKey transaction approach is that since this is a client side only approach, the client is required to maintain all the state of transaction in its memory, which includes buffering all the inflight events until user calls commit. We need to do this primarily to tackle failure scenarios like network glitches or segment sealed events. This means we ought to impose some limits on the amount of data in a single transaction otherwise it could overwhelm the client memory and possibly require wasteful network retransmissions.

There is an equivalent cost on the segment store also because it buffers the append blocks until an append block end is received. Without a limit on the size of an append block, arbitrarily large amounts of data could potentially be required to be buffered on the server, which would be detrimental.

The question is what is a reasonable limit and should it be based on serialized size or number of events? If its serialized blob’s size, users have no real way of controlling the serialized size explicitly most of the times as they rely on third party libraries for serialization. If its number of events, that may not be a good idea either because that does not really constraint the size. We could keep it open but that opens us up for all sorts of misuses.

Currently the writer batching size is bounded by a fixed upper bound of 16 MB. But that size limit doesn’t impact the user’s business logic, instead if only affects when the batch is flushed to segment store. With a light weight transaction we intend to accumulate the batch locally and then perform a single flush, so limiting this to 16 MBs ensures that the append batches are contained within that size.

This limit needs to be communicated to the user and enforced in a meaningful fashion. And if the size of transactional payload exceeds 16mb, we could throw meaningful exception (MaxTransactionSizeExceededException).

Clone this wiki locally