diff --git a/google-cloud-clients/google-cloud-spanner/pom.xml b/google-cloud-clients/google-cloud-spanner/pom.xml index d6f7e78b68d4..60c795bdedab 100644 --- a/google-cloud-clients/google-cloud-spanner/pom.xml +++ b/google-cloud-clients/google-cloud-spanner/pom.xml @@ -18,6 +18,7 @@ google-cloud-spanner + ${skipTests} true @@ -47,6 +48,7 @@ 2.12.4 com.google.cloud.spanner.IntegrationTest + ${skipUnitTests} sponge_log diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 80e4d67eb664..ced479151253 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -408,6 +408,12 @@ public T run(TransactionCallable callable) { public Timestamp getCommitTimestamp() { return runner.getCommitTimestamp(); } + + @Override + public TransactionRunner allowNestedTransaction() { + runner.allowNestedTransaction(); + return runner; + } }; } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 850bf2898f57..579da6797ced 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -81,7 +81,6 @@ import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; - import java.io.IOException; import java.io.Serializable; import java.util.AbstractList; @@ -129,6 +128,19 @@ class SpannerImpl extends BaseService implements Spanner { private static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery"; private static final String READ = "CloudSpannerOperation.ExecuteStreamingRead"; + private static final ThreadLocal hasPendingTransaction = new ThreadLocal() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + private static void throwIfTransactionsPending() { + if (hasPendingTransaction.get() == Boolean.TRUE) { + throw newSpannerException(ErrorCode.INTERNAL, "Nested transactions are not supported"); + } + } + static { TraceUtil.exportSpans(CREATE_SESSION, DELETE_SESSION, BEGIN_TRANSACTION, COMMIT, QUERY, READ); } @@ -905,6 +917,8 @@ TransactionContextImpl newTransaction() { } T setActive(@Nullable T ctx) { + throwIfTransactionsPending(); + if (activeTransaction != null) { activeTransaction.invalidate(); } @@ -1209,6 +1223,7 @@ public void execute(Runnable command) { @VisibleForTesting static class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { + private boolean blockNestedTxn = true; /** Allow for testing of backoff logic */ static class Sleeper { @@ -1223,6 +1238,11 @@ void backoffSleep(Context context, long backoffMillis) { private TransactionContextImpl txn; private volatile boolean isValid = true; + public TransactionRunner allowNestedTransaction() { + blockNestedTxn = false; + return this; + } + TransactionRunnerImpl( SessionImpl session, SpannerRpc rpc, Sleeper sleeper, int defaultPrefetchChunks) { this.session = session; @@ -1239,11 +1259,19 @@ void backoffSleep(Context context, long backoffMillis) { @Override public T run(TransactionCallable callable) { try (Scope s = tracer.withSpan(span)) { + if (blockNestedTxn) { + hasPendingTransaction.set(Boolean.TRUE); + } + return runInternal(callable); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; } finally { + // Remove threadLocal rather than set to FALSE to avoid a possible memory leak. + // We also do this unconditionally in case a user has modified the flag when the transaction + // was running. + hasPendingTransaction.remove(); span.end(); } } @@ -1660,6 +1688,8 @@ ByteString getTransactionId() { } void initTransaction() { + throwIfTransactionsPending(); + // Since we only support synchronous calls, just block on "txnLock" while the RPC is in // flight. Note that we use the strategy of sending an explicit BeginTransaction() RPC, // rather than using the first read in the transaction to begin it implicitly. The chosen diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java index 65659ec98b7e..02d5947109d7 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java @@ -73,4 +73,23 @@ interface TransactionCallable { * {@link #run(TransactionCallable)} has returned normally. */ Timestamp getCommitTimestamp(); + + /** + * Allows overriding the default behaviour of blocking nested transactions. + * + * Note that the client library does not maintain any information regarding the nesting structure. + * If an outer transaction fails and an inner transaction succeeds, upon retry of the outer + * transaction, the inner transaction will be re-executed. + * + * Use with care when certain that the inner transaction is idempotent. Avoid doing this when + * accessing the same db. There might be legitimate uses where access need to be made across DBs + * for instance. + * + * E.g. of nesting that is discouraged, see {@code nestedReadWriteTxnThrows} + * {@code nestedReadOnlyTxnThrows}, {@code nestedBatchTxnThrows}, + * {@code nestedSingleUseReadTxnThrows} + * + * @return this object + */ + TransactionRunner allowNestedTransaction(); } diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 9362814146ab..0cd804c6a89e 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -32,7 +32,6 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java index 1bacea50d627..c9fb40416fef 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java @@ -23,13 +23,17 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.BatchClient; +import com.google.cloud.spanner.BatchReadOnlyTransaction; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTest; import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ReadContext; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; @@ -349,4 +353,126 @@ public Void run(TransactionContext transaction) throws SpannerException { .getLong(0)) .isEqualTo(2); } + + private void doNestedRwTransaction() { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); + + return null; + } + }); + } + + @Test + public void nestedReadWriteTxnThrows() { + try { + doNestedRwTransaction(); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedReadOnlyTxnThrows() { + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + client + .readOnlyTransaction() + .getReadTimestamp(); + + return null; + } + }); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedBatchTxnThrows() { + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + BatchClient batchClient = env.getTestHelper().getBatchClient(db); + BatchReadOnlyTransaction batchTxn = batchClient + .batchReadOnlyTransaction(TimestampBound.strong()); + batchTxn.partitionReadUsingIndex( + PartitionOptions.getDefaultInstance(), + "Test", + "Index", + KeySet.all(), + Arrays.asList("Fingerprint")); + + return null; + } + }); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedSingleUseReadTxnThrows() { + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + client.singleUseReadOnlyTransaction(); + + return null; + } + }); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("not supported"); + } + } + + @Test + public void nestedTxnSucceedsWhenAllowed() { + client + .readWriteTransaction() + .allowNestedTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws SpannerException { + client.singleUseReadOnlyTransaction(); + + return null; + } + }); + } }