Skip to content

Commit

Permalink
[3.2][Kernel][Writes] Support idempotent writes (#3051)
Browse files Browse the repository at this point in the history
(Split from #2944)

Adds an API on `TransactionBuilder` to take the transaction identifier
for idempotent writes
```
    /*
     * Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
     * streaming systems) that track progress using their own application-specific versions need to
     * record what progress has been made, in order to avoid duplicating data in the face of
     * failures and retries during writes. By setting the transaction identifier, the Delta table
     * can ensure that the data with same identifier is not written multiple times. For more
     * information refer to the Delta protocol section <a
     * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
     * Transaction Identifiers</a>.
     *
     * @param engine             {@link Engine} instance to use.
     * @param applicationId      The application ID that is writing to the table.
     * @param transactionVersion The version of the transaction. This should be monotonically
     *                           increasing with each write for the same application ID.
     * @return updated {@link TransactionBuilder} instance.
     */
    TransactionBuilder withTransactionId(
            Engine engine,
            String applicationId,
            long transactionVersion);
```

During the transaction build, check the latest txn version of the given
AppId. If it is not monotonically increasing throw
`ConcurrentTransactionException`.

Added to `DeltaTableWriteSuite.scala`
  • Loading branch information
vkorukanti committed May 6, 2024
1 parent f365eb0 commit 4ae6df6
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentTransactionException;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -48,10 +49,33 @@ public interface TransactionBuilder {
TransactionBuilder withPartitionColumns(Engine engine, List<String> partitionColumns);

/**
* Build the transaction. Also validates the given info to ensure that a valida transaction
* can be created.
* Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
* streaming systems) that track progress using their own application-specific versions need to
* record what progress has been made, in order to avoid duplicating data in the face of
* failures and retries during writes. By setting the transaction identifier, the Delta table
* can ensure that the data with same identifier is not written multiple times. For more
* information refer to the Delta protocol section <a
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
* Transaction Identifiers</a>.
*
* @param engine {@link Engine} instance to use.
* @param applicationId The application ID that is writing to the table.
* @param transactionVersion The version of the transaction. This should be monotonically
* increasing with each write for the same application ID.
* @return updated {@link TransactionBuilder} instance.
*/
TransactionBuilder withTransactionId(
Engine engine,
String applicationId,
long transactionVersion);

/**
* Build the transaction. Also validates the given info to ensure that a valid transaction can
* be created.
*
* @param engine {@link Engine} instance to use.
* @throws ConcurrentTransactionException if the table already has a committed transaction with
* the same given transaction identifier.
*/
Transaction build(Engine engine);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.TransactionBuilder;
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;

/**
* Thrown when concurrent transaction both attempt to update the table with same transaction
* identifier set through {@link TransactionBuilder#withTransactionId(Engine, String, long)}
* (String)}.
* <p>
* Incremental processing systems (e.g., streaming systems) that track progress using their own
* application-specific versions need to record what progress has been made, in order to avoid
* duplicating data in the face of failures and retries during writes. For more information refer to
* the Delta protocol section <a
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
* Transaction Identifiers</a>
*
* @since 3.2.0
*/
@Evolving
public class ConcurrentTransactionException extends ConcurrentWriteException {
private static final String message = "This error occurs when multiple updates are " +
"using the same transaction identifier to write into this table.\n" +
"Application ID: %s, Attempted version: %s, Latest version in table: %s";

public ConcurrentTransactionException(String appId, long txnVersion, long lastUpdated) {
super(String.format(message, appId, txnVersion, lastUpdated));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public ConcurrentWriteException() {
super("Transaction has encountered a conflict and can not be committed. " +
"Query needs to be re-executed using the latest version of the table.");
}

public ConcurrentWriteException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public static KernelException partitionColumnMissingInData(
return new KernelException(format(msgT, partitionColumn, tablePath));
}

public static KernelException concurrentTransaction(
String appId,
long txnVersion,
long lastUpdated) {
return new ConcurrentTransactionException(appId, txnVersion, lastUpdated);
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.internal;

import java.util.*;
import static java.util.Objects.requireNonNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private final Operation operation;
private Optional<StructType> schema = Optional.empty();
private Optional<List<String>> partitionColumns = Optional.empty();
private Optional<SetTransaction> setTxnOpt = Optional.empty();

public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation operation) {
this.table = table;
Expand All @@ -70,6 +72,19 @@ public TransactionBuilder withPartitionColumns(Engine engine, List<String> parti
return this;
}

@Override
public TransactionBuilder withTransactionId(
Engine engine,
String applicationId,
long transactionVersion) {
SetTransaction txnId = new SetTransaction(
requireNonNull(applicationId, "applicationId is null"),
transactionVersion,
Optional.of(currentTimeMillis));
this.setTxnOpt = Optional.of(txnId);
return this;
}

@Override
public Transaction build(Engine engine) {
SnapshotImpl snapshot;
Expand Down Expand Up @@ -97,7 +112,8 @@ public Transaction build(Engine engine) {
engineInfo,
operation,
snapshot.getProtocol(),
snapshot.getMetadata());
snapshot.getMetadata(),
setTxnOpt);
}

/**
Expand Down Expand Up @@ -131,6 +147,16 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
SchemaUtils.validatePartitionColumns(
schema.get(), partitionColumns.orElse(Collections.emptyList()));
}

setTxnOpt.ifPresent(txnId -> {
Optional<Long> lastTxnVersion = snapshot.getLatestTransactionVersion(txnId.getAppId());
if (lastTxnVersion.isPresent() && lastTxnVersion.get() >= txnId.getVersion()) {
throw DeltaErrors.concurrentTransaction(
txnId.getAppId(),
txnId.getVersion(),
lastTxnVersion.get());
}
});
}

private class InitialSnapshot extends SnapshotImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TransactionImpl
private final Protocol protocol;
private final Metadata metadata;
private final SnapshotImpl readSnapshot;
private final Optional<SetTransaction> setTxnOpt;

private boolean closed; // To avoid trying to commit the same transaction again.

Expand All @@ -65,7 +66,8 @@ public TransactionImpl(
String engineInfo,
Operation operation,
Protocol protocol,
Metadata metadata) {
Metadata metadata,
Optional<SetTransaction> setTxnOpt) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -74,6 +76,7 @@ public TransactionImpl(
this.operation = operation;
this.protocol = protocol;
this.metadata = metadata;
this.setTxnOpt = setTxnOpt;
}

@Override
Expand Down Expand Up @@ -106,6 +109,7 @@ public TransactionCommitResult commit(
metadataActions.add(createMetadataSingleAction(metadata.toRow()));
metadataActions.add(createProtocolSingleAction(protocol.toRow()));
}
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));

try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {
// Create a new CloseableIterator that will return the metadata actions followed by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch
import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase
import io.delta.kernel.defaults.utils.TestRow
import io.delta.kernel.engine.Engine
import io.delta.kernel.exceptions.{KernelException, TableAlreadyExistsException, TableNotFoundException}
import io.delta.kernel.exceptions._
import io.delta.kernel.expressions.Literal
import io.delta.kernel.expressions.Literal._
import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement
Expand Down Expand Up @@ -430,7 +430,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val expV0Data = v0Part0Data.flatMap(_.toTestRows) ++ v0Part1Data.flatMap(_.toTestRows)
val expV1Data = v1Part0Data.flatMap(_.toTestRows) ++ v1Part1Data.flatMap(_.toTestRows)

for(i <- 0 until 2) {
for (i <- 0 until 2) {
val commitResult = appendData(
engine,
tblPath,
Expand Down Expand Up @@ -673,6 +673,76 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("insert into table - idempotent writes") {
withTempDirAndEngine { (tblPath, engine) =>
val data = Seq(Map("part1" -> ofInt(1), "part2" -> ofInt(2)) -> dataPartitionBatches1)
var expData = Seq.empty[TestRow] // as the data in inserted, update this.

def addDataWithTxnId(newTbl: Boolean, appId: String, txnVer: Long, expTblVer: Long): Unit = {
var txnBuilder = createWriteTxnBuilder(Table.forPath(engine, tblPath))

if (appId != null) txnBuilder = txnBuilder.withTransactionId(engine, appId, txnVer)

if (newTbl) {
txnBuilder = txnBuilder.withSchema(engine, testPartitionSchema)
.withPartitionColumns(engine, testPartitionColumns.asJava)
}
val txn = txnBuilder.build(engine)

val combinedActions = inMemoryIterable(
data.map { case (partValues, partData) =>
stageData(txn.getTransactionState(engine), partValues, partData)
}.reduceLeft(_ combine _))

val commitResult = txn.commit(engine, combinedActions)

expData = expData ++ data.flatMap(_._2).flatMap(_.toTestRows)

verifyCommitResult(commitResult, expVersion = expTblVer, expIsReadyForCheckpoint = false)
val expPartCols = if (newTbl) testPartitionColumns else null
verifyCommitInfo(tblPath, version = expTblVer, expPartCols, operation = WRITE)
verifyWrittenContent(tblPath, testPartitionSchema, expData)
}

def expFailure(appId: String, txnVer: Long, latestTxnVer: Long)(fn: => Any): Unit = {
val ex = intercept[ConcurrentTransactionException] {
fn
}
assert(ex.getMessage.contains(s"This error occurs when multiple updates are using the " +
s"same transaction identifier to write into this table.\nApplication ID: $appId, " +
s"Attempted version: $txnVer, Latest version in table: $latestTxnVer"))
}

// Create a transaction with id (txnAppId1, 0) and commit it
addDataWithTxnId(newTbl = true, appId = "txnAppId1", txnVer = 0, expTblVer = 0)

// Try to create a transaction with id (txnAppId1, 0) and commit it - should be valid
addDataWithTxnId(newTbl = false, appId = "txnAppId1", txnVer = 1, expTblVer = 1)

// Try to create a transaction with id (txnAppId1, 1) and try to commit it
// Should fail the it is already committed above.
expFailure("txnAppId1", txnVer = 1, latestTxnVer = 1) {
addDataWithTxnId(newTbl = false, "txnAppId1", txnVer = 1, expTblVer = 2)
}

// append with no txn id
addDataWithTxnId(newTbl = false, appId = null, txnVer = 0, expTblVer = 2)

// Try to create a transaction with id (txnAppId2, 1) and commit it
// Should be successful as the transaction app id is different
addDataWithTxnId(newTbl = false, "txnAppId2", txnVer = 1, expTblVer = 3)

// Try to create a transaction with id (txnAppId2, 0) and commit it
// Should fail as the transaction app id is same but the version is less than the committed
expFailure("txnAppId2", txnVer = 0, latestTxnVer = 1) {
addDataWithTxnId(newTbl = false, "txnAppId2", txnVer = 0, expTblVer = 4)
}

// TODO: Add a test case where there are concurrent transactions with same app id
// and only one of them succeeds. Will be added once conflict resolution is handled
}
}

def verifyWrittenContent(path: String, expSchema: StructType, expData: Seq[TestRow]): Unit = {
val actSchema = tableSchema(path)
assert(actSchema === expSchema)
Expand Down

0 comments on commit 4ae6df6

Please sign in to comment.