-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add client shutdown if request waiting in request queue for too long. #2017
Conversation
prerequisite for multiplexing client
new stream name as a switch of destinationt
also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset
possible the proto schema does not contain this field
Duration milliSinceLastCallback = | ||
Duration.between(lastRequestCallbackTriggerTime, Instant.now()); | ||
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) { | ||
throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is really an IllegalStateException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use a generic runtime exception, the goal is only to trigger uncaught exception handler
@@ -659,6 +678,18 @@ private void appendLoop() { | |||
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId); | |||
} | |||
|
|||
private void throwIfWaitCallbackTooLong() { | |||
Duration milliSinceLastCallback = | |||
Duration.between(lastRequestCallbackTriggerTime, Instant.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a StreamWriter is created and then there is a pause before using it, won't this trigger? (odd scenario, I know). Actually if a user ever pauses for 10 minutes, won't this cause the exception the be thrown the next time they use the StreamWriter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller of the throwIfWaitCallbackTooLong is checking on the inflight queue size before triggering this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, would it make sense to store a timestamp in the RequestWrapper itself when it is added to the inflightRequestQueue? Since the queue is in order, you then can just check the timestamp at the head of the queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, but append will add to this queue, right? So if append is called after a long delay, the appendLoop might hit this condition (if the appendLoop runs before the callback comes back).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a storing timestamp directly during creation of request wrapper
private void throwIfWaitCallbackTooLong(Instant timeToCheck) { | ||
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now()); | ||
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) { | ||
throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that a RuntimeException will be caught and will cancel things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) { | ||
this.appendResult = SettableApiFuture.create(); | ||
this.message = message; | ||
this.messageSize = message.getProtoRows().getSerializedSize(); | ||
this.streamWriter = streamWriter; | ||
this.requestCreationTimeStamp = Instant.now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is 100% right, since this gets called when the request is added to waitingRequestQueue. I think we want a timestamp that is set close to when the actual RPC goes out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to start counting time when user call Append. But let's change to set the timestamp when the request is added to the inflight queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to detect stuck RPCs. If things are stuck in the waitingRequestQueue (and nothing is stuck in the inflight queue), then presumably something else is very wrong.
* We will constantly checking how much time we have been waiting for the next request callback | ||
* if we wait too much time we will start shutting down the connections and clean up the queues. | ||
*/ | ||
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep it safe, let's put a longer time say 30 minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use 20 minutes, 30 minutes sound crazy long for a pipeline to stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the user waits forever on the queue, I am just afraid some user does crazy things. I am fine with 20 min.
🤖 I have created a release *beep* *boop* --- ## [2.33.0](https://togithub.com/googleapis/java-bigquerystorage/compare/v2.32.1...v2.33.0) (2023-03-01) ### Features * Add header back to the client ([#2016](https://togithub.com/googleapis/java-bigquerystorage/issues/2016)) ([de00447](https://togithub.com/googleapis/java-bigquerystorage/commit/de00447958e5939d7be9d0f7da02323aabbfed8c)) ### Bug Fixes * Add client shutdown if request waiting in request queue for too long. ([#2017](https://togithub.com/googleapis/java-bigquerystorage/issues/2017)) ([91da88b](https://togithub.com/googleapis/java-bigquerystorage/commit/91da88b0ed914bf55111dd9cef2a3fc4b27c3443)) * Allow StreamWriter settings to override passed in BQ client setting ([#2001](https://togithub.com/googleapis/java-bigquerystorage/issues/2001)) ([66db8fe](https://togithub.com/googleapis/java-bigquerystorage/commit/66db8fed26474076fb5aaca5044d39e11f6ef28d)) * Catch uncaught exception from append loop and add expoential retry to reconnection ([#2015](https://togithub.com/googleapis/java-bigquerystorage/issues/2015)) ([35db0fb](https://togithub.com/googleapis/java-bigquerystorage/commit/35db0fb38a929a8f3e4db30ee173ce5a4af43d64)) * Remove write_location header pending discussion ([#2021](https://togithub.com/googleapis/java-bigquerystorage/issues/2021)) ([0941d43](https://togithub.com/googleapis/java-bigquerystorage/commit/0941d4363daf782e0be81c11fdf6a2fe0ff4d7ac)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
this is needed as offline investigation, it's possible for grpc streaming lib to never trigger doneCallback / requestCallback under some scenario, e.g. when there's too many GAX threads. We should add this timeout to help connection spit out the requests on dead connections
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> ☕️
If you write sample code, please follow the samples format.