From 05a7ca1736088da83470c722a87668aa53896890 Mon Sep 17 00:00:00 2001 From: jrhee17 <guins_j@guins.org> Date: Wed, 18 Dec 2024 23:09:07 +0900 Subject: [PATCH] poc --- .../armeria/client/ClientOptions.java | 4 + .../armeria/client/ClientPreprocessor.java | 32 ++++++++ .../armeria/client/ClientPreprocessors.java | 82 +++++++++++++++++++ .../client/DecoratingClientPreprocessor.java | 28 +++++++ .../armeria/client/DefaultWebClient.java | 19 +++-- .../armeria/client/HttpPreprocessor.java | 24 ++++++ .../armeria/client/RedirectingClient.java | 5 +- .../armeria/client/RpcPreprocessor.java | 24 ++++++ .../client/TailClientPreprocessor.java | 70 ++++++++++++++++ .../linecorp/armeria/client/UserClient.java | 41 ++-------- .../armeria/client/retry/RetryingClient.java | 8 +- .../client/retry/RetryingRpcClient.java | 6 +- .../armeria/internal/client/ClientUtil.java | 22 ++--- .../client/DefaultClientRequestContext.java | 42 +++++++++- .../client/PreprocessorAttributeKeys.java | 45 ++++++++++ .../armeria/client/RequestOptionsTest.java | 4 + .../internal/client/grpc/ArmeriaChannel.java | 29 ++++++- .../client/grpc/ArmeriaClientCall.java | 34 ++++---- .../client/auth/oauth2/OAuth2Client.java | 2 +- .../client/thrift/DefaultTHttpClient.java | 26 +++++- 20 files changed, 461 insertions(+), 86 deletions(-) create mode 100644 core/src/main/java/com/linecorp/armeria/client/ClientPreprocessor.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/ClientPreprocessors.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/DecoratingClientPreprocessor.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/HttpPreprocessor.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/RpcPreprocessor.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/TailClientPreprocessor.java create mode 100644 core/src/main/java/com/linecorp/armeria/internal/client/PreprocessorAttributeKeys.java diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientOptions.java b/core/src/main/java/com/linecorp/armeria/client/ClientOptions.java index 2975cff4eb4..3c80911fac6 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientOptions.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientOptions.java @@ -410,6 +410,10 @@ public ResponseTimeoutMode responseTimeoutMode() { return get(RESPONSE_TIMEOUT_MODE); } + public ClientPreprocessors clientPreprocessors() { + return ClientPreprocessors.of(); + } + /** * Returns a new {@link ClientOptionsBuilder} created from this {@link ClientOptions}. */ diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientPreprocessor.java b/core/src/main/java/com/linecorp/armeria/client/ClientPreprocessor.java new file mode 100644 index 00000000000..9894f986702 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/ClientPreprocessor.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client; + +import com.linecorp.armeria.common.Request; +import com.linecorp.armeria.common.Response; + +/** + * TBU. + */ +@FunctionalInterface +public interface ClientPreprocessor<I extends Request, O extends Response> { + + /** + * TBU. + */ + O execute(ClientRequestContext ctx, I req, RequestOptions requestOptions); +} diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientPreprocessors.java b/core/src/main/java/com/linecorp/armeria/client/ClientPreprocessors.java new file mode 100644 index 00000000000..501b732cdda --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/ClientPreprocessors.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client; + +import java.util.function.Function; + +import com.google.common.collect.ImmutableList; + +/** + * A set of {@link Function}s that transforms a {@link Client} into another. + */ +public final class ClientPreprocessors { + + private static final ClientPreprocessors NONE = new ClientPreprocessors(Function.identity(), Function.identity()); + + /** + * Returns an empty {@link ClientPreprocessors} which does not decorate a {@link Client}. + */ + public static ClientPreprocessors of() { + return NONE; + } + + /** + * Creates a new instance from a single decorator {@link Function}. + * + * @param decorator the {@link Function} that transforms an {@link HttpClient} to another + */ + public static ClientPreprocessors of( + Function<? super HttpPreprocessor, ? extends HttpPreprocessor> decorator) { + return new ClientPreprocessors(decorator, Function.identity()); + } + + /** + * Creates a new instance from a single decorator {@link Function}. + * + * @param decorator the {@link Function} that transforms an {@link RpcClient} to another + */ + public static ClientPreprocessors ofRpc(Function<? super RpcPreprocessor, ? extends RpcPreprocessor> decorator) { + return new ClientPreprocessors(Function.identity(), decorator); + } + + private final Function<? super HttpPreprocessor, ? extends HttpPreprocessor> preprocessorFn; + private final Function<? super RpcPreprocessor, ? extends RpcPreprocessor> rpcPreprocessorFn; + + ClientPreprocessors(Function<? super HttpPreprocessor, ? extends HttpPreprocessor> preprocessorFn, + Function<? super RpcPreprocessor, ? extends RpcPreprocessor> rpcPreprocessorFn) { + this.preprocessorFn = preprocessorFn; + this.rpcPreprocessorFn = rpcPreprocessorFn; + } + + /** + * Decorates the specified {@link HttpPreprocessor} using the decorator. + * + * @param preprocessor the {@link HttpPreprocessor} being decorated + */ + public HttpPreprocessor decorate(HttpPreprocessor preprocessor) { + return preprocessorFn.apply(preprocessor); + } + + /** + * Decorates the specified {@link RpcPreprocessor} using the decorator. + * + * @param rpcPreprocessor the {@link RpcPreprocessor} being decorated + */ + public RpcPreprocessor rpcDecorate(RpcPreprocessor rpcPreprocessor) { + return rpcPreprocessorFn.apply(rpcPreprocessor); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/DecoratingClientPreprocessor.java b/core/src/main/java/com/linecorp/armeria/client/DecoratingClientPreprocessor.java new file mode 100644 index 00000000000..4e89d6dc12b --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/DecoratingClientPreprocessor.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client; + +import com.linecorp.armeria.common.Request; +import com.linecorp.armeria.common.Response; +import com.linecorp.armeria.common.annotation.UnstableApi; + +@UnstableApi +@FunctionalInterface +public interface DecoratingClientPreprocessor<I extends Request, O extends Response> { + + O execute(ClientPreprocessor<I, O> delegate, ClientRequestContext ctx, I req, RequestOptions requestOptions); +} diff --git a/core/src/main/java/com/linecorp/armeria/client/DefaultWebClient.java b/core/src/main/java/com/linecorp/armeria/client/DefaultWebClient.java index e812281120e..b221b334558 100644 --- a/core/src/main/java/com/linecorp/armeria/client/DefaultWebClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/DefaultWebClient.java @@ -29,6 +29,8 @@ import com.linecorp.armeria.common.Scheme; import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.client.DefaultClientRequestContext; +import com.linecorp.armeria.internal.client.PreprocessorAttributeKeys; import io.micrometer.core.instrument.MeterRegistry; @@ -113,12 +115,17 @@ public HttpResponse execute(HttpRequest req, RequestOptions requestOptions) { newReq = req.withHeaders(req.headers().toBuilder().path(newPath)); } - return execute(protocol, - endpointGroup, - newReq.method(), - reqTarget, - newReq, - requestOptions); + final RequestOptions reqOptions = requestOptions + .toBuilder() + .attr(PreprocessorAttributeKeys.FUTURE_CONVERTER_KEY, futureConverter()) + .attr(PreprocessorAttributeKeys.ERROR_RESPONSE_FACTORY_KEY, errorResponseFactory()) + .attr(PreprocessorAttributeKeys.ENDPOINT_GROUP_KEY, endpointGroup) + .build(); + final DefaultClientRequestContext ctx = new DefaultClientRequestContext( + protocol, newReq, null, reqTarget, + reqOptions, options()); + return options().clientPreprocessors().decorate(TailClientPreprocessor.of(unwrap())) + .execute(ctx, newReq, reqOptions); } private static HttpResponse abortRequestAndReturnFailureResponse( diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpPreprocessor.java b/core/src/main/java/com/linecorp/armeria/client/HttpPreprocessor.java new file mode 100644 index 00000000000..36115446b86 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/HttpPreprocessor.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client; + +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; + +@FunctionalInterface +public interface HttpPreprocessor extends ClientPreprocessor<HttpRequest, HttpResponse> { +} diff --git a/core/src/main/java/com/linecorp/armeria/client/RedirectingClient.java b/core/src/main/java/com/linecorp/armeria/client/RedirectingClient.java index d2481315513..b360e1fbf7a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/RedirectingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/RedirectingClient.java @@ -192,8 +192,11 @@ private void execute0(ClientRequestContext ctx, RedirectContext redirectCtx, return; } + final HttpRequest ctxReq = derivedCtx.request(); + assert ctxReq != null; final HttpResponse response = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> HttpResponse.ofFailure(cause)); + (context, cause) -> HttpResponse.ofFailure(cause), + ctxReq); derivedCtx.log().whenAvailable(RequestLogProperty.RESPONSE_HEADERS).thenAccept(log -> { if (log.isAvailable(RequestLogProperty.RESPONSE_CAUSE)) { final Throwable cause = log.responseCause(); diff --git a/core/src/main/java/com/linecorp/armeria/client/RpcPreprocessor.java b/core/src/main/java/com/linecorp/armeria/client/RpcPreprocessor.java new file mode 100644 index 00000000000..356cea4fd72 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/RpcPreprocessor.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client; + +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; + +@FunctionalInterface +public interface RpcPreprocessor extends ClientPreprocessor<RpcRequest, RpcResponse> { +} diff --git a/core/src/main/java/com/linecorp/armeria/client/TailClientPreprocessor.java b/core/src/main/java/com/linecorp/armeria/client/TailClientPreprocessor.java new file mode 100644 index 00000000000..51bc259413c --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/TailClientPreprocessor.java @@ -0,0 +1,70 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; + +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.Request; +import com.linecorp.armeria.common.Response; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; +import com.linecorp.armeria.internal.client.ClientRequestContextExtension; +import com.linecorp.armeria.internal.client.ClientUtil; +import com.linecorp.armeria.internal.client.PreprocessorAttributeKeys; + +public final class TailClientPreprocessor<I extends Request, O extends Response> implements ClientPreprocessor<I, O> { + + private final Client<I, O> delegate; + + private TailClientPreprocessor(Client<I, O> delegate) { + this.delegate = delegate; + } + + public static HttpPreprocessor of(HttpClient httpClient) { + final TailClientPreprocessor<HttpRequest, HttpResponse> tail = new TailClientPreprocessor<>(httpClient); + return tail::execute; + } + + public static RpcPreprocessor ofRpc(RpcClient rpcClient) { + final TailClientPreprocessor<RpcRequest, RpcResponse> tail = new TailClientPreprocessor<>(rpcClient); + return tail::execute; + } + + @Override + public O execute(ClientRequestContext ctx, I req, RequestOptions requestOptions) { + final Function<CompletableFuture<O>, O> futureConverter = + (Function<CompletableFuture<O>, O>) requestOptions.attrs().get( + PreprocessorAttributeKeys.FUTURE_CONVERTER_KEY); + final BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory = + (BiFunction<ClientRequestContext, Throwable, O>) requestOptions.attrs().get( + PreprocessorAttributeKeys.ERROR_RESPONSE_FACTORY_KEY); + final EndpointGroup endpointGroup = (EndpointGroup) requestOptions.attrs().get( + PreprocessorAttributeKeys.ENDPOINT_GROUP_KEY); + assert futureConverter != null; + assert errorResponseFactory != null; + assert endpointGroup != null; + final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class); + assert ctxExt != null; + return ClientUtil.initContextAndExecuteWithFallback(delegate, ctxExt, endpointGroup, + futureConverter, errorResponseFactory, req); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/UserClient.java b/core/src/main/java/com/linecorp/armeria/client/UserClient.java index 28b38fa24e3..c2106673395 100644 --- a/core/src/main/java/com/linecorp/armeria/client/UserClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/UserClient.java @@ -15,8 +15,6 @@ */ package com.linecorp.armeria.client; -import static com.linecorp.armeria.internal.client.ClientUtil.initContextAndExecuteWithFallback; - import java.net.URI; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -28,19 +26,15 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.HttpMethod; -import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.RequestId; import com.linecorp.armeria.common.RequestTarget; import com.linecorp.armeria.common.Response; -import com.linecorp.armeria.common.RpcRequest; import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.common.Scheme; import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.util.AbstractUnwrappable; -import com.linecorp.armeria.common.util.SystemInfo; -import com.linecorp.armeria.internal.client.DefaultClientRequestContext; import io.micrometer.core.instrument.MeterRegistry; @@ -170,37 +164,14 @@ protected final O execute(SessionProtocol protocol, EndpointGroup endpointGroup, */ protected final O execute(SessionProtocol protocol, EndpointGroup endpointGroup, HttpMethod method, RequestTarget reqTarget, I req, RequestOptions requestOptions) { + throw new UnsupportedOperationException(); + } - final HttpRequest httpReq; - final RpcRequest rpcReq; - final RequestId id = nextRequestId(); - - if (req instanceof HttpRequest) { - httpReq = (HttpRequest) req; - rpcReq = null; - } else { - httpReq = null; - rpcReq = (RpcRequest) req; - } - - final DefaultClientRequestContext ctx = new DefaultClientRequestContext( - meterRegistry, protocol, id, method, reqTarget, options(), httpReq, rpcReq, - requestOptions, System.nanoTime(), SystemInfo.currentTimeMicros()); - - return initContextAndExecuteWithFallback(unwrap(), ctx, endpointGroup, - futureConverter, errorResponseFactory); + protected Function<CompletableFuture<O>, O> futureConverter() { + return futureConverter; } - private RequestId nextRequestId() { - final RequestId id = options().requestIdGenerator().get(); - if (id == null) { - if (!warnedNullRequestId) { - warnedNullRequestId = true; - logger.warn("requestIdGenerator.get() returned null; using RequestId.random()"); - } - return RequestId.random(); - } else { - return id; - } + protected BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory() { + return errorResponseFactory; } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index ec0454934e0..0e41f1bb843 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -318,12 +318,16 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD // clear the pending throwable to retry endpoint selection ClientPendingThrowableUtil.removePendingThrowable(derivedCtx); // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop + final HttpRequest ctxReq = ctxExtension.request(); + assert ctxReq != null; response = initContextAndExecuteWithFallback( unwrap(), ctxExtension, endpointGroup, HttpResponse::of, - (context, cause) -> HttpResponse.ofFailure(cause)); + (context, cause) -> HttpResponse.ofFailure(cause), ctxReq); } else { + final HttpRequest ctxReq = derivedCtx.request(); + assert ctxReq != null; response = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> HttpResponse.ofFailure(cause)); + (context, cause) -> HttpResponse.ofFailure(cause), ctxReq); } final RetryConfig<HttpResponse> config = mappedRetryConfig(ctx); if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) { diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index 9968568a458..37489924b90 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -163,7 +163,7 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req, return; } - final ClientRequestContext derivedCtx = newDerivedContext(ctx, null, req, initialAttempt); + final ClientRequestContext derivedCtx = newDerivedContext(ctx, ctx.request(), req, initialAttempt); if (!initialAttempt) { derivedCtx.mutateAdditionalRequestHeaders( @@ -180,10 +180,10 @@ private void doExecute0(ClientRequestContext ctx, RpcRequest req, ClientPendingThrowableUtil.removePendingThrowable(derivedCtx); // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop res = initContextAndExecuteWithFallback(unwrap(), ctxExtension, endpointGroup, RpcResponse::from, - (context, cause) -> RpcResponse.ofFailure(cause)); + (context, cause) -> RpcResponse.ofFailure(cause), req); } else { res = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> RpcResponse.ofFailure(cause)); + (context, cause) -> RpcResponse.ofFailure(cause), req); } final RetryConfig<RpcResponse> retryConfig = mappedRetryConfig(ctx); diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java index 1c4ff86b472..0c97105998a 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java @@ -56,7 +56,8 @@ O initContextAndExecuteWithFallback( ClientRequestContextExtension ctx, EndpointGroup endpointGroup, Function<CompletableFuture<O>, O> futureConverter, - BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory) { + BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory, + I req) { requireNonNull(delegate, "delegate"); requireNonNull(ctx, "ctx"); @@ -77,7 +78,7 @@ O initContextAndExecuteWithFallback( throw UnprocessedRequestException.of(Exceptions.peel(e)); } - return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success); + return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success, req); } else { return futureConverter.apply(initFuture.handle((success0, cause) -> { try { @@ -85,7 +86,8 @@ O initContextAndExecuteWithFallback( throw UnprocessedRequestException.of(Exceptions.peel(cause)); } - return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success0); + return initContextAndExecuteWithFallback( + delegate, ctx, errorResponseFactory, success0, req); } catch (Throwable t) { fail(ctx, t); return errorResponseFactory.apply(ctx, t); @@ -107,11 +109,11 @@ O initContextAndExecuteWithFallback( private static <I extends Request, O extends Response, U extends Client<I, O>> O initContextAndExecuteWithFallback( U delegate, ClientRequestContextExtension ctx, - BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory, boolean succeeded) + BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory, boolean succeeded, I req) throws Exception { if (succeeded) { - return pushAndExecute(delegate, ctx); + return pushAndExecute(delegate, ctx, req); } else { final Throwable cause = ctx.log().partial().requestCause(); assert cause != null; @@ -123,7 +125,7 @@ O initContextAndExecuteWithFallback( // See `init()` and `failEarly()` in `DefaultClientRequestContext`. // Call the decorator chain anyway so that the request is seen by the decorators. - final O res = pushAndExecute(delegate, ctx); + final O res = pushAndExecute(delegate, ctx, req); // We will use the fallback response which is created from the exception // raised in ctx.init(), so the response returned can be aborted. @@ -138,14 +140,14 @@ O initContextAndExecuteWithFallback( public static <I extends Request, O extends Response, U extends Client<I, O>> O executeWithFallback(U delegate, ClientRequestContext ctx, - BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory) { + BiFunction<ClientRequestContext, Throwable, O> errorResponseFactory, I req) { requireNonNull(delegate, "delegate"); requireNonNull(ctx, "ctx"); requireNonNull(errorResponseFactory, "errorResponseFactory"); try { - return pushAndExecute(delegate, ctx); + return pushAndExecute(delegate, ctx, req); } catch (Throwable cause) { fail(ctx, cause); return errorResponseFactory.apply(ctx, cause); @@ -153,9 +155,7 @@ O executeWithFallback(U delegate, ClientRequestContext ctx, } private static <I extends Request, O extends Response, U extends Client<I, O>> - O pushAndExecute(U delegate, ClientRequestContext ctx) throws Exception { - @SuppressWarnings("unchecked") - final I req = (I) firstNonNull(ctx.request(), ctx.rpcRequest()); + O pushAndExecute(U delegate, ClientRequestContext ctx, I req) throws Exception { try (SafeCloseable ignored = ctx.push()) { return delegate.execute(ctx, req); } diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java index 4ad2e2bdeee..1d830ab0b9e 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java @@ -36,6 +36,9 @@ import javax.net.ssl.SSLSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.linecorp.armeria.client.ClientOptions; import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.Endpoint; @@ -68,6 +71,7 @@ import com.linecorp.armeria.common.logging.RequestLogProperty; import com.linecorp.armeria.common.util.ReleasableHolder; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.common.util.SystemInfo; import com.linecorp.armeria.common.util.TextFormatter; import com.linecorp.armeria.common.util.TimeoutMode; import com.linecorp.armeria.common.util.UnmodifiableFuture; @@ -99,6 +103,8 @@ public final class DefaultClientRequestContext extends NonWrappingRequestContext implements ClientRequestContextExtension { + private static final Logger logger = LoggerFactory.getLogger(DefaultClientRequestContext.class); + private static final AtomicReferenceFieldUpdater<DefaultClientRequestContext, HttpHeaders> additionalRequestHeadersUpdater = AtomicReferenceFieldUpdater.newUpdater( DefaultClientRequestContext.class, HttpHeaders.class, "additionalRequestHeaders"); @@ -124,6 +130,7 @@ private static SessionProtocol desiredSessionProtocol(SessionProtocol protocol, private static final short STR_CHANNEL_AVAILABILITY = 1; private static final short STR_PARENT_LOG_AVAILABILITY = 1 << 1; + private static boolean warnedNullRequestId; private boolean initialized; @Nullable @@ -170,6 +177,16 @@ private static SessionProtocol desiredSessionProtocol(SessionProtocol protocol, private volatile CompletableFuture<Boolean> whenInitialized; private final ResponseTimeoutMode responseTimeoutMode; + private final RequestOptions requestOptions; + + public DefaultClientRequestContext(SessionProtocol sessionProtocol, HttpRequest httpRequest, + @Nullable RpcRequest rpcRequest, RequestTarget requestTarget, + RequestOptions requestOptions, ClientOptions clientOptions) { + this(null, clientOptions.factory().meterRegistry(), + sessionProtocol, nextRequestId(clientOptions), httpRequest.method(), requestTarget, + clientOptions, httpRequest, rpcRequest, requestOptions, serviceRequestContext(), + null, System.nanoTime(), SystemInfo.currentTimeMicros()); + } /** * Creates a new instance. Note that {@link #init(EndpointGroup)} method must be invoked to finish @@ -191,8 +208,8 @@ public DefaultClientRequestContext( ClientOptions options, @Nullable HttpRequest req, @Nullable RpcRequest rpcReq, RequestOptions requestOptions, CancellationScheduler responseCancellationScheduler, long requestStartTimeNanos, long requestStartTimeMicros) { - this(eventLoop, meterRegistry, sessionProtocol, - id, method, reqTarget, options, req, rpcReq, requestOptions, serviceRequestContext(), + this(eventLoop, meterRegistry, sessionProtocol, id, method, reqTarget, + options, req, rpcReq, requestOptions, serviceRequestContext(), requireNonNull(responseCancellationScheduler, "responseCancellationScheduler"), requestStartTimeNanos, requestStartTimeMicros); } @@ -213,7 +230,7 @@ id, method, reqTarget, options, req, rpcReq, requestOptions, serviceRequestConte public DefaultClientRequestContext( MeterRegistry meterRegistry, SessionProtocol sessionProtocol, RequestId id, HttpMethod method, RequestTarget reqTarget, - ClientOptions options, @Nullable HttpRequest req, @Nullable RpcRequest rpcReq, + ClientOptions options, HttpRequest req, @Nullable RpcRequest rpcReq, RequestOptions requestOptions, long requestStartTimeNanos, long requestStartTimeMicros) { this(null, meterRegistry, sessionProtocol, @@ -237,6 +254,7 @@ private DefaultClientRequestContext( this.eventLoop = eventLoop; this.options = requireNonNull(options, "options"); this.root = root; + this.requestOptions = requestOptions; log = RequestLog.builder(this); log.startRequest(requestStartTimeNanos, requestStartTimeMicros); @@ -533,6 +551,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx, // So we don't check the nullness of rpcRequest unlike request. // See https://github.com/line/armeria/pull/3251 and https://github.com/line/armeria/issues/3248. + requestOptions = ctx.requestOptions(); options = ctx.options(); root = ctx.root(); @@ -1074,6 +1093,10 @@ public ResponseTimeoutMode responseTimeoutMode() { return responseTimeoutMode; } + public RequestOptions requestOptions() { + return requestOptions; + } + private static ResponseTimeoutMode responseTimeoutMode(ClientOptions options, RequestOptions requestOptions) { final ResponseTimeoutMode requestOptionTimeoutMode = requestOptions.responseTimeoutMode(); @@ -1082,4 +1105,17 @@ private static ResponseTimeoutMode responseTimeoutMode(ClientOptions options, } return options.responseTimeoutMode(); } + + private static RequestId nextRequestId(ClientOptions options) { + final RequestId id = options.requestIdGenerator().get(); + if (id == null) { + if (!warnedNullRequestId) { + warnedNullRequestId = true; + logger.warn("requestIdGenerator.get() returned null; using RequestId.random()"); + } + return RequestId.random(); + } else { + return id; + } + } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/PreprocessorAttributeKeys.java b/core/src/main/java/com/linecorp/armeria/internal/client/PreprocessorAttributeKeys.java new file mode 100644 index 00000000000..6231e56dbed --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/client/PreprocessorAttributeKeys.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.client; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; + +import com.google.common.collect.ImmutableSet; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.endpoint.EndpointGroup; + +import io.netty.util.AttributeKey; + +public final class PreprocessorAttributeKeys { + + public static final AttributeKey<Object> FUTURE_CONVERTER_KEY = + AttributeKey.valueOf(PreprocessorAttributeKeys.class, "futureConverter"); + public static final AttributeKey<BiFunction<ClientRequestContext, Throwable, ?>> ERROR_RESPONSE_FACTORY_KEY = + AttributeKey.valueOf(PreprocessorAttributeKeys.class, "errorResponseFactory"); + public static final AttributeKey<EndpointGroup> ENDPOINT_GROUP_KEY = + AttributeKey.valueOf(PreprocessorAttributeKeys.class, "endpointGroup"); + + public static Set<AttributeKey<?>> keys() { + return ImmutableSet.of(FUTURE_CONVERTER_KEY, ERROR_RESPONSE_FACTORY_KEY, ENDPOINT_GROUP_KEY); + } + + private PreprocessorAttributeKeys() {} +} diff --git a/core/src/test/java/com/linecorp/armeria/client/RequestOptionsTest.java b/core/src/test/java/com/linecorp/armeria/client/RequestOptionsTest.java index 543c6592df3..d3d586b9858 100644 --- a/core/src/test/java/com/linecorp/armeria/client/RequestOptionsTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/RequestOptionsTest.java @@ -39,6 +39,7 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.stream.StreamMessage; import com.linecorp.armeria.common.stream.StreamWriter; +import com.linecorp.armeria.internal.client.PreprocessorAttributeKeys; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -222,6 +223,9 @@ void overwriteTest() { final Iterator<Entry<AttributeKey<?>, Object>> attrs = ctx.attrs(); while (attrs.hasNext()) { final Entry<AttributeKey<?>, Object> next = attrs.next(); + if (PreprocessorAttributeKeys.keys().contains(next.getKey())) { + continue; + } assertThat(requestOptions.attrs()).contains(next); } assertThat(res.join().contentUtf8()).isEqualTo("pong"); diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaChannel.java b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaChannel.java index a75dea16adf..8a2ad4988b2 100644 --- a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaChannel.java +++ b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaChannel.java @@ -21,12 +21,16 @@ import java.net.URI; import java.util.EnumMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Function; import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.linecorp.armeria.client.ClientBuilderParams; import com.linecorp.armeria.client.ClientOptions; +import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.HttpClient; import com.linecorp.armeria.client.RequestOptions; import com.linecorp.armeria.client.endpoint.EndpointGroup; @@ -36,6 +40,7 @@ import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpRequestWriter; +import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.RequestHeadersBuilder; import com.linecorp.armeria.common.RequestTarget; @@ -49,8 +54,10 @@ import com.linecorp.armeria.common.util.SystemInfo; import com.linecorp.armeria.common.util.Unwrappable; import com.linecorp.armeria.internal.client.DefaultClientRequestContext; +import com.linecorp.armeria.internal.client.PreprocessorAttributeKeys; import com.linecorp.armeria.internal.common.RequestTargetCache; import com.linecorp.armeria.internal.common.grpc.InternalGrpcExceptionHandler; +import com.linecorp.armeria.internal.common.grpc.StatusAndMetadata; import io.grpc.CallCredentials; import io.grpc.CallOptions; @@ -61,6 +68,7 @@ import io.grpc.DecompressorRegistry; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Status; import io.micrometer.core.instrument.MeterRegistry; import io.netty.handler.codec.http.HttpHeaderValues; @@ -183,7 +191,8 @@ public <I, O> ClientCall<I, O> newCall(MethodDescriptor<I, O> method, CallOption jsonMarshaller, unsafeWrapResponseBuffers, exceptionHandler, - useMethodMarshaller); + useMethodMarshaller, + options().clientPreprocessors()); } @Override @@ -241,6 +250,22 @@ private <I, O> DefaultClientRequestContext newContext(HttpMethod method, HttpReq final RequestOptions requestOptions = REQUEST_OPTIONS_MAP.get(methodDescriptor.getType()); assert requestOptions != null; + final BiFunction<ClientRequestContext, Throwable, HttpResponse> errorResponseFactory = + (ctx, cause) -> { + final StatusAndMetadata statusAndMetadata = exceptionHandler.handle(ctx, cause); + Status status = statusAndMetadata.status(); + if (status.getDescription() == null) { + status = status.withDescription(cause.getMessage()); + } + return HttpResponse.ofFailure(status.asRuntimeException()); + }; + final Function<CompletableFuture<? extends HttpResponse>, HttpResponse> future = HttpResponse::of; + final RequestOptions reqOptions = requestOptions + .toBuilder() + .attr(PreprocessorAttributeKeys.FUTURE_CONVERTER_KEY, future) + .attr(PreprocessorAttributeKeys.ERROR_RESPONSE_FACTORY_KEY, errorResponseFactory) + .attr(PreprocessorAttributeKeys.ENDPOINT_GROUP_KEY, endpointGroup()) + .build(); return new DefaultClientRequestContext( meterRegistry, @@ -251,7 +276,7 @@ private <I, O> DefaultClientRequestContext newContext(HttpMethod method, HttpReq options(), req, null, - requestOptions, + reqOptions, System.nanoTime(), SystemInfo.currentTimeMicros()); } diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java index d00c4d572cc..22299800b12 100644 --- a/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java +++ b/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java @@ -15,7 +15,6 @@ */ package com.linecorp.armeria.internal.client.grpc; -import static com.linecorp.armeria.internal.client.ClientUtil.initContextAndExecuteWithFallback; import static com.linecorp.armeria.internal.client.grpc.protocol.InternalGrpcWebUtil.messageBuf; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.BiFunction; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -38,8 +36,10 @@ import com.google.common.util.concurrent.MoreExecutors; -import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientPreprocessors; import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.HttpPreprocessor; +import com.linecorp.armeria.client.TailClientPreprocessor; import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpRequest; @@ -110,7 +110,7 @@ final class ArmeriaClientCall<I, O> extends ClientCall<I, O> private final DefaultClientRequestContext ctx; private final EndpointGroup endpointGroup; - private final HttpClient httpClient; + private final HttpPreprocessor httpPreprocessor; private final HttpRequestWriter req; private final MethodDescriptor<I, O> method; private final Map<MethodDescriptor<?, ?>, String> simpleMethodNames; @@ -126,6 +126,7 @@ final class ArmeriaClientCall<I, O> extends ClientCall<I, O> private final boolean grpcWebText; private final Compressor compressor; private final InternalGrpcExceptionHandler exceptionHandler; + private final ClientPreprocessors clientPreprocessors; private boolean endpointInitialized; @Nullable @@ -161,10 +162,11 @@ final class ArmeriaClientCall<I, O> extends ClientCall<I, O> @Nullable GrpcJsonMarshaller jsonMarshaller, boolean unsafeWrapResponseBuffers, InternalGrpcExceptionHandler exceptionHandler, - boolean useMethodMarshaller) { + boolean useMethodMarshaller, + ClientPreprocessors clientPreprocessors) { this.ctx = ctx; this.endpointGroup = endpointGroup; - this.httpClient = httpClient; + httpPreprocessor = TailClientPreprocessor.of(httpClient); this.req = req; this.method = method; this.simpleMethodNames = simpleMethodNames; @@ -177,6 +179,7 @@ final class ArmeriaClientCall<I, O> extends ClientCall<I, O> grpcWebText = GrpcSerializationFormats.isGrpcWebText(serializationFormat); this.maxInboundMessageSizeBytes = maxInboundMessageSizeBytes; this.exceptionHandler = exceptionHandler; + this.clientPreprocessors = clientPreprocessors; ctx.whenInitialized().handle((unused1, unused2) -> { runPendingTask(); @@ -245,19 +248,9 @@ public void start(Listener<O> responseListener, Metadata metadata) { } // Must come after handling deadline. - prepareHeaders(compressor, metadata, remainingNanos); - - final BiFunction<ClientRequestContext, Throwable, HttpResponse> errorResponseFactory = - (unused, cause) -> { - final StatusAndMetadata statusAndMetadata = exceptionHandler.handle(ctx, cause); - Status status = statusAndMetadata.status(); - if (status.getDescription() == null) { - status = status.withDescription(cause.getMessage()); - } - return HttpResponse.ofFailure(status.asRuntimeException()); - }; - final HttpResponse res = initContextAndExecuteWithFallback( - httpClient, ctx, endpointGroup, HttpResponse::of, errorResponseFactory); + final HttpRequest newReq = prepareHeaders(compressor, metadata, remainingNanos); + final HttpResponse res = clientPreprocessors.decorate(httpPreprocessor) + .execute(ctx, newReq, ctx.requestOptions()); final HttpStreamDeframer deframer = new HttpStreamDeframer( decompressorRegistry, ctx, this, exceptionHandler, @@ -493,7 +486,7 @@ public void transportReportHeaders(Metadata metadata) { }); } - private void prepareHeaders(Compressor compressor, Metadata metadata, long remainingNanos) { + private HttpRequest prepareHeaders(Compressor compressor, Metadata metadata, long remainingNanos) { final RequestHeadersBuilder newHeaders = req.headers().toBuilder(); if (compressor != Identity.NONE) { newHeaders.set(GrpcHeaderNames.GRPC_ENCODING, compressor.getMessageEncoding()); @@ -512,6 +505,7 @@ private void prepareHeaders(Compressor compressor, Metadata metadata, long remai final HttpRequest newReq = req.withHeaders(newHeaders); ctx.updateRequest(newReq); + return newReq; } private void closeWhenListenerThrows(Throwable t) { diff --git a/oauth2/src/main/java/com/linecorp/armeria/client/auth/oauth2/OAuth2Client.java b/oauth2/src/main/java/com/linecorp/armeria/client/auth/oauth2/OAuth2Client.java index 34a521f1793..5d70210d20f 100644 --- a/oauth2/src/main/java/com/linecorp/armeria/client/auth/oauth2/OAuth2Client.java +++ b/oauth2/src/main/java/com/linecorp/armeria/client/auth/oauth2/OAuth2Client.java @@ -72,7 +72,7 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex HttpHeaderNames.AUTHORIZATION, token.authorization()).build()); ctx.updateRequest(newReq); return executeWithFallback(unwrap(), ctx, - (context, cause0) -> HttpResponse.ofFailure(cause0)); + (context, cause0) -> HttpResponse.ofFailure(cause0), newReq); }); return HttpResponse.of(future); } diff --git a/thrift/thrift0.13/src/main/java/com/linecorp/armeria/internal/client/thrift/DefaultTHttpClient.java b/thrift/thrift0.13/src/main/java/com/linecorp/armeria/internal/client/thrift/DefaultTHttpClient.java index 7149ce3c327..42caf28a648 100644 --- a/thrift/thrift0.13/src/main/java/com/linecorp/armeria/internal/client/thrift/DefaultTHttpClient.java +++ b/thrift/thrift0.13/src/main/java/com/linecorp/armeria/internal/client/thrift/DefaultTHttpClient.java @@ -24,14 +24,20 @@ import com.linecorp.armeria.client.ClientBuilderParams; import com.linecorp.armeria.client.RequestOptions; import com.linecorp.armeria.client.RpcClient; +import com.linecorp.armeria.client.RpcPreprocessor; +import com.linecorp.armeria.client.TailClientPreprocessor; import com.linecorp.armeria.client.UserClient; import com.linecorp.armeria.client.thrift.THttpClient; import com.linecorp.armeria.common.ExchangeType; import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.RequestTarget; import com.linecorp.armeria.common.RpcRequest; import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.client.DefaultClientRequestContext; +import com.linecorp.armeria.internal.client.PreprocessorAttributeKeys; import com.linecorp.armeria.internal.common.RequestTargetCache; import io.micrometer.core.instrument.MeterRegistry; @@ -43,9 +49,12 @@ final class DefaultTHttpClient extends UserClient<RpcRequest, RpcResponse> imple .exchangeType(ExchangeType.UNARY) .build(); + private final RpcPreprocessor rpcPreprocessor; + DefaultTHttpClient(ClientBuilderParams params, RpcClient delegate, MeterRegistry meterRegistry) { super(params, delegate, meterRegistry, RpcResponse::from, (ctx, cause) -> RpcResponse.ofFailure(decodeException(cause, null))); + rpcPreprocessor = TailClientPreprocessor.ofRpc(unwrap()); } @Override @@ -77,8 +86,21 @@ private RpcResponse execute0( RequestTargetCache.putForClient(path, reqTarget); final RpcRequest call = RpcRequest.of(serviceType, method, args); - return execute(scheme().sessionProtocol(), HttpMethod.POST, - reqTarget, call, UNARY_REQUEST_OPTIONS); + final HttpRequest httpReq = HttpRequest.of( + RequestHeaders.builder(HttpMethod.POST, reqTarget.path()) + .contentType(scheme().serializationFormat().mediaType()) + .build()); + final RequestOptions reqOptions = UNARY_REQUEST_OPTIONS + .toBuilder() + .attr(PreprocessorAttributeKeys.FUTURE_CONVERTER_KEY, futureConverter()) + .attr(PreprocessorAttributeKeys.ERROR_RESPONSE_FACTORY_KEY, errorResponseFactory()) + .attr(PreprocessorAttributeKeys.ENDPOINT_GROUP_KEY, endpointGroup()) + .build(); + final DefaultClientRequestContext ctx = new DefaultClientRequestContext( + scheme().sessionProtocol(), httpReq, call, reqTarget, + reqOptions, options()); + return options().clientPreprocessors().rpcDecorate(rpcPreprocessor) + .execute(ctx, call, reqOptions); } @Override