Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update storageError support due to server side enhancement #1456

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.grpc.protobuf.StatusProto;
import javax.annotation.Nullable;

/** Exceptions for Storage Client Libraries. */
Expand Down Expand Up @@ -124,30 +121,8 @@ public static StorageException toStorageException(
*/
@Nullable
public static StorageException toStorageException(Throwable exception) {
// TODO: switch to using rpcStatus when cl/408735437 is rolled out
// com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
Status grpcStatus = Status.fromThrowable(exception);
String message = exception.getMessage();
String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
Pattern streamPattern = Pattern.compile(streamPatternString);
if (message == null) {
return null;
}
// TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("input schema has more fields than bigquery schema")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new SchemaMismatchedException(entity, message, exception);
}
// TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("stream has been finalized and cannot be appended")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new StreamFinalizedException(entity, message, exception);
}
return null;
com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
return toStorageException(rpcStatus, exception);
}

private Exceptions() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,18 +594,15 @@ private void doneCallback(Throwable finalStatus) {
+ " for stream "
+ streamName);
} else {
this.connectionFinalStatus = finalStatus;
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
this.connectionFinalStatus = storageException != null ? storageException : finalStatus;
log.info(
"Stream finished with error " + finalStatus.toString() + " for stream " + streamName);
}
}
} finally {
this.lock.unlock();
}
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this inside the lock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we should move it even up to l597, otherwise, there will be a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the Exception conversion needs to be protected by the lock so that it is not thrown before getting converted.

if (storageException != null) {
this.connectionFinalStatus = storageException;
}
}

@GuardedBy("lock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,64 +343,6 @@ public void testAppendFailedSchemaError() throws Exception {
writer.close();
}

@Test
public void testAppendFailedOnDone() throws Exception {
StreamWriter writer = getTestStreamWriter();

StatusRuntimeException exception =
new StatusRuntimeException(
io.grpc.Status.INVALID_ARGUMENT.withDescription(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(exception);

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertTrue(
actualError
.getMessage()
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

writer.close();
}

// TODO(stephwang): update test case to below when toStorageException is updated
// @Test
// public void testAppendFailedOnDone2() throws Exception {
// StreamWriter writer = getTestStreamWriter();
//
// StorageError storageError =
// StorageError.newBuilder()
// .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
// .setEntity("foobar")
// .build();
// com.google.rpc.Status statusProto =
// com.google.rpc.Status.newBuilder()
// .addDetails(Any.pack(storageError))
// .build();
//
// StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto);
//
// testBigQueryWrite.addResponse(createAppendResponse(0));
// testBigQueryWrite.addException(exception);
//
// ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
// ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
//
// assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
// Exceptions.SchemaMismatchedException actualError =
// assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
// assertEquals("foobar", actualError.getStreamName());
//
// writer.close();
// }

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,9 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// TODO(stephwang): update test case when toStroageException is updated
assertEquals(Exceptions.SchemaMismatchedException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema");
.contains("Schema mismatch due to extra fields in user schema");
}
}
}
Expand Down Expand Up @@ -774,10 +773,8 @@ public void testStreamFinalizedError()
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// //TODO(stephwang): update test case when toStroageException is updated
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended");
assertEquals(Exceptions.StreamFinalizedException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage()).contains("Stream is finalized");
}
}
}
Expand Down