Skip to content

Commit

Permalink
feat: add a indicator of how much time a request is waiting for infli…
Browse files Browse the repository at this point in the history
…ght limit (#1514)


Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️
  • Loading branch information
yirutang authored Feb 7, 2022
1 parent 5eaf38e commit 54874be
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ public Descriptor getDescriptor() {
return this.descriptor;
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
* constructing the Writer). The value is the wait time for the last sent request. A constant high
* wait value indicates a need for more throughput, you can create a new Stream for to increase
* the throughput in exclusive stream case, or create a new Writer in the default stream case.
*/
public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -159,6 +160,11 @@ public class StreamWriter implements AutoCloseable {
*/
private Thread appendThread;

/*
* The inflight wait time for the previous sent request.
*/
private final AtomicLong inflightWaitSec = new AtomicLong(0);

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -316,6 +322,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)

@GuardedBy("lock")
private void maybeWaitForInflightQuota() {
long start_time = System.currentTimeMillis();
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
try {
Expand All @@ -332,6 +339,19 @@ private void maybeWaitForInflightQuota() {
.withDescription("Interrupted while waiting for quota."));
}
}
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
* constructing the StreamWriter). The value is the wait time for the last sent request. A
* constant high wait value indicates a need for more throughput, you can create a new Stream for
* to increase the throughput in exclusive stream case, or create a new Writer in the default
* stream case.
*/
public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}

/** Close the stream writer. Shut down all resources. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,8 @@ public void testOneMaxInflightRequests() throws Exception {
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
testBigQueryWrite.addResponse(createAppendResponse(0));

long appendStartTimeMs = System.currentTimeMillis();
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
assertTrue(appendElapsedMs >= 1000);
assertTrue(writer.getInflightWaitSeconds() >= 1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
writer.close();
}
Expand Down

0 comments on commit 54874be

Please sign in to comment.