From f87b97bfba56e88d61c1cfd8650a3eabdf790c23 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Tue, 11 Apr 2023 16:50:40 +0200 Subject: [PATCH 1/9] fix: 101 status code is plural Signed-off-by: Marc Nuri --- .../io/fabric8/kubernetes/client/http/HttpStatusMessage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpStatusMessage.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpStatusMessage.java index 3c956ef4261..b2f69a7f4e1 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpStatusMessage.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpStatusMessage.java @@ -31,7 +31,7 @@ public class HttpStatusMessage { static { // informational entry(100, "Continue"); - entry(101, "Switching Protocol"); + entry(101, "Switching Protocols"); entry(102, "Processing"); entry(103, "Early Hints"); From 0f3e4f1fbec7115c16d2cbe8db1c61d30af9c0b4 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 27 Apr 2023 14:28:42 +0200 Subject: [PATCH 2/9] fix: typos in JDK HttpClient Signed-off-by: Marc Nuri --- .../fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 01e9b1c609d..a4881fba783 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -235,7 +235,7 @@ public void close() { return; } builder.getClientFactory().closeHttpClient(this); - // help with default cleanup, which is based upon garbarge collection + // help with default cleanup, which is based upon garbage collection this.httpClient = null; } @@ -252,7 +252,7 @@ public CompletableFuture> consumeBytesDirect(StandardHtt BodyHandler handlerAdapter = new BodyHandlerAdapter(subscriber, handler); return this.getHttpClient().sendAsync(requestBuilder(request).build(), handlerAdapter) - .thenApply(r -> new JdkHttpResponseImpl(r, r.body())); + .thenApply(r -> new JdkHttpResponseImpl<>(r, r.body())); } java.net.http.HttpRequest.Builder requestBuilder(StandardHttpRequest request) { From afa1b79087529b21e2b14aa93e8ad0de005c9022 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 6 Apr 2023 08:13:49 +0200 Subject: [PATCH 3/9] refactor: remove OkHttp-specific HttpLoggingInterceptor Signed-off-by: Marc Nuri --- .../client/okhttp/OkHttpClientBuilderImpl.java | 12 ------------ .../client/okhttp/OkHttpClientFactory.java | 10 ---------- 2 files changed, 22 deletions(-) diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java index 0186c1f752d..cbf9d705314 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java @@ -19,10 +19,8 @@ import io.fabric8.kubernetes.client.http.StandardHttpClientBuilder; import okhttp3.Authenticator; import okhttp3.ConnectionSpec; -import okhttp3.Interceptor; import okhttp3.OkHttpClient; import okhttp3.Protocol; -import okhttp3.logging.HttpLoggingInterceptor; import java.net.Proxy; import java.util.Arrays; @@ -106,16 +104,6 @@ private OkHttpClientImpl derivedBuild(okhttp3.OkHttpClient.Builder builder) { builder.cache(null); } OkHttpClient client = builder.build(); - if (this.forStreaming) { - // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does - // not let us stream responses from the server. - for (Interceptor i : client.networkInterceptors()) { - if (i instanceof HttpLoggingInterceptor) { - HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; - interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); - } - } - } return new OkHttpClientImpl(client, this); } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientFactory.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientFactory.java index f2b3c1d8a59..78d1dfea020 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientFactory.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientFactory.java @@ -22,9 +22,6 @@ import io.fabric8.kubernetes.client.utils.HttpClientUtils; import okhttp3.Dispatcher; import okhttp3.OkHttpClient; -import okhttp3.logging.HttpLoggingInterceptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; @@ -73,13 +70,6 @@ public OkHttpClientBuilderImpl newBuilder(Config config) { httpClientBuilder.hostnameVerifier((s, sslSession) -> true); } - Logger reqLogger = LoggerFactory.getLogger(HttpLoggingInterceptor.class); - if (reqLogger.isTraceEnabled()) { - HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(); - loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY); - httpClientBuilder.addNetworkInterceptor(loggingInterceptor); - } - if (config.getWebsocketPingInterval() > 0) { httpClientBuilder.pingInterval(config.getWebsocketPingInterval(), TimeUnit.MILLISECONDS); } From 4f1b57ff21b68ad0e5db78522ff68af934feaf49 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 6 Apr 2023 12:12:09 +0200 Subject: [PATCH 4/9] feat: generic HttpLoggingInterceptor Signed-off-by: Marc Nuri --- .../client/jdkhttp/JdkHttpClientImpl.java | 19 +- .../client/jetty/JettyHttpClient.java | 9 +- .../client/jetty/JettyWebSocket.java | 30 ++- .../client/okhttp/OkHttpClientImpl.java | 5 +- .../client/okhttp/OkHttpWebSocketImpl.java | 34 ++-- .../client/vertx/VertxHttpClient.java | 52 +---- .../client/vertx/VertxHttpRequest.java | 2 +- .../client/http/HttpLoggingInterceptor.java | 188 ++++++++++++++++++ .../kubernetes/client/http/HttpRequest.java | 8 + .../kubernetes/client/http/Interceptor.java | 3 +- .../client/http/StandardHttpClient.java | 18 +- .../http/StandardHttpClientBuilder.java | 5 + .../client/http/StandardHttpRequest.java | 16 +- .../http/WebSocketHandshakeException.java | 8 +- .../client/http/WebSocketResponse.java | 12 +- .../client/http/WebSocketUpgradeResponse.java | 67 +++++++ .../client/http/AbstractInterceptorTest.java | 4 +- .../client/http/StandardHttpClientTest.java | 7 +- 18 files changed, 375 insertions(+), 112 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketUpgradeResponse.java diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index a4881fba783..1b9a3a70d83 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -31,6 +31,7 @@ import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocket.Listener; import io.fabric8.kubernetes.client.http.WebSocketResponse; +import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse; import java.net.URI; import java.net.http.HttpClient; @@ -305,7 +306,7 @@ public long contentLength() { @Override public CompletableFuture buildWebSocketDirect( StandardWebSocketBuilder standardWebSocketBuilder, Listener listener) { - StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest(); + final StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest(); java.net.http.WebSocket.Builder newBuilder = this.getHttpClient().newWebSocketBuilder(); request.headers().forEach((k, v) -> v.forEach(s -> newBuilder.header(k, s))); if (standardWebSocketBuilder.getSubprotocol() != null) { @@ -324,21 +325,21 @@ public CompletableFuture buildWebSocketDirect( CompletableFuture response = new CompletableFuture<>(); URI uri = WebSocket.toWebSocketUri(request.uri()); - newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((w, t) -> { + newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((jdkWebSocket, t) -> { if (t instanceof CompletionException && t.getCause() != null) { t = t.getCause(); } + final JdkWebSocketImpl fabric8WebSocket = new JdkWebSocketImpl(queueSize, jdkWebSocket); if (t instanceof java.net.http.WebSocketHandshakeException) { - response - .complete( - new WebSocketResponse(new JdkWebSocketImpl(queueSize, w), - new io.fabric8.kubernetes.client.http.WebSocketHandshakeException( - new JdkHttpResponseImpl<>(((java.net.http.WebSocketHandshakeException) t).getResponse())) - .initCause(t))); + final java.net.http.HttpResponse jdkResponse = ((java.net.http.WebSocketHandshakeException) t).getResponse(); + final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse( + request, jdkResponse.statusCode(), jdkResponse.headers().map(), fabric8WebSocket); + response.complete(new WebSocketResponse(upgradeResponse, + new io.fabric8.kubernetes.client.http.WebSocketHandshakeException(upgradeResponse).initCause(t))); } else if (t != null) { response.completeExceptionally(t); } else { - response.complete(new WebSocketResponse(new JdkWebSocketImpl(queueSize, w), null)); + response.complete(new WebSocketResponse(new WebSocketUpgradeResponse(request, fabric8WebSocket), null)); } }); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index 09385856043..f5b38ade851 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -136,7 +136,7 @@ public CompletableFuture buildWebSocketDirect(StandardWebSock Listener listener) { try { jettyWs.start(); - HttpRequest request = standardWebSocketBuilder.asHttpRequest(); + final HttpRequest request = standardWebSocketBuilder.asHttpRequest(); final ClientUpgradeRequest cur = new ClientUpgradeRequest(); if (Utils.isNotNullOrEmpty(standardWebSocketBuilder.getSubprotocol())) { cur.setSubProtocols(standardWebSocketBuilder.getSubprotocol()); @@ -152,15 +152,14 @@ public CompletableFuture buildWebSocketDirect(StandardWebSock .whenComplete((s, ex) -> { if (ex != null) { if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) { - future.complete( - new WebSocketResponse(webSocket, JettyWebSocket.toHandshakeException((UpgradeException) ex.getCause()))); + future.complete(JettyWebSocket.toWebSocketResponse(request, webSocket, (UpgradeException) ex.getCause())); } else if (ex instanceof UpgradeException) { - future.complete(new WebSocketResponse(webSocket, JettyWebSocket.toHandshakeException((UpgradeException) ex))); + future.complete(JettyWebSocket.toWebSocketResponse(request, webSocket, (UpgradeException) ex)); } else { future.completeExceptionally(ex); } } else { - future.complete(new WebSocketResponse(webSocket, null)); + future.complete(JettyWebSocket.toWebSocketResponse(request, webSocket, s)); } }); return future; diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index 03f885cce6a..dafd7d7de02 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -17,18 +17,19 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.http.BufferUtil; -import io.fabric8.kubernetes.client.http.StandardHttpRequest; +import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocketHandshakeException; -import org.eclipse.jetty.client.HttpResponse; +import io.fabric8.kubernetes.client.http.WebSocketResponse; +import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketListener; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.exceptions.UpgradeException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -54,14 +55,6 @@ public JettyWebSocket(WebSocket.Listener listener) { moreMessages = true; } - static WebSocketHandshakeException toHandshakeException(UpgradeException ex) { - return new WebSocketHandshakeException(new JettyHttpResponse<>( - new StandardHttpRequest.Builder().uri(ex.getRequestURI()).build(), - new HttpResponse(null, Collections.emptyList()).status(ex.getResponseStatusCode()), - null)) - .initCause(ex); - } - @Override public boolean send(ByteBuffer buffer) { if (closed.get() || !webSocketSession.isOpen()) { @@ -171,4 +164,19 @@ private void backPressure() { lock.unlock(); } } + + static WebSocketResponse toWebSocketResponse(HttpRequest httpRequest, WebSocket ws, UpgradeException ex) { + final WebSocketUpgradeResponse webSocketUpgradeResponse = new WebSocketUpgradeResponse(httpRequest, + ex.getResponseStatusCode(), ws); + final WebSocketHandshakeException handshakeException = new WebSocketHandshakeException(webSocketUpgradeResponse) + .initCause(ex); + return new WebSocketResponse(webSocketUpgradeResponse, handshakeException); + } + + static WebSocketResponse toWebSocketResponse(HttpRequest httpRequest, WebSocket ws, Session session) { + final UpgradeResponse jettyUpgradeResponse = session.getUpgradeResponse(); + final WebSocketUpgradeResponse fabric8UpgradeResponse = new WebSocketUpgradeResponse( + httpRequest, jettyUpgradeResponse.getStatusCode(), jettyUpgradeResponse.getHeaders(), ws); + return new WebSocketResponse(fabric8UpgradeResponse, null); + } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 12b9da04bd8..c4e4d9eec6f 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -411,11 +411,12 @@ public long contentLength() throws IOException { @Override public CompletableFuture buildWebSocketDirect(StandardWebSocketBuilder standardWebSocketBuilder, Listener listener) { - Request.Builder requestBuilder = requestBuilder(standardWebSocketBuilder.asHttpRequest()); + final StandardHttpRequest fabric8Request = standardWebSocketBuilder.asHttpRequest(); + Request.Builder requestBuilder = requestBuilder(fabric8Request); if (standardWebSocketBuilder.getSubprotocol() != null) { requestBuilder.header("Sec-WebSocket-Protocol", standardWebSocketBuilder.getSubprotocol()); } - return OkHttpWebSocketImpl.buildAsync(httpClient, requestBuilder.build(), listener); + return OkHttpWebSocketImpl.buildAsync(httpClient, fabric8Request, requestBuilder.build(), listener); } } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java index 627323d0cda..0bfab821aa1 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java @@ -17,18 +17,20 @@ package io.fabric8.kubernetes.client.okhttp; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.StandardHttpRequest; import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocketHandshakeException; import io.fabric8.kubernetes.client.http.WebSocketResponse; -import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl.OkHttpResponseImpl; +import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import okhttp3.ResponseBody; import okhttp3.WebSocketListener; import okio.ByteString; -import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -36,8 +38,8 @@ class OkHttpWebSocketImpl implements WebSocket { - private okhttp3.WebSocket webSocket; - private Runnable requestMethod; + private final okhttp3.WebSocket webSocket; + private final Runnable requestMethod; public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket, Runnable requestMethod) { this.webSocket = webSocket; @@ -64,7 +66,8 @@ public void request() { requestMethod.run(); } - public static CompletableFuture buildAsync(OkHttpClient httpClient, Request request, Listener listener) { + public static CompletableFuture buildAsync(OkHttpClient httpClient, StandardHttpRequest fabric8Request, + Request request, Listener listener) { CompletableFuture future = new CompletableFuture<>(); httpClient.newWebSocket(request, new WebSocketListener() { private volatile boolean opened; @@ -79,13 +82,12 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons } if (!opened) { if (response != null) { - try { - future.complete(new WebSocketResponse(null, - // passing null as the type ensures that the response body is closed - new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t))); - } catch (IOException e) { - // can't happen - } + // Ensure response body is always closed (leak) + Optional.ofNullable(response.body()).ifPresent(ResponseBody::close); + final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse( + fabric8Request, response.code(), response.headers().toMultimap(), null); + future.complete(new WebSocketResponse(upgradeResponse, + new WebSocketHandshakeException(upgradeResponse).initCause(t))); } else { future.completeExceptionally(t); } @@ -100,9 +102,11 @@ public void onOpen(okhttp3.WebSocket webSocket, Response response) { if (response != null) { response.close(); } - OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket, this::request); - listener.onOpen(value); - future.complete(new WebSocketResponse(value, null)); + OkHttpWebSocketImpl fabric8WebSocket = new OkHttpWebSocketImpl(webSocket, this::request); + listener.onOpen(fabric8WebSocket); + future.complete(new WebSocketResponse( + new WebSocketUpgradeResponse(fabric8Request, response.code(), response.headers().toMultimap(), fabric8WebSocket), + null)); } @Override diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java index f5f1f03927a..cd351db1223 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java @@ -16,13 +16,13 @@ package io.fabric8.kubernetes.client.vertx; import io.fabric8.kubernetes.client.http.AsyncBody; -import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.http.HttpResponse; import io.fabric8.kubernetes.client.http.StandardHttpClient; import io.fabric8.kubernetes.client.http.StandardHttpRequest; import io.fabric8.kubernetes.client.http.StandardWebSocketBuilder; import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocketResponse; +import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse; import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpHeaders; @@ -35,12 +35,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import static io.fabric8.kubernetes.client.vertx.VertxHttpRequest.toHeadersMap; + public class VertxHttpClient extends StandardHttpClient, F, VertxHttpClientBuilder> { @@ -71,59 +71,27 @@ public CompletableFuture buildWebSocketDirect(StandardWebSock options.setSubProtocols(Collections.singletonList(standardWebSocketBuilder.getSubprotocol())); } - StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest(); + final StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest(); request.headers().entrySet().stream() .forEach(e -> e.getValue().stream().forEach(v -> options.addHeader(e.getKey(), v))); options.setAbsoluteURI(request.uri().toString()); - CompletableFuture response = new CompletableFuture(); + CompletableFuture response = new CompletableFuture<>(); client .webSocket(options) .onSuccess(ws -> { VertxWebSocket ret = new VertxWebSocket(ws, listener); ret.init(); - response.complete(new WebSocketResponse(ret, null)); + response.complete(new WebSocketResponse(new WebSocketUpgradeResponse(request, ret), null)); }).onFailure(t -> { if (t instanceof UpgradeRejectedException) { UpgradeRejectedException handshake = (UpgradeRejectedException) t; - response.complete(new WebSocketResponse(null, - new io.fabric8.kubernetes.client.http.WebSocketHandshakeException(new HttpResponse() { - @Override - public int code() { - return handshake.getStatus(); - } - - @Override - public String body() { - return handshake.getBody().toString(); - } - - @Override - public HttpRequest request() { - throw new UnsupportedOperationException(); - } - - @Override - public Optional> previousResponse() { - return Optional.empty(); - } - - @Override - public List headers(String s) { - return handshake.getHeaders().getAll(s); - } - - @Override - public Map> headers() { - Map> headers = new LinkedHashMap<>(); - handshake.getHeaders().names().forEach(name -> { - headers.put(name, handshake.getHeaders().getAll(name)); - }); - return headers; - } - }))); + final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse( + request, handshake.getStatus(), toHeadersMap(handshake.getHeaders()), null); + response.complete(new WebSocketResponse(upgradeResponse, + new io.fabric8.kubernetes.client.http.WebSocketHandshakeException(upgradeResponse))); } response.completeExceptionally(t); }); diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java index e40443d7173..a155fb51f5a 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java @@ -150,7 +150,7 @@ public void cancel() { static Map> toHeadersMap(MultiMap multiMap) { Map> headers = new LinkedHashMap<>(); - multiMap.names().stream().forEach(k -> headers.put(k, multiMap.getAll(k))); + multiMap.names().forEach(k -> headers.put(k, multiMap.getAll(k))); return headers; } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java new file mode 100644 index 00000000000..2658aaeeacd --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +public class HttpLoggingInterceptor implements Interceptor { + + private final HttpLogger httpLogger; + private final Map activeConsumers; + + public HttpLoggingInterceptor() { + this(LoggerFactory.getLogger(HttpLoggingInterceptor.class)); + } + + public HttpLoggingInterceptor(Logger logger) { + this.httpLogger = new HttpLogger(logger); + activeConsumers = new ConcurrentHashMap<>(); + } + + @Override + public AsyncBody.Consumer> consumer(AsyncBody.Consumer> consumer, HttpRequest request) { + final DeferredLoggingConsumer interceptedConsumer = new DeferredLoggingConsumer(httpLogger, request, consumer, + () -> activeConsumers.remove(request.id())); + activeConsumers.put(request.id(), interceptedConsumer); + return interceptedConsumer; + } + + @Override + public void after(HttpRequest request, HttpResponse response) { + if (response instanceof WebSocketUpgradeResponse) { + httpLogger.logWsStart(); + httpLogger.logRequest(request); + httpLogger.logResponse(response); + httpLogger.logWsEnd(); + } else { + activeConsumers.computeIfPresent(request.id(), (id, consumer) -> { + consumer.originalResponse.set(response); + if (response.body() instanceof AsyncBody) { + consumer.processAsyncBody((AsyncBody) response.body()); + } + return consumer; + }); + } + } + + private static final class DeferredLoggingConsumer + implements AsyncBody.Consumer>, BiConsumer { + + private final HttpLogger httpLogger; + private final HttpRequest originalRequest; + private final AsyncBody.Consumer> originalConsumer; + private final Runnable cleanUp; + private final AtomicReference> originalResponse; + private final AtomicBoolean asyncBodyDoneProcessed; + private final Queue responseBody; + + public DeferredLoggingConsumer(HttpLogger httpLogger, HttpRequest originalRequest, + AsyncBody.Consumer> originalConsumer, Runnable cleanUp) { + this.httpLogger = httpLogger; + this.originalRequest = originalRequest; + this.originalConsumer = originalConsumer; + this.cleanUp = cleanUp; + originalResponse = new AtomicReference<>(); + asyncBodyDoneProcessed = new AtomicBoolean(false); + responseBody = new ConcurrentLinkedQueue<>(); + } + + @Override + public void consume(List value, AsyncBody asyncBody) throws Exception { + // TODO: Should try to detect if the body is text or not? (HttpLoggingInterceptor.isPlainText) + value.stream().map(BufferUtil::copy).forEach(responseBody::add); // Potential leak + originalConsumer.consume(value, asyncBody); + } + + @Override + public void accept(Void v, Throwable throwable) { + httpLogger.logStart(); + httpLogger.logRequest(originalRequest); + if (originalResponse.get() != null) { + httpLogger.logResponse(originalResponse.get()); + httpLogger.logResponseBody(responseBody); + } + httpLogger.logEnd(); + responseBody.clear(); + cleanUp.run(); + } + + /** + * Registers the asyncBody.done() callback. + * + * @param asyncBody the AsyncBody instance to register the callback on the done() future. + */ + private void processAsyncBody(AsyncBody asyncBody) { + if (asyncBodyDoneProcessed.compareAndSet(false, true)) { + asyncBody.done().whenComplete(this); + } + } + } + + private static final class HttpLogger { + private final Logger logger; + + private HttpLogger(Logger logger) { + this.logger = logger; + } + + void logRequest(HttpRequest request) { + if (logger.isTraceEnabled() && request != null) { + logger.trace("> {} {}", request.method(), request.uri()); + request.headers().forEach((h, vv) -> vv.forEach(v -> logger.trace("> {}: {}", h, v))); + if (!Utils.isNullOrEmpty(request.bodyString())) { + logger.trace(request.bodyString()); + } + } + } + + void logResponse(HttpResponse response) { + if (logger.isTraceEnabled() && response != null) { + logger.trace("< {} {}", response.code(), response.message()); + response.headers().forEach((h, vv) -> vv.forEach(v -> logger.trace("< {}: {}", h, v))); + } + } + + void logResponseBody(Queue responseBody) { + if (logger.isTraceEnabled() && responseBody != null) { + final StringBuilder bodyString = new StringBuilder(); + while (!responseBody.isEmpty()) { + bodyString.append(StandardCharsets.UTF_8.decode(responseBody.poll())); + } + if (bodyString.length() > 0) { + logger.trace(bodyString.toString()); + } + } + } + + void logStart() { + if (logger.isTraceEnabled()) { + logger.trace("-HTTP START-"); + } + } + + void logEnd() { + if (logger.isTraceEnabled()) { + logger.trace("-HTTP END-"); + } + } + + void logWsStart() { + if (logger.isTraceEnabled()) { + logger.trace("-WS START-"); + } + } + + void logWsEnd() { + if (logger.isTraceEnabled()) { + logger.trace("-WS END-"); + } + } + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpRequest.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpRequest.java index d5bb95d0647..f868cd07cff 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpRequest.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpRequest.java @@ -23,6 +23,7 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; public interface HttpRequest extends HttpHeaders { @@ -98,6 +99,13 @@ static String formURLEncode(String value) { } } + /** + * The unique id for this HTTP request, used for logging and debugging + * + * @return a UUID. + */ + UUID id(); + URI uri(); String method(); diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java index 3cc5ee6fdca..76629297181 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java @@ -42,9 +42,10 @@ default void before(BasicBuilder builder, HttpRequest request, RequestTags tags) *

* Should be used to analyze response codes and headers, original response shouldn't be altered. * + * @param request the original request sent to the server. * @param response the response received from the server. */ - default void after(HttpResponse response) { + default void after(HttpRequest request, HttpResponse response) { } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java index da24bcb55e5..3b349cb7999 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java @@ -88,15 +88,19 @@ private CompletableFuture> consumeBytesOnce(HttpRequest for (Interceptor interceptor : builder.getInterceptors().values()) { interceptor.before(copy, standardHttpRequest, this); standardHttpRequest = copy.build(); - consumer = interceptor.consumer(consumer, standardHttpRequest); + } + final StandardHttpRequest effectiveRequest = standardHttpRequest; + + for (Interceptor interceptor : builder.getInterceptors().values()) { + consumer = interceptor.consumer(consumer, effectiveRequest); } final Consumer> effectiveConsumer = consumer; - CompletableFuture> cf = consumeBytesDirect(standardHttpRequest, effectiveConsumer); + CompletableFuture> cf = consumeBytesDirect(effectiveRequest, effectiveConsumer); for (Interceptor interceptor : builder.getInterceptors().values()) { cf = cf.thenCompose(response -> { - interceptor.after(response); + interceptor.after(effectiveRequest, response); if (!HttpResponse.isSuccessful(response.code())) { return interceptor.afterFailure(copy, response, this) .thenCompose(b -> { @@ -190,7 +194,7 @@ final CompletableFuture buildWebSocket(StandardWebSocketBuilder stand retryWithExponentialBackoff(intermediate, () -> buildWebSocketOnce(standardWebSocketBuilder, listener), standardWebSocketBuilder.asHttpRequest().uri(), r -> Optional.ofNullable(r.wshse).map(WebSocketHandshakeException::getResponse).map(HttpResponse::code).orElse(null), - r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null))); + r -> Optional.ofNullable(r.webSocketUpgradeResponse.getWebSocket()).ifPresent(w -> w.sendClose(1000, null))); CompletableFuture result = new CompletableFuture<>(); @@ -199,7 +203,8 @@ final CompletableFuture buildWebSocket(StandardWebSocketBuilder stand if (t != null) { result.completeExceptionally(t); } else { - completeOrCancel(w -> w.sendClose(1000, null), result).accept(r.webSocket, r.wshse); + completeOrCancel(w -> w.sendClose(1000, null), result) + .accept(r.webSocketUpgradeResponse.getWebSocket(), r.wshse); } }); return result; @@ -208,11 +213,12 @@ final CompletableFuture buildWebSocket(StandardWebSocketBuilder stand private CompletableFuture buildWebSocketOnce(StandardWebSocketBuilder standardWebSocketBuilder, Listener listener) { final StandardWebSocketBuilder copy = standardWebSocketBuilder.newBuilder(); - builder.getInterceptors().values().stream().forEach(i -> i.before(copy, copy.asHttpRequest(), this)); + builder.getInterceptors().values().forEach(i -> i.before(copy, copy.asHttpRequest(), this)); CompletableFuture cf = buildWebSocketDirect(copy, listener); for (Interceptor interceptor : builder.getInterceptors().values()) { cf = cf.thenCompose(response -> { + interceptor.after(response.webSocketUpgradeResponse.request(), response.webSocketUpgradeResponse); if (response.wshse != null && response.wshse.getResponse() != null) { return interceptor.afterFailure(copy, response.wshse.getResponse(), this).thenCompose(b -> { if (Boolean.TRUE.equals(b)) { diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClientBuilder.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClientBuilder.java index 22fefd8427b..f136238f151 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClientBuilder.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClientBuilder.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.client.http.HttpClient.DerivedClientBuilder; import io.fabric8.kubernetes.client.internal.SSLUtils; import lombok.Getter; +import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.time.Duration; @@ -54,6 +55,10 @@ public abstract class StandardHttpClientBuilder> headers, URI uri, String method, String bodyString) { - super(headers); - this.uri = uri; - this.method = method; - this.bodyString = bodyString; - expectContinue = false; - this.body = null; - this.contentType = null; + this(headers, uri, method, bodyString, null, false, null); } StandardHttpRequest(Map> headers, URI uri, String method, String bodyString, BodyContent body, boolean expectContinue, String contentType) { super(headers); + this.id = UUID.randomUUID(); this.uri = uri; this.method = method; this.bodyString = bodyString; @@ -120,6 +117,11 @@ public StandardHttpRequest(Map> headers, URI uri, String me this.contentType = contentType; } + @Override + public UUID id() { + return id; + } + @Override public URI uri() { return uri; diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketHandshakeException.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketHandshakeException.java index f9f31728889..3e2a9ac589e 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketHandshakeException.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketHandshakeException.java @@ -20,18 +20,18 @@ public final class WebSocketHandshakeException extends IOException { - private final transient HttpResponse response; + private final transient WebSocketUpgradeResponse response; - public WebSocketHandshakeException(HttpResponse response) { + public WebSocketHandshakeException(WebSocketUpgradeResponse response) { this.response = response; } /** * Get the response, which includes the code of failure. - * + * * @return the response, never null */ - public HttpResponse getResponse() { + public WebSocketUpgradeResponse getResponse() { return response; } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketResponse.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketResponse.java index 46ee20d98a2..967751e2916 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketResponse.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketResponse.java @@ -16,16 +16,20 @@ package io.fabric8.kubernetes.client.http; +import java.util.Objects; + /* * TODO: this may not be the best way to do this - in general * instead we create a response to hold them both + * manusa: We can remove this class and replace with separate WebSocketUpgradeResponse|WebSocketHandshakeException handled by the future */ public class WebSocketResponse { - public WebSocketResponse(WebSocket w, WebSocketHandshakeException wshse) { - this.webSocket = w; + final WebSocketUpgradeResponse webSocketUpgradeResponse; + final WebSocketHandshakeException wshse; + + public WebSocketResponse(WebSocketUpgradeResponse webSocketUpgradeResponse, WebSocketHandshakeException wshse) { + this.webSocketUpgradeResponse = Objects.requireNonNull(webSocketUpgradeResponse); this.wshse = wshse; } - WebSocket webSocket; - WebSocketHandshakeException wshse; } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketUpgradeResponse.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketUpgradeResponse.java new file mode 100644 index 00000000000..fab2febdbd2 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocketUpgradeResponse.java @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class WebSocketUpgradeResponse extends StandardHttpHeaders implements HttpResponse { + + private final HttpRequest httpRequest; + private final int code; + private final WebSocket webSocket; + + public WebSocketUpgradeResponse(HttpRequest httpRequest, WebSocket webSocket) { + this(httpRequest, 101, new LinkedHashMap<>(), webSocket); + } + + public WebSocketUpgradeResponse(HttpRequest httpRequest, int code, WebSocket webSocket) { + this(httpRequest, code, new LinkedHashMap<>(), webSocket); + } + + public WebSocketUpgradeResponse(HttpRequest httpRequest, int code, Map> headers, WebSocket webSocket) { + super(headers); + this.httpRequest = httpRequest; + this.code = code; + this.webSocket = webSocket; + } + + @Override + public int code() { + return code; + } + + @Override + public Void body() { + return null; + } + + @Override + public HttpRequest request() { + return httpRequest; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + public WebSocket getWebSocket() { + return webSocket; + } +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index d1697910700..2412ff3388c 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -272,7 +272,7 @@ public void afterHttpSuccess() throws Exception { final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("after", new Interceptor() { @Override - public void after(HttpResponse response) { + public void after(HttpRequest request, HttpResponse response) { responseFuture.complete(response); } }); @@ -299,7 +299,7 @@ public void afterHttpError() throws Exception { final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("after", new Interceptor() { @Override - public void after(HttpResponse response) { + public void after(HttpRequest request, HttpResponse response) { responseFuture.complete(response); } }); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java index 81ddbaccc5f..5417548b90b 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java @@ -54,7 +54,7 @@ void webSocketFutureCancel() { // cancel the future before the websocket response future.cancel(true); - client.getWsFutures().get(0).complete(new WebSocketResponse(ws, null)); + client.getWsFutures().get(0).complete(new WebSocketResponse(new WebSocketUpgradeResponse(null, 101, ws), null)); // ensure that the ws has been closed Mockito.verify(ws).sendClose(1000, null); @@ -176,9 +176,10 @@ void testWebSocketWithLessFailuresThanRetries() throws Exception { }); client.getWsFutures().get(0) - .completeExceptionally(new WebSocketHandshakeException(new TestHttpResponse().withCode(500))); + .completeExceptionally(new WebSocketHandshakeException(new WebSocketUpgradeResponse(null, 500, null))); client.getWsFutures().add(client.getWsFutures().get(0)); - client.getWsFutures().add(CompletableFuture.completedFuture((new WebSocketResponse(ws, null)))); + client.getWsFutures() + .add(CompletableFuture.completedFuture((new WebSocketResponse(new WebSocketUpgradeResponse(null, ws), null)))); future.get(2, TimeUnit.MINUTES); From 761c6e10bc9be1662352f9a6f3e5d68e383e88b3 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Tue, 11 Apr 2023 17:07:36 +0200 Subject: [PATCH 5/9] test: HttpLoggingInterceptor tests Signed-off-by: Marc Nuri --- httpclient-jdk/pom.xml | 4 + .../JdkHttpLoggingInterceptorTest.java | 27 +++ .../JettyHttpLoggingInterceptorTest.java | 28 +++ .../OkHttpHttpLoggingInterceptorTest.java | 27 +++ httpclient-vertx/pom.xml | 9 + .../VertxHttpLoggingInterceptorTest.java | 28 +++ .../AbstractHttpLoggingInterceptorTest.java | 210 ++++++++++++++++++ 7 files changed, 333 insertions(+) create mode 100644 httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpLoggingInterceptorTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpLoggingInterceptorTest.java create mode 100644 httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpHttpLoggingInterceptorTest.java create mode 100644 httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpLoggingInterceptorTest.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java diff --git a/httpclient-jdk/pom.xml b/httpclient-jdk/pom.xml index b17475458ab..9256b145c99 100644 --- a/httpclient-jdk/pom.xml +++ b/httpclient-jdk/pom.xml @@ -71,6 +71,10 @@ mockwebserver test + + org.mockito + mockito-inline + org.assertj assertj-core diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpLoggingInterceptorTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpLoggingInterceptorTest.java new file mode 100644 index 00000000000..a939ea7f10c --- /dev/null +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpLoggingInterceptorTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jdkhttp; + +import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class JdkHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JdkHttpClientFactory(); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpLoggingInterceptorTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpLoggingInterceptorTest.java new file mode 100644 index 00000000000..8a3974bc8cd --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpLoggingInterceptorTest.java @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class JettyHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest { + + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JettyHttpClientFactory(); + } +} diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpHttpLoggingInterceptorTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpHttpLoggingInterceptorTest.java new file mode 100644 index 00000000000..7076f34e3a4 --- /dev/null +++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpHttpLoggingInterceptorTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.okhttp; + +import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class OkHttpHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new OkHttpClientFactory(); + } +} diff --git a/httpclient-vertx/pom.xml b/httpclient-vertx/pom.xml index f5c2ef131f6..859d7d31deb 100644 --- a/httpclient-vertx/pom.xml +++ b/httpclient-vertx/pom.xml @@ -72,6 +72,15 @@ junit-jupiter-engine test + + org.junit.jupiter + junit-jupiter-params + test + + + org.mockito + mockito-inline + io.fabric8 mockwebserver diff --git a/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpLoggingInterceptorTest.java b/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpLoggingInterceptorTest.java new file mode 100644 index 00000000000..87c552078cc --- /dev/null +++ b/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpLoggingInterceptorTest.java @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.vertx; + +import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class VertxHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest { + + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new VertxHttpClientFactory(); + } +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java new file mode 100644 index 00000000000..0e9b679b746 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.mockwebserver.DefaultMockServer; +import io.fabric8.mockwebserver.utils.ResponseProviders; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; +import org.slf4j.Logger; + +import java.net.URI; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.when; + +public abstract class AbstractHttpLoggingInterceptorTest { + + private static DefaultMockServer server; + private Logger logger; + private InOrder inOrder; + private HttpClient httpClient; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + @BeforeEach + void setUp() { + logger = mock(Logger.class); + inOrder = Mockito.inOrder(logger); + when(logger.isTraceEnabled()).thenReturn(true); + httpClient = getHttpClientFactory().newBuilder() + .addOrReplaceInterceptor("loggingInterceptor", new HttpLoggingInterceptor(logger)) + .build(); + } + + protected abstract HttpClient.Factory getHttpClientFactory(); + + @Test + @DisplayName("HTTP request URI is logged") + public void httpRequestUriLogged() throws Exception { + server.expect().withPath("/request-uri") + .andReturn(200, "This is the response body") + .always(); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .uri(server.url("/request-uri")) + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger) + .trace(eq("> {} {}"), eq("GET"), argThat((URI uri) -> uri.toString().endsWith("/request-uri"))); + inOrder.verify(logger).trace("-HTTP END-"); + } + + @Test + @DisplayName("HTTP request headers are logged") + public void httpRequestHeadersLogged() throws Exception { + server.expect().withPath("/request-headers") + .andReturn(200, "This is the response body") + .always(); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .header("test-type", "header-test") + .uri(server.url("/request-headers")) + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger).trace("> {}: {}", "test-type", "header-test"); + inOrder.verify(logger).trace("-HTTP END-"); + } + + @Test + @DisplayName("HTTP request body is logged") + public void httpRequestBodyLogged() throws Exception { + server.expect().withPath("/request-body") + .andReturn(200, "This is the response body") + .always(); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .uri(server.url("/request-body")) + .method("POST", "test/plain", "This is the request body") + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger).trace("This is the request body"); + inOrder.verify(logger).trace("-HTTP END-"); + } + + @Test + @DisplayName("HTTP response status is logged") + public void httpResponseStatusLogged() throws Exception { + server.expect().withPath("/response-status") + .andReturn(200, "This is the response body") + .always(); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .uri(server.url("/response-status")) + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger).trace("< {} {}", 200, "OK"); + inOrder.verify(logger).trace("-HTTP END-"); + } + + @Test + @DisplayName("HTTP response headers are logged") + public void httpResponseHeadersLogged() throws Exception { + server.expect().withPath("/response-headers") + .andReply(ResponseProviders.of(204, "", Collections.singletonMap("test-type", "response-header-test"))) + .always(); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .uri(server.url("/response-headers")) + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger).trace(eq("< {} {}"), anyInt(), anyString()); + inOrder.verify(logger).trace("< {}: {}", "test-type", "response-header-test"); + inOrder.verify(logger).trace("-HTTP END-"); + } + + @Test + @DisplayName("HTTP response body is logged") + public void httpResponseBodyLogged() throws Exception { + server.expect().withPath("/response-body") + .andReturn(200, "This is the response body") + .always(); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .uri(server.url("/response-body")) + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger).trace(eq("< {} {}"), anyInt(), anyString()); + inOrder.verify(logger).trace("This is the response body"); + inOrder.verify(logger).trace("-HTTP END-"); + } + + @Test + @DisplayName("WS request URI is logged") + public void wsRequestUriLogged() throws Exception { + server.expect().withPath("/ws-request-uri") + .andUpgradeToWebSocket() + .open().done().always(); + httpClient.newWebSocketBuilder() + .uri(URI.create(server.url("/ws-request-uri"))) + .buildAsync(new WebSocket.Listener() { + }) + .get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-WS START-"); + inOrder.verify(logger) + .trace(eq("> {} {}"), eq("GET"), argThat((URI uri) -> uri.toString().endsWith("/ws-request-uri"))); + inOrder.verify(logger).trace("-WS END-"); + } + + @Test + @DisplayName("WS request headers are logged") + public void wsRequestHeadersLogged() throws Exception { + server.expect().withPath("/ws-request-headers") + .andUpgradeToWebSocket() + .open().done().always(); + httpClient.newWebSocketBuilder() + .header("test-type", "ws-header-test") + .uri(URI.create(server.url("/ws-request-headers"))) + .buildAsync(new WebSocket.Listener() { + }) + .get(10, TimeUnit.SECONDS); + verify(logger, timeout(1000L)).trace("-WS END-"); + inOrder.verify(logger).trace("-WS START-"); + inOrder.verify(logger).trace("> {}: {}", "test-type", "ws-header-test"); + inOrder.verify(logger).trace("-WS END-"); + } + + @Test + @DisplayName("WS response status is logged") + public void wsResponseStatusLogged() throws Exception { + server.expect().withPath("/ws-response-status") + .andUpgradeToWebSocket() + .open().done().always(); + httpClient.newWebSocketBuilder() + .uri(URI.create(server.url("/ws-response-status"))) + .buildAsync(new WebSocket.Listener() { + }) + .get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-WS START-"); + inOrder.verify(logger).trace("< {} {}", 101, "Switching Protocols"); + inOrder.verify(logger).trace("-WS END-"); + } +} From c81d48804413604b73f9d5967c1aed7ae8f4603b Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 27 Apr 2023 13:58:16 +0200 Subject: [PATCH 6/9] feat: HttpLoggingInterceptor logs only plain text response bodies Signed-off-by: Marc Nuri --- .../kubernetes/client/http/BufferUtil.java | 23 ++++++++++ .../client/http/HttpLoggingInterceptor.java | 9 ++-- .../AbstractHttpLoggingInterceptorTest.java | 44 ++++++++++++++++++- 3 files changed, 71 insertions(+), 5 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java index 924672c9f91..ca68d180c9e 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java @@ -16,6 +16,9 @@ package io.fabric8.kubernetes.client.http; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; @@ -70,4 +73,24 @@ public static ByteBuffer copy(ByteBuffer buffer) { buffer.position(position); return clone; } + + /** + * Very rudimentary method to check if the provided ByteBuffer contains text. + * + * @return true if the buffer contains text, false otherwise. + */ + public static boolean isPlainText(ByteBuffer originalBuffer) { + if (originalBuffer == null) { + return false; + } + final ByteBuffer buffer = copy(originalBuffer); + final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder(); + try { + decoder.decode(buffer); + return true; + } catch (CharacterCodingException ex) { + return false; + } + } + } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java index 2658aaeeacd..2a0bd5d50d6 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java @@ -95,8 +95,9 @@ public DeferredLoggingConsumer(HttpLogger httpLogger, HttpRequest originalReques @Override public void consume(List value, AsyncBody asyncBody) throws Exception { - // TODO: Should try to detect if the body is text or not? (HttpLoggingInterceptor.isPlainText) - value.stream().map(BufferUtil::copy).forEach(responseBody::add); // Potential leak + if (!value.isEmpty() && BufferUtil.isPlainText(value.iterator().next())) { + value.stream().map(BufferUtil::copy).forEach(responseBody::add); // Potential leak + } originalConsumer.consume(value, asyncBody); } @@ -115,7 +116,7 @@ public void accept(Void v, Throwable throwable) { /** * Registers the asyncBody.done() callback. - * + * * @param asyncBody the AsyncBody instance to register the callback on the done() future. */ private void processAsyncBody(AsyncBody asyncBody) { @@ -150,7 +151,7 @@ void logResponse(HttpResponse response) { } void logResponseBody(Queue responseBody) { - if (logger.isTraceEnabled() && responseBody != null) { + if (logger.isTraceEnabled() && responseBody != null && !responseBody.isEmpty()) { final StringBuilder bodyString = new StringBuilder(); while (!responseBody.isEmpty()) { bodyString.append(StandardCharsets.UTF_8.decode(responseBody.poll())); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java index 0e9b679b746..0c8cd7dbdb1 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java @@ -15,8 +15,16 @@ */ package io.fabric8.kubernetes.client.http; +import io.fabric8.mockwebserver.Context; import io.fabric8.mockwebserver.DefaultMockServer; +import io.fabric8.mockwebserver.ServerRequest; +import io.fabric8.mockwebserver.ServerResponse; +import io.fabric8.mockwebserver.internal.SimpleRequest; import io.fabric8.mockwebserver.utils.ResponseProviders; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import okio.Buffer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -27,7 +35,11 @@ import org.slf4j.Logger; import java.net.URI; +import java.util.ArrayDeque; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.anyInt; @@ -36,18 +48,21 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; public abstract class AbstractHttpLoggingInterceptorTest { private static DefaultMockServer server; + private static Map> responses; private Logger logger; private InOrder inOrder; private HttpClient httpClient; @BeforeAll static void beforeAll() { - server = new DefaultMockServer(false); + responses = new HashMap<>(); + server = new DefaultMockServer(new Context(), new MockWebServer(), responses, false); server.start(); } @@ -157,6 +172,33 @@ public void httpResponseBodyLogged() throws Exception { inOrder.verify(logger).trace("-HTTP END-"); } + @Test + @DisplayName("HTTP binary response body is skipped") + public void httpResponseBodySkipped() throws Exception { + final MockResponse binaryResponse = new MockResponse() + .setResponseCode(200) + .setBody(new Buffer().write(new byte[] { (byte) 0xFF, (byte) 0xD8, (byte) 0x00, (byte) 0x12, (byte) 0x34 })); + responses.computeIfAbsent(new SimpleRequest("/binary-response-body"), k -> new ArrayDeque<>()).add( + new ServerResponse() { + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public MockResponse toMockResponse(RecordedRequest recordedRequest) { + return binaryResponse; + } + }); + httpClient.sendAsync(httpClient.newHttpRequestBuilder() + .uri(server.url("/binary-response-body")) + .build(), String.class).get(10, TimeUnit.SECONDS); + inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + inOrder.verify(logger).trace(eq("< {} {}"), anyInt(), anyString()); + inOrder.verify(logger, times(1)).trace(anyString()); // only -HTTP END- was logged + inOrder.verifyNoMoreInteractions(); + } + @Test @DisplayName("WS request URI is logged") public void wsRequestUriLogged() throws Exception { From 235cb7b0e2728a6d3ccb848db6789eec8f413fc0 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 27 Apr 2023 16:03:01 +0200 Subject: [PATCH 7/9] test: HttpLoggingInterceptor test ensure logging is complete Signed-off-by: Marc Nuri --- .../AbstractHttpLoggingInterceptorTest.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java index 0c8cd7dbdb1..8597b6831b1 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpLoggingInterceptorTest.java @@ -49,6 +49,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public abstract class AbstractHttpLoggingInterceptorTest { @@ -92,7 +93,8 @@ public void httpRequestUriLogged() throws Exception { httpClient.sendAsync(httpClient.newHttpRequestBuilder() .uri(server.url("/request-uri")) .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger) .trace(eq("> {} {}"), eq("GET"), argThat((URI uri) -> uri.toString().endsWith("/request-uri"))); inOrder.verify(logger).trace("-HTTP END-"); @@ -108,7 +110,8 @@ public void httpRequestHeadersLogged() throws Exception { .header("test-type", "header-test") .uri(server.url("/request-headers")) .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger).trace("> {}: {}", "test-type", "header-test"); inOrder.verify(logger).trace("-HTTP END-"); } @@ -123,7 +126,8 @@ public void httpRequestBodyLogged() throws Exception { .uri(server.url("/request-body")) .method("POST", "test/plain", "This is the request body") .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger).trace("This is the request body"); inOrder.verify(logger).trace("-HTTP END-"); } @@ -137,7 +141,8 @@ public void httpResponseStatusLogged() throws Exception { httpClient.sendAsync(httpClient.newHttpRequestBuilder() .uri(server.url("/response-status")) .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger).trace("< {} {}", 200, "OK"); inOrder.verify(logger).trace("-HTTP END-"); } @@ -151,7 +156,8 @@ public void httpResponseHeadersLogged() throws Exception { httpClient.sendAsync(httpClient.newHttpRequestBuilder() .uri(server.url("/response-headers")) .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger).trace(eq("< {} {}"), anyInt(), anyString()); inOrder.verify(logger).trace("< {}: {}", "test-type", "response-header-test"); inOrder.verify(logger).trace("-HTTP END-"); @@ -166,7 +172,8 @@ public void httpResponseBodyLogged() throws Exception { httpClient.sendAsync(httpClient.newHttpRequestBuilder() .uri(server.url("/response-body")) .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger).trace(eq("< {} {}"), anyInt(), anyString()); inOrder.verify(logger).trace("This is the response body"); inOrder.verify(logger).trace("-HTTP END-"); @@ -193,7 +200,8 @@ public MockResponse toMockResponse(RecordedRequest recordedRequest) { httpClient.sendAsync(httpClient.newHttpRequestBuilder() .uri(server.url("/binary-response-body")) .build(), String.class).get(10, TimeUnit.SECONDS); - inOrder.verify(logger, timeout(1000L)).trace("-HTTP START-"); + verify(logger, timeout(1000L)).trace("-HTTP END-"); + inOrder.verify(logger).trace("-HTTP START-"); inOrder.verify(logger).trace(eq("< {} {}"), anyInt(), anyString()); inOrder.verify(logger, times(1)).trace(anyString()); // only -HTTP END- was logged inOrder.verifyNoMoreInteractions(); From b0c0b8c0d733b0d9287ccfa547ec4ea66e23fe13 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 27 Apr 2023 09:50:27 -0400 Subject: [PATCH 8/9] possible refinement to logging logic --- .../kubernetes/client/http/AsyncBody.java | 7 ++ .../client/http/HttpLoggingInterceptor.java | 70 ++++++++----------- .../kubernetes/client/http/Interceptor.java | 2 +- .../client/http/StandardHttpClient.java | 17 +++-- .../client/http/AbstractInterceptorTest.java | 5 +- 5 files changed, 52 insertions(+), 49 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java index 3b6c1f3123d..dd917b6dbef 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AsyncBody.java @@ -45,5 +45,12 @@ public interface AsyncBody { interface Consumer { void consume(T value, AsyncBody asyncBody) throws Exception; + default U unwrap(Class target) { + if (this.getClass().equals(target)) { + return (U) this; + } + return null; + } + } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java index 2a0bd5d50d6..0439a80102a 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java @@ -22,19 +22,13 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Map; +import java.util.Optional; import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; public class HttpLoggingInterceptor implements Interceptor { private final HttpLogger httpLogger; - private final Map activeConsumers; public HttpLoggingInterceptor() { this(LoggerFactory.getLogger(HttpLoggingInterceptor.class)); @@ -42,54 +36,47 @@ public HttpLoggingInterceptor() { public HttpLoggingInterceptor(Logger logger) { this.httpLogger = new HttpLogger(logger); - activeConsumers = new ConcurrentHashMap<>(); } @Override public AsyncBody.Consumer> consumer(AsyncBody.Consumer> consumer, HttpRequest request) { - final DeferredLoggingConsumer interceptedConsumer = new DeferredLoggingConsumer(httpLogger, request, consumer, - () -> activeConsumers.remove(request.id())); - activeConsumers.put(request.id(), interceptedConsumer); - return interceptedConsumer; + return new DeferredLoggingConsumer(httpLogger, request, consumer); } @Override - public void after(HttpRequest request, HttpResponse response) { + public void after(HttpRequest request, HttpResponse response, AsyncBody.Consumer> consumer) { if (response instanceof WebSocketUpgradeResponse) { httpLogger.logWsStart(); httpLogger.logRequest(request); httpLogger.logResponse(response); httpLogger.logWsEnd(); } else { - activeConsumers.computeIfPresent(request.id(), (id, consumer) -> { - consumer.originalResponse.set(response); - if (response.body() instanceof AsyncBody) { - consumer.processAsyncBody((AsyncBody) response.body()); - } - return consumer; - }); + DeferredLoggingConsumer deferredLoggingConsumer = consumer.unwrap(DeferredLoggingConsumer.class); + if (response.body() instanceof AsyncBody && deferredLoggingConsumer != null) { + deferredLoggingConsumer.processAsyncBody((AsyncBody) response.body(), response); + } else { + // currently not possible + httpLogger.logStart(); + httpLogger.logRequest(request); + httpLogger.logResponse(response); + httpLogger.logEnd(); + } } } private static final class DeferredLoggingConsumer - implements AsyncBody.Consumer>, BiConsumer { + implements AsyncBody.Consumer> { private final HttpLogger httpLogger; private final HttpRequest originalRequest; private final AsyncBody.Consumer> originalConsumer; - private final Runnable cleanUp; - private final AtomicReference> originalResponse; - private final AtomicBoolean asyncBodyDoneProcessed; private final Queue responseBody; public DeferredLoggingConsumer(HttpLogger httpLogger, HttpRequest originalRequest, - AsyncBody.Consumer> originalConsumer, Runnable cleanUp) { + AsyncBody.Consumer> originalConsumer) { this.httpLogger = httpLogger; this.originalRequest = originalRequest; this.originalConsumer = originalConsumer; - this.cleanUp = cleanUp; - originalResponse = new AtomicReference<>(); - asyncBodyDoneProcessed = new AtomicBoolean(false); responseBody = new ConcurrentLinkedQueue<>(); } @@ -102,27 +89,26 @@ public void consume(List value, AsyncBody asyncBody) throws Exceptio } @Override - public void accept(Void v, Throwable throwable) { - httpLogger.logStart(); - httpLogger.logRequest(originalRequest); - if (originalResponse.get() != null) { - httpLogger.logResponse(originalResponse.get()); - httpLogger.logResponseBody(responseBody); - } - httpLogger.logEnd(); - responseBody.clear(); - cleanUp.run(); + public U unwrap(Class target) { + return Optional.ofNullable(AsyncBody.Consumer.super.unwrap(target)).orElse(originalConsumer.unwrap(target)); } /** * Registers the asyncBody.done() callback. * * @param asyncBody the AsyncBody instance to register the callback on the done() future. + * @param response */ - private void processAsyncBody(AsyncBody asyncBody) { - if (asyncBodyDoneProcessed.compareAndSet(false, true)) { - asyncBody.done().whenComplete(this); - } + private void processAsyncBody(AsyncBody asyncBody, HttpResponse response) { + asyncBody.done().whenComplete((Void v, Throwable throwable) -> { + httpLogger.logStart(); + // TODO: we also have access to the response.request, which may be different than originalRequest + httpLogger.logRequest(originalRequest); + httpLogger.logResponse(response); + httpLogger.logResponseBody(responseBody); + httpLogger.logEnd(); + responseBody.clear(); + }); } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java index 76629297181..9c21e6efe93 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/Interceptor.java @@ -45,7 +45,7 @@ default void before(BasicBuilder builder, HttpRequest request, RequestTags tags) * @param request the original request sent to the server. * @param response the response received from the server. */ - default void after(HttpRequest request, HttpResponse response) { + default void after(HttpRequest request, HttpResponse response, AsyncBody.Consumer> consumer) { } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java index 3b349cb7999..61708d38080 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java @@ -97,17 +97,21 @@ private CompletableFuture> consumeBytesOnce(HttpRequest final Consumer> effectiveConsumer = consumer; CompletableFuture> cf = consumeBytesDirect(effectiveRequest, effectiveConsumer); + cf.thenAccept( + response -> builder.getInterceptors().values().forEach(i -> i.after(effectiveRequest, response, effectiveConsumer))); for (Interceptor interceptor : builder.getInterceptors().values()) { cf = cf.thenCompose(response -> { - interceptor.after(effectiveRequest, response); if (!HttpResponse.isSuccessful(response.code())) { return interceptor.afterFailure(copy, response, this) .thenCompose(b -> { if (Boolean.TRUE.equals(b)) { // before starting another request, make sure the old one is cancelled / closed response.body().cancel(); - return consumeBytesDirect(copy.build(), effectiveConsumer); + CompletableFuture> result = consumeBytesDirect(copy.build(), effectiveConsumer); + result.thenAccept( + r -> builder.getInterceptors().values().forEach(i -> i.after(effectiveRequest, r, effectiveConsumer))); + return result; } return CompletableFuture.completedFuture(response); }); @@ -216,15 +220,20 @@ private CompletableFuture buildWebSocketOnce(StandardWebSocke builder.getInterceptors().values().forEach(i -> i.before(copy, copy.asHttpRequest(), this)); CompletableFuture cf = buildWebSocketDirect(copy, listener); + cf.thenAccept(response -> builder.getInterceptors().values() + .forEach(i -> i.after(response.webSocketUpgradeResponse.request(), response.webSocketUpgradeResponse, null))); + for (Interceptor interceptor : builder.getInterceptors().values()) { cf = cf.thenCompose(response -> { - interceptor.after(response.webSocketUpgradeResponse.request(), response.webSocketUpgradeResponse); if (response.wshse != null && response.wshse.getResponse() != null) { return interceptor.afterFailure(copy, response.wshse.getResponse(), this).thenCompose(b -> { if (Boolean.TRUE.equals(b)) { return this.buildWebSocketDirect(copy, listener); } - return CompletableFuture.completedFuture(response); + CompletableFuture result = CompletableFuture.completedFuture(response); + result.thenAccept(r -> builder.getInterceptors().values() + .forEach(i -> i.after(r.webSocketUpgradeResponse.request(), r.webSocketUpgradeResponse, null))); + return result; }); } return CompletableFuture.completedFuture(response); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index 2412ff3388c..9bd7ff31bdc 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.http; +import io.fabric8.kubernetes.client.http.AsyncBody.Consumer; import io.fabric8.mockwebserver.DefaultMockServer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -272,7 +273,7 @@ public void afterHttpSuccess() throws Exception { final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("after", new Interceptor() { @Override - public void after(HttpRequest request, HttpResponse response) { + public void after(HttpRequest request, HttpResponse response, Consumer> consumer) { responseFuture.complete(response); } }); @@ -299,7 +300,7 @@ public void afterHttpError() throws Exception { final HttpClient.Builder builder = getHttpClientFactory().newBuilder() .addOrReplaceInterceptor("after", new Interceptor() { @Override - public void after(HttpRequest request, HttpResponse response) { + public void after(HttpRequest request, HttpResponse response, Consumer> consumer) { responseFuture.complete(response); } }); From 40a9164a63f8598653dba783aa48b58c4ab0958b Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Thu, 27 Apr 2023 17:37:26 +0200 Subject: [PATCH 9/9] feat: limit and add bounds HttpLoggingInterceptor response body Signed-off-by: Marc Nuri --- .../client/http/HttpLoggingInterceptor.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java index 0439a80102a..185ee87ab79 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpLoggingInterceptor.java @@ -25,6 +25,8 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class HttpLoggingInterceptor implements Interceptor { @@ -67,9 +69,13 @@ public void after(HttpRequest request, HttpResponse response, AsyncBody.Consu private static final class DeferredLoggingConsumer implements AsyncBody.Consumer> { + private static final long MAX_BODY_SIZE = 2097152L; // 2MiB + private final HttpLogger httpLogger; private final HttpRequest originalRequest; private final AsyncBody.Consumer> originalConsumer; + private final AtomicBoolean logResponseBody; + private final AtomicLong responseBodySize; private final Queue responseBody; public DeferredLoggingConsumer(HttpLogger httpLogger, HttpRequest originalRequest, @@ -77,13 +83,27 @@ public DeferredLoggingConsumer(HttpLogger httpLogger, HttpRequest originalReques this.httpLogger = httpLogger; this.originalRequest = originalRequest; this.originalConsumer = originalConsumer; + logResponseBody = new AtomicBoolean(true); + responseBodySize = new AtomicLong(0); responseBody = new ConcurrentLinkedQueue<>(); } @Override public void consume(List value, AsyncBody asyncBody) throws Exception { - if (!value.isEmpty() && BufferUtil.isPlainText(value.iterator().next())) { - value.stream().map(BufferUtil::copy).forEach(responseBody::add); // Potential leak + if (!logResponseBody.get() || responseBodySize.get() > MAX_BODY_SIZE) { + return; + } + if (!value.isEmpty()) { + if (BufferUtil.isPlainText(value.iterator().next())) { + value.stream().map(BufferUtil::copy).forEach(bb -> { + if (responseBodySize.addAndGet(bb.remaining()) < MAX_BODY_SIZE) { + responseBody.add(bb); + } + + }); + } else { + logResponseBody.set(false); + } } originalConsumer.consume(value, asyncBody); }