From 1726c85ee7d3b1be58e6616de199d6cc2c01959f Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 29 Mar 2023 18:54:08 -0700 Subject: [PATCH] feat: add public api to stream writer to set the maximum wait time --- .../bigquery/storage/v1/ConnectionWorker.java | 2 +- .../bigquery/storage/v1/StreamWriter.java | 10 ++++++ .../bigquery/storage/v1/StreamWriterTest.java | 31 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 1aeb911943..12afbf13e0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -75,7 +75,7 @@ class ConnectionWorker implements AutoCloseable { * We will constantly checking how much time we have been waiting for the next request callback * if we wait too much time we will start shutting down the connections and clean up the queues. */ - private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15); + static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15); private Lock lock; private Condition hasMessageInWaitingQueue; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index b21a52a63d..bfa30c6141 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -518,6 +518,16 @@ public synchronized TableSchema getUpdatedSchema() { : null; } + /** + * Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very + * low chance, it's possible for append request to be waiting indefintely for request callback + * when Google networking SDK does not detect the networking breakage. The default timeout is 15 + * minutes. We are investigating the root cause for callback not triggered by networking SDK. + */ + public static void setMaxRequestCallbackWaitTime(Duration waitTime) { + ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime; + } + long getCreationTimestamp() { return creationTimestamp; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index af36273102..bc6dd71690 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigquery.storage.v1; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -113,6 +114,7 @@ public StreamWriterTest() throws DescriptorValidationException {} @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); + StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(10000)); ConnectionWorker.setMaxInflightQueueWaitTime(300000); serviceHelper = new MockServiceHelper( @@ -947,6 +949,35 @@ public void testMessageTooLarge() throws Exception { writer.close(); } + @Test + public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(1)); + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + + for (int i = 0; i < appendCount; i++) { + int finalI = i; + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); + assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + } + } + @Test public void testAppendWithResetSuccess() throws Exception { try (StreamWriter writer = getTestStreamWriter()) {