Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option for retrying DML as PDML #3480

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -791,4 +791,22 @@
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
</difference>

<!-- Retry DML as Partitioned DML -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
<method>void retryDmlAsPartitionedDmlStarting(java.util.UUID, com.google.cloud.spanner.Statement, com.google.cloud.spanner.TransactionMutationLimitExceededException)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
<method>void retryDmlAsPartitionedDmlFinished(java.util.UUID, com.google.cloud.spanner.Statement, long)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
<method>void retryDmlAsPartitionedDmlFailed(java.util.UUID, com.google.cloud.spanner.Statement, java.lang.Throwable)</method>
</difference>


</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.TransactionMutationLimitExceededException.isTransactionMutationLimitException;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.cloud.spanner.SpannerException.DoNotConstructDirectly;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -256,6 +259,18 @@ private static ErrorInfo extractErrorInfo(Throwable cause) {
return null;
}

static ErrorDetails extractErrorDetails(Throwable cause) {
Throwable prevCause = null;
while (cause != null && cause != prevCause) {
if (cause instanceof ApiException) {
return ((ApiException) cause).getErrorDetails();
}
prevCause = cause;
cause = cause.getCause();
}
return null;
}

/**
* Creates a {@link StatusRuntimeException} that contains a {@link RetryInfo} with the specified
* retry delay.
Expand Down Expand Up @@ -313,6 +328,11 @@ static SpannerException newSpannerExceptionPreformatted(
token, message, resourceInfo, cause, apiException);
}
}
case INVALID_ARGUMENT:
if (isTransactionMutationLimitException(cause)) {
return new TransactionMutationLimitExceededException(
token, code, message, cause, apiException);
}
// Fall through to the default.
default:
return new SpannerException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerExceptionFactory.extractErrorDetails;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ErrorDetails;
import javax.annotation.Nullable;

/** Exception thrown by Spanner when the transaction mutation limit has been exceeded. */
public class TransactionMutationLimitExceededException extends SpannerException {
private static final long serialVersionUID = 1L;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
TransactionMutationLimitExceededException(
DoNotConstructDirectly token,
ErrorCode errorCode,
String message,
Throwable cause,
@Nullable ApiException apiException) {
super(token, errorCode, /*retryable = */ false, message, cause, apiException);
}

static boolean isTransactionMutationLimitException(Throwable cause) {
if (cause == null
|| cause.getMessage() == null
|| !cause.getMessage().contains("The transaction contains too many mutations.")) {
return false;
}
// Spanner includes a hint that points to the Spanner limits documentation page when the error
// was that the transaction mutation limit was exceeded. We use that here to identify the error,
// as there is no other specific metadata in the error that identifies it (other than the error
// message).
ErrorDetails errorDetails = extractErrorDetails(cause);
if (errorDetails != null && errorDetails.getHelp() != null) {
return errorDetails.getHelp().getLinksCount() == 1
&& errorDetails
.getHelp()
.getLinks(0)
.getDescription()
.equals("Cloud Spanner limits documentation.")
&& errorDetails
.getHelp()
.getLinks(0)
.getUrl()
.equals("https://cloud.google.com/spanner/docs/limits");
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ReadWriteTransaction.Builder;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -75,6 +76,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;
protected final List<TransactionRetryListener> transactionRetryListeners;
protected final boolean excludeTxnFromChangeStreams;
protected final RpcPriority rpcPriority;
protected final Span span;
Expand Down Expand Up @@ -110,6 +112,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;
private List<TransactionRetryListener> transactionRetryListeners;

private boolean excludeTxnFromChangeStreams;
private RpcPriority rpcPriority;
Expand All @@ -134,6 +137,16 @@ B setStatementTimeout(StatementTimeout timeout) {
return self();
}

B setTransactionRetryListeners(List<TransactionRetryListener> listeners) {
Preconditions.checkNotNull(listeners);
this.transactionRetryListeners = listeners;
return self();
}

boolean hasTransactionRetryListeners() {
return this.transactionRetryListeners != null;
}

B setTransactionTag(@Nullable String tag) {
this.transactionTag = tag;
return self();
Expand Down Expand Up @@ -162,6 +175,7 @@ B setSpan(@Nullable Span span) {
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
this.transactionRetryListeners = builder.transactionRetryListeners;
this.excludeTxnFromChangeStreams = builder.excludeTxnFromChangeStreams;
this.rpcPriority = builder.rpcPriority;
this.span = Preconditions.checkNotNull(builder.span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@

/** Enum used to define the behavior of DML statements in autocommit mode */
public enum AutocommitDmlMode {
/** TRANSACTIONAL: DML statements use a standard atomic transaction. */
TRANSACTIONAL,
PARTITIONED_NON_ATOMIC;
/** PARTITIONED_NON_ATOMIC: DML statements use a Partitioned DML transaction. */
PARTITIONED_NON_ATOMIC,
/**
* TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC: DML statements are first executed with a
* standard atomic transaction. If that fails due to the mutation limit being exceeded, the
* statement will automatically be retried using a Partitioned DML transaction. These statements
* are not guaranteed to be atomic. The corresponding {@link TransactionRetryListener} methods
* will be invoked when a DML statement falls back to Partitioned DML.
*/
TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC;

private final String statementString;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2125,6 +2125,7 @@ UnitOfWork createNewUnitOfWork(
.setReadOnly(getConnectionPropertyValue(READONLY))
.setReadOnlyStaleness(getConnectionPropertyValue(READ_ONLY_STALENESS))
.setAutocommitDmlMode(getConnectionPropertyValue(AUTOCOMMIT_DML_MODE))
.setTransactionRetryListeners(transactionRetryListeners)
.setReturnCommitStats(getConnectionPropertyValue(RETURN_COMMIT_STATS))
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
.setMaxCommitDelay(getConnectionPropertyValue(MAX_COMMIT_DELAY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction {
private final SavepointSupport savepointSupport;
private int transactionRetryAttempts;
private int successfulRetries;
private final List<TransactionRetryListener> transactionRetryListeners;
private volatile ApiFuture<TransactionContext> txContextFuture;
private boolean canUseSingleUseRead;
private volatile SettableApiFuture<CommitResponse> commitResponseFuture;
Expand Down Expand Up @@ -203,7 +202,6 @@ static class Builder extends AbstractMultiUseTransaction.Builder<Builder, ReadWr
private boolean returnCommitStats;
private Duration maxCommitDelay;
private SavepointSupport savepointSupport;
private List<TransactionRetryListener> transactionRetryListeners;

private Builder() {}

Expand Down Expand Up @@ -253,19 +251,13 @@ Builder setSavepointSupport(SavepointSupport savepointSupport) {
return this;
}

Builder setTransactionRetryListeners(List<TransactionRetryListener> listeners) {
Preconditions.checkNotNull(listeners);
this.transactionRetryListeners = listeners;
return this;
}

@Override
ReadWriteTransaction build() {
Preconditions.checkState(dbClient != null, "No DatabaseClient client specified");
Preconditions.checkState(
retryAbortsInternally != null, "RetryAbortsInternally is not specified");
Preconditions.checkState(
transactionRetryListeners != null, "TransactionRetryListeners are not specified");
hasTransactionRetryListeners(), "TransactionRetryListeners are not specified");
Preconditions.checkState(savepointSupport != null, "SavepointSupport is not specified");
return new ReadWriteTransaction(this);
}
Expand Down Expand Up @@ -301,7 +293,6 @@ private ReadWriteTransaction(Builder builder) {
this.keepAliveLock = this.keepTransactionAlive ? new ReentrantLock() : null;
this.retryAbortsInternally = builder.retryAbortsInternally;
this.savepointSupport = builder.savepointSupport;
this.transactionRetryListeners = builder.transactionRetryListeners;
this.transactionOptions = extractOptions(builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.spanner.connection.AbstractStatementParser.RUN_BATCH_STATEMENT;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
Expand All @@ -42,11 +43,10 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionMutationLimitExceededException;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.connection.ReadWriteTransaction.Builder;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
Expand All @@ -56,6 +56,7 @@
import io.opentelemetry.context.Scope;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -219,6 +220,11 @@ public boolean supportsDirectedReads(ParsedStatement parsedStatement) {
return parsedStatement.isQuery();
}

private boolean isRetryDmlAsPartitionedDml() {
return this.autocommitDmlMode
== AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC;
}

private void checkAndMarkUsed() {
Preconditions.checkState(!used, "This single-use transaction has already been used");
used = true;
Expand Down Expand Up @@ -434,6 +440,7 @@ public ApiFuture<Long> executeUpdateAsync(
ApiFuture<Long> res;
switch (autocommitDmlMode) {
case TRANSACTIONAL:
case TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC:
res =
ApiFutures.transform(
executeTransactionalUpdateAsync(callType, update, AnalyzeMode.NONE, options),
Expand Down Expand Up @@ -561,11 +568,89 @@ private ApiFuture<Tuple<Long, ResultSet>> executeTransactionalUpdateAsync(
throw t;
}
};
return executeStatementAsync(
callType,
update,
callable,
ImmutableList.of(SpannerGrpc.getExecuteSqlMethod(), SpannerGrpc.getCommitMethod()));
ApiFuture<Tuple<Long, ResultSet>> transactionalResult =
executeStatementAsync(
callType,
update,
callable,
ImmutableList.of(SpannerGrpc.getExecuteSqlMethod(), SpannerGrpc.getCommitMethod()));
// Retry as Partitioned DML if the statement fails due to exceeding the mutation limit if that
// option has been enabled.
if (isRetryDmlAsPartitionedDml()) {
return addRetryUpdateAsPartitionedDmlCallback(transactionalResult, callType, update, options);
}
return transactionalResult;
}

/**
* Adds a callback to the given future that retries the update statement using Partitioned DML if
* the original statement fails with a {@link TransactionMutationLimitExceededException}.
*/
private ApiFuture<Tuple<Long, ResultSet>> addRetryUpdateAsPartitionedDmlCallback(
ApiFuture<Tuple<Long, ResultSet>> transactionalResult,
CallType callType,
final ParsedStatement update,
final UpdateOption... options) {
// Catch TransactionMutationLimitExceededException and retry as Partitioned DML. All other
// exceptions are just propagated.
return ApiFutures.catchingAsync(
transactionalResult,
TransactionMutationLimitExceededException.class,
mutationLimitExceededException -> {
UUID executionId = UUID.randomUUID();
// Invoke the retryDmlAsPartitionedDmlStarting method for the TransactionRetryListeners
// that have been registered for the connection.
for (TransactionRetryListener listener : this.transactionRetryListeners) {
listener.retryDmlAsPartitionedDmlStarting(
executionId, update.getStatement(), mutationLimitExceededException);
}
// Try to execute the DML statement as Partitioned DML.
ApiFuture<Tuple<Long, ResultSet>> partitionedResult =
ApiFutures.transform(
executePartitionedUpdateAsync(callType, update, options),
lowerBoundUpdateCount -> Tuple.of(lowerBoundUpdateCount, null),
MoreExecutors.directExecutor());

// Add a callback to the future that invokes the TransactionRetryListeners after the
// Partitioned DML statement finished. This will invoke either the Finished or Failed
// method on the listeners.
ApiFutures.addCallback(
partitionedResult,
new ApiFutureCallback<Tuple<Long, ResultSet>>() {
@Override
public void onFailure(Throwable throwable) {
for (TransactionRetryListener listener :
SingleUseTransaction.this.transactionRetryListeners) {
listener.retryDmlAsPartitionedDmlFailed(
executionId, update.getStatement(), throwable);
}
}

@Override
public void onSuccess(Tuple<Long, ResultSet> result) {
for (TransactionRetryListener listener :
SingleUseTransaction.this.transactionRetryListeners) {
listener.retryDmlAsPartitionedDmlFinished(
executionId, update.getStatement(), result.x());
}
}
},
MoreExecutors.directExecutor());

// Catch any exception from the Partitioned DML execution and throw the original
// TransactionMutationLimitExceededException instead.
// The exception that is returned for the Partitioned DML statement is added to the
// exception as a suppressed exception.
return ApiFutures.catching(
partitionedResult,
Throwable.class,
input -> {
mutationLimitExceededException.addSuppressed(input);
throw mutationLimitExceededException;
},
MoreExecutors.directExecutor());
},
MoreExecutors.directExecutor());
}

private ApiFuture<ResultSet> analyzeTransactionalUpdateAsync(
Expand Down
Loading
Loading