Skip to content

Commit

Permalink
refactor: include the option in autocommit_dml_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Nov 25, 2024
1 parent ff8e0df commit 644cbdb
Show file tree
Hide file tree
Showing 15 changed files with 1,003 additions and 2,445 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@

/** Enum used to define the behavior of DML statements in autocommit mode */
public enum AutocommitDmlMode {
/** TRANSACTIONAL: DML statements use a standard atomic transaction. */
TRANSACTIONAL,
PARTITIONED_NON_ATOMIC;
/** PARTITIONED_NON_ATOMIC: DML statements use a Partitioned DML transaction. */
PARTITIONED_NON_ATOMIC,
/**
* TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC: DML statements are first executed with a
* standard atomic transaction. If that fails due to the mutation limit being exceeded, the
* statement will automatically be retried using a Partitioned DML transaction. These statements
* are not guaranteed to be atomic. The corresponding {@link TransactionRetryListener} methods
* will be invoked when a DML statement falls back to Partitioned DML.
*/
TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC;

private final String statementString;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,14 +524,6 @@ default byte[] getProtoDescriptors() {
*/
AutocommitDmlMode getAutocommitDmlMode();

default void setFallbackToPartitionedDml(boolean fallbackToPartitionedDml) {
throw new UnsupportedOperationException();
}

default boolean isFallbackToPartitionedDml() {
throw new UnsupportedOperationException();
}

/**
* Sets the staleness to use for the current read-only transaction. This method may only be called
* when the transaction mode of the current transaction is {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static com.google.cloud.spanner.connection.ConnectionProperties.DDL_IN_TRANSACTION_MODE;
import static com.google.cloud.spanner.connection.ConnectionProperties.DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.ConnectionProperties.DIRECTED_READ;
import static com.google.cloud.spanner.connection.ConnectionProperties.FALLBACK_TO_PARTITIONED_DML;
import static com.google.cloud.spanner.connection.ConnectionProperties.KEEP_TRANSACTION_ALIVE;
import static com.google.cloud.spanner.connection.ConnectionProperties.MAX_COMMIT_DELAY;
import static com.google.cloud.spanner.connection.ConnectionProperties.MAX_PARTITIONED_PARALLELISM;
Expand Down Expand Up @@ -664,18 +663,6 @@ public AutocommitDmlMode getAutocommitDmlMode() {
return getConnectionPropertyValue(AUTOCOMMIT_DML_MODE);
}

@Override
public void setFallbackToPartitionedDml(boolean fallbackToPartitionedDml) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
setConnectionPropertyValue(FALLBACK_TO_PARTITIONED_DML, fallbackToPartitionedDml);
}

@Override
public boolean isFallbackToPartitionedDml() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return getConnectionPropertyValue(FALLBACK_TO_PARTITIONED_DML);
}

@Override
public void setReadOnlyStaleness(TimestampBound staleness) {
Preconditions.checkNotNull(staleness);
Expand Down Expand Up @@ -2138,7 +2125,6 @@ UnitOfWork createNewUnitOfWork(
.setReadOnly(getConnectionPropertyValue(READONLY))
.setReadOnlyStaleness(getConnectionPropertyValue(READ_ONLY_STALENESS))
.setAutocommitDmlMode(getConnectionPropertyValue(AUTOCOMMIT_DML_MODE))
.setRetryDmlAsPartitionedDml(getConnectionPropertyValue(FALLBACK_TO_PARTITIONED_DML))
.setTransactionRetryListeners(transactionRetryListeners)
.setReturnCommitStats(getConnectionPropertyValue(RETURN_COMMIT_STATS))
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,6 @@ class ConnectionProperties {
AutocommitDmlMode.TRANSACTIONAL,
AutocommitDmlModeConverter.INSTANCE,
Context.USER);
static final ConnectionProperty<Boolean> FALLBACK_TO_PARTITIONED_DML =
create(
"fallback_to_partitioned_dml",
"Automatically retry DML statements as Partitioned DML if the atomic DML "
+ "statement failed due to exceeding the Spanner transaction mutation limit. "
+ "This option only affects DML statements that are executed in auto-commit mode.",
false,
BooleanConverter.INSTANCE,
Context.USER);
static final ConnectionProperty<Boolean> RETRY_ABORTS_INTERNALLY =
create(
// TODO: Add support for synonyms for connection properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,4 @@ StatementResult statementSetPgSessionCharacteristicsTransactionMode(
StatementResult statementSetAutoBatchDmlUpdateCountVerification(Boolean verification);

StatementResult statementShowAutoBatchDmlUpdateCountVerification();

StatementResult statementSetFallbackToPartitionedDml(Boolean fallbackToPartitionedDml);

StatementResult statementShowFallbackToPartitionedDml();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DIRECTED_READ;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_FALLBACK_TO_PARTITIONED_DML;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_KEEP_TRANSACTION_ALIVE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_COMMIT_DELAY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_PARTITIONED_PARALLELISM;
Expand Down Expand Up @@ -65,7 +64,6 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DIRECTED_READ;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_FALLBACK_TO_PARTITIONED_DML;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_KEEP_TRANSACTION_ALIVE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_COMMIT_DELAY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_PARTITIONED_PARALLELISM;
Expand Down Expand Up @@ -732,20 +730,6 @@ public StatementResult statementShowAutoBatchDmlUpdateCountVerification() {
SHOW_AUTO_BATCH_DML_UPDATE_COUNT_VERIFICATION);
}

@Override
public StatementResult statementSetFallbackToPartitionedDml(Boolean fallbackToPartitionedDml) {
getConnection().setFallbackToPartitionedDml(fallbackToPartitionedDml);
return noResult(SET_FALLBACK_TO_PARTITIONED_DML);
}

@Override
public StatementResult statementShowFallbackToPartitionedDml() {
return resultSet(
String.format("%sFALLBACK_TO_PARTITIONED_DML", getNamespace(connection.getDialect())),
getConnection().isFallbackToPartitionedDml(),
SHOW_FALLBACK_TO_PARTITIONED_DML);
}

private String processQueryPlan(PlanNode planNode) {
StringBuilder planNodeDescription = new StringBuilder(" : { ");
com.google.protobuf.Struct metadata = planNode.getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class SingleUseTransaction extends AbstractBaseUnitOfWork {
private final BatchClient batchClient;
private final TimestampBound readOnlyStaleness;
private final AutocommitDmlMode autocommitDmlMode;
private final boolean retryDmlAsPartitionedDml;
private final boolean returnCommitStats;
private final Duration maxCommitDelay;
private final boolean internalMetdataQuery;
Expand All @@ -100,7 +99,6 @@ static class Builder extends AbstractBaseUnitOfWork.Builder<Builder, SingleUseTr
private boolean readOnly;
private TimestampBound readOnlyStaleness;
private AutocommitDmlMode autocommitDmlMode;
private boolean retryDmlAsPartitionedDml;
private boolean returnCommitStats;
private Duration maxCommitDelay;
private boolean internalMetadataQuery;
Expand Down Expand Up @@ -142,11 +140,6 @@ Builder setAutocommitDmlMode(AutocommitDmlMode dmlMode) {
return this;
}

Builder setRetryDmlAsPartitionedDml(boolean retryDmlAsPartitionedDml) {
this.retryDmlAsPartitionedDml = retryDmlAsPartitionedDml;
return this;
}

Builder setReturnCommitStats(boolean returnCommitStats) {
this.returnCommitStats = returnCommitStats;
return this;
Expand Down Expand Up @@ -190,7 +183,6 @@ private SingleUseTransaction(Builder builder) {
this.readOnly = builder.readOnly;
this.readOnlyStaleness = builder.readOnlyStaleness;
this.autocommitDmlMode = builder.autocommitDmlMode;
this.retryDmlAsPartitionedDml = builder.retryDmlAsPartitionedDml;
this.returnCommitStats = builder.returnCommitStats;
this.maxCommitDelay = builder.maxCommitDelay;
this.internalMetdataQuery = builder.internalMetadataQuery;
Expand Down Expand Up @@ -228,6 +220,11 @@ public boolean supportsDirectedReads(ParsedStatement parsedStatement) {
return parsedStatement.isQuery();
}

private boolean isRetryDmlAsPartitionedDml() {
return this.autocommitDmlMode
== AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC;
}

private void checkAndMarkUsed() {
Preconditions.checkState(!used, "This single-use transaction has already been used");
used = true;
Expand Down Expand Up @@ -443,6 +440,7 @@ public ApiFuture<Long> executeUpdateAsync(
ApiFuture<Long> res;
switch (autocommitDmlMode) {
case TRANSACTIONAL:
case TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC:
res =
ApiFutures.transform(
executeTransactionalUpdateAsync(callType, update, AnalyzeMode.NONE, options),
Expand Down Expand Up @@ -578,7 +576,7 @@ private ApiFuture<Tuple<Long, ResultSet>> executeTransactionalUpdateAsync(
ImmutableList.of(SpannerGrpc.getExecuteSqlMethod(), SpannerGrpc.getCommitMethod()));
// Retry as Partitioned DML if the statement fails due to exceeding the mutation limit if that
// option has been enabled.
if (this.retryDmlAsPartitionedDml) {
if (isRetryDmlAsPartitionedDml()) {
return addRetryUpdateAsPartitionedDmlCallback(transactionalResult, callType, update, options);
}
return transactionalResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,6 @@
"method": "statementShowAutoBatchDmlUpdateCountVerification",
"exampleStatements": ["show variable auto_batch_dml_update_count_verification"]
},
{
"name": "SHOW VARIABLE FALLBACK_TO_PARTITIONED_DML",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_FALLBACK_TO_PARTITIONED_DML",
"regex": "(?is)\\A\\s*show\\s+variable\\s+fallback_to_partitioned_dml\\s*\\z",
"method": "statementShowFallbackToPartitionedDml",
"exampleStatements": ["show variable fallback_to_partitioned_dml"]
},
{
"name": "PARTITION <sql>",
"executorName": "ClientSideStatementPartitionExecutor",
Expand Down Expand Up @@ -380,17 +371,17 @@
}
},
{
"name": "SET AUTOCOMMIT_DML_MODE = 'PARTITIONED_NON_ATOMIC'|'TRANSACTIONAL'",
"name": "SET AUTOCOMMIT_DML_MODE = 'PARTITIONED_NON_ATOMIC'|'TRANSACTIONAL'|'TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC'",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_AUTOCOMMIT_DML_MODE",
"regex": "(?is)\\A\\s*set\\s+autocommit_dml_mode\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetAutocommitDmlMode",
"exampleStatements": ["set autocommit_dml_mode='PARTITIONED_NON_ATOMIC'", "set autocommit_dml_mode='TRANSACTIONAL'"],
"exampleStatements": ["set autocommit_dml_mode='PARTITIONED_NON_ATOMIC'", "set autocommit_dml_mode='TRANSACTIONAL'", "set autocommit_dml_mode='TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC'"],
"setStatement": {
"propertyName": "AUTOCOMMIT_DML_MODE",
"separator": "=",
"allowedValues": "'(PARTITIONED_NON_ATOMIC|TRANSACTIONAL)'",
"allowedValues": "'(PARTITIONED_NON_ATOMIC|TRANSACTIONAL|TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC)'",
"converterName": "ClientSideStatementValueConverters$AutocommitDmlModeConverter"
}
},
Expand Down Expand Up @@ -710,21 +701,6 @@
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
}
},
{
"name": "SET FALLBACK_TO_PARTITIONED_DML = TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_FALLBACK_TO_PARTITIONED_DML",
"regex": "(?is)\\A\\s*set\\s+fallback_to_partitioned_dml\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetFallbackToPartitionedDml",
"exampleStatements": ["set fallback_to_partitioned_dml = true", "set fallback_to_partitioned_dml = false"],
"setStatement": {
"propertyName": "FALLBACK_TO_PARTITIONED_DML",
"separator": "=",
"allowedValues": "(TRUE|FALSE)",
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
}
},
{
"name": "SHOW VARIABLE DATA_BOOST_ENABLED",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,6 @@
"method": "statementShowAutoBatchDmlUpdateCountVerification",
"exampleStatements": ["show spanner.auto_batch_dml_update_count_verification","show variable spanner.auto_batch_dml_update_count_verification"]
},
{
"name": "SHOW [VARIABLE] SPANNER.FALLBACK_TO_PARTITIONED_DML",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_FALLBACK_TO_PARTITIONED_DML",
"regex": "(?is)\\A\\s*show\\s+(?:variable\\s+)?spanner\\.fallback_to_partitioned_dml\\s*\\z",
"method": "statementShowFallbackToPartitionedDml",
"exampleStatements": ["show spanner.fallback_to_partitioned_dml", "show variable spanner.fallback_to_partitioned_dml"]
},
{
"name": "SHOW [VARIABLE] TRANSACTION ISOLATION LEVEL",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down Expand Up @@ -429,7 +420,7 @@
}
},
{
"name": "SET SPANNER.AUTOCOMMIT_DML_MODE =|TO 'PARTITIONED_NON_ATOMIC'|'TRANSACTIONAL'",
"name": "SET SPANNER.AUTOCOMMIT_DML_MODE =|TO 'PARTITIONED_NON_ATOMIC'|'TRANSACTIONAL'|'TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC'",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_AUTOCOMMIT_DML_MODE",
Expand All @@ -438,13 +429,15 @@
"exampleStatements": [
"set spanner.autocommit_dml_mode='PARTITIONED_NON_ATOMIC'",
"set spanner.autocommit_dml_mode='TRANSACTIONAL'",
"set spanner.autocommit_dml_mode='TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC'",
"set spanner.autocommit_dml_mode to 'PARTITIONED_NON_ATOMIC'",
"set spanner.autocommit_dml_mode to 'TRANSACTIONAL'"
"set spanner.autocommit_dml_mode to 'TRANSACTIONAL'",
"set spanner.autocommit_dml_mode to 'TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC'"
],
"setStatement": {
"propertyName": "SPANNER.AUTOCOMMIT_DML_MODE",
"separator": "(?:=|\\s+TO\\s+)",
"allowedValues": "'(PARTITIONED_NON_ATOMIC|TRANSACTIONAL)'",
"allowedValues": "'(PARTITIONED_NON_ATOMIC|TRANSACTIONAL|TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC)'",
"converterName": "ClientSideStatementValueConverters$AutocommitDmlModeConverter"
}
},
Expand Down Expand Up @@ -864,21 +857,6 @@
"converterName": "ClientSideStatementValueConverters$PgBooleanConverter"
}
},
{
"name": "SET SPANNER.FALLBACK_TO_PARTITIONED_DML = TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_FALLBACK_TO_PARTITIONED_DML",
"regex": "(?is)\\A\\s*set\\s+spanner\\.fallback_to_partitioned_dml(?:\\s*=\\s*|\\s+to\\s+)(.*)\\z",
"method": "statementSetFallbackToPartitionedDml",
"exampleStatements": ["set spanner.fallback_to_partitioned_dml = true", "set spanner.fallback_to_partitioned_dml = false", "set spanner.fallback_to_partitioned_dml to true", "set spanner.fallback_to_partitioned_dml to false", "set spanner.fallback_to_partitioned_dml to off"],
"setStatement": {
"propertyName": "SPANNER.FALLBACK_TO_PARTITIONED_DML",
"separator": "(?:=|\\s+TO\\s+)",
"allowedValues": "(.+)",
"converterName": "ClientSideStatementValueConverters$PgBooleanConverter"
}
},
{
"name": "SET SPANNER.AUTO_BATCH_DML_UPDATE_COUNT = <bigint>",
"executorName": "ClientSideStatementSetExecutor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testTransactionMutationLimitExceeded_isNotRetriedByDefault() {

try (Connection connection = createConnection()) {
connection.setAutocommit(true);
assertFalse(connection.isFallbackToPartitionedDml());
assertEquals(AutocommitDmlMode.TRANSACTIONAL, connection.getAutocommitDmlMode());

TransactionMutationLimitExceededException exception =
assertThrows(
Expand All @@ -96,7 +96,8 @@ public void testTransactionMutationLimitExceeded_canBeRetriedAsPDML() {

try (Connection connection = createConnection()) {
connection.setAutocommit(true);
connection.setFallbackToPartitionedDml(true);
connection.setAutocommitDmlMode(
AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC);

long updateCount = connection.executeUpdate(statement);
assertEquals(100000L, updateCount);
Expand Down Expand Up @@ -136,7 +137,8 @@ public void testTransactionMutationLimitExceeded_retryAsPDMLFails() {

try (Connection connection = createConnection()) {
connection.setAutocommit(true);
connection.setFallbackToPartitionedDml(true);
connection.setAutocommitDmlMode(
AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC);

// The connection throws TransactionMutationLimitExceededException if the retry using
// partitioned DML fails. The exception from the failed retry is returned as a suppressed
Expand Down Expand Up @@ -184,22 +186,28 @@ public void testSqlStatements() {
String prefix = dialect == Dialect.POSTGRESQL ? "SPANNER." : "";

try (Connection connection = createConnection()) {
connection.setAutocommit(true);
try (ResultSet resultSet =
connection.executeQuery(
Statement.of(
String.format("show variable %sfallback_to_partitioned_dml", prefix)))) {
Statement.of(String.format("show variable %sautocommit_dml_mode", prefix)))) {
assertTrue(resultSet.next());
assertFalse(resultSet.getBoolean(String.format("%sFALLBACK_TO_PARTITIONED_DML", prefix)));
assertEquals(
AutocommitDmlMode.TRANSACTIONAL.name(),
resultSet.getString(String.format("%sAUTOCOMMIT_DML_MODE", prefix)));
assertFalse(resultSet.next());
}
connection.execute(
Statement.of(String.format("set %sfallback_to_partitioned_dml = true", prefix)));
Statement.of(
String.format(
"set %sautocommit_dml_mode = 'transactional_with_fallback_to_partitioned_non_atomic'",
prefix)));
try (ResultSet resultSet =
connection.executeQuery(
Statement.of(
String.format("show variable %sfallback_to_partitioned_dml", prefix)))) {
Statement.of(String.format("show variable %sautocommit_dml_mode", prefix)))) {
assertTrue(resultSet.next());
assertTrue(resultSet.getBoolean(String.format("%sFALLBACK_TO_PARTITIONED_DML", prefix)));
assertEquals(
AutocommitDmlMode.TRANSACTIONAL_WITH_FALLBACK_TO_PARTITIONED_NON_ATOMIC.name(),
resultSet.getString(String.format("%sAUTOCOMMIT_DML_MODE", prefix)));
assertFalse(resultSet.next());
}
}
Expand Down
Loading

0 comments on commit 644cbdb

Please sign in to comment.