-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Unary Callables Deadline values respect the TotalTimeout in RetrySettings #1603
Changes from 137 commits
dca83d7
45c0765
7862762
7f5622f
94b5017
608d44b
0c1782f
f75f67e
7c4c841
3668093
d440b8e
bbf7f4f
bee8c0e
73882a7
b0d83b9
fc5cf13
2f1bc8a
ead0eb9
ca0770b
21ed291
bb39513
fab86be
95ef1a1
25190ec
f59de69
133af94
ff5108f
e800373
d757c05
f139797
e1d12c5
8ee7513
bed960e
5c5eaec
ab93117
f3d19b6
1e38e5f
5a10f4c
9b877ac
eff3513
1d1dffa
887f3bb
102ab99
3f4b9a5
dffe194
47c8da6
e5ccddd
fdf6c63
64b2c77
c5c2f54
d0893ea
1281ab5
1ae5d3d
36e3788
452fc97
0ad9442
0ad7a4d
a769ee1
65e9e67
0b926d5
6b70a50
9af022a
66c757c
fc59f98
b773f89
b571b6e
f97e7c0
1aec63c
de5927d
6a337c5
8be44c8
42dd7ed
1de756c
8756e27
51df7ea
a9ff512
8f1627e
becbc25
c1f609f
be1f9fb
490ccc1
3a9bdfd
87a41dc
da26a56
0978549
93c8fdc
2a9d2df
a517d95
0f8bbc8
c1f914f
6bb2d04
3da7937
d28b87f
7c316a1
ae3c2ec
3fb78f7
8cd3e3b
b8704ff
eb6635a
0b76faf
acc3ee1
8b6445e
d458167
09e7ff2
c20ee95
6552dbe
0454e02
a1dcfdd
f9afed4
1d5c00a
a7983d3
15448d2
924f7a0
f4ba8af
a305123
6e35172
d4bc792
ccdf1e2
561ffa3
3607a30
cb34160
cf5ba07
07ecd7a
b4812e3
ed6b53c
2095745
a8e3153
760f5fe
d890dee
3086821
084b592
145d084
e9c1645
600a2dc
5d81e85
c429d96
5527700
13881bb
c052a87
069acc6
6cc15fb
e099ac1
0a3cadf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,15 +33,19 @@ | |
import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType; | ||
import com.google.api.gax.httpjson.HttpRequestRunnable.ResultListener; | ||
import com.google.api.gax.httpjson.HttpRequestRunnable.RunnableResult; | ||
import com.google.api.gax.rpc.StatusCode; | ||
import com.google.common.base.Preconditions; | ||
import java.io.IOException; | ||
import java.io.InputStreamReader; | ||
import java.io.Reader; | ||
import java.nio.charset.StandardCharsets; | ||
import java.time.Duration; | ||
import java.util.ArrayDeque; | ||
import java.util.Queue; | ||
import java.util.concurrent.CancellationException; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import javax.annotation.Nullable; | ||
import javax.annotation.concurrent.GuardedBy; | ||
|
||
|
@@ -88,6 +92,7 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT> | |
private final ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor; | ||
private final HttpTransport httpTransport; | ||
private final Executor executor; | ||
private final ScheduledExecutorService deadlineCancellationExecutor; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we try to reuse the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha, makes sense. I'll take another look into how grpc handles the multiple executors. I'd imagine they would also possibly run into this issue. |
||
|
||
// | ||
// Request-specific data (provided by client code) before we get a response. | ||
|
@@ -114,19 +119,21 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT> | |
private ProtoMessageJsonStreamIterator responseStreamIterator; | ||
|
||
@GuardedBy("lock") | ||
private boolean closed; | ||
private volatile boolean closed; | ||
lqiu96 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
HttpJsonClientCallImpl( | ||
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor, | ||
String endpoint, | ||
HttpJsonCallOptions callOptions, | ||
HttpTransport httpTransport, | ||
Executor executor) { | ||
Executor executor, | ||
ScheduledExecutorService deadlineCancellationExecutor) { | ||
this.methodDescriptor = methodDescriptor; | ||
this.endpoint = endpoint; | ||
this.callOptions = callOptions; | ||
this.httpTransport = httpTransport; | ||
this.executor = executor; | ||
this.deadlineCancellationExecutor = deadlineCancellationExecutor; | ||
this.closed = false; | ||
} | ||
|
||
|
@@ -161,6 +168,38 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request | |
this.listener = responseListener; | ||
this.requestHeaders = requestHeaders; | ||
} | ||
|
||
// Use the timeout duration value instead of calculating the future Instant | ||
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings | ||
Duration timeout = callOptions.getTimeout(); | ||
if (timeout != null) { | ||
// The future timeout value is guaranteed to not be a negative value as the | ||
// RetryAlgorithm will not retry | ||
long timeoutMs = timeout.toMillis(); | ||
this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
// Notify the FutureListener that the there is a timeout exception from this RPC | ||
// call (DEADLINE_EXCEEDED). For retrying RPCs, this code is returned for every attempt | ||
// that exceeds the timeout. The RetryAlgorithm will check both the timing and code to | ||
// ensure another attempt is made. | ||
private void timeout() { | ||
// There is a race between the deadline scheduler and response being returned from | ||
// the server. The deadline scheduler has priority as it will clear out the pending | ||
// notifications queue and add the DEADLINE_EXCEEDED event once it is able to obtain | ||
// the lock. | ||
synchronized (lock) { | ||
close( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah the cancellation in the runnable doesn't seem like it's actually cancel's the runnable. I'll look into the possibility of being able to actually disconnect the connection instead of waiting for the socket timeout. |
||
StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), | ||
"Deadline exceeded", | ||
new HttpJsonStatusRuntimeException( | ||
StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), "Deadline exceeded", null), | ||
true); | ||
} | ||
|
||
// trigger delivery loop if not already running | ||
deliver(); | ||
} | ||
|
||
@Override | ||
|
@@ -260,9 +299,10 @@ private void deliver() { | |
throw new InterruptedException("Message delivery has been interrupted"); | ||
} | ||
|
||
// All listeners must be called under delivery loop (but outside the lock) to ensure that no | ||
// two notifications come simultaneously from two different threads and that we do not go | ||
// indefinitely deep in the stack if delivery logic is called recursively via listeners. | ||
// All listeners must be called under delivery loop (but outside the lock) to ensure that | ||
// no two notifications come simultaneously from two different threads and that we do not | ||
// go indefinitely deep in the stack if delivery logic is called recursively via | ||
// listeners. | ||
notifyListeners(); | ||
|
||
// The synchronized block around message reading and cancellation notification processing | ||
|
@@ -302,7 +342,7 @@ private void deliver() { | |
inDelivery = false; | ||
break; | ||
} else { | ||
// We still have some stuff in notiticationTasksQueue so continue the loop, most | ||
// We still have some stuff in notificationTasksQueue so continue the loop, most | ||
// likely we will finally terminate on the next cycle. | ||
continue; | ||
} | ||
|
@@ -319,8 +359,8 @@ private void deliver() { | |
// can do in such an unlikely situation (otherwise we would stay forever in the delivery | ||
// loop). | ||
synchronized (lock) { | ||
// Close the call immediately marking it cancelled. If already closed close() will have no | ||
// effect. | ||
// Close the call immediately marking it cancelled. If already closed, close() will have | ||
// no effect. | ||
close(ex.getStatusCode(), ex.getMessage(), ex, true); | ||
} | ||
} | ||
|
@@ -352,7 +392,7 @@ private boolean consumeMessageFromStream() throws IOException { | |
boolean allMessagesConsumed; | ||
Reader responseReader; | ||
if (methodDescriptor.getType() == MethodType.SERVER_STREAMING) { | ||
// Lazily initialize responseStreamIterator in case if it is a server steraming response | ||
// Lazily initialize responseStreamIterator in case if it is a server streaming response | ||
if (responseStreamIterator == null) { | ||
responseStreamIterator = | ||
new ProtoMessageJsonStreamIterator( | ||
|
@@ -384,7 +424,7 @@ private boolean consumeMessageFromStream() throws IOException { | |
|
||
@GuardedBy("lock") | ||
private void close( | ||
int statusCode, String message, Throwable cause, boolean terminateImmediatelly) { | ||
int statusCode, String message, Throwable cause, boolean terminateImmediately) { | ||
try { | ||
if (closed) { | ||
return; | ||
|
@@ -399,12 +439,12 @@ private void close( | |
requestRunnable = null; | ||
} | ||
|
||
HttpJsonMetadata.Builder meatadaBuilder = HttpJsonMetadata.newBuilder(); | ||
HttpJsonMetadata.Builder metadataBuilder = HttpJsonMetadata.newBuilder(); | ||
if (runnableResult != null && runnableResult.getTrailers() != null) { | ||
meatadaBuilder = runnableResult.getTrailers().toBuilder(); | ||
metadataBuilder = runnableResult.getTrailers().toBuilder(); | ||
} | ||
meatadaBuilder.setException(cause); | ||
meatadaBuilder.setStatusMessage(message); | ||
metadataBuilder.setException(cause); | ||
metadataBuilder.setStatusMessage(message); | ||
if (responseStreamIterator != null) { | ||
responseStreamIterator.close(); | ||
} | ||
|
@@ -415,19 +455,19 @@ private void close( | |
// onClose() suppresses all other pending notifications. | ||
// there should be no place in the code which inserts something in this queue before checking | ||
// the `closed` flag under the lock and refusing to insert anything if `closed == true`. | ||
if (terminateImmediatelly) { | ||
if (terminateImmediately) { | ||
// This usually means we are cancelling the call before processing the response in full. | ||
// It may happen if a user explicitly cancels the call or in response to an unexpected | ||
// exception either from server or a call listener execution. | ||
pendingNotifications.clear(); | ||
} | ||
|
||
pendingNotifications.offer( | ||
new OnCloseNotificationTask<>(listener, statusCode, meatadaBuilder.build())); | ||
new OnCloseNotificationTask<>(listener, statusCode, metadataBuilder.build())); | ||
|
||
} catch (Throwable e) { | ||
// suppress stream closing exceptions in favor of the actual call closing cause. This method | ||
// should not throw, otherwise we may stuck in an infinite loop of exception processing. | ||
// should not throw, otherwise we may be stuck in an infinite loop of exception processing. | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use ms instead of nanos now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done