diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
index 3120e651feb2..969af0275fcf 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
@@ -135,7 +135,7 @@ public interface DatabaseClient {
ReadOnlyTransaction singleUseReadOnlyTransaction();
/**
- * Returns a read-only transaction context in which a single read or query can be performed at the
+ * Returns a read-only transaction context in which a single read or query can be performed at
* given timestamp bound. This method differs from {@link #singleUse(TimestampBound)} in that the
* read timestamp used may be inspected after the read has returned data or finished successfully.
*
@@ -269,4 +269,51 @@ public interface DatabaseClient {
*
*/
TransactionManager transactionManager();
+
+ /**
+ * Returns the lower bound of rows modified by this DML statement.
+ *
+ *
The method will block until the update is complete. Running a DML statement with this method
+ * does not offer exactly once semantics, and therfore the DML statement should be idempotent. The
+ * DML statement must be fully-partitionable. Specifically, the statement must be expressible as
+ * the union of many statements which each access only a single row of the table. This is a
+ * Partitioned DML transaction in which a single Partitioned DML statement is executed.
+ * Partitioned DML partitions the key space and runs the DML statement over each partition in
+ * parallel using separate, internal transactions that commit independently. Partitioned DML
+ * transactions do not need to be committed.
+ *
+ *
Partitioned DML updates are used to execute a single DML statement with a different
+ * execution strategy that provides different, and often better, scalability properties for large,
+ * table-wide operations than DML in a {@link #readWriteTransaction()} transaction. Smaller scoped
+ * statements, such as an OLTP workload, should prefer using {@link
+ * TransactionContext#executeUpdate(Statement)} with {@link #readWriteTransaction()}.
+ *
+ *
+ * That said, Partitioned DML is not a drop-in replacement for standard DML used in {@link
+ * #readWriteTransaction()}.
+ *
The DML statement must be fully-partitionable. Specifically, the statement must be
+ * expressible as the union of many statements which each access only a single row of the
+ * table.
+ *
The statement is not applied atomically to all rows of the table. Rather, the statement
+ * is applied atomically to partitions of the table, in independent internal transactions.
+ * Secondary index rows are updated atomically with the base table rows.
+ *
Partitioned DML does not guarantee exactly-once execution semantics against a partition.
+ * The statement will be applied at least once to each partition. It is strongly recommended
+ * that the DML statement should be idempotent to avoid unexpected results. For instance, it
+ * is potentially dangerous to run a statement such as `UPDATE table SET column = column +
+ * 1` as it could be run multiple times against some rows.
+ *
The partitions are committed automatically - there is no support for Commit or Rollback.
+ * If the call returns an error, or if the client issuing the DML statement dies, it is
+ * possible that some rows had the statement executed on them successfully. It is also
+ * possible that statement was never executed against other rows.
+ *
If any error is encountered during the execution of the partitioned DML operation (for
+ * instance, a UNIQUE INDEX violation, division by zero, or a value that cannot be stored
+ * due to schema constraints), then the operation is stopped at that point and an error is
+ * returned. It is possible that at this point, some partitions have been committed (or even
+ * committed multiple times), and other partitions have not been run at all.
+ *
+ *
Given the above, Partitioned DML is good fit for large, database-wide, operations that are
+ * idempotent, such as deleting old rows from a very large table.
+ */
+ long executePartitionedUpdate(Statement stmt);
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
index b8c51849d5a6..46a90afac120 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
@@ -28,10 +28,11 @@
class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
+ private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();
static {
- TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION);
+ TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION);
}
private final SessionPool pool;
@@ -155,6 +156,17 @@ public TransactionManager transactionManager() {
}
}
+ @Override
+ public long executePartitionedUpdate(Statement stmt) {
+ Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
+ try (Scope s = tracer.withSpan(span)) {
+ return pool.getReadWriteSession().executePartitionedUpdate(stmt);
+ } catch (RuntimeException e) {
+ TraceUtil.endSpanWithFailure(span, e);
+ throw e;
+ }
+ }
+
ListenableFuture closeAsync() {
return pool.closeAsync();
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java
index d3359df6d24b..23ac2e50d267 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java
@@ -17,6 +17,7 @@
package com.google.cloud.spanner;
import com.google.spanner.v1.ResultSetStats;
+import javax.annotation.Nullable;
/**
* Provides access to the data returned by a Cloud Spanner read or query. {@code ResultSet} allows a
@@ -59,13 +60,17 @@ public interface ResultSet extends AutoCloseable, StructReader {
@Override
void close();
+
/**
* Returns the {@link ResultSetStats} for the query only if the query was executed in either the
* {@code PLAN} or the {@code PROFILE} mode via the {@link ReadContext#analyzeQuery(Statement,
- * com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method. Attempts to call this method on
- * a {@code ResultSet} not obtained from {@code analyzeQuery} result in an {@code
- * UnsupportedOperationException}. This method must be called after {@link #next()} has
- * returned @{code false}. Calling it before that will result in an {@code IllegalStateException}.
+ * com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method or for DML statements in
+ * {@link ReadContext#executeQuery(Statement, QueryOption)}. Attempts to call this method on
+ * a {@code ResultSet} not obtained from {@code analyzeQuery} or {@code executeQuery} will return
+ * a {@code null} {@code ResultSetStats}. This method must be called after {@link #next()} has
+ * returned @{code false}. Calling it before that will result in {@code null}
+ * {@code ResultSetStats} too.
*/
+ @Nullable
ResultSetStats getStats();
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
index ced479151253..3aec9e8ce47b 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
@@ -312,6 +312,18 @@ public Timestamp write(Iterable mutations) throws SpannerException {
}
}
+ @Override
+ public long executePartitionedUpdate(Statement stmt) throws SpannerException {
+ try {
+ markUsed();
+ return delegate.executePartitionedUpdate(stmt);
+ } catch (SpannerException e) {
+ throw lastException = e;
+ } finally {
+ close();
+ }
+ }
+
@Override
public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
try {
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
index b997103dbaf2..9023ddc01b6a 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
@@ -102,6 +102,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -768,6 +769,13 @@ public String getName() {
return options;
}
+ @Override
+ public long executePartitionedUpdate(Statement stmt) {
+ setActive(null);
+ PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, rpc);
+ return txn.executePartitionedUpdate(stmt);
+ }
+
@Override
public Timestamp write(Iterable mutations) throws SpannerException {
TransactionRunner runner = readWriteTransaction();
@@ -957,6 +965,7 @@ abstract static class AbstractReadContext
@GuardedBy("lock")
private boolean isClosed = false;
+
// Allow up to 512MB to be buffered (assuming 1MB chunks). In practice, restart tokens are sent
// much more frequently.
private static final int MAX_BUFFERED_CHUNKS = 512;
@@ -1033,17 +1042,14 @@ private ResultSet executeQueryInternal(
statement, queryMode, readOptions, null /*partitionToken*/);
}
- ResultSet executeQueryInternalWithOptions(
+ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement,
- com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
- Options readOptions,
- ByteString partitionToken) {
- beforeReadOrQuery();
+ QueryMode queryMode) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(queryMode)
- .setSession(session.getName());
+ .setSession(session.name);
Map stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
@@ -1056,10 +1062,20 @@ ResultSet executeQueryInternalWithOptions(
if (selector != null) {
builder.setTransaction(selector);
}
+ return builder;
+ }
+
+ ResultSet executeQueryInternalWithOptions(
+ Statement statement,
+ com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
+ Options readOptions,
+ ByteString partitionToken) {
+ beforeReadOrQuery();
+ final ExecuteSqlRequest.Builder request =
+ getExecuteSqlRequestBuilder(statement, queryMode);
if (partitionToken != null) {
- builder.setPartitionToken(partitionToken);
+ request.setPartitionToken(partitionToken);
}
- final ExecuteSqlRequest request = builder.build();
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
@@ -1067,13 +1083,11 @@ ResultSet executeQueryInternalWithOptions(
@Override
CloseableIterator startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
+ if (resumeToken != null) {
+ request.setResumeToken(resumeToken);
+ }
SpannerRpc.StreamingCall call =
- rpc.executeQuery(
- resumeToken == null
- ? request
- : request.toBuilder().setResumeToken(resumeToken).build(),
- stream.consumer(),
- session.options);
+ rpc.executeQuery(request.build(), stream.consumer(), session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
@@ -1083,7 +1097,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken
}
};
return new GrpcResultSet(stream, this, queryMode);
- }
+ }
/**
* Called before any read or query is started to perform state checks and initializations.
@@ -1152,7 +1166,7 @@ ResultSet readInternalWithOptions(
Options readOptions,
ByteString partitionToken) {
beforeReadOrQuery();
- ReadRequest.Builder builder =
+ final ReadRequest.Builder builder =
ReadRequest.newBuilder()
.setSession(session.name)
.setTable(checkNotNull(table))
@@ -1172,7 +1186,6 @@ ResultSet readInternalWithOptions(
if (partitionToken != null) {
builder.setPartitionToken(partitionToken);
}
- final ReadRequest request = builder.build();
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
@@ -1180,13 +1193,11 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
+ if (resumeToken != null) {
+ builder.setResumeToken(resumeToken);
+ }
SpannerRpc.StreamingCall call =
- rpc.read(
- resumeToken == null
- ? request
- : request.toBuilder().setResumeToken(resumeToken).build(),
- stream.consumer(),
- session.options);
+ rpc.read(builder.build(), stream.consumer(), session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
@@ -1362,6 +1373,83 @@ private void backoff(Context context, BackOff backoff) {
}
}
+ static class PartitionedDMLTransaction implements SessionTransaction {
+ private final ByteString transactionId;
+ private final SessionImpl session;
+ private final SpannerRpc rpc;
+ private volatile boolean isValid = true;
+
+ PartitionedDMLTransaction(
+ SessionImpl session,
+ SpannerRpc rpc) {
+ this.session = session;
+ this.rpc = rpc;
+ this.transactionId = initTransaction();
+ }
+
+ ByteString initTransaction() {
+ final BeginTransactionRequest request =
+ BeginTransactionRequest.newBuilder()
+ .setSession(session.getName())
+ .setOptions(
+ TransactionOptions.newBuilder()
+ .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
+ .build();
+ Transaction txn =
+ runWithRetries(
+ new Callable() {
+ @Override
+ public Transaction call() throws Exception {
+ return rpc.beginTransaction(request, session.options);
+ }
+ });
+ if (txn.getId().isEmpty()) {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INTERNAL,
+ "Failed to init transaction, missing transaction id\n" + session.getName());
+ }
+ return txn.getId();
+ }
+
+ public long executePartitionedUpdate(Statement statement) {
+ checkState(
+ isValid, "Partitioned DML has been invalidated by a new operation on the session");
+ final ExecuteSqlRequest.Builder builder =
+ ExecuteSqlRequest.newBuilder()
+ .setSql(statement.getSql())
+ .setQueryMode(QueryMode.NORMAL)
+ .setSession(session.name)
+ .setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
+ Map stmtParameters = statement.getParameters();
+ if (!stmtParameters.isEmpty()) {
+ com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
+ for (Map.Entry param : stmtParameters.entrySet()) {
+ paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
+ builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
+ }
+ }
+ com.google.spanner.v1.ResultSet resultSet =
+ runWithRetries(
+ new Callable() {
+ @Override
+ public com.google.spanner.v1.ResultSet call() throws Exception {
+ return rpc.executeQuery(builder.build(), session.options);
+ }
+ });
+ if (!resultSet.hasStats()) {
+ throw new IllegalArgumentException(
+ "Partitioned DML response missing stats possibly due to non-DML statement as input");
+ }
+ // For partitioned DML, using the row count lower bound.
+ return resultSet.getStats().getRowCountLowerBound();
+ }
+
+ @Override
+ public void invalidate() {
+ isValid = false;
+ }
+ }
+
@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
@GuardedBy("lock")
@@ -1374,6 +1462,10 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
@GuardedBy("lock")
private long retryDelayInMillis = -1L;
+ // A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML,
+ // ignored for query by the server.
+ private AtomicLong seqNo = new AtomicLong();
+
private ByteString transactionId;
private Timestamp commitTimestamp;
@@ -1495,6 +1587,10 @@ void rollback() {
}
}
+ long getSeqNo() {
+ return seqNo.incrementAndGet();
+ }
+
@Nullable
@Override
TransactionSelector getTransactionSelector() {
@@ -1536,6 +1632,30 @@ public void buffer(Iterable mutations) {
}
}
}
+
+ @Override
+ public long executeUpdate(Statement statement) {
+ beforeReadOrQuery();
+ final ExecuteSqlRequest.Builder builder =
+ getExecuteSqlRequestBuilder(
+ statement,
+ QueryMode.NORMAL);
+ builder.setSeqno(getSeqNo());
+ com.google.spanner.v1.ResultSet resultSet =
+ runWithRetries(
+ new Callable() {
+ @Override
+ public com.google.spanner.v1.ResultSet call() throws Exception {
+ return rpc.executeQuery(builder.build(), session.options);
+ }
+ });
+ if (!resultSet.hasStats()) {
+ throw new IllegalArgumentException(
+ "DML response missing stats possibly due to non-DML statement as input");
+ }
+ // For standard DML, using the exact row count.
+ return resultSet.getStats().getRowCountExact();
+ }
}
/**
@@ -1901,7 +2021,7 @@ public boolean next() throws SpannerException {
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
boolean hasNext = currRow.consumeRow(iterator);
- if (queryMode != QueryMode.NORMAL && !hasNext) {
+ if (!hasNext) {
statistics = iterator.getStats();
}
return hasNext;
@@ -1911,13 +2031,8 @@ public boolean next() throws SpannerException {
}
@Override
+ @Nullable
public ResultSetStats getStats() {
- if (queryMode == QueryMode.NORMAL) {
- throw new UnsupportedOperationException(
- "ResultSetStats are available only in PLAN and PROFILE execution modes");
- }
- checkState(
- statistics != null, "ResultSetStats requested before consuming the entire ResultSet");
return statistics;
}
@@ -2637,13 +2752,10 @@ ResultSetMetadata getMetadata() throws SpannerException {
/*
* Get the query statistics. Query statistics are delivered with the last PartialResultSet
* in the stream. Any attempt to call this method before the caller has finished consuming the
- * results will throw an exception.
+ * results will return null.
*/
- ResultSetStats getStats() {
- if (statistics == null) {
- throw newSpannerException(
- ErrorCode.INTERNAL, "Stream closed without sending query statistics");
- }
+ @Nullable
+ ResultSetStats getStats() {
return statistics;
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
index 540d9a276772..5ddefcf73728 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
@@ -94,4 +94,11 @@ public interface TransactionContext extends ReadContext {
* mutations will be applied atomically.
*/
void buffer(Iterable mutations);
+
+ /**
+ * Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
+ * it will result in an {@code IllegalArgumentException}. The effects of the DML statement
+ * will be visible to subsequent operations in the transaction.
+ */
+ long executeUpdate(Statement statement);
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java
index e983ef6303f9..0cbdf614d45e 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java
@@ -66,6 +66,7 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.ReadRequest;
+import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.SpannerGrpc;
@@ -377,6 +378,17 @@ public StreamingCall read(
Option.CHANNEL_HINT.getLong(options));
}
+ @Override
+ public ResultSet executeQuery(
+ ExecuteSqlRequest request, @Nullable Map