Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: generic HttpLoggingInterceptor #5037

Merged
merged 9 commits into from
Apr 27, 2023
4 changes: 4 additions & 0 deletions httpclient-jdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import io.fabric8.kubernetes.client.http.WebSocketResponse;
import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;

import java.net.URI;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -235,7 +236,7 @@ public void close() {
return;
}
builder.getClientFactory().closeHttpClient(this);
// help with default cleanup, which is based upon garbarge collection
// help with default cleanup, which is based upon garbage collection
this.httpClient = null;
}

Expand All @@ -252,7 +253,7 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHtt
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);

return this.getHttpClient().sendAsync(requestBuilder(request).build(), handlerAdapter)
.thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r, r.body()));
.thenApply(r -> new JdkHttpResponseImpl<>(r, r.body()));
}

java.net.http.HttpRequest.Builder requestBuilder(StandardHttpRequest request) {
Expand Down Expand Up @@ -305,7 +306,7 @@ public long contentLength() {
@Override
public CompletableFuture<WebSocketResponse> buildWebSocketDirect(
StandardWebSocketBuilder standardWebSocketBuilder, Listener listener) {
StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest();
final StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest();
java.net.http.WebSocket.Builder newBuilder = this.getHttpClient().newWebSocketBuilder();
request.headers().forEach((k, v) -> v.forEach(s -> newBuilder.header(k, s)));
if (standardWebSocketBuilder.getSubprotocol() != null) {
Expand All @@ -324,21 +325,21 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(
CompletableFuture<WebSocketResponse> response = new CompletableFuture<>();

URI uri = WebSocket.toWebSocketUri(request.uri());
newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((w, t) -> {
newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((jdkWebSocket, t) -> {
if (t instanceof CompletionException && t.getCause() != null) {
t = t.getCause();
}
final JdkWebSocketImpl fabric8WebSocket = new JdkWebSocketImpl(queueSize, jdkWebSocket);
if (t instanceof java.net.http.WebSocketHandshakeException) {
response
.complete(
new WebSocketResponse(new JdkWebSocketImpl(queueSize, w),
new io.fabric8.kubernetes.client.http.WebSocketHandshakeException(
new JdkHttpResponseImpl<>(((java.net.http.WebSocketHandshakeException) t).getResponse()))
.initCause(t)));
final java.net.http.HttpResponse<?> jdkResponse = ((java.net.http.WebSocketHandshakeException) t).getResponse();
final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse(
request, jdkResponse.statusCode(), jdkResponse.headers().map(), fabric8WebSocket);
response.complete(new WebSocketResponse(upgradeResponse,
new io.fabric8.kubernetes.client.http.WebSocketHandshakeException(upgradeResponse).initCause(t)));
} else if (t != null) {
response.completeExceptionally(t);
} else {
response.complete(new WebSocketResponse(new JdkWebSocketImpl(queueSize, w), null));
response.complete(new WebSocketResponse(new WebSocketUpgradeResponse(request, fabric8WebSocket), null));
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest;
import io.fabric8.kubernetes.client.http.HttpClient;

@SuppressWarnings("java:S2187")
public class JdkHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest {
@Override
protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(StandardWebSock
Listener listener) {
try {
jettyWs.start();
HttpRequest request = standardWebSocketBuilder.asHttpRequest();
final HttpRequest request = standardWebSocketBuilder.asHttpRequest();
final ClientUpgradeRequest cur = new ClientUpgradeRequest();
if (Utils.isNotNullOrEmpty(standardWebSocketBuilder.getSubprotocol())) {
cur.setSubProtocols(standardWebSocketBuilder.getSubprotocol());
Expand All @@ -152,15 +152,14 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(StandardWebSock
.whenComplete((s, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) {
future.complete(
new WebSocketResponse(webSocket, JettyWebSocket.toHandshakeException((UpgradeException) ex.getCause())));
future.complete(JettyWebSocket.toWebSocketResponse(request, webSocket, (UpgradeException) ex.getCause()));
} else if (ex instanceof UpgradeException) {
future.complete(new WebSocketResponse(webSocket, JettyWebSocket.toHandshakeException((UpgradeException) ex)));
future.complete(JettyWebSocket.toWebSocketResponse(request, webSocket, (UpgradeException) ex));
} else {
future.completeExceptionally(ex);
}
} else {
future.complete(new WebSocketResponse(webSocket, null));
future.complete(JettyWebSocket.toWebSocketResponse(request, webSocket, s));
}
});
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.BufferUtil;
import io.fabric8.kubernetes.client.http.StandardHttpRequest;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocketHandshakeException;
import org.eclipse.jetty.client.HttpResponse;
import io.fabric8.kubernetes.client.http.WebSocketResponse;
import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -54,14 +55,6 @@ public JettyWebSocket(WebSocket.Listener listener) {
moreMessages = true;
}

static WebSocketHandshakeException toHandshakeException(UpgradeException ex) {
return new WebSocketHandshakeException(new JettyHttpResponse<>(
new StandardHttpRequest.Builder().uri(ex.getRequestURI()).build(),
new HttpResponse(null, Collections.emptyList()).status(ex.getResponseStatusCode()),
null))
.initCause(ex);
}

@Override
public boolean send(ByteBuffer buffer) {
if (closed.get() || !webSocketSession.isOpen()) {
Expand Down Expand Up @@ -171,4 +164,19 @@ private void backPressure() {
lock.unlock();
}
}

static WebSocketResponse toWebSocketResponse(HttpRequest httpRequest, WebSocket ws, UpgradeException ex) {
final WebSocketUpgradeResponse webSocketUpgradeResponse = new WebSocketUpgradeResponse(httpRequest,
ex.getResponseStatusCode(), ws);
final WebSocketHandshakeException handshakeException = new WebSocketHandshakeException(webSocketUpgradeResponse)
.initCause(ex);
return new WebSocketResponse(webSocketUpgradeResponse, handshakeException);
}

static WebSocketResponse toWebSocketResponse(HttpRequest httpRequest, WebSocket ws, Session session) {
final UpgradeResponse jettyUpgradeResponse = session.getUpgradeResponse();
final WebSocketUpgradeResponse fabric8UpgradeResponse = new WebSocketUpgradeResponse(
httpRequest, jettyUpgradeResponse.getStatusCode(), jettyUpgradeResponse.getHeaders(), ws);
return new WebSocketResponse(fabric8UpgradeResponse, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest;
import io.fabric8.kubernetes.client.http.HttpClient;

@SuppressWarnings("java:S2187")
public class JettyHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest {

@Override
protected HttpClient.Factory getHttpClientFactory() {
return new JettyHttpClientFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import io.fabric8.kubernetes.client.http.StandardHttpClientBuilder;
import okhttp3.Authenticator;
import okhttp3.ConnectionSpec;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.logging.HttpLoggingInterceptor;

import java.net.Proxy;
import java.util.Arrays;
Expand Down Expand Up @@ -106,16 +104,6 @@ private OkHttpClientImpl derivedBuild(okhttp3.OkHttpClient.Builder builder) {
builder.cache(null);
}
OkHttpClient client = builder.build();
if (this.forStreaming) {
// If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does
// not let us stream responses from the server.
for (Interceptor i : client.networkInterceptors()) {
if (i instanceof HttpLoggingInterceptor) {
HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i;
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
}
}
}

return new OkHttpClientImpl(client, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -73,13 +70,6 @@ public OkHttpClientBuilderImpl newBuilder(Config config) {
httpClientBuilder.hostnameVerifier((s, sslSession) -> true);
}

Logger reqLogger = LoggerFactory.getLogger(HttpLoggingInterceptor.class);
if (reqLogger.isTraceEnabled()) {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
httpClientBuilder.addNetworkInterceptor(loggingInterceptor);
}

if (config.getWebsocketPingInterval() > 0) {
httpClientBuilder.pingInterval(config.getWebsocketPingInterval(), TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,12 @@ public long contentLength() throws IOException {
@Override
public CompletableFuture<WebSocketResponse> buildWebSocketDirect(StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {
Request.Builder requestBuilder = requestBuilder(standardWebSocketBuilder.asHttpRequest());
final StandardHttpRequest fabric8Request = standardWebSocketBuilder.asHttpRequest();
Request.Builder requestBuilder = requestBuilder(fabric8Request);
if (standardWebSocketBuilder.getSubprotocol() != null) {
requestBuilder.header("Sec-WebSocket-Protocol", standardWebSocketBuilder.getSubprotocol());
}
return OkHttpWebSocketImpl.buildAsync(httpClient, requestBuilder.build(), listener);
return OkHttpWebSocketImpl.buildAsync(httpClient, fabric8Request, requestBuilder.build(), listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@
package io.fabric8.kubernetes.client.okhttp;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.StandardHttpRequest;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocketHandshakeException;
import io.fabric8.kubernetes.client.http.WebSocketResponse;
import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl.OkHttpResponseImpl;
import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.WebSocketListener;
import okio.ByteString;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class OkHttpWebSocketImpl implements WebSocket {

private okhttp3.WebSocket webSocket;
private Runnable requestMethod;
private final okhttp3.WebSocket webSocket;
private final Runnable requestMethod;

public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket, Runnable requestMethod) {
this.webSocket = webSocket;
Expand All @@ -64,7 +66,8 @@ public void request() {
requestMethod.run();
}

public static CompletableFuture<WebSocketResponse> buildAsync(OkHttpClient httpClient, Request request, Listener listener) {
public static CompletableFuture<WebSocketResponse> buildAsync(OkHttpClient httpClient, StandardHttpRequest fabric8Request,
Request request, Listener listener) {
CompletableFuture<WebSocketResponse> future = new CompletableFuture<>();
httpClient.newWebSocket(request, new WebSocketListener() {
private volatile boolean opened;
Expand All @@ -79,13 +82,12 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
}
if (!opened) {
if (response != null) {
try {
future.complete(new WebSocketResponse(null,
// passing null as the type ensures that the response body is closed
new WebSocketHandshakeException(new OkHttpResponseImpl<>(response, null)).initCause(t)));
} catch (IOException e) {
// can't happen
}
// Ensure response body is always closed (leak)
Optional.ofNullable(response.body()).ifPresent(ResponseBody::close);
final WebSocketUpgradeResponse upgradeResponse = new WebSocketUpgradeResponse(
fabric8Request, response.code(), response.headers().toMultimap(), null);
future.complete(new WebSocketResponse(upgradeResponse,
new WebSocketHandshakeException(upgradeResponse).initCause(t)));
} else {
future.completeExceptionally(t);
}
Expand All @@ -100,9 +102,11 @@ public void onOpen(okhttp3.WebSocket webSocket, Response response) {
if (response != null) {
response.close();
}
OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket, this::request);
listener.onOpen(value);
future.complete(new WebSocketResponse(value, null));
OkHttpWebSocketImpl fabric8WebSocket = new OkHttpWebSocketImpl(webSocket, this::request);
listener.onOpen(fabric8WebSocket);
future.complete(new WebSocketResponse(
new WebSocketUpgradeResponse(fabric8Request, response.code(), response.headers().toMultimap(), fabric8WebSocket),
null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.okhttp;

import io.fabric8.kubernetes.client.http.AbstractHttpLoggingInterceptorTest;
import io.fabric8.kubernetes.client.http.HttpClient;

@SuppressWarnings("java:S2187")
public class OkHttpHttpLoggingInterceptorTest extends AbstractHttpLoggingInterceptorTest {
@Override
protected HttpClient.Factory getHttpClientFactory() {
return new OkHttpClientFactory();
}
}
Loading