diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
index 969af0275fcf..3120e651feb2 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
@@ -135,7 +135,7 @@ public interface DatabaseClient {
ReadOnlyTransaction singleUseReadOnlyTransaction();
/**
- * Returns a read-only transaction context in which a single read or query can be performed at
+ * Returns a read-only transaction context in which a single read or query can be performed at the
* given timestamp bound. This method differs from {@link #singleUse(TimestampBound)} in that the
* read timestamp used may be inspected after the read has returned data or finished successfully.
*
@@ -269,51 +269,4 @@ public interface DatabaseClient {
*
*/
TransactionManager transactionManager();
-
- /**
- * Returns the lower bound of rows modified by this DML statement.
- *
- *
The method will block until the update is complete. Running a DML statement with this method
- * does not offer exactly once semantics, and therfore the DML statement should be idempotent. The
- * DML statement must be fully-partitionable. Specifically, the statement must be expressible as
- * the union of many statements which each access only a single row of the table. This is a
- * Partitioned DML transaction in which a single Partitioned DML statement is executed.
- * Partitioned DML partitions the key space and runs the DML statement over each partition in
- * parallel using separate, internal transactions that commit independently. Partitioned DML
- * transactions do not need to be committed.
- *
- *
Partitioned DML updates are used to execute a single DML statement with a different
- * execution strategy that provides different, and often better, scalability properties for large,
- * table-wide operations than DML in a {@link #readWriteTransaction()} transaction. Smaller scoped
- * statements, such as an OLTP workload, should prefer using {@link
- * TransactionContext#executeUpdate(Statement)} with {@link #readWriteTransaction()}.
- *
- *
- * That said, Partitioned DML is not a drop-in replacement for standard DML used in {@link
- * #readWriteTransaction()}.
- *
The DML statement must be fully-partitionable. Specifically, the statement must be
- * expressible as the union of many statements which each access only a single row of the
- * table.
- *
The statement is not applied atomically to all rows of the table. Rather, the statement
- * is applied atomically to partitions of the table, in independent internal transactions.
- * Secondary index rows are updated atomically with the base table rows.
- *
Partitioned DML does not guarantee exactly-once execution semantics against a partition.
- * The statement will be applied at least once to each partition. It is strongly recommended
- * that the DML statement should be idempotent to avoid unexpected results. For instance, it
- * is potentially dangerous to run a statement such as `UPDATE table SET column = column +
- * 1` as it could be run multiple times against some rows.
- *
The partitions are committed automatically - there is no support for Commit or Rollback.
- * If the call returns an error, or if the client issuing the DML statement dies, it is
- * possible that some rows had the statement executed on them successfully. It is also
- * possible that statement was never executed against other rows.
- *
If any error is encountered during the execution of the partitioned DML operation (for
- * instance, a UNIQUE INDEX violation, division by zero, or a value that cannot be stored
- * due to schema constraints), then the operation is stopped at that point and an error is
- * returned. It is possible that at this point, some partitions have been committed (or even
- * committed multiple times), and other partitions have not been run at all.
- *
- *
Given the above, Partitioned DML is good fit for large, database-wide, operations that are
- * idempotent, such as deleting old rows from a very large table.
- */
- long executePartitionedUpdate(Statement stmt);
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
index 46a90afac120..b8c51849d5a6 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
@@ -28,11 +28,10 @@
class DatabaseClientImpl implements DatabaseClient {
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
- private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();
static {
- TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION);
+ TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION);
}
private final SessionPool pool;
@@ -156,17 +155,6 @@ public TransactionManager transactionManager() {
}
}
- @Override
- public long executePartitionedUpdate(Statement stmt) {
- Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
- try (Scope s = tracer.withSpan(span)) {
- return pool.getReadWriteSession().executePartitionedUpdate(stmt);
- } catch (RuntimeException e) {
- TraceUtil.endSpanWithFailure(span, e);
- throw e;
- }
- }
-
ListenableFuture closeAsync() {
return pool.closeAsync();
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java
index 23ac2e50d267..d3359df6d24b 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java
@@ -17,7 +17,6 @@
package com.google.cloud.spanner;
import com.google.spanner.v1.ResultSetStats;
-import javax.annotation.Nullable;
/**
* Provides access to the data returned by a Cloud Spanner read or query. {@code ResultSet} allows a
@@ -60,17 +59,13 @@ public interface ResultSet extends AutoCloseable, StructReader {
@Override
void close();
-
/**
* Returns the {@link ResultSetStats} for the query only if the query was executed in either the
* {@code PLAN} or the {@code PROFILE} mode via the {@link ReadContext#analyzeQuery(Statement,
- * com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method or for DML statements in
- * {@link ReadContext#executeQuery(Statement, QueryOption)}. Attempts to call this method on
- * a {@code ResultSet} not obtained from {@code analyzeQuery} or {@code executeQuery} will return
- * a {@code null} {@code ResultSetStats}. This method must be called after {@link #next()} has
- * returned @{code false}. Calling it before that will result in {@code null}
- * {@code ResultSetStats} too.
+ * com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method. Attempts to call this method on
+ * a {@code ResultSet} not obtained from {@code analyzeQuery} result in an {@code
+ * UnsupportedOperationException}. This method must be called after {@link #next()} has
+ * returned @{code false}. Calling it before that will result in an {@code IllegalStateException}.
*/
- @Nullable
ResultSetStats getStats();
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
index 3aec9e8ce47b..ced479151253 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
@@ -312,18 +312,6 @@ public Timestamp write(Iterable mutations) throws SpannerException {
}
}
- @Override
- public long executePartitionedUpdate(Statement stmt) throws SpannerException {
- try {
- markUsed();
- return delegate.executePartitionedUpdate(stmt);
- } catch (SpannerException e) {
- throw lastException = e;
- } finally {
- close();
- }
- }
-
@Override
public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
try {
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
index 9023ddc01b6a..b997103dbaf2 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
@@ -102,7 +102,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -769,13 +768,6 @@ public String getName() {
return options;
}
- @Override
- public long executePartitionedUpdate(Statement stmt) {
- setActive(null);
- PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, rpc);
- return txn.executePartitionedUpdate(stmt);
- }
-
@Override
public Timestamp write(Iterable mutations) throws SpannerException {
TransactionRunner runner = readWriteTransaction();
@@ -965,7 +957,6 @@ abstract static class AbstractReadContext
@GuardedBy("lock")
private boolean isClosed = false;
-
// Allow up to 512MB to be buffered (assuming 1MB chunks). In practice, restart tokens are sent
// much more frequently.
private static final int MAX_BUFFERED_CHUNKS = 512;
@@ -1042,14 +1033,17 @@ private ResultSet executeQueryInternal(
statement, queryMode, readOptions, null /*partitionToken*/);
}
- ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
+ ResultSet executeQueryInternalWithOptions(
Statement statement,
- QueryMode queryMode) {
+ com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
+ Options readOptions,
+ ByteString partitionToken) {
+ beforeReadOrQuery();
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
.setQueryMode(queryMode)
- .setSession(session.name);
+ .setSession(session.getName());
Map stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
@@ -1062,20 +1056,10 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (selector != null) {
builder.setTransaction(selector);
}
- return builder;
- }
-
- ResultSet executeQueryInternalWithOptions(
- Statement statement,
- com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
- Options readOptions,
- ByteString partitionToken) {
- beforeReadOrQuery();
- final ExecuteSqlRequest.Builder request =
- getExecuteSqlRequestBuilder(statement, queryMode);
if (partitionToken != null) {
- request.setPartitionToken(partitionToken);
+ builder.setPartitionToken(partitionToken);
}
+ final ExecuteSqlRequest request = builder.build();
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
@@ -1083,11 +1067,13 @@ ResultSet executeQueryInternalWithOptions(
@Override
CloseableIterator startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
- if (resumeToken != null) {
- request.setResumeToken(resumeToken);
- }
SpannerRpc.StreamingCall call =
- rpc.executeQuery(request.build(), stream.consumer(), session.options);
+ rpc.executeQuery(
+ resumeToken == null
+ ? request
+ : request.toBuilder().setResumeToken(resumeToken).build(),
+ stream.consumer(),
+ session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
@@ -1097,7 +1083,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken
}
};
return new GrpcResultSet(stream, this, queryMode);
- }
+ }
/**
* Called before any read or query is started to perform state checks and initializations.
@@ -1166,7 +1152,7 @@ ResultSet readInternalWithOptions(
Options readOptions,
ByteString partitionToken) {
beforeReadOrQuery();
- final ReadRequest.Builder builder =
+ ReadRequest.Builder builder =
ReadRequest.newBuilder()
.setSession(session.name)
.setTable(checkNotNull(table))
@@ -1186,6 +1172,7 @@ ResultSet readInternalWithOptions(
if (partitionToken != null) {
builder.setPartitionToken(partitionToken);
}
+ final ReadRequest request = builder.build();
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
@@ -1193,11 +1180,13 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
- if (resumeToken != null) {
- builder.setResumeToken(resumeToken);
- }
SpannerRpc.StreamingCall call =
- rpc.read(builder.build(), stream.consumer(), session.options);
+ rpc.read(
+ resumeToken == null
+ ? request
+ : request.toBuilder().setResumeToken(resumeToken).build(),
+ stream.consumer(),
+ session.options);
// We get one message for free.
if (prefetchChunks > 1) {
call.request(prefetchChunks - 1);
@@ -1373,83 +1362,6 @@ private void backoff(Context context, BackOff backoff) {
}
}
- static class PartitionedDMLTransaction implements SessionTransaction {
- private final ByteString transactionId;
- private final SessionImpl session;
- private final SpannerRpc rpc;
- private volatile boolean isValid = true;
-
- PartitionedDMLTransaction(
- SessionImpl session,
- SpannerRpc rpc) {
- this.session = session;
- this.rpc = rpc;
- this.transactionId = initTransaction();
- }
-
- ByteString initTransaction() {
- final BeginTransactionRequest request =
- BeginTransactionRequest.newBuilder()
- .setSession(session.getName())
- .setOptions(
- TransactionOptions.newBuilder()
- .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
- .build();
- Transaction txn =
- runWithRetries(
- new Callable() {
- @Override
- public Transaction call() throws Exception {
- return rpc.beginTransaction(request, session.options);
- }
- });
- if (txn.getId().isEmpty()) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.INTERNAL,
- "Failed to init transaction, missing transaction id\n" + session.getName());
- }
- return txn.getId();
- }
-
- public long executePartitionedUpdate(Statement statement) {
- checkState(
- isValid, "Partitioned DML has been invalidated by a new operation on the session");
- final ExecuteSqlRequest.Builder builder =
- ExecuteSqlRequest.newBuilder()
- .setSql(statement.getSql())
- .setQueryMode(QueryMode.NORMAL)
- .setSession(session.name)
- .setTransaction(TransactionSelector.newBuilder().setId(transactionId).build());
- Map stmtParameters = statement.getParameters();
- if (!stmtParameters.isEmpty()) {
- com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
- for (Map.Entry param : stmtParameters.entrySet()) {
- paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
- builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
- }
- }
- com.google.spanner.v1.ResultSet resultSet =
- runWithRetries(
- new Callable() {
- @Override
- public com.google.spanner.v1.ResultSet call() throws Exception {
- return rpc.executeQuery(builder.build(), session.options);
- }
- });
- if (!resultSet.hasStats()) {
- throw new IllegalArgumentException(
- "Partitioned DML response missing stats possibly due to non-DML statement as input");
- }
- // For partitioned DML, using the row count lower bound.
- return resultSet.getStats().getRowCountLowerBound();
- }
-
- @Override
- public void invalidate() {
- isValid = false;
- }
- }
-
@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
@GuardedBy("lock")
@@ -1462,10 +1374,6 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans
@GuardedBy("lock")
private long retryDelayInMillis = -1L;
- // A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML,
- // ignored for query by the server.
- private AtomicLong seqNo = new AtomicLong();
-
private ByteString transactionId;
private Timestamp commitTimestamp;
@@ -1587,10 +1495,6 @@ void rollback() {
}
}
- long getSeqNo() {
- return seqNo.incrementAndGet();
- }
-
@Nullable
@Override
TransactionSelector getTransactionSelector() {
@@ -1632,30 +1536,6 @@ public void buffer(Iterable mutations) {
}
}
}
-
- @Override
- public long executeUpdate(Statement statement) {
- beforeReadOrQuery();
- final ExecuteSqlRequest.Builder builder =
- getExecuteSqlRequestBuilder(
- statement,
- QueryMode.NORMAL);
- builder.setSeqno(getSeqNo());
- com.google.spanner.v1.ResultSet resultSet =
- runWithRetries(
- new Callable() {
- @Override
- public com.google.spanner.v1.ResultSet call() throws Exception {
- return rpc.executeQuery(builder.build(), session.options);
- }
- });
- if (!resultSet.hasStats()) {
- throw new IllegalArgumentException(
- "DML response missing stats possibly due to non-DML statement as input");
- }
- // For standard DML, using the exact row count.
- return resultSet.getStats().getRowCountExact();
- }
}
/**
@@ -2021,7 +1901,7 @@ public boolean next() throws SpannerException {
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
boolean hasNext = currRow.consumeRow(iterator);
- if (!hasNext) {
+ if (queryMode != QueryMode.NORMAL && !hasNext) {
statistics = iterator.getStats();
}
return hasNext;
@@ -2031,8 +1911,13 @@ public boolean next() throws SpannerException {
}
@Override
- @Nullable
public ResultSetStats getStats() {
+ if (queryMode == QueryMode.NORMAL) {
+ throw new UnsupportedOperationException(
+ "ResultSetStats are available only in PLAN and PROFILE execution modes");
+ }
+ checkState(
+ statistics != null, "ResultSetStats requested before consuming the entire ResultSet");
return statistics;
}
@@ -2752,10 +2637,13 @@ ResultSetMetadata getMetadata() throws SpannerException {
/*
* Get the query statistics. Query statistics are delivered with the last PartialResultSet
* in the stream. Any attempt to call this method before the caller has finished consuming the
- * results will return null.
+ * results will throw an exception.
*/
- @Nullable
- ResultSetStats getStats() {
+ ResultSetStats getStats() {
+ if (statistics == null) {
+ throw newSpannerException(
+ ErrorCode.INTERNAL, "Stream closed without sending query statistics");
+ }
return statistics;
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
index 5ddefcf73728..540d9a276772 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
@@ -94,11 +94,4 @@ public interface TransactionContext extends ReadContext {
* mutations will be applied atomically.
*/
void buffer(Iterable mutations);
-
- /**
- * Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
- * it will result in an {@code IllegalArgumentException}. The effects of the DML statement
- * will be visible to subsequent operations in the transaction.
- */
- long executeUpdate(Statement statement);
}
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java
index 0cbdf614d45e..e983ef6303f9 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java
@@ -66,7 +66,6 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.ReadRequest;
-import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.SpannerGrpc;
@@ -378,17 +377,6 @@ public StreamingCall read(
Option.CHANNEL_HINT.getLong(options));
}
- @Override
- public ResultSet executeQuery(
- ExecuteSqlRequest request, @Nullable Map