Skip to content

Commit

Permalink
refactor: introduce PartitionedUpdateOption
Browse files Browse the repository at this point in the history
  • Loading branch information
dengwe1 committed Mar 21, 2024
1 parent aa7bc0c commit 96b508b
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 24 deletions.
7 changes: 7 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -631,4 +631,11 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDirectedRead(com.google.spanner.v1.DirectedReadOptions)</method>
</difference>

<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$UpdateOption[])</method>
<to>long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$PartitionedUpdateOption[])</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
Expand Down Expand Up @@ -600,5 +601,5 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt, UpdateOption... options);
long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -240,7 +240,8 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
}

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
public long executePartitionedUpdate(
final Statement stmt, final PartitionedUpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ public interface ReadOption {}
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to Update and Write operations */
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}

/**
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
* API.
Expand All @@ -86,8 +83,15 @@ public interface QueryOption {}
/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to partitioned update */
public interface PartitionedUpdateOption {}

/** Marker interface to mark options applicable to update operation. */
public interface UpdateOption {}
public interface UpdateOption extends PartitionedUpdateOption {}

/** Marker interface to mark options applicable to partitioned update and write operations */
public interface PartitionedUpdateTransactionOption
extends PartitionedUpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}
Expand Down Expand Up @@ -118,7 +122,7 @@ public static TransactionOption optimisticLock() {
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
* unset.
*/
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
public static PartitionedUpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

Expand Down Expand Up @@ -297,7 +301,7 @@ void appendToOptions(Options options) {

/** Option to request the transaction to be excluded from change streams. */
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
implements UpdateTransactionOption {
implements PartitionedUpdateTransactionOption {
@Override
void appendToOptions(Options options) {
options.withExcludeTxnFromChangeStreams = true;
Expand Down Expand Up @@ -744,6 +748,16 @@ static Options fromUpdateOptions(UpdateOption... options) {
return updateOptions;
}

static Options fromPartitinoedUpdateOptions(PartitionedUpdateOption... options) {
Options partitionedUpdateOptions = new Options();
for (PartitionedUpdateOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(partitionedUpdateOptions);
}
}
return partitionedUpdateOptions;
}

static Options fromTransactionOptions(TransactionOption... options) {
Options transactionOptions = new Options();
for (TransactionOption option : options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -71,15 +71,17 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
* last seen resume token if the server returns any.
*/
long executeStreamingPartitionedUpdate(
final Statement statement, final Duration timeout, final UpdateOption... updateOptions) {
final Statement statement,
final Duration timeout,
final PartitionedUpdateOption... partitionedUpdateOptions) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement");

ByteString resumeToken = ByteString.EMPTY;
boolean foundStats = false;
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Options options = Options.fromUpdateOptions(updateOptions);
Options options = Options.fromPartitinoedUpdateOptions(partitionedUpdateOptions);

try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand Down Expand Up @@ -140,7 +140,7 @@ void markUsed(Instant instant) {
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.cloud.Tuple;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.Options.TransactionOption;
Expand Down Expand Up @@ -1270,7 +1271,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) {
try {
return get(true).executePartitionedUpdate(stmt, options);
} finally {
Expand Down Expand Up @@ -1470,7 +1471,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options)
throws SpannerException {
try {
markUsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,37 @@ public void testUpdateOptionsWithPriorityHashCode() {
assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode());
}

@Test
public void testPartitionedUpdateOptions() {
Options option1 = Options.fromPartitinoedUpdateOptions();
Options option2 = Options.fromPartitinoedUpdateOptions();
assertEquals(option1, option2);
assertEquals(option1.hashCode(), option2.hashCode());
assertEquals(option1.toString(), option2.toString());
assertEquals("", option1.toString());
}

@Test
public void testPartitionedUpdateOptionsWithPriority() {
Options optionsWithHighPriority1 =
Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH));
assertEquals(Priority.PRIORITY_HIGH, optionsWithHighPriority1.priority());
assertEquals("priority: HIGH ", optionsWithHighPriority1.toString());

Options optionsWithHighPriority2 =
Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH));
assertEquals(optionsWithHighPriority1, optionsWithHighPriority2);
assertEquals(optionsWithHighPriority1.hashCode(), optionsWithHighPriority2.hashCode());
assertEquals(optionsWithHighPriority1.toString(), optionsWithHighPriority2.toString());

Options optionsWithMediumPriority =
Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.MEDIUM));
assertEquals(Priority.PRIORITY_MEDIUM, optionsWithMediumPriority.priority());
assertEquals("priority: MEDIUM ", optionsWithMediumPriority.toString());
assertNotEquals(optionsWithHighPriority1, optionsWithMediumPriority);
assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode());
}

@Test
public void testQueryOptionsEquality() {
Options option1 = Options.fromQueryOptions();
Expand Down Expand Up @@ -617,6 +648,26 @@ public void updateEquality() {
assertThat(o2.equals(o3)).isFalse();
}

@Test
public void partitionedUpdateWithTag() {
String tag1 = "app=spanner,env=test";
Options o1 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1));
assertEquals(tag1, o1.tag());
assertEquals("tag: " + tag1 + " ", o1.toString());

Options o2 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1));
assertEquals(o1, o2);
assertEquals(o1.hashCode(), o2.hashCode());
assertEquals(o1.toString(), o2.toString());

String tag2 = "app=spanner,env=stage";
Options o3 = Options.fromPartitinoedUpdateOptions(Options.tag(tag2));
assertEquals("tag: " + tag2 + " ", o3.toString());
assertNotEquals(o2, o3);
assertNotEquals(o2.hashCode(), o3.hashCode());
assertNotEquals(o2.toString(), o3.toString());
}

@Test
public void transactionOptionsTest() {
String tag = "app=spanner,env=test";
Expand Down Expand Up @@ -706,27 +757,27 @@ public void transactionOptionsExcludeTxnFromChangeStreams() {
assertNotEquals(option1.hashCode(), option3.hashCode());

assertTrue(option1.withExcludeTxnFromChangeStreams());
assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true");
assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString());

assertNull(option3.withExcludeTxnFromChangeStreams());
assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true");
assertEquals("", option3.toString());
}

@Test
public void updateOptionsExcludeTxnFromChangeStreams() {
Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams());
Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams());
Options option3 = Options.fromUpdateOptions();
public void partitionedUpdateOptionsExcludeTxnFromChangeStreams() {
Options option1 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams());
Options option2 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams());
Options option3 = Options.fromPartitinoedUpdateOptions();

assertEquals(option1, option2);
assertEquals(option1.hashCode(), option2.hashCode());
assertNotEquals(option1, option3);
assertNotEquals(option1.hashCode(), option3.hashCode());

assertTrue(option1.withExcludeTxnFromChangeStreams());
assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true");
assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString());

assertNull(option3.withExcludeTxnFromChangeStreams());
assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true");
assertEquals("", option3.toString());
}
}

0 comments on commit 96b508b

Please sign in to comment.