diff --git a/README.md b/README.md index 402a638de0..b208a29cdd 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.2' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.2" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4e17850511..69aef0527c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -40,6 +40,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -63,6 +65,7 @@ public class ConnectionWorker implements AutoCloseable { private Condition hasMessageInWaitingQueue; private Condition inflightReduced; private static Duration maxRetryDuration = Duration.ofMinutes(5); + private ExecutorService threadPool = Executors.newFixedThreadPool(1); /* * The identifier of the current stream to write to. This stream name can change during @@ -288,7 +291,7 @@ private ApiFuture appendInternal(AppendRowsRequest message) requestWrapper.appendResult.setException( new Exceptions.StreamWriterClosedException( Status.fromCode(Status.Code.FAILED_PRECONDITION) - .withDescription("Connection is already closed"), + .withDescription("Connection is already closed during append"), streamName, writerId)); return requestWrapper.appendResult; @@ -382,6 +385,18 @@ public void close() { this.client.awaitTermination(150, TimeUnit.SECONDS); } catch (InterruptedException ignored) { } + + try { + threadPool.shutdown(); + threadPool.awaitTermination(3, TimeUnit.MINUTES); + } catch (InterruptedException e) { + // Unexpected. Just swallow the exception with logging. + log.warning( + "Close on thread pool for " + + streamName + + " is interrupted with exception: " + + e.toString()); + } } /* @@ -639,35 +654,44 @@ private void requestCallback(AppendRowsResponse response) { } finally { this.lock.unlock(); } - if (response.hasError()) { - Exceptions.StorageException storageException = - Exceptions.toStorageException(response.getError(), null); - log.fine(String.format("Got error message: %s", response.toString())); - if (storageException != null) { - requestWrapper.appendResult.setException(storageException); - } else if (response.getRowErrorsCount() > 0) { - Map rowIndexToErrorMessage = new HashMap<>(); - for (int i = 0; i < response.getRowErrorsCount(); i++) { - RowError rowError = response.getRowErrors(i); - rowIndexToErrorMessage.put(Math.toIntExact(rowError.getIndex()), rowError.getMessage()); - } - AppendSerializtionError exception = - new AppendSerializtionError( - response.getError().getCode(), - response.getError().getMessage(), - streamName, - rowIndexToErrorMessage); - requestWrapper.appendResult.setException(exception); - } else { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); - requestWrapper.appendResult.setException(exception); - } - } else { - requestWrapper.appendResult.set(response); - } + + // We need a separte thread pool to unblock the next request callback. + // Otherwise user may call append inside request callback, which may be blocked on waiting + // on in flight quota, causing deadlock as requests can't be popped out of queue until + // the current request callback finishes. + threadPool.submit( + () -> { + if (response.hasError()) { + Exceptions.StorageException storageException = + Exceptions.toStorageException(response.getError(), null); + log.fine(String.format("Got error message: %s", response.toString())); + if (storageException != null) { + requestWrapper.appendResult.setException(storageException); + } else if (response.getRowErrorsCount() > 0) { + Map rowIndexToErrorMessage = new HashMap<>(); + for (int i = 0; i < response.getRowErrorsCount(); i++) { + RowError rowError = response.getRowErrors(i); + rowIndexToErrorMessage.put( + Math.toIntExact(rowError.getIndex()), rowError.getMessage()); + } + AppendSerializtionError exception = + new AppendSerializtionError( + response.getError().getCode(), + response.getError().getMessage(), + streamName, + rowIndexToErrorMessage); + requestWrapper.appendResult.setException(exception); + } else { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + requestWrapper.appendResult.setException(exception); + } + } else { + requestWrapper.appendResult.set(response); + } + }); } private boolean isRetriableError(Throwable t) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index f8822e231f..d271fd99d5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -20,7 +20,10 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.MockGrpcService; @@ -34,6 +37,7 @@ import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.base.Strings; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; @@ -282,6 +286,64 @@ public void testAppendSuccess() throws Exception { writer.close(); } + @Test + public void testAppendSuccess_RetryDirectlyInCallback() throws Exception { + // Set a relatively small in flight request counts. + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) + .setMaxInflightRequests(5) + .build(); + + // Fail the first request, in the request callback of the first request we will insert another + // 10 requests. Those requests can't be processed until the previous request callback has + // been finished. + long appendCount = 20; + for (int i = 0; i < appendCount; i++) { + if (i == 0) { + testBigQueryWrite.addResponse( + createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message")); + } + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // We will trigger 10 more requests in the request callback of the following request. + ProtoRows protoRows = createProtoRows(new String[] {String.valueOf(-1)}); + ApiFuture future = writer.append(protoRows, -1); + ApiFutures.addCallback( + future, new AppendCompleteCallback(writer, protoRows), MoreExecutors.directExecutor()); + + StatusRuntimeException actualError = + assertFutureException(StatusRuntimeException.class, future); + + Sleeper.DEFAULT.sleep(1000); + writer.close(); + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final StreamWriter mainStreamWriter; + private final ProtoRows protoRows; + private int retryCount = 0; + + public AppendCompleteCallback(StreamWriter mainStreamWriter, ProtoRows protoRows) { + this.mainStreamWriter = mainStreamWriter; + this.protoRows = protoRows; + } + + public void onSuccess(AppendRowsResponse response) { + // Donothing + } + + public void onFailure(Throwable throwable) { + for (int i = 0; i < 10; i++) { + this.mainStreamWriter.append(protoRows); + } + } + } + @Test public void testUpdatedSchemaFetch_multiplexing() throws Exception { testUpdatedSchemaFetch(/*enableMultiplexing=*/ true); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index d8f0cc38b5..f5f357238a 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -39,8 +39,6 @@ import io.grpc.Status.Code; import java.io.IOException; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; @@ -188,8 +186,6 @@ static class AppendCompleteCallback implements ApiFutureCallback { - try { - // Since default stream appends are not ordered, we can simply retry the - // appends. - // Retrying with exclusive streams requires more careful consideration. - this.parent.append(appendContext); - } catch (Exception e) { - // Fall through to return error. - System.out.format("Failed to retry append: %s%n", e); - } - }); - // Mark the existing attempt as done since it's being retried. - done(); - return; + try { + // Since default stream appends are not ordered, we can simply retry the appends. + // Retrying with exclusive streams requires more careful consideration. + this.parent.append(appendContext); + // Mark the existing attempt as done since it's being retried. + done(); + return; + } catch (Exception e) { + // Fall through to return error. + System.out.format("Failed to retry append: %s\n", e); + } } if (throwable instanceof AppendSerializtionError) { @@ -241,21 +232,19 @@ public void onFailure(Throwable throwable) { } } - // Mark the existing attempt as done since we got a response for it - done(); - // Retry the remaining valid rows, but using a separate thread to // avoid potentially blocking while we are in a callback. if (dataNew.length() > 0) { - pool.submit( - () -> { - try { - this.parent.append(new AppendContext(dataNew, 0)); - } catch (Exception e2) { - System.out.format("Failed to retry append with filtered rows: %s%n", e2); - } - }); + try { + this.parent.append(new AppendContext(dataNew, 0)); + } catch (DescriptorValidationException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } } + // Mark the existing attempt as done since we got a response for it + done(); return; } } @@ -267,7 +256,6 @@ public void onFailure(Throwable throwable) { (storageException != null) ? storageException : new RuntimeException(throwable); } } - System.out.format("Error that arrived: %s%n", throwable); done(); }