diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 198088e1e4..1d001d5bbb 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -215,30 +215,33 @@ public ApiFuture append(AppendRowsRequest message) { this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); hasMessageInWaitingQueue.signal(); - - // Maybe block until we are below inflight limit. - while (this.inflightRequests >= this.maxInflightRequests - || this.inflightBytes >= this.maxInflightBytes) { - try { - inflightReduced.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for inflight quota. Stream: " - + streamName - + " Error: " - + e.toString()); - throw new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withCause(e) - .withDescription("Interrupted while waiting for quota.")); - } - } + maybeWaitForInflightQuota(); return requestWrapper.appendResult; } finally { this.lock.unlock(); } } + @GuardedBy("lock") + private void maybeWaitForInflightQuota() { + while (this.inflightRequests >= this.maxInflightRequests + || this.inflightBytes >= this.maxInflightBytes) { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); + } + } + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() {