Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch dml mainline #4653

Merged
merged 6 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

public class SpannerBatchUpdateException extends SpannerException {
private long[] updateCounts;
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
SpannerBatchUpdateException(
DoNotConstructDirectly token, ErrorCode code, String message, long[] counts) {
super(token, code, false, message, null);
updateCounts = counts;
}

/** Returns the number of rows affected by each statement that is successfully run. */
public long[] getUpdateCounts() {
return updateCounts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public static SpannerException newSpannerException(Throwable cause) {
return newSpannerException(null, cause);
}

public static SpannerBatchUpdateException newSpannerBatchUpdateException(
ErrorCode code, String message, long[] updateCounts) {
DoNotConstructDirectly token = DoNotConstructDirectly.ALLOWED;
return new SpannerBatchUpdateException(token, code, message, updateCounts);
}

/**
* Creates a new exception based on {@code cause}. If {@code cause} indicates cancellation, {@code
* context} will be inspected to establish the type of cancellation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerBatchUpdateException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -69,6 +70,7 @@
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.PartialResultSet;
Expand Down Expand Up @@ -1080,6 +1082,36 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
return builder;
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements) {
ExecuteBatchDmlRequest.Builder builder =
ExecuteBatchDmlRequest.newBuilder().setSession(session.name);
int idx = 0;
for (Statement stmt : statements) {
builder.addStatementsBuilder();
builder.getStatementsBuilder(idx).setSql(stmt.getSql());
Map<String, Value> stmtParameters = stmt.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder =
builder.getStatementsBuilder(idx).getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), param.getValue().toProto());
builder
.getStatementsBuilder(idx)
.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
idx++;
}

TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
builder.setSeqno(getSeqNo());
return builder;
}

ResultSet executeQueryInternalWithOptions(
Statement statement,
com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Expand Down Expand Up @@ -1660,6 +1692,32 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
}

@Override
public long[] batchUpdate(Iterable<Statement> statements) {
beforeReadOrQuery();
final ExecuteBatchDmlRequest.Builder builder = getExecuteBatchDmlRequestBuilder(statements);
com.google.spanner.v1.ExecuteBatchDmlResponse response =
runWithRetries(
new Callable<com.google.spanner.v1.ExecuteBatchDmlResponse>() {
@Override
public com.google.spanner.v1.ExecuteBatchDmlResponse call() throws Exception {
return rpc.executeBatchDml(builder.build(), session.options);
}
});
long[] results = new long[response.getResultSetsCount()];
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
}

if (response.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(response.getStatus()),
response.getStatus().getMessage(),
results);
}
return results;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,21 @@ public interface TransactionContext extends ReadContext {
* visible to subsequent operations in the transaction.
*/
long executeUpdate(Statement statement);

/**
* Executes a list of DML statements in a single request. The statements will be executed in order
* and the semantics is the same as if each statement is executed by {@code executeUpdate} in a
* loop. This method returns an array of long integers, each representing the number of rows
* modified by each statement.
*
* <p>If an individual statement fails, execution stops and a {@code SpannerBatchUpdateException}
* is returned, which includes the error and the number of rows affected by the statements that
* are run prior to the error.
*
* <p>For example, if statements contains 3 statements, and the 2nd one is not a valid DML. This
* method throws a {@code SpannerBatchUpdateException} that contains the error message from the
* 2nd statement, and an array of length 1 that contains the number of rows modified by the 1st
* statement. The 3rd statement will not run.
*/
long[] batchUpdate(Iterable<Statement> statements);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
Expand Down Expand Up @@ -514,6 +516,14 @@ public void cancel(String message) {
};
}

@Override
public ExecuteBatchDmlResponse executeBatchDml(
ExecuteBatchDmlRequest request, @Nullable Map<Option, ?> options) {

GrpcCallContext context = newCallContext(options, request.getSession());
return get(spannerStub.executeBatchDmlCallable().futureCall(request, context));
}

@Override
public Transaction beginTransaction(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
Expand Down Expand Up @@ -214,6 +216,8 @@ StreamingCall read(
StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

ExecuteBatchDmlResponse executeBatchDml(ExecuteBatchDmlRequest build, Map<Option, ?> options);

Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map<Option, ?> options)
throws SpannerException;

Expand Down
Loading