diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/AddTransactionRequest.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/AddTransactionRequest.java index 4b6eaf085e..58a1b154a3 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/AddTransactionRequest.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/AddTransactionRequest.java @@ -15,6 +15,8 @@ */ package sleeper.core.statestore.transactionlog; +import sleeper.core.statestore.transactionlog.transactions.TransactionType; + import java.util.Optional; /** @@ -63,6 +65,10 @@ public > T getTransaction() { return (T) transaction; } + public TransactionType getTransactionType() { + return TransactionType.getType(transaction); + } + public Optional getBodyPointer() { return Optional.ofNullable(bodyPointer); } diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogEntry.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogEntry.java index 7d08aa29d5..ea55acc455 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogEntry.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogEntry.java @@ -15,8 +15,11 @@ */ package sleeper.core.statestore.transactionlog; +import sleeper.core.statestore.transactionlog.transactions.TransactionType; + import java.time.Instant; import java.util.Objects; +import java.util.Optional; /** * An entry in a state store transaction log. @@ -25,12 +28,26 @@ public class TransactionLogEntry { private final long transactionNumber; private final Instant updateTime; + private final TransactionType transactionType; + private final TransactionBodyPointer bodyPointer; private final StateStoreTransaction transaction; public TransactionLogEntry(long transactionNumber, Instant updateTime, StateStoreTransaction transaction) { + this(transactionNumber, updateTime, AddTransactionRequest.transaction(transaction)); + } + + public TransactionLogEntry(long transactionNumber, Instant updateTime, AddTransactionRequest request) { this.transactionNumber = transactionNumber; this.updateTime = updateTime; - this.transaction = transaction; + this.transactionType = TransactionType.getType(request.getTransaction()); + Optional bodyPointer = request.getBodyPointer(); + if (bodyPointer.isPresent()) { + this.bodyPointer = bodyPointer.get(); + this.transaction = null; + } else { + this.bodyPointer = null; + this.transaction = request.getTransaction(); + } } public long getTransactionNumber() { @@ -41,8 +58,16 @@ public Instant getUpdateTime() { return updateTime; } - public StateStoreTransaction getTransaction() { - return transaction; + public TransactionType getTransactionType() { + return transactionType; + } + + public Optional getBodyPointer() { + return Optional.ofNullable(bodyPointer); + } + + public Optional> getTransaction() { + return Optional.ofNullable(transaction); } @Override diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java index c04a621dad..23793c74e5 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java @@ -42,6 +42,7 @@ class TransactionLogHead { private final TableStatus sleeperTable; private final TransactionLogStore logStore; + private final TransactionBodyStore bodyStore; private final boolean updateLogBeforeAddTransaction; private final int maxAddTransactionAttempts; private final ExponentialBackoffWithJitter retryBackoff; @@ -59,6 +60,7 @@ class TransactionLogHead { private TransactionLogHead(Builder builder) { sleeperTable = builder.sleeperTable; logStore = builder.logStore; + bodyStore = builder.bodyStore; updateLogBeforeAddTransaction = builder.updateLogBeforeAddTransaction; maxAddTransactionAttempts = builder.maxAddTransactionAttempts; retryBackoff = builder.retryBackoff; @@ -88,16 +90,15 @@ void addTransaction(Instant updateTime, StateStoreTransaction transaction) th */ void addTransaction(Instant updateTime, AddTransactionRequest request) throws StateStoreException { Instant startTime = Instant.now(); - StateStoreTransaction transaction = request.getTransaction(); LOGGER.debug("Adding transaction of type {} to table {}", - transaction.getClass().getSimpleName(), sleeperTable); + request.getTransactionType(), sleeperTable); Exception failure = new IllegalArgumentException("No attempts made"); for (int attempt = 1; attempt <= maxAddTransactionAttempts; attempt++) { - prepareAddTransactionAttempt(attempt, transaction); + prepareAddTransactionAttempt(attempt, request.getTransaction()); try { - attemptAddTransaction(updateTime, transaction); + attemptAddTransaction(updateTime, request); LOGGER.info("Added transaction of type {} to table {} with {} attempts, took {}", - transaction.getClass().getSimpleName(), sleeperTable, attempt, + request.getTransactionType(), sleeperTable, attempt, LoggedDuration.withShortOutput(startTime, Instant.now())); return; } catch (DuplicateTransactionNumberException e) { @@ -113,13 +114,13 @@ void addTransaction(Instant updateTime, AddTransactionRequest request) throws St private void prepareAddTransactionAttempt(int attempt, StateStoreTransaction transaction) throws StateStoreException { if (updateLogBeforeAddTransaction) { + waitBeforeAttempt(attempt); forceUpdate(); validate(transaction); - waitBeforeAttempt(attempt); } else if (attempt > 1) { + waitBeforeAttempt(attempt - 1); forceUpdate(); validate(transaction); - waitBeforeAttempt(attempt - 1); } else { try { validate(transaction); @@ -146,14 +147,15 @@ private void waitBeforeAttempt(int attempt) throws StateStoreException { } } - private void attemptAddTransaction(Instant updateTime, StateStoreTransaction transaction) throws StateStoreException, DuplicateTransactionNumberException { + private void attemptAddTransaction(Instant updateTime, AddTransactionRequest request) throws StateStoreException, DuplicateTransactionNumberException { long transactionNumber = lastTransactionNumber + 1; try { - logStore.addTransaction(new TransactionLogEntry(transactionNumber, updateTime, transaction)); + logStore.addTransaction(new TransactionLogEntry(transactionNumber, updateTime, request)); } catch (RuntimeException e) { throw new StateStoreException("Failed adding transaction", e); } Instant startApplyTime = Instant.now(); + StateStoreTransaction transaction = request.getTransaction(); transaction.apply(state, updateTime); lastTransactionNumber = transactionNumber; LOGGER.debug("Applied transaction {} in {}", @@ -248,14 +250,14 @@ private void updateFromLog(Instant startTime) { } private void applyTransaction(TransactionLogEntry entry) { - if (!transactionType.isInstance(entry.getTransaction())) { + if (!transactionType.isAssignableFrom(entry.getTransactionType().getType())) { LOGGER.warn("Found unexpected transaction type for table {} with number {}. Expected {}, found {}", sleeperTable, entry.getTransactionNumber(), - transactionType.getClass().getName(), - entry.getTransaction().getClass().getName()); + transactionType.getName(), + entry.getTransactionType()); return; } - transactionType.cast(entry.getTransaction()) + transactionType.cast(entry.getTransaction().orElseThrow()) .apply(state, entry.getUpdateTime()); lastTransactionNumber = entry.getTransactionNumber(); } @@ -276,6 +278,7 @@ long lastTransactionNumber() { static class Builder { private TableStatus sleeperTable; private TransactionLogStore logStore; + private TransactionBodyStore bodyStore; private boolean updateLogBeforeAddTransaction = true; private int maxAddTransactionAttempts; private ExponentialBackoffWithJitter retryBackoff; @@ -301,6 +304,11 @@ public Builder logStore(TransactionLogStore logStore) { return this; } + public Builder bodyStore(TransactionBodyStore bodyStore) { + this.bodyStore = bodyStore; + return this; + } + public Builder updateLogBeforeAddTransaction(boolean updateLogBeforeAddTransaction) { this.updateLogBeforeAddTransaction = updateLogBeforeAddTransaction; return this; diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java index c278dad874..1d10cbc07c 100644 --- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java @@ -436,6 +436,7 @@ class StoreTransactionBodySeparately { private TransactionLogStateStore store = (TransactionLogStateStore) TransactionLogStateStoreLogSpecificTest.this.store; @Test + @Disabled("TODO") void shouldAddFileTransactionWhoseBodyIsHeldInS3() { // Given FileReference file = fileFactory().rootFile("file.parquet", 100); @@ -448,10 +449,11 @@ void shouldAddFileTransactionWhoseBodyIsHeldInS3() { store.addTransaction(AddTransactionRequest.transactionInBucket(bucket, key, transaction)); // Then - assertThat(store.getFileReferences()).containsExactly(file); + assertThat(otherProcess().getFileReferences()).containsExactly(file); } @Test + @Disabled("TODO") void shouldAddPartitionTransactionWhoseBodyIsHeldInS3() { // Given PartitionTree tree = partitions.splitToNewChildren("root", "L", "R", "m").buildTree(); @@ -464,7 +466,7 @@ void shouldAddPartitionTransactionWhoseBodyIsHeldInS3() { store.addTransaction(AddTransactionRequest.transactionInBucket(bucket, key, transaction)); // Then - assertThat(store.getAllPartitions()).isEqualTo(tree.getAllPartitions()); + assertThat(otherProcess().getAllPartitions()).isEqualTo(tree.getAllPartitions()); } @Test diff --git a/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStore.java b/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStore.java index ec6d3e376d..d3ad61f59f 100644 --- a/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStore.java +++ b/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStore.java @@ -128,7 +128,7 @@ private DynamoDBTransactionLogStore( @Override public void addTransaction(TransactionLogEntry entry) throws DuplicateTransactionNumberException { long transactionNumber = entry.getTransactionNumber(); - StateStoreTransaction transaction = entry.getTransaction(); + StateStoreTransaction transaction = entry.getTransaction().orElseThrow(); try { dynamo.putItem(new PutItemRequest() .withTableName(logTableName)