Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for StorageError #1391

Merged
merged 19 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import javax.annotation.Nullable;

/** Exceptions for Storage Client Libraries. */
public final class Exceptions {
/** Main Storage Exception. Might contain map of streams to errors for that stream. */
public static class StorageException extends RuntimeException {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved

private final ImmutableMap<String, GrpcStatusCode> errors;
private final String streamName;

private StorageException() {
this(null, null, null, ImmutableMap.of());
}

private StorageException(
@Nullable String message,
@Nullable Throwable cause,
@Nullable String streamName,
ImmutableMap<String, GrpcStatusCode> errors) {
super(message, cause);
this.streamName = streamName;
this.errors = errors;
}

public ImmutableMap<String, GrpcStatusCode> getErrors() {
return errors;
}

public String getStreamName() {
return streamName;
}
}

/** Stream has already been finalized. */
public static final class StreamFinalizedException extends StorageException {
protected StreamFinalizedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
}
}

/**
* There was a schema mismatch due to bigquery table with fewer fields than the input message.
* This can be resolved by updating the table's schema with the message schema.
*/
public static final class SchemaMismatchedException extends StorageException {
protected SchemaMismatchedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
}
}

private static StorageError toStorageError(com.google.rpc.Status rpcStatus) {
for (Any detail : rpcStatus.getDetailsList()) {
if (detail.is(StorageError.class)) {
try {
return detail.unpack(StorageError.class);
} catch (InvalidProtocolBufferException protoException) {
throw new IllegalStateException(protoException);
}
}
}
return null;
}

/**
* Converts a c.g.rpc.Status into a StorageException, if possible. Examines the embedded
* StorageError, and potentially returns a {@link StreamFinalizedException} or {@link
* SchemaMismatchedException} (both derive from StorageException). If there is no StorageError, or
* the StorageError is a different error it will return NULL.
*/
@Nullable
public static StorageException toStorageException(
com.google.rpc.Status rpcStatus, Throwable exception) {
StorageError error = toStorageError(rpcStatus);
if (error == null) {
return null;
}
switch (error.getCode()) {
case STREAM_FINALIZED:
return new StreamFinalizedException(error.getEntity(), error.getErrorMessage(), exception);

case SCHEMA_MISMATCH_EXTRA_FIELDS:
return new SchemaMismatchedException(error.getEntity(), error.getErrorMessage(), exception);

default:
return null;
}
}

/**
* Converts a Throwable into a StorageException, if possible. Examines the embedded error message,
* and potentially returns a {@link StreamFinalizedException} or {@link SchemaMismatchedException}
* (both derive from StorageException). If there is no StorageError, or the StorageError is a
* different error it will return NULL.
*/
@Nullable
public static StorageException toStorageException(Throwable exception) {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
String errorMsg = exception.getMessage();
if (errorMsg == null) {
return null;
}
if (errorMsg.toLowerCase().contains("finazlied")) {
return new StreamFinalizedException(null, errorMsg, exception);
}
if (errorMsg.toLowerCase().contains("mismatched")) {
return new SchemaMismatchedException(null, errorMsg, exception);
}
return null;
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,17 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}
if (response.hasError()) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
}
} else {
requestWrapper.appendResult.set(response);
}
Expand All @@ -482,6 +488,10 @@ private void doneCallback(Throwable finalStatus) {
} finally {
this.lock.unlock();
}
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
if (storageException != null) {
this.connectionFinalStatus = storageException;
}
}

@GuardedBy("lock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand Down Expand Up @@ -304,6 +306,60 @@ public void testAppendSuccessAndInStreamError() throws Exception {
writer.close();
}

@Test
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
public void testAppendFailedSchemaError() throws Exception {
StreamWriter writer = getTestStreamWriter();

StorageError storageError =
StorageError.newBuilder()
.setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
.setEntity("foobar")
.build();
com.google.rpc.Status statusProto =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT.getHttpStatusCode())
.addDetails(Any.pack(storageError))
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setError(statusProto).build());
testBigQueryWrite.addResponse(createAppendResponse(1));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertEquals("foobar", actualError.getStreamName());
assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue());

writer.close();
}

@Test
public void testAppendFailedOnDone() throws Exception {
StreamWriter writer = getTestStreamWriter();

StatusRuntimeException exception =
new StatusRuntimeException(
io.grpc.Status.INVALID_ARGUMENT.withDescription("schema mismatched"));

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(exception);

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertTrue(actualError.getMessage().contains("schema mismatched"));

writer.close();
}

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ public class ITBigQueryWriteManualClientTest {
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String TABLEINT = "testtableintcol";
private static final String TABLE2 = "complicatedtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
private static TableInfo intTableInfo;
private static TableInfo tableInfo;
private static TableInfo tableInfo2;
private static TableInfo tableInfoEU;
private static String intTableId;
private static String tableId;
private static String tableId2;
private static String tableIdEU;
Expand All @@ -71,6 +74,15 @@ public static void beforeClass() throws IOException {
DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
bigquery.create(datasetInfo);
LOG.info("Created test dataset: " + DATASET);
intTableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, TABLEINT),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.INTEGER)
.setMode(Field.Mode.NULLABLE)
.build())))
.build();
tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, TABLE),
Expand Down Expand Up @@ -101,8 +113,13 @@ public static void beforeClass() throws IOException {
.build(),
innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build())))
.build();
bigquery.create(intTableInfo);
bigquery.create(tableInfo);
bigquery.create(tableInfo2);
intTableId =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, TABLEINT);
tableId =
String.format(
"projects/%s/datasets/%s/tables/%s",
Expand Down Expand Up @@ -499,6 +516,38 @@ public void testStreamError() throws IOException, InterruptedException, Executio
}
}

@Test
public void testStreamSchemaMisMatchError() throws IOException, InterruptedException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(intTableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());

try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build()) {
// Create a proto row that is not compatible with the table schema.
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRows(new String[] {"a"}), /*offset=*/ -1L);
try {
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at com_google_cloud_bigquery_storage_test_FooType.foo, the proto field type string, BigQuery field type INTEGER Entity");
assertThat(
e.getMessage()
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched with BigQuery field at com_google_cloud_bigquery_storage_test_FooType.foo, the proto field type string, BigQuery field type INTEGER Entity"));
}
}
}

@Test
public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
Expand Down