Skip to content

Commit

Permalink
fix: prevents memory accumulation from informers
Browse files Browse the repository at this point in the history
closes: #5636

Signed-off-by: Steve Hawkins <[email protected]>
  • Loading branch information
shawkins authored and manusa committed Dec 4, 2023
1 parent e3ac5f8 commit 7fe6d52
Show file tree
Hide file tree
Showing 19 changed files with 138 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Fix #5580: [java-generator] Correctly handle defaults for IntOrString types
* Fix #5584: Fix CRD generation when EnumMap is used
* Fix #5626: Prevent memory accumulation from informer usage

#### Improvements
* Fix #5429: moved crd generator annotations to generator-annotations instead of crd-generator-api. Using generator-annotations introduces no transitive dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.http.HttpClient.Redirect;
import java.net.http.HttpClient.Version;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.SSLParameters;

Expand All @@ -48,7 +49,7 @@ public JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) {
@Override
public HttpClient build() {
if (client != null) {
return new JdkHttpClientImpl(this, client.getHttpClient());
return new JdkHttpClientImpl(this, client.getHttpClient(), client.getClosed());
}
java.net.http.HttpClient.Builder builder = clientFactory.createNewHttpClientBuilder();
if (connectTimeout != null && !java.time.Duration.ZERO.equals(connectTimeout)) {
Expand Down Expand Up @@ -78,7 +79,7 @@ public HttpClient build() {
Arrays.asList(tlsVersions).stream().map(TlsVersion::javaName).toArray(String[]::new)));
}
clientFactory.additionalConfig(builder);
return new JdkHttpClientImpl(this, builder.build());
return new JdkHttpClientImpl(this, builder.build(), new AtomicBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -223,13 +224,13 @@ public Optional<HttpResponse<?>> previousResponse() {

private java.net.http.HttpClient httpClient;

public JdkHttpClientImpl(JdkHttpClientBuilderImpl builder, HttpClient httpClient) {
super(builder);
public JdkHttpClientImpl(JdkHttpClientBuilderImpl builder, HttpClient httpClient, AtomicBoolean closed) {
super(builder, closed);
this.httpClient = httpClient;
}

@Override
public void close() {
public void doClose() {
if (this.httpClient == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.fabric8.kubernetes.client.http.BufferUtil.copy;
import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM;
Expand All @@ -61,13 +62,18 @@ public class JettyHttpClient extends StandardHttpClient<JettyHttpClient, JettyHt

public JettyHttpClient(StandardHttpClientBuilder<JettyHttpClient, JettyHttpClientFactory, JettyHttpClientBuilder> builder,
HttpClient jetty, WebSocketClient jettyWs) {
super(builder);
this(builder, jetty, jettyWs, new AtomicBoolean());
}

public JettyHttpClient(StandardHttpClientBuilder<JettyHttpClient, JettyHttpClientFactory, JettyHttpClientBuilder> builder,
HttpClient jetty, WebSocketClient jettyWs, AtomicBoolean closed) {
super(builder, closed);
this.jetty = jetty;
this.jettyWs = jettyWs;
}

@Override
public void close() {
public void doClose() {
try {
jetty.stop();
jettyWs.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public JettyHttpClientBuilder(JettyHttpClientFactory clientFactory) {
@Override
public JettyHttpClient build() {
if (client != null) {
return new JettyHttpClient(this, client.getJetty(), client.getJettyWs());
return new JettyHttpClient(this, client.getJetty(), client.getJettyWs(), client.getClosed());
}
final var sslContextFactory = new SslContextFactory.Client();
if (sslContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ssl.X509TrustManager;

Expand Down Expand Up @@ -119,7 +120,7 @@ private OkHttpClientImpl completeBuild(okhttp3.OkHttpClient.Builder builder, boo

OkHttpClient client = builder.build();

return new OkHttpClientImpl(client, this);
return new OkHttpClientImpl(client, this, new AtomicBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class OkHttpClientImpl extends StandardHttpClient<OkHttpClientImpl, OkHttpClientFactory, OkHttpClientBuilderImpl> {
Expand Down Expand Up @@ -241,13 +242,13 @@ public Map<String, List<String>> headers() {

private final okhttp3.OkHttpClient httpClient;

public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder) {
super(builder);
public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder, AtomicBoolean closed) {
super(builder, closed);
this.httpClient = client;
}

@Override
public void close() {
public void doClose() {
ConnectionPool connectionPool = httpClient.connectionPool();

Dispatcher dispatcher = httpClient.dispatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.fabric8.kubernetes.client.vertx.VertxHttpRequest.toHeadersMap;

Expand All @@ -43,8 +44,8 @@ public class VertxHttpClient<F extends io.fabric8.kubernetes.client.http.HttpCli
private final Vertx vertx;
private final HttpClient client;

VertxHttpClient(VertxHttpClientBuilder<F> vertxHttpClientBuilder, HttpClient client) {
super(vertxHttpClientBuilder);
VertxHttpClient(VertxHttpClientBuilder<F> vertxHttpClientBuilder, HttpClient client, AtomicBoolean closed) {
super(vertxHttpClientBuilder, closed);
this.vertx = vertxHttpClientBuilder.vertx;
this.client = client;
}
Expand Down Expand Up @@ -119,7 +120,7 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHtt
}

@Override
public void close() {
public void doClose() {
client.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

public class VertxHttpClientBuilder<F extends HttpClient.Factory>
Expand All @@ -52,7 +53,7 @@ public VertxHttpClientBuilder(F clientFactory, Vertx vertx) {
@Override
public VertxHttpClient<F> build() {
if (this.client != null) {
return new VertxHttpClient<>(this, this.client.getClient());
return new VertxHttpClient<>(this, this.client.getClient(), this.client.getClosed());
}

WebClientOptions options = new WebClientOptions();
Expand Down Expand Up @@ -115,7 +116,7 @@ public SslContextFactory sslContextFactory() {
}
});
}
return new VertxHttpClient<>(this, vertx.createHttpClient(options));
return new VertxHttpClient<>(this, vertx.createHttpClient(options), new AtomicBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,6 @@ interface Builder extends DerivedClientBuilder {

HttpRequest.Builder newHttpRequestBuilder();

boolean isClosed();

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -52,9 +53,11 @@ public abstract class StandardHttpClient<C extends HttpClient, F extends HttpCli
private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class);

protected StandardHttpClientBuilder<C, F, T> builder;
protected AtomicBoolean closed;

protected StandardHttpClient(StandardHttpClientBuilder<C, F, T> builder) {
protected StandardHttpClient(StandardHttpClientBuilder<C, F, T> builder, AtomicBoolean closed) {
this.builder = builder;
this.closed = closed;
}

public abstract CompletableFuture<WebSocketResponse> buildWebSocketDirect(
Expand Down Expand Up @@ -280,4 +283,22 @@ public <V> V getTag(Class<V> type) {
return type.cast(builder.tags.get(type));
}

@Override
final public void close() {
if (closed.compareAndSet(false, true)) {
doClose();
}
}

protected abstract void doClose();

@Override
public boolean isClosed() {
return closed.get();
}

public AtomicBoolean getClosed() {
return closed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,18 @@ public static Stream<Arguments> testRetryAfterParsingData() {
10000));
}

@Test
void testIsClosed() {
client.close();
assertTrue(client.isClosed());
}

@Test
void testDerivedIsClosed() {
TestStandardHttpClient childClient = client.newBuilder().connectTimeout(0, TimeUnit.SECONDS).build();
childClient.close();
assertTrue(childClient.isClosed());
assertTrue(client.isClosed());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestStandardHttpClient
extends StandardHttpClient<TestStandardHttpClient, TestStandardHttpClientFactory, TestStandardHttpClientBuilder> {
Expand All @@ -39,15 +40,15 @@ public class TestStandardHttpClient
@Getter
private final List<RecordedConsumeBytesDirect> recordedConsumeBytesDirects;

protected TestStandardHttpClient(TestStandardHttpClientBuilder builder) {
super(builder);
protected TestStandardHttpClient(TestStandardHttpClientBuilder builder, AtomicBoolean closed) {
super(builder, closed);
expectations = new HashMap<>();
recordedBuildWebSocketDirects = new ArrayList<>();
recordedConsumeBytesDirects = new ArrayList<>();
}

@Override
public void close() {
public void doClose() {
recordedConsumeBytesDirects.clear();
recordedBuildWebSocketDirects.clear();
expectations.values().forEach(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.fabric8.kubernetes.client.http;

import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestStandardHttpClientBuilder
extends StandardHttpClientBuilder<TestStandardHttpClient, TestStandardHttpClientFactory, TestStandardHttpClientBuilder> {
Expand All @@ -30,7 +32,8 @@ protected TestStandardHttpClientBuilder(TestStandardHttpClientFactory clientFact

@Override
public TestStandardHttpClient build() {
final TestStandardHttpClient instance = new TestStandardHttpClient(this);
final TestStandardHttpClient instance = new TestStandardHttpClient(this,
Optional.ofNullable(instances.peek()).map(TestStandardHttpClient::getClosed).orElse(new AtomicBoolean()));
instances.add(instance);
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ void scheduleReconnect(WatchRequestState state) {

synchronized void reconnect() {
try {
if (client.isClosed()) {
logger.debug("The client has closed, closing the watch");
this.close();
return;
}
startWatch();
if (isForceClosed()) {
closeRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,13 @@ private DefaultSharedIndexInformer<T, L> createInformer(long resync, Executor ex
if (indexers != null) {
informer.addIndexers(indexers);
}
this.context.getClient().adapt(BaseClient.class).getClosed().whenComplete((closed, ignored) -> informer.stop());
informer.started().whenComplete((ignored, throwable) -> {
if (throwable == null) {
BaseClient baseClient = this.context.getClient().adapt(BaseClient.class);
baseClient.addToCloseable(informer);
informer.stopped().whenComplete((x, y) -> baseClient.removeFromCloseable(informer));
}
});
return informer;
}

Expand Down
Loading

0 comments on commit 7fe6d52

Please sign in to comment.