-
Notifications
You must be signed in to change notification settings - Fork 15
Handle KAE (or other exceptions of similar type) in the future when witnessing transactions #6533
Changes from 6 commits
7113edf
81c62a6
ca39726
32dac63
374053b
23de549
37a96c8
f8bf341
bc61f18
fca9cff
3737bcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.atlasdb.workload.transaction.witnessed; | ||
|
||
import org.immutables.value.Value; | ||
|
||
/** | ||
* For transactions that we know for certain we've witnessed. | ||
*/ | ||
@Value.Immutable | ||
public interface FullyWitnessedTransaction extends WitnessedTransaction { | ||
@Override | ||
default <T> T accept(WitnessedTransactionVisitor<T> visitor) { | ||
return visitor.visit(this); | ||
} | ||
|
||
static ImmutableFullyWitnessedTransaction.Builder builder() { | ||
return ImmutableFullyWitnessedTransaction.builder(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.atlasdb.workload.transaction.witnessed; | ||
|
||
import com.palantir.logsafe.Preconditions; | ||
import org.immutables.value.Value; | ||
|
||
/** | ||
* For transactions that we witnessed, but are in a state which we are | ||
* unsure if they have committed or not due to failures. | ||
*/ | ||
@Value.Immutable | ||
public interface MaybeWitnessedTransaction extends WitnessedTransaction { | ||
@Value.Check | ||
default void check() { | ||
Preconditions.checkArgument( | ||
commitTimestamp().isPresent(), | ||
"Commit timestamp must be present in a potentially witnessed transaction, as otherwise it is not" | ||
+ " possible to validate whether or not a transaction has been committed."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can be stronger: something like |
||
} | ||
|
||
@Override | ||
default <T> T accept(WitnessedTransactionVisitor<T> visitor) { | ||
return visitor.visit(this); | ||
} | ||
|
||
static ImmutableMaybeWitnessedTransaction.Builder builder() { | ||
return ImmutableMaybeWitnessedTransaction.builder(); | ||
} | ||
|
||
default FullyWitnessedTransaction toFullyWitnessed() { | ||
return FullyWitnessedTransaction.builder() | ||
.startTimestamp(startTimestamp()) | ||
.commitTimestamp(commitTimestamp()) | ||
.actions(actions()) | ||
.build(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. non actionable: If we add an optional field to |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.atlasdb.workload.transaction.witnessed; | ||
|
||
public interface WitnessedTransactionVisitor<T> { | ||
T visit(FullyWitnessedTransaction witnessedTransaction); | ||
|
||
T visit(MaybeWitnessedTransaction maybeWitnessedTransaction); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.atlasdb.workload.transaction.witnessed; | ||
|
||
import static com.palantir.logsafe.testing.Assertions.assertThatLoggableExceptionThrownBy; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.assertj.core.api.Assertions.assertThatCode; | ||
import static org.mockito.Mockito.mock; | ||
|
||
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; | ||
import java.util.List; | ||
import org.junit.Test; | ||
|
||
public class MaybeWitnessedTransactionTest { | ||
|
||
private static final Long START_TIMESTAMP = 100L; | ||
private static final Long COMMIT_TIMESTAMP = 200L; | ||
|
||
@Test | ||
public void throwsWhenCommitTimestampIsNotPresent() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: these two should mention |
||
assertThatLoggableExceptionThrownBy(() -> | ||
MaybeWitnessedTransaction.builder().startTimestamp(100L).build()) | ||
.isInstanceOf(SafeIllegalArgumentException.class) | ||
.hasMessageContaining("Commit timestamp must be present in a potentially witnessed transaction"); | ||
} | ||
|
||
@Test | ||
public void doesNotThrowWhenCommitTimestampPresent() { | ||
assertThatCode(() -> MaybeWitnessedTransaction.builder() | ||
.startTimestamp(START_TIMESTAMP) | ||
.commitTimestamp(COMMIT_TIMESTAMP) | ||
.build()) | ||
.doesNotThrowAnyException(); | ||
} | ||
|
||
@Test | ||
public void toFullyWitnessedCopiesArgumentsCorrectly() { | ||
WitnessedWriteTransactionAction writeTransactionAction = mock(WitnessedWriteTransactionAction.class); | ||
WitnessedReadTransactionAction readTransactionAction = mock(WitnessedReadTransactionAction.class); | ||
List<WitnessedTransactionAction> actions = List.of(readTransactionAction, writeTransactionAction); | ||
FullyWitnessedTransaction fullyWitnessedTransaction = MaybeWitnessedTransaction.builder() | ||
.startTimestamp(START_TIMESTAMP) | ||
.commitTimestamp(COMMIT_TIMESTAMP) | ||
.actions(actions) | ||
.build() | ||
.toFullyWitnessed(); | ||
assertThat(fullyWitnessedTransaction.startTimestamp()).isEqualTo(START_TIMESTAMP); | ||
assertThat(fullyWitnessedTransaction.commitTimestamp()).contains(COMMIT_TIMESTAMP); | ||
assertThat(fullyWitnessedTransaction.actions()).containsExactlyElementsOf(actions); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,19 +18,23 @@ | |
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Suppliers; | ||
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException; | ||
import com.palantir.atlasdb.keyvalue.api.TableReference; | ||
import com.palantir.atlasdb.transaction.api.PreCommitCondition; | ||
import com.palantir.atlasdb.transaction.api.Transaction; | ||
import com.palantir.atlasdb.transaction.api.TransactionManager; | ||
import com.palantir.atlasdb.transaction.service.TransactionStatus; | ||
import com.palantir.atlasdb.workload.transaction.DeleteTransactionAction; | ||
import com.palantir.atlasdb.workload.transaction.InteractiveTransaction; | ||
import com.palantir.atlasdb.workload.transaction.ReadTransactionAction; | ||
import com.palantir.atlasdb.workload.transaction.TransactionAction; | ||
import com.palantir.atlasdb.workload.transaction.TransactionActionVisitor; | ||
import com.palantir.atlasdb.workload.transaction.WriteTransactionAction; | ||
import com.palantir.atlasdb.workload.transaction.witnessed.ImmutableWitnessedTransaction; | ||
import com.palantir.atlasdb.workload.transaction.witnessed.FullyWitnessedTransaction; | ||
import com.palantir.atlasdb.workload.transaction.witnessed.MaybeWitnessedTransaction; | ||
import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransaction; | ||
import com.palantir.atlasdb.workload.transaction.witnessed.WitnessedTransactionAction; | ||
import com.palantir.logsafe.SafeArg; | ||
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; | ||
import com.palantir.logsafe.exceptions.SafeIllegalStateException; | ||
import com.palantir.logsafe.logger.SafeLogger; | ||
|
@@ -63,6 +67,13 @@ public Optional<Integer> get(String table, WorkloadCell cell) { | |
transaction -> new AtlasDbInteractiveTransaction(transaction, tables).read(table, cell)); | ||
} | ||
|
||
@Override | ||
public boolean isCommitted(long startTimestamp) { | ||
return TransactionStatus.getCommitTimestamp( | ||
transactionManager.getTransactionService().getV2(startTimestamp)) | ||
.isPresent(); | ||
} | ||
|
||
@Override | ||
public Optional<WitnessedTransaction> readWrite(List<TransactionAction> actions) { | ||
return readWrite(txn -> { | ||
|
@@ -74,23 +85,26 @@ public Optional<WitnessedTransaction> readWrite(List<TransactionAction> actions) | |
@Override | ||
public Optional<WitnessedTransaction> readWrite(Consumer<InteractiveTransaction> interactiveTransactionConsumer) { | ||
AtomicReference<List<WitnessedTransactionAction>> witnessedActionsReference = new AtomicReference<>(); | ||
AtomicReference<Transaction> transactionReference = new AtomicReference<>(); | ||
Supplier<CommitTimestampProvider> commitTimestampProvider = | ||
Suppliers.memoize(() -> new CommitTimestampProvider()); | ||
try { | ||
Transaction transaction = | ||
transactionManager.runTaskWithConditionWithRetry(commitTimestampProvider, (txn, _condition) -> { | ||
AtlasDbInteractiveTransaction atlasDbInteractiveTransaction = | ||
new AtlasDbInteractiveTransaction(txn, tables); | ||
interactiveTransactionConsumer.accept(atlasDbInteractiveTransaction); | ||
witnessedActionsReference.set(atlasDbInteractiveTransaction.witness()); | ||
return txn; | ||
}); | ||
transactionManager.runTaskWithConditionWithRetry(commitTimestampProvider, (txn, _condition) -> { | ||
AtlasDbInteractiveTransaction atlasDbInteractiveTransaction = | ||
new AtlasDbInteractiveTransaction(txn, tables); | ||
interactiveTransactionConsumer.accept(atlasDbInteractiveTransaction); | ||
witnessedActionsReference.set(atlasDbInteractiveTransaction.witness()); | ||
transactionReference.set(txn); | ||
return null; | ||
}); | ||
|
||
Transaction transaction = transactionReference.get(); | ||
|
||
if (transaction.isAborted()) { | ||
return Optional.empty(); | ||
} | ||
|
||
return Optional.of(ImmutableWitnessedTransaction.builder() | ||
return Optional.of(FullyWitnessedTransaction.builder() | ||
.startTimestamp(transaction.getTimestamp()) | ||
.commitTimestamp(commitTimestampProvider | ||
.get() | ||
|
@@ -99,9 +113,22 @@ public Optional<WitnessedTransaction> readWrite(Consumer<InteractiveTransaction> | |
.build()); | ||
} catch (SafeIllegalArgumentException e) { | ||
throw e; | ||
} catch (KeyAlreadyExistsException e) { | ||
Transaction transaction = transactionReference.get(); | ||
return Optional.of(MaybeWitnessedTransaction.builder() | ||
.startTimestamp(transaction.getTimestamp()) | ||
.commitTimestamp(commitTimestampProvider | ||
.get() | ||
.getCommitTimestampOrThrowIfMaybeNotCommitted(transaction.getTimestamp())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not actionable: This might work strangely if the transaction task deliberately throws a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Later on, I made it the more general case of |
||
.actions(witnessedActionsReference.get()) | ||
.build()); | ||
} catch (Exception e) { | ||
// TODO: Need to eventually handle PuE exceptions, as they could've succeeded in committing. | ||
log.info("Failed to record transaction due to an exception", e); | ||
Optional<Long> startTimestamp = | ||
Optional.ofNullable(transactionReference.get()).map(Transaction::getTimestamp); | ||
log.info( | ||
"Failed to record transaction due to an exception for startTimestamp {}", | ||
SafeArg.of("startTimestamp", startTimestamp), | ||
e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to set the transaction reference at the beginning of the task, if we're concerned about something going wrong before we enter the commit-writes phase. |
||
return Optional.empty(); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.atlasdb.workload.transaction.witnessed; | ||
|
||
import com.palantir.atlasdb.workload.store.ReadOnlyTransactionStore; | ||
import java.util.Optional; | ||
|
||
public final class OnlyCommittedWitnessedTransactionVisitor | ||
implements WitnessedTransactionVisitor<Optional<FullyWitnessedTransaction>> { | ||
|
||
private final ReadOnlyTransactionStore readOnlyTransactionStore; | ||
|
||
public OnlyCommittedWitnessedTransactionVisitor(ReadOnlyTransactionStore readOnlyTransactionStore) { | ||
this.readOnlyTransactionStore = readOnlyTransactionStore; | ||
} | ||
|
||
@Override | ||
public Optional<FullyWitnessedTransaction> visit(FullyWitnessedTransaction witnessedTransaction) { | ||
return Optional.of(witnessedTransaction); | ||
} | ||
|
||
@Override | ||
public Optional<FullyWitnessedTransaction> visit(MaybeWitnessedTransaction maybeWitnessedTransaction) { | ||
return readOnlyTransactionStore.isCommitted(maybeWitnessedTransaction.startTimestamp()) | ||
? Optional.of(maybeWitnessedTransaction.toFullyWitnessed()) | ||
: Optional.empty(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: checks whether the transaction with the provided startTimestamp has committed?