Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improving how httpclients can be configured #4490

Merged
merged 4 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Fix #4426: [java-generator] Encode an `AnyType` instead of an Object if `x-kubernetes-preserve-unknown-fields` is present and the type is null.

#### Improvements
* Fix #4471: Adding KubernetesClientBuilder.withHttpClientBuilderConsumer to further customize the HttpClient for any implementation.
* Fix #4348: Introduce specific annotations for the generators
* Refactor #4441: refactoring `TokenRefreshInterceptor`
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,17 @@

import io.fabric8.kubernetes.client.http.BasicBuilder;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpClient.Builder;
import io.fabric8.kubernetes.client.http.HttpHeaders;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.fabric8.kubernetes.client.http.StandardHttpClientBuilder;
import io.fabric8.kubernetes.client.http.TlsVersion;
import io.fabric8.kubernetes.client.internal.SSLUtils;

import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;

/**
* TODO: if there is another implementation that does not support client builder copying, then this needs to be abstracted -
Expand All @@ -48,28 +40,17 @@
*
*/

class JdkHttpClientBuilderImpl implements Builder {
class JdkHttpClientBuilderImpl
extends StandardHttpClientBuilder<JdkHttpClientImpl, JdkHttpClientFactory, JdkHttpClientBuilderImpl> {

LinkedHashMap<String, Interceptor> interceptors = new LinkedHashMap<>();
Duration connectTimeout;
Duration readTimeout;
private SSLContext sslContext;
JdkHttpClientFactory clientFactory;
private String proxyAuthorization;
private InetSocketAddress proxyAddress;
private boolean followRedirects;
private boolean preferHttp11;
private TlsVersion[] tlsVersions;
private java.net.http.HttpClient httpClient;

JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
this.clientFactory = factory;
public JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
super(factory);
}

@Override
public HttpClient build() {
if (httpClient != null) {
return new JdkHttpClientImpl(this, httpClient);
if (client != null) {
return new JdkHttpClientImpl(this, client.getHttpClient(), this.requestConfig);
}
java.net.http.HttpClient.Builder builder = clientFactory.createNewHttpClientBuilder();
if (connectTimeout != null) {
Expand Down Expand Up @@ -104,101 +85,12 @@ public void before(BasicBuilder builder, HttpHeaders headers) {
Arrays.asList(tlsVersions).stream().map(TlsVersion::javaName).toArray(String[]::new)));
}
clientFactory.additionalConfig(builder);
return new JdkHttpClientImpl(this, builder.build());
}

@Override
public JdkHttpClientBuilderImpl readTimeout(long readTimeout, TimeUnit unit) {
if (readTimeout == 0) {
this.readTimeout = null;
} else {
this.readTimeout = Duration.ofNanos(unit.toNanos(readTimeout));
}
return this;
}

@Override
public Builder connectTimeout(long connectTimeout, TimeUnit unit) {
this.connectTimeout = Duration.ofNanos(unit.toNanos(connectTimeout));
return this;
}

@Override
public Builder forStreaming() {
// nothing to do
return this;
}

@Override
public Builder writeTimeout(long timeout, TimeUnit timeoutUnit) {
// nothing to do
return this;
}

@Override
public Builder addOrReplaceInterceptor(String name, Interceptor interceptor) {
if (interceptor == null) {
interceptors.remove(name);
} else {
interceptors.put(name, interceptor);
}
return this;
return new JdkHttpClientImpl(this, builder.build(), null);
}

@Override
public Builder authenticatorNone() {
return this;
}

@Override
public Builder sslContext(KeyManager[] keyManagers, TrustManager[] trustManagers) {
this.sslContext = SSLUtils.sslContext(keyManagers, trustManagers);
return this;
}

@Override
public Builder followAllRedirects() {
this.followRedirects = true;
return this;
}

@Override
public Builder proxyAddress(InetSocketAddress proxyAddress) {
this.proxyAddress = proxyAddress;
return this;
}

@Override
public Builder proxyAuthorization(String credentials) {
this.proxyAuthorization = credentials;
return this;
}

@Override
public Builder preferHttp11() {
this.preferHttp11 = true;
return this;
}

@Override
public Builder tlsVersions(TlsVersion... tlsVersions) {
this.tlsVersions = tlsVersions;
return this;
}

public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) {
JdkHttpClientBuilderImpl copy = new JdkHttpClientBuilderImpl(this.clientFactory);
copy.connectTimeout = this.connectTimeout;
copy.readTimeout = this.readTimeout;
copy.sslContext = this.sslContext;
copy.interceptors = new LinkedHashMap<>(this.interceptors);
copy.proxyAddress = this.proxyAddress;
copy.proxyAuthorization = this.proxyAuthorization;
copy.tlsVersions = this.tlsVersions;
copy.preferHttp11 = this.preferHttp11;
copy.followRedirects = this.followRedirects;
copy.httpClient = httpClient;
return copy;
protected JdkHttpClientBuilderImpl newInstance(JdkHttpClientFactory clientFactory) {
return new JdkHttpClientBuilderImpl(clientFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
import io.fabric8.kubernetes.client.utils.Utils;

import java.util.concurrent.Executor;
Expand Down Expand Up @@ -46,15 +45,6 @@ public void shutdownNow() {

}

@Override
public HttpClient createHttpClient(Config config) {
JdkHttpClientBuilderImpl builderWrapper = newBuilder();

HttpClientUtils.applyCommonConfiguration(config, builderWrapper, this);

return builderWrapper.build();
}

@Override
public JdkHttpClientBuilderImpl newBuilder() {
return new JdkHttpClientBuilderImpl(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.jdkhttp;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
Expand Down Expand Up @@ -214,25 +215,27 @@ public HandlerAndAsyncBody(BodyHandler<T> handler, AsyncBody asyncBody) {

private JdkHttpClientBuilderImpl builder;
private java.net.http.HttpClient httpClient;
private Config config;

public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient) {
public JdkHttpClientImpl(JdkHttpClientBuilderImpl builderImpl, java.net.http.HttpClient httpClient, Config config) {
this.builder = builderImpl;
this.httpClient = httpClient;
this.config = config;
}

@Override
public void close() {
if (this.httpClient == null) {
return;
}
builder.clientFactory.closeHttpClient(this);
builder.getClientFactory().closeHttpClient(this);
// help with default cleanup, which is based upon garbarge collection
this.httpClient = null;
}

@Override
public DerivedClientBuilder newBuilder() {
return this.builder.copy(getHttpClient());
return this.builder.copy(this);
}

@Override
Expand Down Expand Up @@ -286,8 +289,8 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder();
for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(builderImpl, jdkRequest);
for (Interceptor interceptor : builder.getInterceptors().values()) {
Interceptor.useConfig(interceptor, config).before(builderImpl, jdkRequest);
jdkRequest = builderImpl.build();
}

Expand All @@ -296,19 +299,20 @@ public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
CompletableFuture<AsyncResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
handlerAndAsyncBody.handler).thenApply(r -> new AsyncResponse<>(r, handlerAndAsyncBody.asyncBody));

for (Interceptor interceptor : builder.interceptors.values()) {
for (Interceptor interceptor : builder.getInterceptors().values()) {
cf = cf.thenCompose(ar -> {
java.net.http.HttpResponse<T> response = ar.response;
if (response != null && !HttpResponse.isSuccessful(response.statusCode())) {
return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
return Interceptor.useConfig(interceptor, config).afterFailure(builderImpl, new JdkHttpResponseImpl<>(response))
.thenCompose(b -> {
if (b) {
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(ar);
});
}
return CompletableFuture.completedFuture(ar);
});
Expand All @@ -324,7 +328,7 @@ public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder()

@Override
public io.fabric8.kubernetes.client.http.HttpRequest.Builder newHttpRequestBuilder() {
return new JdkHttpRequestImpl.BuilderImpl().timeout(this.builder.readTimeout);
return new JdkHttpRequestImpl.BuilderImpl().timeout(this.builder.getReadTimeout());
}

/*
Expand All @@ -344,23 +348,24 @@ public WebSocketResponse(WebSocket w, java.net.http.WebSocketHandshakeException
public CompletableFuture<WebSocket> buildAsync(JdkWebSocketImpl.BuilderImpl webSocketBuilder, Listener listener) {
JdkWebSocketImpl.BuilderImpl copy = webSocketBuilder.copy();

for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
for (Interceptor interceptor : builder.getInterceptors().values()) {
Interceptor.useConfig(interceptor, config).before(copy, new JdkHttpRequestImpl(null, copy.asRequest()));
}

CompletableFuture<WebSocket> result = new CompletableFuture<>();

CompletableFuture<WebSocketResponse> cf = internalBuildAsync(copy, listener);

for (Interceptor interceptor : builder.interceptors.values()) {
for (Interceptor interceptor : builder.getInterceptors().values()) {
cf = cf.thenCompose(response -> {
if (response.wshse != null && response.wshse.getResponse() != null) {
return interceptor.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
return Interceptor.useConfig(interceptor, config)
.afterFailure(copy, new JdkHttpResponseImpl<>(response.wshse.getResponse())).thenCompose(b -> {
if (b) {
return this.internalBuildAsync(copy, listener);
}
return CompletableFuture.completedFuture(response);
});
}
return CompletableFuture.completedFuture(response);
});
Expand Down Expand Up @@ -399,8 +404,8 @@ public CompletableFuture<WebSocketResponse> internalBuildAsync(JdkWebSocketImpl.
}
// the Watch logic sets a websocketTimeout as the readTimeout
// TODO: this should probably be made clearer in the docs
if (this.builder.readTimeout != null) {
newBuilder.connectTimeout(this.builder.readTimeout);
if (this.builder.getReadTimeout() != null) {
newBuilder.connectTimeout(this.builder.getReadTimeout());
}

AtomicLong queueSize = new AtomicLong();
Expand Down Expand Up @@ -436,9 +441,4 @@ java.net.http.HttpClient getHttpClient() {
return httpClient;
}

@Override
public Factory getFactory() {
return builder.clientFactory;
}

}
Loading