From 3abab95e757f0473605a7fd32d57412be73d2be1 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 21 Mar 2024 17:02:06 -0700 Subject: [PATCH] core: Provide DEADLINE_EXCEEDED insights for context deadline We provided extra details when the RPC is killed by CallOptions' Deadline, but didn't do the same for Context. To avoid duplicating code, things were restructured, including the threading. There are more code flows now, but I think the multi-threading came out more obvious and less error-prone. I didn't change the status when the deadline is already expired, because the text is shared with DelayedClientCall and AbstractInteropTest doesn't distinguish between the two cases. This is a roll-forward that avoids a NPE when cancel() is called without an earlier call to start(). As seen at b/300991330 --- .../java/io/grpc/internal/ClientCallImpl.java | 170 ++++++++---------- .../io/grpc/internal/ClientCallImplTest.java | 21 ++- .../integration/AbstractInteropTest.java | 2 +- 3 files changed, 98 insertions(+), 95 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index e2176668b73..07f2701d1c1 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -28,7 +28,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; -import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -62,6 +61,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -82,16 +82,13 @@ final class ClientCallImpl extends ClientCall { private final boolean callExecutorIsDirect; private final CallTracer channelCallsTracer; private final Context context; - private volatile ScheduledFuture deadlineCancellationFuture; + private CancellationHandler cancellationHandler; private final boolean unaryRequest; private CallOptions callOptions; private ClientStream stream; - private volatile boolean cancelListenersShouldBeRemoved; private boolean cancelCalled; private boolean halfCloseCalled; private final ClientStreamProvider clientStreamProvider; - private final ContextCancellationListener cancellationListener = - new ContextCancellationListener(); private final ScheduledExecutorService deadlineCancellationExecutor; private boolean fullStreamDecompression; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); @@ -128,13 +125,6 @@ final class ClientCallImpl extends ClientCall { PerfMark.event("ClientCall.", tag); } - private final class ContextCancellationListener implements CancellationListener { - @Override - public void cancelled(Context context) { - stream.cancel(statusFromCancelled(context)); - } - } - /** * Provider of {@link ClientStream}s. */ @@ -252,21 +242,21 @@ public void runInContext() { prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression); Deadline effectiveDeadline = effectiveDeadline(); - boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); + boolean contextIsDeadlineSource = effectiveDeadline != null + && effectiveDeadline.equals(context.getDeadline()); + cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource); + boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0; if (!deadlineExceeded) { - logIfContextNarrowedTimeout( - effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); - String deadlineName = - isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context"; + String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions"; Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED); String description = String.format( "ClientCall started after %s deadline was exceeded %.9f seconds ago. " + "Name resolution delay %.9f seconds.", deadlineName, - effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS, + cancellationHandler.remainingNanos / NANO_TO_SECS, nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS); stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers); } @@ -298,21 +288,7 @@ public void runInContext() { // they receive cancel before start. Issue #1343 has more details // Propagate later Context cancellation to the remote side. - context.addListener(cancellationListener, directExecutor()); - if (effectiveDeadline != null - // If the context has the effective deadline, we don't need to schedule an extra task. - && !effectiveDeadline.equals(context.getDeadline()) - // If the channel has been terminated, we don't need to schedule an extra task. - && deadlineCancellationExecutor != null) { - deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); - } - if (cancelListenersShouldBeRemoved) { - // Race detected! ClientStreamListener.closed may have been called before - // deadlineCancellationFuture was set / context listener added, thereby preventing the future - // and listener from being cancelled. Go ahead and cancel again, just to be sure it - // was cancelled. - removeContextListenerAndCancelDeadlineFuture(); - } + cancellationHandler.setUp(); } private void applyMethodConfig() { @@ -354,54 +330,77 @@ private void applyMethodConfig() { } } - private static void logIfContextNarrowedTimeout( - Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, - @Nullable Deadline callDeadline) { - if (!log.isLoggable(Level.FINE) || effectiveDeadline == null - || !effectiveDeadline.equals(outerCallDeadline)) { - return; + private final class CancellationHandler implements Runnable, CancellationListener { + private final boolean contextIsDeadlineSource; + private final boolean hasDeadline; + private final long remainingNanos; + private volatile ScheduledFuture deadlineCancellationFuture; + private volatile boolean tearDownCalled; + + CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) { + this.contextIsDeadlineSource = contextIsDeadlineSource; + if (deadline == null) { + hasDeadline = false; + remainingNanos = 0; + } else { + hasDeadline = true; + remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); + } } - long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS)); - StringBuilder builder = new StringBuilder(String.format( - Locale.US, - "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout)); - if (callDeadline == null) { - builder.append(" Explicit call timeout was not set."); - } else { - long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS); - builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout)); + void setUp() { + if (tearDownCalled) { + return; + } + if (hasDeadline + // If the context has the effective deadline, we don't need to schedule an extra task. + && !contextIsDeadlineSource + // If the channel has been terminated, we don't need to schedule an extra task. + && deadlineCancellationExecutor != null) { + deadlineCancellationFuture = deadlineCancellationExecutor.schedule( + new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS); + } + context.addListener(this, directExecutor()); + if (tearDownCalled) { + // Race detected! Re-run to make sure the future is cancelled and context listener removed + tearDown(); + } } - log.fine(builder.toString()); - } - - private void removeContextListenerAndCancelDeadlineFuture() { - context.removeListener(cancellationListener); - ScheduledFuture f = deadlineCancellationFuture; - if (f != null) { - f.cancel(false); + // May be called multiple times, and race with setUp() + void tearDown() { + tearDownCalled = true; + ScheduledFuture deadlineCancellationFuture = this.deadlineCancellationFuture; + if (deadlineCancellationFuture != null) { + deadlineCancellationFuture.cancel(false); + } + context.removeListener(this); } - } - private class DeadlineTimer implements Runnable { - private final long remainingNanos; - - DeadlineTimer(long remainingNanos) { - this.remainingNanos = remainingNanos; + @Override + public void cancelled(Context context) { + if (hasDeadline && contextIsDeadlineSource + && context.cancellationCause() instanceof TimeoutException) { + stream.cancel(formatDeadlineExceededStatus()); + return; + } + stream.cancel(statusFromCancelled(context)); } @Override public void run() { - InsightBuilder insight = new InsightBuilder(); - stream.appendTimeoutInsight(insight); + stream.cancel(formatDeadlineExceededStatus()); + } + + Status formatDeadlineExceededStatus() { // DelayedStream.cancel() is safe to call from a thread that is different from where the // stream is created. long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); StringBuilder buf = new StringBuilder(); - buf.append("deadline exceeded after "); + buf.append(contextIsDeadlineSource ? "Context" : "CallOptions"); + buf.append(" deadline exceeded after "); if (remainingNanos < 0) { buf.append('-'); } @@ -409,20 +408,18 @@ public void run() { buf.append(String.format(Locale.US, ".%09d", nanos)); buf.append("s. "); Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED); - buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ", + buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.", nsDelay == null ? 0 : nsDelay / NANO_TO_SECS)); - buf.append(insight); - stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString())); + if (stream != null) { + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + buf.append(" "); + buf.append(insight); + } + return DEADLINE_EXCEEDED.withDescription(buf.toString()); } } - private ScheduledFuture startDeadlineTimer(Deadline deadline) { - long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); - return deadlineCancellationExecutor.schedule( - new LogExceptionRunnable( - new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); - } - @Nullable private Deadline effectiveDeadline() { // Call options and context are immutable, so we don't need to cache the deadline. @@ -440,16 +437,6 @@ private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline dea return deadline0.minimum(deadline1); } - private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { - if (deadline0 == null) { - return false; - } - if (deadline1 == null) { - return true; - } - return deadline0.isBefore(deadline1); - } - @Override public void request(int numMessages) { try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) { @@ -493,7 +480,10 @@ private void cancelInternal(@Nullable String message, @Nullable Throwable cause) stream.cancel(status); } } finally { - removeContextListenerAndCancelDeadlineFuture(); + // start() might not have been called + if (cancellationHandler != null) { + cancellationHandler.tearDown(); + } } } @@ -699,10 +689,7 @@ private void closedInternal( // description. Since our timer may be delayed in firing, we double-check the deadline and // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. if (deadline.isExpired()) { - InsightBuilder insight = new InsightBuilder(); - stream.appendTimeoutInsight(insight); - status = DEADLINE_EXCEEDED.augmentDescription( - "ClientCall was cancelled at or after deadline. " + insight); + status = cancellationHandler.formatDeadlineExceededStatus(); // Replace trailers to prevent mixing sources of status and trailers. trailers = new Metadata(); } @@ -725,6 +712,7 @@ public void runInContext() { } private void runInternal() { + cancellationHandler.tearDown(); Status status = savedStatus; Metadata trailers = savedTrailers; if (exceptionStatus != null) { @@ -737,11 +725,9 @@ private void runInternal() { // Replace trailers to prevent mixing sources of status and trailers. trailers = new Metadata(); } - cancelListenersShouldBeRemoved = true; try { closeObserver(observer, status, trailers); } finally { - removeContextListenerAndCancelDeadlineFuture(); channelCallsTracer.reportCallEnded(status.isOk()); } } diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 34011cd844d..66d626ec2b6 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -926,7 +926,7 @@ public void expiredDeadlineCancelsStream_CallOptions() { verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()) - .matches("deadline exceeded after [0-9]+\\.[0-9]+s. " + .matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. " + "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); } @@ -954,7 +954,24 @@ public void expiredDeadlineCancelsStream_Context() { verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); - assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out"); + assertThat(statusCaptor.getValue().getDescription()) + .matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. " + + "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); + } + + @Test + public void cancelWithoutStart() { + fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); + + ClientCallImpl call = new ClientCallImpl<>( + method, + MoreExecutors.directExecutor(), + baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), + clientStreamProvider, + deadlineCancellationExecutor, + channelCallTracer, configSelector); + // Nothing happens as a result, but it shouldn't throw + call.cancel("canceled", null); } @Test diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index d450ece7bcf..3efd576abe6 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1190,7 +1190,7 @@ public void deadlineExceeded() throws Exception { assertTrue(desc, // There is a race between client and server-side deadline expiration. // If client expires first, it'd generate this message - Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc) + Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc) // If server expires first, it'd reset the stream and client would generate a different // message || desc.startsWith("ClientCall was cancelled at or after deadline."));