Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Handle KAE (or other exceptions of similar type) in the future when w…
Browse files Browse the repository at this point in the history
…itnessing transactions (#6533)

Handles transactions that may or may have not been witnessed due to errors (such as KeyAlreadyExistsException) in the workload server.
  • Loading branch information
Sam-Kramer authored May 3, 2023
1 parent f2f0426 commit 2857ac2
Show file tree
Hide file tree
Showing 21 changed files with 464 additions and 69 deletions.
1 change: 1 addition & 0 deletions atlasdb-workload-server-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ dependencies {
annotationProcessor 'org.immutables:value'

testImplementation 'com.palantir.safe-logging:preconditions-assertj'
testImplementation 'org.mockito:mockito-core'
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public interface ReadableTransactionStore {
* @return The value of the cell for a given table.
*/
Optional<Integer> get(String table, WorkloadCell cell);

/**
* Checks whether the transaction with the provided startTimestamp has committed.
*/
boolean isCommitted(long startTimestamp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.transaction.witnessed;

import org.immutables.value.Value;

/**
* For transactions that we know for certain we've witnessed.
*/
@Value.Immutable
public interface FullyWitnessedTransaction extends WitnessedTransaction {
@Override
default <T> T accept(WitnessedTransactionVisitor<T> visitor) {
return visitor.visit(this);
}

static ImmutableFullyWitnessedTransaction.Builder builder() {
return ImmutableFullyWitnessedTransaction.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.transaction.witnessed;

import com.palantir.logsafe.Preconditions;
import org.immutables.value.Value;

/**
* For transactions that we witnessed, but are in a state which we are
* unsure if they have committed or not due to failures.
*/
@Value.Immutable
public interface MaybeWitnessedTransaction extends WitnessedTransaction {
@Value.Check
default void check() {
Preconditions.checkArgument(
commitTimestamp().isPresent(),
"Given how the transaction protocol works, a transaction for which we haven't retrieved the commit "
+ "timestamp has not written anything of significance to the database");
}

@Override
default <T> T accept(WitnessedTransactionVisitor<T> visitor) {
return visitor.visit(this);
}

static ImmutableMaybeWitnessedTransaction.Builder builder() {
return ImmutableMaybeWitnessedTransaction.builder();
}

default FullyWitnessedTransaction toFullyWitnessed() {
return FullyWitnessedTransaction.builder()
.startTimestamp(startTimestamp())
.commitTimestamp(commitTimestamp())
.actions(actions())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import java.util.List;
import java.util.Optional;
import org.immutables.value.Value;

@Value.Immutable
public interface WitnessedTransaction {
/** Start timestamp of the transaction. */
long startTimestamp();
Expand All @@ -30,4 +28,6 @@ public interface WitnessedTransaction {

/** Provides an in-order list of actions that were performed during the transaction's execution. */
List<WitnessedTransactionAction> actions();

<T> T accept(WitnessedTransactionVisitor<T> visitor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.transaction.witnessed;

public interface WitnessedTransactionVisitor<T> {
T visit(FullyWitnessedTransaction witnessedTransaction);

T visit(MaybeWitnessedTransaction maybeWitnessedTransaction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ public interface WorkflowHistory {
// We may want to query the KVS during verification.
ReadableTransactionStore transactionStore();

/**
* TODO: This should only be a list of
* {@link com.palantir.atlasdb.workload.transaction.witnessed.FullyWitnessedTransaction} rather than the general
* purpose type.
*/
List<WitnessedTransaction> history();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.transaction.witnessed;

import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.mock;

import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import java.util.List;
import org.junit.Test;

public class MaybeWitnessedTransactionTest {

private static final Long START_TIMESTAMP = 100L;
private static final Long COMMIT_TIMESTAMP = 200L;

@Test
public void throwsWhenCommitTimestampIsNotPresentOnCreation() {
assertThatLoggableExceptionThrownBy(() -> MaybeWitnessedTransaction.builder()
.startTimestamp(START_TIMESTAMP)
.build())
.isInstanceOf(SafeIllegalArgumentException.class)
.hasMessageContaining(
"Given how the transaction protocol works, a transaction for which we haven't retrieved the"
+ " commit timestamp");
}

@Test
public void doesNotThrowWhenCommitTimestampPresentOnCreation() {
assertThatCode(() -> MaybeWitnessedTransaction.builder()
.startTimestamp(START_TIMESTAMP)
.commitTimestamp(COMMIT_TIMESTAMP)
.build())
.doesNotThrowAnyException();
}

@Test
public void toFullyWitnessedCopiesArgumentsCorrectly() {
WitnessedWriteTransactionAction writeTransactionAction = mock(WitnessedWriteTransactionAction.class);
WitnessedReadTransactionAction readTransactionAction = mock(WitnessedReadTransactionAction.class);
List<WitnessedTransactionAction> actions = List.of(readTransactionAction, writeTransactionAction);
FullyWitnessedTransaction fullyWitnessedTransaction = MaybeWitnessedTransaction.builder()
.startTimestamp(START_TIMESTAMP)
.commitTimestamp(COMMIT_TIMESTAMP)
.actions(actions)
.build()
.toFullyWitnessed();
assertThat(fullyWitnessedTransaction.startTimestamp()).isEqualTo(START_TIMESTAMP);
assertThat(fullyWitnessedTransaction.commitTimestamp()).contains(COMMIT_TIMESTAMP);
assertThat(fullyWitnessedTransaction.actions()).containsExactlyElementsOf(actions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.PreCommitCondition;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionCommitFailedException;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.service.TransactionStatus;
import com.palantir.atlasdb.workload.transaction.DeleteTransactionAction;
import com.palantir.atlasdb.workload.transaction.InteractiveTransaction;
import com.palantir.atlasdb.workload.transaction.ReadTransactionAction;
import com.palantir.atlasdb.workload.transaction.TransactionAction;
import com.palantir.atlasdb.workload.transaction.TransactionActionVisitor;
import com.palantir.atlasdb.workload.transaction.WriteTransactionAction;
import com.palantir.atlasdb.workload.transaction.witnessed.ImmutableWitnessedTransaction;
import com.palantir.atlasdb.workload.transaction.witnessed.FullyWitnessedTransaction;
import com.palantir.atlasdb.workload.transaction.witnessed.MaybeWitnessedTransaction;
import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransaction;
import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransactionAction;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
Expand Down Expand Up @@ -63,6 +67,13 @@ public Optional<Integer> get(String table, WorkloadCell cell) {
transaction -> new AtlasDbInteractiveTransaction(transaction, tables).read(table, cell));
}

@Override
public boolean isCommitted(long startTimestamp) {
return TransactionStatus.getCommitTimestamp(
transactionManager.getTransactionService().getV2(startTimestamp))
.isPresent();
}

@Override
public Optional<WitnessedTransaction> readWrite(List<TransactionAction> actions) {
return readWrite(txn -> {
Expand All @@ -74,23 +85,26 @@ public Optional<WitnessedTransaction> readWrite(List<TransactionAction> actions)
@Override
public Optional<WitnessedTransaction> readWrite(Consumer<InteractiveTransaction> interactiveTransactionConsumer) {
AtomicReference<List<WitnessedTransactionAction>> witnessedActionsReference = new AtomicReference<>();
AtomicReference<Transaction> transactionReference = new AtomicReference<>();
Supplier<CommitTimestampProvider> commitTimestampProvider =
Suppliers.memoize(() -> new CommitTimestampProvider());
try {
Transaction transaction =
transactionManager.runTaskWithConditionWithRetry(commitTimestampProvider, (txn, _condition) -> {
AtlasDbInteractiveTransaction atlasDbInteractiveTransaction =
new AtlasDbInteractiveTransaction(txn, tables);
interactiveTransactionConsumer.accept(atlasDbInteractiveTransaction);
witnessedActionsReference.set(atlasDbInteractiveTransaction.witness());
return txn;
});
transactionManager.runTaskWithConditionWithRetry(commitTimestampProvider, (txn, _condition) -> {
transactionReference.set(txn);
AtlasDbInteractiveTransaction atlasDbInteractiveTransaction =
new AtlasDbInteractiveTransaction(txn, tables);
interactiveTransactionConsumer.accept(atlasDbInteractiveTransaction);
witnessedActionsReference.set(atlasDbInteractiveTransaction.witness());
return null;
});

Transaction transaction = transactionReference.get();

if (transaction.isAborted()) {
return Optional.empty();
}

return Optional.of(ImmutableWitnessedTransaction.builder()
return Optional.of(FullyWitnessedTransaction.builder()
.startTimestamp(transaction.getTimestamp())
.commitTimestamp(commitTimestampProvider
.get()
Expand All @@ -99,9 +113,22 @@ public Optional<WitnessedTransaction> readWrite(Consumer<InteractiveTransaction>
.build());
} catch (SafeIllegalArgumentException e) {
throw e;
} catch (TransactionCommitFailedException e) {
Transaction transaction = transactionReference.get();
return Optional.of(MaybeWitnessedTransaction.builder()
.startTimestamp(transaction.getTimestamp())
.commitTimestamp(commitTimestampProvider
.get()
.getCommitTimestampOrThrowIfMaybeNotCommitted(transaction.getTimestamp()))
.actions(witnessedActionsReference.get())
.build());
} catch (Exception e) {
// TODO: Need to eventually handle PuE exceptions, as they could've succeeded in committing.
log.info("Failed to record transaction due to an exception", e);
Optional<Long> startTimestamp =
Optional.ofNullable(transactionReference.get()).map(Transaction::getTimestamp);
log.info(
"Failed to record transaction due to an exception for startTimestamp {}",
SafeArg.of("startTimestamp", startTimestamp),
e);
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public ReadOnlyTransactionStore(ReadableTransactionStore delegate) {
public Optional<Integer> get(String table, WorkloadCell cell) {
return delegate.get(table, cell);
}

@Override
public boolean isCommitted(long startTimestamp) {
return delegate.isCommitted(startTimestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.workload.transaction.witnessed;

import com.palantir.atlasdb.workload.store.ReadOnlyTransactionStore;
import java.util.Optional;

public final class OnlyCommittedWitnessedTransactionVisitor
implements WitnessedTransactionVisitor<Optional<FullyWitnessedTransaction>> {

private final ReadOnlyTransactionStore readOnlyTransactionStore;

public OnlyCommittedWitnessedTransactionVisitor(ReadOnlyTransactionStore readOnlyTransactionStore) {
this.readOnlyTransactionStore = readOnlyTransactionStore;
}

@Override
public Optional<FullyWitnessedTransaction> visit(FullyWitnessedTransaction witnessedTransaction) {
return Optional.of(witnessedTransaction);
}

@Override
public Optional<FullyWitnessedTransaction> visit(MaybeWitnessedTransaction maybeWitnessedTransaction) {
return readOnlyTransactionStore.isCommitted(maybeWitnessedTransaction.startTimestamp())
? Optional.of(maybeWitnessedTransaction.toFullyWitnessed())
: Optional.empty();
}
}
Loading

0 comments on commit 2857ac2

Please sign in to comment.