Skip to content

Commit

Permalink
Hold body pointer in transaction log entry
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Jan 9, 2025
1 parent ebd6fe1 commit c7eef11
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package sleeper.core.statestore.transactionlog;

import sleeper.core.statestore.transactionlog.transactions.TransactionType;

import java.util.Optional;

/**
Expand Down Expand Up @@ -63,6 +65,10 @@ public <T extends StateStoreTransaction<?>> T getTransaction() {
return (T) transaction;
}

public TransactionType getTransactionType() {
return TransactionType.getType(transaction);
}

public Optional<TransactionBodyPointer> getBodyPointer() {
return Optional.ofNullable(bodyPointer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package sleeper.core.statestore.transactionlog;

import sleeper.core.statestore.transactionlog.transactions.TransactionType;

import java.time.Instant;
import java.util.Objects;
import java.util.Optional;

/**
* An entry in a state store transaction log.
Expand All @@ -25,12 +28,26 @@ public class TransactionLogEntry {

private final long transactionNumber;
private final Instant updateTime;
private final TransactionType transactionType;
private final TransactionBodyPointer bodyPointer;
private final StateStoreTransaction<?> transaction;

public TransactionLogEntry(long transactionNumber, Instant updateTime, StateStoreTransaction<?> transaction) {
this(transactionNumber, updateTime, AddTransactionRequest.transaction(transaction));
}

public TransactionLogEntry(long transactionNumber, Instant updateTime, AddTransactionRequest request) {
this.transactionNumber = transactionNumber;
this.updateTime = updateTime;
this.transaction = transaction;
this.transactionType = TransactionType.getType(request.getTransaction());
Optional<TransactionBodyPointer> bodyPointer = request.getBodyPointer();
if (bodyPointer.isPresent()) {
this.bodyPointer = bodyPointer.get();
this.transaction = null;
} else {
this.bodyPointer = null;
this.transaction = request.getTransaction();
}
}

public long getTransactionNumber() {
Expand All @@ -41,8 +58,16 @@ public Instant getUpdateTime() {
return updateTime;
}

public StateStoreTransaction<?> getTransaction() {
return transaction;
public TransactionType getTransactionType() {
return transactionType;
}

public Optional<TransactionBodyPointer> getBodyPointer() {
return Optional.ofNullable(bodyPointer);
}

public Optional<StateStoreTransaction<?>> getTransaction() {
return Optional.ofNullable(transaction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TransactionLogHead<T> {

private final TableStatus sleeperTable;
private final TransactionLogStore logStore;
private final TransactionBodyStore bodyStore;
private final boolean updateLogBeforeAddTransaction;
private final int maxAddTransactionAttempts;
private final ExponentialBackoffWithJitter retryBackoff;
Expand All @@ -59,6 +60,7 @@ class TransactionLogHead<T> {
private TransactionLogHead(Builder<T> builder) {
sleeperTable = builder.sleeperTable;
logStore = builder.logStore;
bodyStore = builder.bodyStore;
updateLogBeforeAddTransaction = builder.updateLogBeforeAddTransaction;
maxAddTransactionAttempts = builder.maxAddTransactionAttempts;
retryBackoff = builder.retryBackoff;
Expand Down Expand Up @@ -88,16 +90,15 @@ void addTransaction(Instant updateTime, StateStoreTransaction<T> transaction) th
*/
void addTransaction(Instant updateTime, AddTransactionRequest request) throws StateStoreException {
Instant startTime = Instant.now();
StateStoreTransaction<T> transaction = request.getTransaction();
LOGGER.debug("Adding transaction of type {} to table {}",
transaction.getClass().getSimpleName(), sleeperTable);
request.getTransactionType(), sleeperTable);
Exception failure = new IllegalArgumentException("No attempts made");
for (int attempt = 1; attempt <= maxAddTransactionAttempts; attempt++) {
prepareAddTransactionAttempt(attempt, transaction);
prepareAddTransactionAttempt(attempt, request.getTransaction());
try {
attemptAddTransaction(updateTime, transaction);
attemptAddTransaction(updateTime, request);
LOGGER.info("Added transaction of type {} to table {} with {} attempts, took {}",
transaction.getClass().getSimpleName(), sleeperTable, attempt,
request.getTransactionType(), sleeperTable, attempt,
LoggedDuration.withShortOutput(startTime, Instant.now()));
return;
} catch (DuplicateTransactionNumberException e) {
Expand All @@ -113,13 +114,13 @@ void addTransaction(Instant updateTime, AddTransactionRequest request) throws St

private void prepareAddTransactionAttempt(int attempt, StateStoreTransaction<T> transaction) throws StateStoreException {
if (updateLogBeforeAddTransaction) {
waitBeforeAttempt(attempt);
forceUpdate();
validate(transaction);
waitBeforeAttempt(attempt);
} else if (attempt > 1) {
waitBeforeAttempt(attempt - 1);
forceUpdate();
validate(transaction);
waitBeforeAttempt(attempt - 1);
} else {
try {
validate(transaction);
Expand All @@ -146,14 +147,15 @@ private void waitBeforeAttempt(int attempt) throws StateStoreException {
}
}

private void attemptAddTransaction(Instant updateTime, StateStoreTransaction<T> transaction) throws StateStoreException, DuplicateTransactionNumberException {
private void attemptAddTransaction(Instant updateTime, AddTransactionRequest request) throws StateStoreException, DuplicateTransactionNumberException {
long transactionNumber = lastTransactionNumber + 1;
try {
logStore.addTransaction(new TransactionLogEntry(transactionNumber, updateTime, transaction));
logStore.addTransaction(new TransactionLogEntry(transactionNumber, updateTime, request));
} catch (RuntimeException e) {
throw new StateStoreException("Failed adding transaction", e);
}
Instant startApplyTime = Instant.now();
StateStoreTransaction<T> transaction = request.getTransaction();
transaction.apply(state, updateTime);
lastTransactionNumber = transactionNumber;
LOGGER.debug("Applied transaction {} in {}",
Expand Down Expand Up @@ -248,14 +250,14 @@ private void updateFromLog(Instant startTime) {
}

private void applyTransaction(TransactionLogEntry entry) {
if (!transactionType.isInstance(entry.getTransaction())) {
if (!transactionType.isAssignableFrom(entry.getTransactionType().getType())) {
LOGGER.warn("Found unexpected transaction type for table {} with number {}. Expected {}, found {}",
sleeperTable, entry.getTransactionNumber(),
transactionType.getClass().getName(),
entry.getTransaction().getClass().getName());
transactionType.getName(),
entry.getTransactionType());
return;
}
transactionType.cast(entry.getTransaction())
transactionType.cast(entry.getTransaction().orElseThrow())
.apply(state, entry.getUpdateTime());
lastTransactionNumber = entry.getTransactionNumber();
}
Expand All @@ -276,6 +278,7 @@ long lastTransactionNumber() {
static class Builder<T> {
private TableStatus sleeperTable;
private TransactionLogStore logStore;
private TransactionBodyStore bodyStore;
private boolean updateLogBeforeAddTransaction = true;
private int maxAddTransactionAttempts;
private ExponentialBackoffWithJitter retryBackoff;
Expand All @@ -301,6 +304,11 @@ public Builder<T> logStore(TransactionLogStore logStore) {
return this;
}

public Builder<T> bodyStore(TransactionBodyStore bodyStore) {
this.bodyStore = bodyStore;
return this;
}

public Builder<T> updateLogBeforeAddTransaction(boolean updateLogBeforeAddTransaction) {
this.updateLogBeforeAddTransaction = updateLogBeforeAddTransaction;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class StoreTransactionBodySeparately {
private TransactionLogStateStore store = (TransactionLogStateStore) TransactionLogStateStoreLogSpecificTest.this.store;

@Test
@Disabled("TODO")
void shouldAddFileTransactionWhoseBodyIsHeldInS3() {
// Given
FileReference file = fileFactory().rootFile("file.parquet", 100);
Expand All @@ -448,10 +449,11 @@ void shouldAddFileTransactionWhoseBodyIsHeldInS3() {
store.addTransaction(AddTransactionRequest.transactionInBucket(bucket, key, transaction));

// Then
assertThat(store.getFileReferences()).containsExactly(file);
assertThat(otherProcess().getFileReferences()).containsExactly(file);
}

@Test
@Disabled("TODO")
void shouldAddPartitionTransactionWhoseBodyIsHeldInS3() {
// Given
PartitionTree tree = partitions.splitToNewChildren("root", "L", "R", "m").buildTree();
Expand All @@ -464,7 +466,7 @@ void shouldAddPartitionTransactionWhoseBodyIsHeldInS3() {
store.addTransaction(AddTransactionRequest.transactionInBucket(bucket, key, transaction));

// Then
assertThat(store.getAllPartitions()).isEqualTo(tree.getAllPartitions());
assertThat(otherProcess().getAllPartitions()).isEqualTo(tree.getAllPartitions());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private DynamoDBTransactionLogStore(
@Override
public void addTransaction(TransactionLogEntry entry) throws DuplicateTransactionNumberException {
long transactionNumber = entry.getTransactionNumber();
StateStoreTransaction<?> transaction = entry.getTransaction();
StateStoreTransaction<?> transaction = entry.getTransaction().orElseThrow();
try {
dynamo.putItem(new PutItemRequest()
.withTableName(logTableName)
Expand Down

0 comments on commit c7eef11

Please sign in to comment.