From d62a1d3cd28e853037b3b2f6299b7f4e3b8d4a84 Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Fri, 8 Mar 2024 19:18:12 -0800 Subject: [PATCH] fix: also shutdown the stream connection in case the timeout exception is triggered. --- .../cloud/bigquery/storage/v1/ConnectionWorker.java | 9 +++++++-- .../cloud/bigquery/storage/v1/ConnectionWorkerTest.java | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4608dc942a..cbfe175193 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -353,7 +353,7 @@ public void run() { } finally { lock.unlock(); } - cleanupInflightRequests(); + cleanup(/* waitForDone= */ false); }); this.appendThread.start(); } @@ -812,7 +812,10 @@ private void appendLoop() { this.streamConnection.send(originalRequestBuilder.build()); } } + cleanup(/* waitForDone= */ true); + } + private void cleanup(boolean waitForDone) { log.info( "Cleanup starts. Stream: " + streamName @@ -828,7 +831,9 @@ private void appendLoop() { // We can close the stream connection and handle the remaining inflight requests. if (streamConnection != null) { this.streamConnection.close(); - waitForDoneCallback(3, TimeUnit.MINUTES); + if (waitForDone) { + waitForDoneCallback(3, TimeUnit.MINUTES); + } } // At this point, there cannot be more callback. It is safe to clean up all inflight requests. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 71e4d47673..26214b5cd1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -650,9 +650,9 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E null, client.getSettings(), retrySettings); - testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(2)); - long appendCount = 10; + long appendCount = 2; for (int i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); } @@ -691,6 +691,10 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E 100) .get()); assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + + // Verify we can shutdown normally. + connectionWorker.close(); + assertTrue(connectionWorker.isUserClosed()); } @Test