From cd0e7e27728c7947254d2c2c421d1b1d4fe73997 Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Fri, 8 Mar 2024 19:18:12 -0800 Subject: [PATCH] fix: also shutdown the stream connection in case the timeout exception is triggered. --- .../bigquery/storage/v1/ConnectionWorker.java | 10 ++++++++-- .../storage/v1/ConnectionWorkerTest.java | 19 ++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4608dc942a..67b4bc7dc5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -353,7 +353,8 @@ public void run() { } finally { lock.unlock(); } - cleanupInflightRequests(); + cleanupConnectionAndRequests( + /* avoidBlocking= */ true); // don't perform blocking operations while on user thread }); this.appendThread.start(); } @@ -812,7 +813,10 @@ private void appendLoop() { this.streamConnection.send(originalRequestBuilder.build()); } } + cleanupConnectionAndRequests(/* avoidBlocking= */ false); + } + private void cleanupConnectionAndRequests(boolean avoidBlocking) { log.info( "Cleanup starts. Stream: " + streamName @@ -828,7 +832,9 @@ private void appendLoop() { // We can close the stream connection and handle the remaining inflight requests. if (streamConnection != null) { this.streamConnection.close(); - waitForDoneCallback(3, TimeUnit.MINUTES); + if (!avoidBlocking) { + waitForDoneCallback(3, TimeUnit.MINUTES); + } } // At this point, there cannot be more callback. It is safe to clean up all inflight requests. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 71e4d47673..3dab071d0d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -650,14 +650,15 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E null, client.getSettings(), retrySettings); - testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2); + testBigQueryWrite.setResponseSleep(durationSleep); - long appendCount = 10; + long appendCount = 2; for (int i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); } - // In total insert 5 requests, + // In total insert 'appendCount' requests, List> futures = new ArrayList<>(); for (int i = 0; i < appendCount; i++) { futures.add( @@ -691,6 +692,18 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E 100) .get()); assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + + // Verify we can shutdown normally within the expected time. + long startCloseTime = System.currentTimeMillis(); + connectionWorker.close(); + long timeDiff = System.currentTimeMillis() - startCloseTime; + assertTrue( + "timeDiff: " + + timeDiff + + " is more than total durationSleep: " + + (appendCount * durationSleep.toMillis()), + timeDiff <= (appendCount * durationSleep.toMillis())); + assertTrue(connectionWorker.isUserClosed()); } @Test