From 2735c96a3fe0736a3a180f8e6394edeb40d7e1ef Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 18 Jun 2021 14:41:54 +0900 Subject: [PATCH] Complete `DefaultClientRequestContext.whenIntialized()` after fully initializing a context (#3636) Motivation: `LazyDynamicEndpointGroupTest.emptyEndpoint()` sometimes failed in CI builds. #3381 It expects to fail the test with `EmptyEndpointException`, however, it fails with `ClosedStreamException`. After digging the issue, I find out that there is a race in `whenInitialized`. If the `acquiredEventLoop.execute()` is executed immediately and completed earlier than returning the method, https://github.com/line/armeria/blob/d4880fe12690d2dafd2c5e7fa9f24c3b24837a00/core/src/main/java/com/linecorp/armeria/client/DefaultClientRequestContext.java#L307-L309 the callbacks of `ctx.whenInitialized()` will be invoked before a `RequestLog` is completed. https://github.com/line/armeria/blob/207c5e038f59802dca769936a50e219a5fe308ea/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java#L337-L348 As the `req` is closed already, the `req.write()` would be failed with `ClosedStreamException`. Modifications: - Complete `DefaultClientRequestContext.whenIntialized()` after `initContextAndExecuteWithFallback` of `ClientUtil` Result: - You no longer see `ClosedStreamException` when an `EndpointGroup` is empty. - Fixes #3381 --- .../client/DefaultClientRequestContext.java | 42 +++++++++---------- .../armeria/internal/client/ClientUtil.java | 16 +++++-- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/DefaultClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/client/DefaultClientRequestContext.java index 134b562a9b5c..e42a8d6a0ab8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/DefaultClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/client/DefaultClientRequestContext.java @@ -281,32 +281,13 @@ private CompletableFuture initEndpointGroup(EndpointGroup endpointGroup }).thenCompose(Function.identity()); } - private CompletableFuture initFuture(boolean success, @Nullable EventLoop acquiredEventLoop) { - CompletableFuture whenInitialized = this.whenInitialized; - if (whenInitialized == null) { - final CompletableFuture future; - if (acquiredEventLoop == null) { - future = UnmodifiableFuture.completedFuture(success); - } else { - future = CompletableFuture.supplyAsync(() -> success, acquiredEventLoop); - } - if (whenInitializedUpdater.compareAndSet(this, null, future)) { - return future; - } - whenInitialized = this.whenInitialized; - } - - final CompletableFuture finalWhenInitialized = whenInitialized; - if (finalWhenInitialized.isDone()) { - return finalWhenInitialized; - } - + private static CompletableFuture initFuture(boolean success, + @Nullable EventLoop acquiredEventLoop) { if (acquiredEventLoop == null) { - finalWhenInitialized.complete(success); + return UnmodifiableFuture.completedFuture(success); } else { - acquiredEventLoop.execute(() -> finalWhenInitialized.complete(success)); + return CompletableFuture.supplyAsync(() -> success, acquiredEventLoop); } - return finalWhenInitialized; } /** @@ -329,6 +310,21 @@ public CompletableFuture whenInitialized() { } } + /** + * Completes the {@link #whenInitialized()} with the specified value. + */ + public void finishInitialization(boolean success) { + final CompletableFuture whenInitialized = this.whenInitialized; + if (whenInitialized != null) { + whenInitialized.complete(success); + } else { + if (!whenInitializedUpdater.compareAndSet(this, null, + UnmodifiableFuture.completedFuture(success))) { + this.whenInitialized.complete(success); + } + } + } + private void updateEndpoint(@Nullable Endpoint endpoint) { this.endpoint = endpoint; autoFillSchemeAndAuthority(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java index c855fbf5395f..f957c396ea0d 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java @@ -52,12 +52,14 @@ O initContextAndExecuteWithFallback( requireNonNull(futureConverter, "futureConverter"); requireNonNull(errorResponseFactory, "errorResponseFactory"); + boolean initialized = false; + boolean success = false; try { endpointGroup = mapEndpoint(ctx, endpointGroup); final CompletableFuture initFuture = ctx.init(endpointGroup); - if (initFuture.isDone()) { + initialized = initFuture.isDone(); + if (initialized) { // Initialization has been done immediately. - final boolean success; try { success = initFuture.get(); } catch (Exception e) { @@ -66,22 +68,28 @@ O initContextAndExecuteWithFallback( return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success); } else { - return futureConverter.apply(initFuture.handle((success, cause) -> { + return futureConverter.apply(initFuture.handle((success0, cause) -> { try { if (cause != null) { throw UnprocessedRequestException.of(Exceptions.peel(cause)); } - return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success); + return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success0); } catch (Throwable t) { fail(ctx, t); return errorResponseFactory.apply(ctx, t); + } finally { + ctx.finishInitialization(success0); } })); } } catch (Throwable cause) { fail(ctx, cause); return errorResponseFactory.apply(ctx, cause); + } finally { + if (initialized) { + ctx.finishInitialization(success); + } } }