Skip to content

Commit

Permalink
Spanner: Block nested transactions (googleapis#3628)
Browse files Browse the repository at this point in the history
Cloud spanner does not support nested transactions. Use a thread-local
flag to check and throw exception.

Signed-off-by: Nithin Nayak Sujir <[email protected]>
  • Loading branch information
nithinsujir authored and pongad committed Sep 19, 2018
1 parent 7869364 commit e8cff3f
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 2 deletions.
2 changes: 2 additions & 0 deletions google-cloud-clients/google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
</parent>
<properties>
<site.installationModule>google-cloud-spanner</site.installationModule>
<skipUnitTests>${skipTests}</skipUnitTests>
<jacoco.skip>true</jacoco.skip>
</properties>
<build>
Expand Down Expand Up @@ -47,6 +48,7 @@
<version>2.12.4</version>
<configuration>
<excludedGroups>com.google.cloud.spanner.IntegrationTest</excludedGroups>
<skipTests>${skipUnitTests}</skipTests>
<reportNameSuffix>sponge_log</reportNameSuffix>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ public <T> T run(TransactionCallable<T> callable) {
public Timestamp getCommitTimestamp() {
return runner.getCommitTimestamp();
}

@Override
public TransactionRunner allowNestedTransaction() {
runner.allowNestedTransaction();
return runner;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractList;
Expand Down Expand Up @@ -129,6 +128,19 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
private static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery";
private static final String READ = "CloudSpannerOperation.ExecuteStreamingRead";

private static final ThreadLocal<Boolean> hasPendingTransaction = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};

private static void throwIfTransactionsPending() {
if (hasPendingTransaction.get() == Boolean.TRUE) {
throw newSpannerException(ErrorCode.INTERNAL, "Nested transactions are not supported");
}
}

static {
TraceUtil.exportSpans(CREATE_SESSION, DELETE_SESSION, BEGIN_TRANSACTION, COMMIT, QUERY, READ);
}
Expand Down Expand Up @@ -905,6 +917,8 @@ TransactionContextImpl newTransaction() {
}

<T extends SessionTransaction> T setActive(@Nullable T ctx) {
throwIfTransactionsPending();

if (activeTransaction != null) {
activeTransaction.invalidate();
}
Expand Down Expand Up @@ -1209,6 +1223,7 @@ public void execute(Runnable command) {

@VisibleForTesting
static class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
private boolean blockNestedTxn = true;

/** Allow for testing of backoff logic */
static class Sleeper {
Expand All @@ -1223,6 +1238,11 @@ void backoffSleep(Context context, long backoffMillis) {
private TransactionContextImpl txn;
private volatile boolean isValid = true;

public TransactionRunner allowNestedTransaction() {
blockNestedTxn = false;
return this;
}

TransactionRunnerImpl(
SessionImpl session, SpannerRpc rpc, Sleeper sleeper, int defaultPrefetchChunks) {
this.session = session;
Expand All @@ -1239,11 +1259,19 @@ void backoffSleep(Context context, long backoffMillis) {
@Override
public <T> T run(TransactionCallable<T> callable) {
try (Scope s = tracer.withSpan(span)) {
if (blockNestedTxn) {
hasPendingTransaction.set(Boolean.TRUE);
}

return runInternal(callable);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
// Remove threadLocal rather than set to FALSE to avoid a possible memory leak.
// We also do this unconditionally in case a user has modified the flag when the transaction
// was running.
hasPendingTransaction.remove();
span.end();
}
}
Expand Down Expand Up @@ -1660,6 +1688,8 @@ ByteString getTransactionId() {
}

void initTransaction() {
throwIfTransactionsPending();

// Since we only support synchronous calls, just block on "txnLock" while the RPC is in
// flight. Note that we use the strategy of sending an explicit BeginTransaction() RPC,
// rather than using the first read in the transaction to begin it implicitly. The chosen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,23 @@ interface TransactionCallable<T> {
* {@link #run(TransactionCallable)} has returned normally.
*/
Timestamp getCommitTimestamp();

/**
* Allows overriding the default behaviour of blocking nested transactions.
*
* Note that the client library does not maintain any information regarding the nesting structure.
* If an outer transaction fails and an inner transaction succeeds, upon retry of the outer
* transaction, the inner transaction will be re-executed.
*
* Use with care when certain that the inner transaction is idempotent. Avoid doing this when
* accessing the same db. There might be legitimate uses where access need to be made across DBs
* for instance.
*
* E.g. of nesting that is discouraged, see {@code nestedReadWriteTxnThrows}
* {@code nestedReadOnlyTxnThrows}, {@code nestedBatchTxnThrows},
* {@code nestedSingleUseReadTxnThrows}
*
* @return this object
*/
TransactionRunner allowNestedTransaction();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.IntegrationTest;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -349,4 +353,126 @@ public Void run(TransactionContext transaction) throws SpannerException {
.getLong(0))
.isEqualTo(2);
}

private void doNestedRwTransaction() {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
return null;
}
});

return null;
}
});
}

@Test
public void nestedReadWriteTxnThrows() {
try {
doNestedRwTransaction();
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedReadOnlyTxnThrows() {
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client
.readOnlyTransaction()
.getReadTimestamp();

return null;
}
});
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedBatchTxnThrows() {
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
BatchClient batchClient = env.getTestHelper().getBatchClient(db);
BatchReadOnlyTransaction batchTxn = batchClient
.batchReadOnlyTransaction(TimestampBound.strong());
batchTxn.partitionReadUsingIndex(
PartitionOptions.getDefaultInstance(),
"Test",
"Index",
KeySet.all(),
Arrays.asList("Fingerprint"));

return null;
}
});
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedSingleUseReadTxnThrows() {
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client.singleUseReadOnlyTransaction();

return null;
}
});
fail("Expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL);
assertThat(e.getMessage()).contains("not supported");
}
}

@Test
public void nestedTxnSucceedsWhenAllowed() {
client
.readWriteTransaction()
.allowNestedTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws SpannerException {
client.singleUseReadOnlyTransaction();

return null;
}
});
}
}

0 comments on commit e8cff3f

Please sign in to comment.