From 3da6837eac3c4120b99c32be1b57e51248514b0c Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 14 Nov 2024 12:41:31 +0530 Subject: [PATCH] revert changes and add transactionTag option --- .../clirr-ignored-differences.xml | 6 +-- ...tractMultiplexedSessionDatabaseClient.java | 7 ---- .../google/cloud/spanner/DatabaseClient.java | 18 --------- .../cloud/spanner/DatabaseClientImpl.java | 14 +------ .../com/google/cloud/spanner/Options.java | 38 +++++++++++++++++++ .../spanner/PartitionedDmlTransaction.java | 14 +++---- .../com/google/cloud/spanner/SessionImpl.java | 13 +------ .../com/google/cloud/spanner/SessionPool.java | 27 ------------- .../cloud/spanner/DatabaseClientImplTest.java | 4 +- .../PartitionedDmlTransactionTest.java | 8 ++-- .../google/cloud/spanner/SessionPoolTest.java | 7 +--- 11 files changed, 55 insertions(+), 101 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 7f194d55dae..b2bcddee010 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -790,9 +790,5 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - - 7012 - com/google/cloud/spanner/DatabaseClient - long executePartitionedUpdate(com.google.cloud.spanner.Statement, java.lang.String, com.google.cloud.spanner.Options$UpdateOption[]) - + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 74fca99f9a3..ebfb0e0a774 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -21,7 +21,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; -import javax.annotation.Nullable; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -57,10 +56,4 @@ public ServerStream batchWriteAtLeastOnce( public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { throw new UnsupportedOperationException(); } - - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - throw new UnsupportedOperationException(); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 755b9b1ef5d..06237131458 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -22,7 +22,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; -import javax.annotation.Nullable; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -602,21 +601,4 @@ ServerStream batchWriteAtLeastOnce( * idempotent, such as deleting old rows from a very large table. */ long executePartitionedUpdate(Statement stmt, UpdateOption... options); - - /** - * Executes a Partitioned DML statement with the specified transaction tag. - * - *

This method has the same behavior as {@link #executePartitionedUpdate(Statement, - * UpdateOption...)} but allows specifying a transaction tag that will be applied to all - * partitioned operations. - * - * @param stmt The Partitioned DML statement to execute - * @param transactionTag The transaction tag to apply to all partitioned operations. The tag must - * be a printable string (ASCII 32-126) with maximum length of 50 characters. - * @param options The options to use for the update operation - * @return The total number of rows modified by the statement - * @throws SpannerException if the operation failed - */ - long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 9200aa23a2b..d7f16f89524 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -309,21 +309,9 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { - return executePartitionedUpdateWithOptions(stmt, null, options); - } - - @Override - public long executePartitionedUpdate( - final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { - return executePartitionedUpdateWithOptions(stmt, transactionTag, options); - } - - private long executePartitionedUpdateWithOptions( - final Statement stmt, @Nullable String transactionTag, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { - return runWithSessionRetry( - session -> session.executePartitionedUpdate(stmt, transactionTag, options)); + return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); } catch (RuntimeException e) { span.setStatus(e); span.end(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 9c3257586fb..73c47a32ac3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -197,6 +197,10 @@ public static ReadQueryUpdateTransactionOption tag(String name) { return new TagOption(name); } + public static ReadQueryUpdateTransactionOption transactionTag(String name) { + return new TransactionTagOption(name); + } + /** * Specifying this will cause the list operations to fetch at most this many records in a page. */ @@ -394,6 +398,24 @@ void appendToOptions(Options options) { } } + static final class TransactionTagOption extends InternalOption + implements ReadQueryUpdateTransactionOption { + private final String transactionTag; + + TransactionTagOption(String transactionTag) { + this.transactionTag = transactionTag; + } + + String getTransactionTag() { + return transactionTag; + } + + @Override + void appendToOptions(Options options) { + options.transactionTag = transactionTag; + } + } + static final class EtagOption extends InternalOption implements DeleteAdminApiOption { private final String etag; @@ -462,6 +484,7 @@ void appendToOptions(Options options) { private RpcPriority priority; private String tag; private String etag; + private String transactionTag; private Boolean validateOnly; private Boolean withOptimisticLock; private Boolean withExcludeTxnFromChangeStreams; @@ -545,6 +568,14 @@ boolean hasTag() { return tag != null; } + boolean hasTransactionTag() { + return transactionTag != null; + } + + String transactionTag() { + return transactionTag; + } + String tag() { return tag; } @@ -661,6 +692,9 @@ public String toString() { if (orderBy != null) { b.append("orderBy: ").append(orderBy).append(' '); } + if (transactionTag != null) { + b.append("transactionTag: ").append(transactionTag).append(' '); + } return b.toString(); } @@ -694,6 +728,7 @@ public boolean equals(Object o) { && Objects.equals(filter(), that.filter()) && Objects.equals(priority(), that.priority()) && Objects.equals(tag(), that.tag()) + && Objects.equals(transactionTag, that.transactionTag) && Objects.equals(etag(), that.etag()) && Objects.equals(validateOnly(), that.validateOnly()) && Objects.equals(withOptimisticLock(), that.withOptimisticLock()) @@ -760,6 +795,9 @@ public int hashCode() { if (orderBy != null) { result = 31 * result + orderBy.hashCode(); } + if (transactionTag != null) { + result = 31 * result + transactionTag.hashCode(); + } return result; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index dc4d1f54f3c..cabfd2c476e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import org.threeten.bp.Duration; import org.threeten.bp.temporal.ChronoUnit; @@ -55,16 +54,13 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction private final SessionImpl session; private final SpannerRpc rpc; private final Ticker ticker; - private final @Nullable String transactionTag; private final IsRetryableInternalError isRetryableInternalErrorPredicate; private volatile boolean isValid = true; - PartitionedDmlTransaction( - SessionImpl session, SpannerRpc rpc, Ticker ticker, @Nullable String transactionTag) { + PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) { this.session = session; this.rpc = rpc; this.ticker = ticker; - this.transactionTag = transactionTag; this.isRetryableInternalErrorPredicate = new IsRetryableInternalError(); } @@ -198,8 +194,8 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt if (options.hasTag()) { requestOptionsBuilder.setRequestTag(options.tag()); } - if (transactionTag != null) { - requestOptionsBuilder.setTransactionTag(transactionTag); + if (options.hasTransactionTag()) { + requestOptionsBuilder.setTransactionTag(options.transactionTag()); } builder.setRequestOptions(requestOptionsBuilder.build()); } @@ -216,9 +212,9 @@ private ByteString initTransaction(final Options options) { .setExcludeTxnFromChangeStreams( options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)); - if (transactionTag != null) { + if (options.hasTransactionTag()) { builder.setRequestOptions( - RequestOptions.newBuilder().setTransactionTag(transactionTag).build()); + RequestOptions.newBuilder().setTransactionTag(options.transactionTag()).build()); } Transaction tx = rpc.beginTransaction(builder.build(), session.getOptions(), true); if (tx.getId().isEmpty()) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index f0eca14feed..5bd31603685 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -201,18 +201,7 @@ public DatabaseId getDatabaseId() { public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = - new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker(), null); - return txn.executeStreamingPartitionedUpdate( - stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); - } - - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - setActive(null); - PartitionedDmlTransaction txn = - new PartitionedDmlTransaction( - this, spanner.getRpc(), Ticker.systemTicker(), transactionTag); + new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); return txn.executeStreamingPartitionedUpdate( stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index d621f91d630..cf50fa44c77 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1273,11 +1273,6 @@ default AsyncTransactionManager transactionManagerAsync(TransactionOption... opt default long executePartitionedUpdate(Statement stmt, UpdateOption... options) { return get().executePartitionedUpdate(stmt, options); } - - default long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - return get().executePartitionedUpdate(stmt, transactionTag, options); - } } class PooledSessionFutureWrapper implements SessionFutureWrapper { @@ -1499,16 +1494,6 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { } } - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) { - try { - return get(true).executePartitionedUpdate(stmt, transactionTag, options); - } finally { - close(); - } - } - @Override public String getName() { return get().getName(); @@ -1724,18 +1709,6 @@ public long executePartitionedUpdate(Statement stmt, UpdateOption... options) } } - @Override - public long executePartitionedUpdate( - Statement stmt, @Nullable String transactionTag, UpdateOption... options) - throws SpannerException { - try { - markUsed(); - return delegate.executePartitionedUpdate(stmt, transactionTag, options); - } catch (SpannerException e) { - throw lastException = e; - } - } - @Override public ReadContext singleUse() { return delegate.singleUse(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 09812f1d598..f01f9342331 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -1949,7 +1949,9 @@ public void testPartitionedDMLWithTransactionTag() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); client.executePartitionedUpdate( - UPDATE_STATEMENT, "testTransactionTag", Options.tag("app=spanner,env=test,action=dml")); + UPDATE_STATEMENT, + Options.transactionTag("testTransactionTag"), + Options.tag("app=spanner,env=test,action=dml")); List beginTransactions = mockSpanner.getRequestsOfType(BeginTransactionRequest.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index d2b102e3d37..93e0e3eb3d0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -101,7 +101,7 @@ public void setup() { when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); - tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + tx = new PartitionedDmlTransaction(session, rpc, ticker); } @Test @@ -332,7 +332,7 @@ public void testExecuteStreamingPartitionedUpdateUnexpectedEOS() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -371,7 +371,7 @@ public void testExecuteStreamingPartitionedUpdateRSTstream() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); assertThat(count).isEqualTo(1000L); @@ -400,7 +400,7 @@ public void testExecuteStreamingPartitionedUpdateGenericInternalException() { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); - PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker, null); + PartitionedDmlTransaction tx = new PartitionedDmlTransaction(session, rpc, ticker); SpannerException e = assertThrows( SpannerException.class, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index cada6007d13..00339fd2946 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1729,13 +1729,10 @@ public void testSessionNotFoundPartitionedUpdate() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1"); final SessionImpl closedSession = mockSession(); - when(closedSession.executePartitionedUpdate(any(Statement.class))).thenThrow(sessionNotFound); - when(closedSession.executePartitionedUpdate(any(Statement.class), any(), any())) - .thenThrow(sessionNotFound); + when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.executePartitionedUpdate(any(Statement.class))).thenReturn(1L); - when(openSession.executePartitionedUpdate(any(Statement.class), any(), any())).thenReturn(1L); + when(openSession.executePartitionedUpdate(statement)).thenReturn(1L); doAnswer( invocation -> { executor.submit(