Skip to content

Commit

Permalink
Batch dml mainline (#4653)
Browse files Browse the repository at this point in the history
* Cloud Spanner Batch DML implementation and integration tests. (#45)

* Fix the file header of the newly added classes. (#46)

* Fix RPC interface mismatch after GAPIC migration.

* Address review comment.

* Fix code format with mvn com.coveo:fmt-maven-plugin:format.

* Update year in file headers.
  • Loading branch information
guangyus authored and kolea2 committed Mar 7, 2019
1 parent 5b4e36c commit cc04990
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

public class SpannerBatchUpdateException extends SpannerException {
private long[] updateCounts;
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
SpannerBatchUpdateException(
DoNotConstructDirectly token, ErrorCode code, String message, long[] counts) {
super(token, code, false, message, null);
updateCounts = counts;
}

/** Returns the number of rows affected by each statement that is successfully run. */
public long[] getUpdateCounts() {
return updateCounts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public static SpannerException newSpannerException(Throwable cause) {
return newSpannerException(null, cause);
}

public static SpannerBatchUpdateException newSpannerBatchUpdateException(
ErrorCode code, String message, long[] updateCounts) {
DoNotConstructDirectly token = DoNotConstructDirectly.ALLOWED;
return new SpannerBatchUpdateException(token, code, message, updateCounts);
}

/**
* Creates a new exception based on {@code cause}. If {@code cause} indicates cancellation, {@code
* context} will be inspected to establish the type of cancellation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerBatchUpdateException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -69,6 +70,7 @@
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.PartialResultSet;
Expand Down Expand Up @@ -1080,6 +1082,36 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
return builder;
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements) {
ExecuteBatchDmlRequest.Builder builder =
ExecuteBatchDmlRequest.newBuilder().setSession(session.name);
int idx = 0;
for (Statement stmt : statements) {
builder.addStatementsBuilder();
builder.getStatementsBuilder(idx).setSql(stmt.getSql());
Map<String, Value> stmtParameters = stmt.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder =
builder.getStatementsBuilder(idx).getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
builder
.getStatementsBuilder(idx)
.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
idx++;
}

TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
builder.setSeqno(getSeqNo());
return builder;
}

ResultSet executeQueryInternalWithOptions(
Statement statement,
com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Expand Down Expand Up @@ -1660,6 +1692,32 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
}

@Override
public long[] batchUpdate(Iterable<Statement> statements) {
beforeReadOrQuery();
final ExecuteBatchDmlRequest.Builder builder = getExecuteBatchDmlRequestBuilder(statements);
com.google.spanner.v1.ExecuteBatchDmlResponse response =
runWithRetries(
new Callable<com.google.spanner.v1.ExecuteBatchDmlResponse>() {
@Override
public com.google.spanner.v1.ExecuteBatchDmlResponse call() throws Exception {
return rpc.executeBatchDml(builder.build(), session.options);
}
});
long[] results = new long[response.getResultSetsCount()];
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
}

if (response.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(response.getStatus()),
response.getStatus().getMessage(),
results);
}
return results;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,21 @@ public interface TransactionContext extends ReadContext {
* visible to subsequent operations in the transaction.
*/
long executeUpdate(Statement statement);

/**
* Executes a list of DML statements in a single request. The statements will be executed in order
* and the semantics is the same as if each statement is executed by {@code executeUpdate} in a
* loop. This method returns an array of long integers, each representing the number of rows
* modified by each statement.
*
* <p>If an individual statement fails, execution stops and a {@code SpannerBatchUpdateException}
* is returned, which includes the error and the number of rows affected by the statements that
* are run prior to the error.
*
* <p>For example, if statements contains 3 statements, and the 2nd one is not a valid DML. This
* method throws a {@code SpannerBatchUpdateException} that contains the error message from the
* 2nd statement, and an array of length 1 that contains the number of rows modified by the 1st
* statement. The 3rd statement will not run.
*/
long[] batchUpdate(Iterable<Statement> statements);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
Expand Down Expand Up @@ -514,6 +516,14 @@ public void cancel(String message) {
};
}

@Override
public ExecuteBatchDmlResponse executeBatchDml(
ExecuteBatchDmlRequest request, @Nullable Map<Option, ?> options) {

GrpcCallContext context = newCallContext(options, request.getSession());
return get(spannerStub.executeBatchDmlCallable().futureCall(request, context));
}

@Override
public Transaction beginTransaction(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
Expand Down Expand Up @@ -214,6 +216,8 @@ StreamingCall read(
StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

ExecuteBatchDmlResponse executeBatchDml(ExecuteBatchDmlRequest build, Map<Option, ?> options);

Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map<Option, ?> options)
throws SpannerException;

Expand Down
Loading

0 comments on commit cc04990

Please sign in to comment.