Skip to content

Commit

Permalink
Move inflight quota wait to separate method.
Browse files Browse the repository at this point in the history
  • Loading branch information
yayi-google committed Feb 23, 2021
1 parent 318c302 commit 74bb949
Showing 1 changed file with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,30 +215,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
this.inflightBytes += requestWrapper.messageSize;
waitingRequestQueue.addLast(requestWrapper);
hasMessageInWaitingQueue.signal();

// Maybe block until we are below inflight limit.
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warning(
"Interrupted while waiting for inflight quota. Stream: "
+ streamName
+ " Error: "
+ e.toString());
throw new StatusRuntimeException(
Status.fromCode(Code.CANCELLED)
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
}
maybeWaitForInflightQuota();
return requestWrapper.appendResult;
} finally {
this.lock.unlock();
}
}

@GuardedBy("lock")
private void maybeWaitForInflightQuota() {
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warning(
"Interrupted while waiting for inflight quota. Stream: "
+ streamName
+ " Error: "
+ e.toString());
throw new StatusRuntimeException(
Status.fromCode(Code.CANCELLED)
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
}
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down

0 comments on commit 74bb949

Please sign in to comment.