diff --git a/README.md b/README.md index a14e35b800..9016ace6b1 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies ```Groovy -implementation platform('com.google.cloud:libraries-bom:24.0.0') +implementation platform('com.google.cloud:libraries-bom:24.1.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.6.3' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.7.0' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.6.3" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.7.0" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 0e0010db2a..3df82733ba 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -20,6 +20,7 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.util.Errors; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; @@ -90,6 +91,26 @@ public class StreamWriter implements AutoCloseable { @GuardedBy("lock") private long inflightBytes = 0; + /* + * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the + * count hits a threshold. Streaming should only be halted, if it isn't possible to establish a + * connection. Keep track of the number of reconnections in succession. This will be reset if + * a row is successfully called back. + */ + @GuardedBy("lock") + private long conectionRetryCountWithoutCallback = 0; + + /* + * If false, streamConnection needs to be reset. + */ + @GuardedBy("lock") + private boolean streamConnectionIsConnected = false; + + /* + * Retry threshold, limits how often the connection is retried before processing halts. + */ + private static final long RETRY_THRESHOLD = 3; + /* * Indicates whether user has called Close() or not. */ @@ -173,6 +194,18 @@ private StreamWriter(Builder builder) throws IOException { this.ownsBigQueryWriteClient = false; } + this.appendThread = + new Thread( + new Runnable() { + @Override + public void run() { + appendLoop(); + } + }); + this.appendThread.start(); + } + + private void resetConnection() { this.streamConnection = new StreamConnection( this.client, @@ -188,15 +221,6 @@ public void run(Throwable finalStatus) { doneCallback(finalStatus); } }); - this.appendThread = - new Thread( - new Runnable() { - @Override - public void run() { - appendLoop(); - } - }); - this.appendThread.start(); } /** @@ -331,12 +355,27 @@ public void close() { * It takes requests from waiting queue and sends them to server. */ private void appendLoop() { - boolean isFirstRequestInConnection = true; Deque localQueue = new LinkedList(); + boolean streamNeedsConnecting = false; + // Set firstRequestInConnection to true immediately after connecting the steam, + // indicates then next row sent, needs the schema and other metadata. + boolean isFirstRequestInConnection = true; while (!waitingQueueDrained()) { this.lock.lock(); try { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); + // Copy the streamConnectionIsConnected guarded by lock to a local variable. + // In addition, only reconnect if there is a retriable error. + streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null; + if (streamNeedsConnecting) { + // If the stream connection is broken, any requests on inflightRequestQueue will need + // to be resent, as the new connection has no knowledge of the requests. Copy the requests + // from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be + // prepended as they need to be sent before new requests. + while (!inflightRequestQueue.isEmpty()) { + waitingRequestQueue.addFirst(inflightRequestQueue.pollLast()); + } + } while (!this.waitingRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); this.inflightRequestQueue.addLast(requestWrapper); @@ -355,12 +394,34 @@ private void appendLoop() { if (localQueue.isEmpty()) { continue; } - - // TODO: Add reconnection here. + if (streamNeedsConnecting) { + // Set streamConnectionIsConnected to true, to indicate the stream has been connected. This + // should happen before the call to resetConnection. As it is unknown when the connection + // could be closed and the doneCallback called, and thus clearing the flag. + lock.lock(); + try { + this.streamConnectionIsConnected = true; + } finally { + lock.unlock(); + } + resetConnection(); + // Set firstRequestInConnection to indicate the next request to be sent should include + // metedata. + isFirstRequestInConnection = true; + } while (!localQueue.isEmpty()) { AppendRowsRequest preparedRequest = prepareRequestBasedOnPosition( localQueue.pollFirst().message, isFirstRequestInConnection); + // Send should only throw an exception if there is a problem with the request. The catch + // block will handle this case, and return the exception with the result. + // Otherwise send will return: + // SUCCESS: Message was sent, wait for the callback. + // STREAM_CLOSED: Stream was closed, normally or due to en error + // NOT_ENOUGH_QUOTA: Message wasn't sent due to not enough quota. + // TODO: Handle NOT_ENOUGH_QUOTA. + // In the close case, the request is in the inflight queue, and will either be returned + // to the user with an error, or will be resent. this.streamConnection.send(preparedRequest); isFirstRequestInConnection = false; } @@ -369,8 +430,10 @@ private void appendLoop() { log.fine("Cleanup starts. Stream: " + streamName); // At this point, the waiting queue is drained, so no more requests. // We can close the stream connection and handle the remaining inflight requests. - this.streamConnection.close(); - waitForDoneCallback(); + if (streamConnection != null) { + this.streamConnection.close(); + waitForDoneCallback(); + } // At this point, there cannot be more callback. It is safe to clean up all inflight requests. log.fine( @@ -455,6 +518,12 @@ private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); try { + // Had a successful connection with at least one result, reset retries. + // conectionRetryCountWithoutCallback is reset so that only multiple retries, without + // successful records sent, will cause the stream to fail. + if (conectionRetryCountWithoutCallback != 0) { + conectionRetryCountWithoutCallback = 0; + } requestWrapper = pollInflightRequestQueue(); } finally { this.lock.unlock(); @@ -476,6 +545,14 @@ private void requestCallback(AppendRowsResponse response) { } } + private boolean isRetriableError(Throwable t) { + Status status = Status.fromThrowable(t); + if (Errors.isRetryableInternalStatus(status)) { + return true; + } + return status.getCode() == Status.Code.ABORTED || status.getCode() == Status.Code.UNAVAILABLE; + } + private void doneCallback(Throwable finalStatus) { log.fine( "Received done callback. Stream: " @@ -484,7 +561,26 @@ private void doneCallback(Throwable finalStatus) { + finalStatus.toString()); this.lock.lock(); try { - this.connectionFinalStatus = finalStatus; + this.streamConnectionIsConnected = false; + if (connectionFinalStatus == null) { + // If the error can be retried, don't set it here, let it try to retry later on. + if (isRetriableError(finalStatus) + && conectionRetryCountWithoutCallback < RETRY_THRESHOLD + && !userClosed) { + this.conectionRetryCountWithoutCallback++; + log.fine( + "Retriable error " + + finalStatus.toString() + + " received, retry count " + + conectionRetryCountWithoutCallback + + " for stream " + + streamName); + } else { + this.connectionFinalStatus = finalStatus; + log.info( + "Stream finished with error " + finalStatus.toString() + " for stream " + streamName); + } + } } finally { this.lock.unlock(); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java index 8a6a8d3d98..5ba2f2aa1e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java @@ -79,14 +79,22 @@ public void reset() { serviceImpl.reset(); } - public void setResponseDelay(Duration delay) { - serviceImpl.setResponseDelay(delay); - } - public void setResponseSleep(Duration sleep) { serviceImpl.setResponseSleep(sleep); } + public void setCloseEveryNAppends(long closeAfter) { + serviceImpl.setCloseEveryNAppends(closeAfter); + } + + public void setTimesToClose(long numberTimesToClose) { + serviceImpl.setTimesToClose(numberTimesToClose); + } + + public long getConnectionCount() { + return serviceImpl.getConnectionCount(); + } + public void setExecutor(ScheduledExecutorService executor) { serviceImpl.setExecutor(executor); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java index ce59e02663..5d8f05fff5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java @@ -17,6 +17,7 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.Uninterruptibles; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; @@ -45,11 +46,16 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private final AtomicInteger nextMessageId = new AtomicInteger(1); private boolean autoPublishResponse; private ScheduledExecutorService executor = null; - private Duration responseDelay = Duration.ZERO; private Duration responseSleep = Duration.ZERO; private Semaphore responseSemaphore = new Semaphore(0, true); + private long numberTimesToClose = 0; + private long closeAfter = 0; + private long recordCount = 0; + private long connectionCount = 0; + private boolean firstRecord = false; + /** Class used to save the state of a possible response. */ private static class Response { Optional appendResponse; @@ -120,38 +126,51 @@ public void waitForResponseScheduled() throws InterruptedException { responseSemaphore.acquire(); } + /* Return the number of times the stream was connected. */ + public long getConnectionCount() { + return connectionCount; + } + @Override public StreamObserver appendRows( final StreamObserver responseObserver) { + this.connectionCount++; + this.firstRecord = true; StreamObserver requestObserver = new StreamObserver() { @Override public void onNext(AppendRowsRequest value) { LOG.fine("Get request:" + value.toString()); - final Response response = responses.remove(); requests.add(value); + recordCount++; if (responseSleep.compareTo(Duration.ZERO) > 0) { - LOG.info("Sleeping before response for " + responseSleep.toString()); + LOG.fine("Sleeping before response for " + responseSleep.toString()); Uninterruptibles.sleepUninterruptibly( responseSleep.toMillis(), TimeUnit.MILLISECONDS); } - if (responseDelay == Duration.ZERO) { - sendResponse(response, responseObserver); + if (firstRecord) { + if (!value.getProtoRows().hasWriterSchema() || value.getWriteStream().isEmpty()) { + LOG.info( + String.valueOf( + !value.getProtoRows().hasWriterSchema() + || value.getWriteStream().isEmpty())); + responseObserver.onError( + Status.INVALID_ARGUMENT + .withDescription("Unexpected first request: " + value.toString()) + .asException()); + return; + } + } + firstRecord = false; + if (closeAfter > 0 + && recordCount % closeAfter == 0 + && (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) { + LOG.info("Shutting down connection from test..."); + responseObserver.onError(Status.ABORTED.asException()); } else { - final Response responseToSend = response; - // TODO(yirutang): This is very wrong because it messes up response/complete ordering. - LOG.fine("Schedule a response to be sent at delay"); - executor.schedule( - new Runnable() { - @Override - public void run() { - sendResponse(responseToSend, responseObserver); - } - }, - responseDelay.toMillis(), - TimeUnit.MILLISECONDS); + final Response response = responses.remove(); + sendResponse(response, responseObserver); } - responseSemaphore.release(); } @Override @@ -183,12 +202,6 @@ public FakeBigQueryWriteImpl setExecutor(ScheduledExecutorService executor) { return this; } - /** Set an amount of time by which to delay publish responses. */ - public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) { - this.responseDelay = responseDelay; - return this; - } - /** Set an amount of time by which to sleep before publishing responses. */ public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) { this.responseSleep = responseSleep; @@ -231,4 +244,29 @@ public void reset() { requests.clear(); responses.clear(); } + + /* Abort the stream after N records. The primary use case is to test the retry logic. After N + * records are sent, the stream will be aborted with Code.ABORTED. This is a retriable error. + * The abort will call the onDone callback immediately, and thus potentially losing some messages + * that have already been sent. If the value of closeAfter is too small, the client might not get + * a chance to process any records before a subsequent abort is sent. Which means multiple retries + * in a row on the client side. After 3 retries in a row the write will fail. + * closeAfter should be large enough to give the client some opportunity to receive some of the + * messages. + **/ + public void setCloseEveryNAppends(long closeAfter) { + this.closeAfter = closeAfter; + } + /* If setCloseEveryNAppends is greater than 0, then the stream will be aborted every N appends. + * setTimesToClose will limit the number of times to do the abort. If it is set to 0, it will + * abort every N appends. + * The primary use cases is, send a couple of records, then abort. But if there are only a couple + * of records, it is possible these two records are sent, then the abort happens before those two + * records are processed by the client, requiring them to be sent again, and thus a potential + * infinite loop. Therefore set the times to close to 1. This will send the two records, force + * an abort an retry, and then reprocess the records to completion. + **/ + public void setTimesToClose(long numberTimesToClose) { + this.numberTimesToClose = numberTimesToClose; + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index b95c981bf3..9310b70b24 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -134,6 +134,11 @@ private ApiFuture sendTestMessage(StreamWriter writer, Strin return writer.append(createProtoRows(messages), -1); } + private ApiFuture sendTestMessage( + StreamWriter writer, String[] messages, long offset) { + return writer.append(createProtoRows(messages), offset); + } + private static T assertFutureException( Class expectedThrowable, final Future future) { return assertThrows( @@ -596,4 +601,41 @@ public void testMessageTooLarge() throws Exception { writer.close(); } + + @Test + public void testAppendWithResetSuccess() throws Exception { + try (StreamWriter writer = getTestStreamWriter()) { + testBigQueryWrite.setCloseEveryNAppends(113); + long appendCount = 10000; + for (long i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + List> futures = new ArrayList<>(); + for (long i = 0; i < appendCount; i++) { + futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}, i)); + } + for (int i = 0; i < appendCount; i++) { + assertEquals(futures.get(i).get().getAppendResult().getOffset().getValue(), (long) i); + } + assertTrue(testBigQueryWrite.getConnectionCount() >= (int) (appendCount / 113.0)); + } + } + + // This test is setup for the server to force a retry after all records are sent. Ensure the + // records are resent, even if no new records are appeneded. + @Test + public void testRetryAfterAllRecordsInflight() throws Exception { + try (StreamWriter writer = getTestStreamWriter()) { + testBigQueryWrite.setCloseEveryNAppends(2); + testBigQueryWrite.setTimesToClose(1); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 0); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}, 1); + TimeUnit.SECONDS.sleep(1); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + } + } }