From 54874be945c2b88be6be03ae654277445c17741d Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Mon, 7 Feb 2022 08:16:55 -0800 Subject: [PATCH] feat: add a indicator of how much time a request is waiting for inflight limit (#1514) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ --- .../bigquery/storage/v1/JsonStreamWriter.java | 11 ++++++++++ .../bigquery/storage/v1/StreamWriter.java | 20 +++++++++++++++++++ .../bigquery/storage/v1/StreamWriterTest.java | 4 +--- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index b43676ed18..31d1d2493f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -167,6 +167,17 @@ public Descriptor getDescriptor() { return this.descriptor; } + /** + * Returns the wait of a request in Client side before sending to the Server. Request could wait + * in Client because it reached the client side inflight request limit (adjustable when + * constructing the Writer). The value is the wait time for the last sent request. A constant high + * wait value indicates a need for more throughput, you can create a new Stream for to increase + * the throughput in exclusive stream case, or create a new Writer in the default stream case. + */ + public long getInflightWaitSeconds() { + return streamWriter.getInflightWaitSeconds(); + } + /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 5540046c26..03b1c69573 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -34,6 +34,7 @@ import java.util.Deque; import java.util.LinkedList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -159,6 +160,11 @@ public class StreamWriter implements AutoCloseable { */ private Thread appendThread; + /* + * The inflight wait time for the previous sent request. + */ + private final AtomicLong inflightWaitSec = new AtomicLong(0); + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -316,6 +322,7 @@ private ApiFuture appendInternal(AppendRowsRequest message) @GuardedBy("lock") private void maybeWaitForInflightQuota() { + long start_time = System.currentTimeMillis(); while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) { try { @@ -332,6 +339,19 @@ private void maybeWaitForInflightQuota() { .withDescription("Interrupted while waiting for quota.")); } } + inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); + } + + /** + * Returns the wait of a request in Client side before sending to the Server. Request could wait + * in Client because it reached the client side inflight request limit (adjustable when + * constructing the StreamWriter). The value is the wait time for the last sent request. A + * constant high wait value indicates a need for more throughput, you can create a new Stream for + * to increase the throughput in exclusive stream case, or create a new Writer in the default + * stream case. + */ + public long getInflightWaitSeconds() { + return inflightWaitSec.longValue(); } /** Close the stream writer. Shut down all resources. */ diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index bb77b8702a..9658f18744 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -489,10 +489,8 @@ public void testOneMaxInflightRequests() throws Exception { testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); testBigQueryWrite.addResponse(createAppendResponse(0)); - long appendStartTimeMs = System.currentTimeMillis(); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs; - assertTrue(appendElapsedMs >= 1000); + assertTrue(writer.getInflightWaitSeconds() >= 1); assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); writer.close(); }