diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f08ef19521..e1126428e79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Fix #5580: [java-generator] Correctly handle defaults for IntOrString types * Fix #5584: Fix CRD generation when EnumMap is used +* Fix #5626: Prevent memory accumulation from informer usage #### Improvements * Fix #5429: moved crd generator annotations to generator-annotations instead of crd-generator-api. Using generator-annotations introduces no transitive dependencies. diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java index 8e4f2bb9928..6309b2f827e 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java @@ -25,6 +25,7 @@ import java.net.http.HttpClient.Redirect; import java.net.http.HttpClient.Version; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLParameters; @@ -48,7 +49,7 @@ public JdkHttpClientBuilderImpl(JdkHttpClientFactory factory) { @Override public HttpClient build() { if (client != null) { - return new JdkHttpClientImpl(this, client.getHttpClient()); + return new JdkHttpClientImpl(this, client.getHttpClient(), client.getClosed()); } java.net.http.HttpClient.Builder builder = clientFactory.createNewHttpClientBuilder(); if (connectTimeout != null && !java.time.Duration.ZERO.equals(connectTimeout)) { @@ -78,7 +79,7 @@ public HttpClient build() { Arrays.asList(tlsVersions).stream().map(TlsVersion::javaName).toArray(String[]::new))); } clientFactory.additionalConfig(builder); - return new JdkHttpClientImpl(this, builder.build()); + return new JdkHttpClientImpl(this, builder.build(), new AtomicBoolean()); } @Override diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 27fc3721ace..7fbe4f2724b 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -52,6 +52,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE; @@ -223,13 +224,13 @@ public Optional> previousResponse() { private java.net.http.HttpClient httpClient; - public JdkHttpClientImpl(JdkHttpClientBuilderImpl builder, HttpClient httpClient) { - super(builder); + public JdkHttpClientImpl(JdkHttpClientBuilderImpl builder, HttpClient httpClient, AtomicBoolean closed) { + super(builder, closed); this.httpClient = httpClient; } @Override - public void close() { + public void doClose() { if (this.httpClient == null) { return; } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java index ff34b006e3b..81102911bb1 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -49,6 +49,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static io.fabric8.kubernetes.client.http.BufferUtil.copy; import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM; @@ -61,13 +62,18 @@ public class JettyHttpClient extends StandardHttpClient builder, HttpClient jetty, WebSocketClient jettyWs) { - super(builder); + this(builder, jetty, jettyWs, new AtomicBoolean()); + } + + public JettyHttpClient(StandardHttpClientBuilder builder, + HttpClient jetty, WebSocketClient jettyWs, AtomicBoolean closed) { + super(builder, closed); this.jetty = jetty; this.jettyWs = jettyWs; } @Override - public void close() { + public void doClose() { try { jetty.stop(); jettyWs.stop(); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java index 219981848c4..8dcee987918 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java @@ -55,7 +55,7 @@ public JettyHttpClientBuilder(JettyHttpClientFactory clientFactory) { @Override public JettyHttpClient build() { if (client != null) { - return new JettyHttpClient(this, client.getJetty(), client.getJettyWs()); + return new JettyHttpClient(this, client.getJetty(), client.getJettyWs(), client.getClosed()); } final var sslContextFactory = new SslContextFactory.Client(); if (sslContext != null) { diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java index ae89df21e30..f05d63fb3ca 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java @@ -28,6 +28,7 @@ import java.net.Proxy; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.X509TrustManager; @@ -119,7 +120,7 @@ private OkHttpClientImpl completeBuild(okhttp3.OkHttpClient.Builder builder, boo OkHttpClient client = builder.build(); - return new OkHttpClientImpl(client, this); + return new OkHttpClientImpl(client, this, new AtomicBoolean()); } @Override diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index f2bfd38e2a3..bd276bb0efe 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -68,6 +68,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; public class OkHttpClientImpl extends StandardHttpClient { @@ -241,13 +242,13 @@ public Map> headers() { private final okhttp3.OkHttpClient httpClient; - public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder) { - super(builder); + public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder, AtomicBoolean closed) { + super(builder, closed); this.httpClient = client; } @Override - public void close() { + public void doClose() { ConnectionPool connectionPool = httpClient.connectionPool(); Dispatcher dispatcher = httpClient.dispatcher(); diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java index 55e9d69e190..ddbec8b2c8b 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import static io.fabric8.kubernetes.client.vertx.VertxHttpRequest.toHeadersMap; @@ -43,8 +44,8 @@ public class VertxHttpClient vertxHttpClientBuilder, HttpClient client) { - super(vertxHttpClientBuilder); + VertxHttpClient(VertxHttpClientBuilder vertxHttpClientBuilder, HttpClient client, AtomicBoolean closed) { + super(vertxHttpClientBuilder, closed); this.vertx = vertxHttpClientBuilder.vertx; this.client = client; } @@ -119,7 +120,7 @@ public CompletableFuture> consumeBytesDirect(StandardHtt } @Override - public void close() { + public void doClose() { client.close(); } diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java index c6fb8114ccf..1010f8c4a31 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; public class VertxHttpClientBuilder @@ -52,7 +53,7 @@ public VertxHttpClientBuilder(F clientFactory, Vertx vertx) { @Override public VertxHttpClient build() { if (this.client != null) { - return new VertxHttpClient<>(this, this.client.getClient()); + return new VertxHttpClient<>(this, this.client.getClient(), this.client.getClosed()); } WebClientOptions options = new WebClientOptions(); @@ -115,7 +116,7 @@ public SslContextFactory sslContextFactory() { } }); } - return new VertxHttpClient<>(this, vertx.createHttpClient(options)); + return new VertxHttpClient<>(this, vertx.createHttpClient(options), new AtomicBoolean()); } @Override diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index d8f71d7b168..78270e17e41 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -167,4 +167,6 @@ interface Builder extends DerivedClientBuilder { HttpRequest.Builder newHttpRequestBuilder(); + boolean isClosed(); + } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java index afe7b6490ce..5b0b078bac2 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpClient.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -52,9 +53,11 @@ public abstract class StandardHttpClient builder; + protected AtomicBoolean closed; - protected StandardHttpClient(StandardHttpClientBuilder builder) { + protected StandardHttpClient(StandardHttpClientBuilder builder, AtomicBoolean closed) { this.builder = builder; + this.closed = closed; } public abstract CompletableFuture buildWebSocketDirect( @@ -280,4 +283,22 @@ public V getTag(Class type) { return type.cast(builder.tags.get(type)); } + @Override + final public void close() { + if (closed.compareAndSet(false, true)) { + doClose(); + } + } + + protected abstract void doClose(); + + @Override + public boolean isClosed() { + return closed.get(); + } + + public AtomicBoolean getClosed() { + return closed; + } + } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java index 73f4e6fa1bf..f3664e9e9d4 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpClientTest.java @@ -265,4 +265,18 @@ public static Stream testRetryAfterParsingData() { 10000)); } + @Test + void testIsClosed() { + client.close(); + assertTrue(client.isClosed()); + } + + @Test + void testDerivedIsClosed() { + TestStandardHttpClient childClient = client.newBuilder().connectTimeout(0, TimeUnit.SECONDS).build(); + childClient.close(); + assertTrue(childClient.isClosed()); + assertTrue(client.isClosed()); + } + } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClient.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClient.java index bbf69ff1d0b..4e75f151e64 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClient.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClient.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; public class TestStandardHttpClient extends StandardHttpClient { @@ -39,15 +40,15 @@ public class TestStandardHttpClient @Getter private final List recordedConsumeBytesDirects; - protected TestStandardHttpClient(TestStandardHttpClientBuilder builder) { - super(builder); + protected TestStandardHttpClient(TestStandardHttpClientBuilder builder, AtomicBoolean closed) { + super(builder, closed); expectations = new HashMap<>(); recordedBuildWebSocketDirects = new ArrayList<>(); recordedConsumeBytesDirects = new ArrayList<>(); } @Override - public void close() { + public void doClose() { recordedConsumeBytesDirects.clear(); recordedBuildWebSocketDirects.clear(); expectations.values().forEach(e -> { diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClientBuilder.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClientBuilder.java index 2c661e84a15..71bad6c6800 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClientBuilder.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestStandardHttpClientBuilder.java @@ -15,7 +15,9 @@ */ package io.fabric8.kubernetes.client.http; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; public class TestStandardHttpClientBuilder extends StandardHttpClientBuilder { @@ -30,7 +32,8 @@ protected TestStandardHttpClientBuilder(TestStandardHttpClientFactory clientFact @Override public TestStandardHttpClient build() { - final TestStandardHttpClient instance = new TestStandardHttpClient(this); + final TestStandardHttpClient instance = new TestStandardHttpClient(this, + Optional.ofNullable(instances.peek()).map(TestStandardHttpClient::getClosed).orElse(new AtomicBoolean())); instances.add(instance); return instance; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index b6e10849b2d..c11b6df435b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -240,6 +240,11 @@ void scheduleReconnect(WatchRequestState state) { synchronized void reconnect() { try { + if (client.isClosed()) { + logger.debug("The client has closed, closing the watch"); + this.close(); + return; + } startWatch(); if (isForceClosed()) { closeRequest(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java index dd351e94392..47d94ade223 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java @@ -1069,7 +1069,13 @@ private DefaultSharedIndexInformer createInformer(long resync, Executor ex if (indexers != null) { informer.addIndexers(indexers); } - this.context.getClient().adapt(BaseClient.class).getClosed().whenComplete((closed, ignored) -> informer.stop()); + informer.started().whenComplete((ignored, throwable) -> { + if (throwable == null) { + BaseClient baseClient = this.context.getClient().adapt(BaseClient.class); + baseClient.addToCloseable(informer); + informer.stopped().whenComplete((x, y) -> baseClient.removeFromCloseable(informer)); + } + }); return informer; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java index 9c1f4996b45..cec202280a2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java @@ -47,7 +47,10 @@ import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -90,6 +93,7 @@ public void onClose(Executor executor) { private Executor executor; protected KubernetesSerialization kubernetesSerialization; private CompletableFuture closed; + private Set closable; private OperationContext operationContext; @@ -103,6 +107,7 @@ public void onClose(Executor executor) { this.executorSupplier = baseClient.executorSupplier; this.executor = baseClient.executor; this.kubernetesSerialization = baseClient.kubernetesSerialization; + this.closable = baseClient.closable; setDerivedFields(); if (baseClient.operationContext != null) { operationContext(baseClient.operationContext); @@ -111,6 +116,7 @@ public void onClose(Executor executor) { BaseClient(final HttpClient httpClient, Config config, ExecutorSupplier executorSupplier, KubernetesSerialization kubernetesSerialization) { + this.closable = Collections.newSetFromMap(new WeakHashMap<>()); this.closed = new CompletableFuture<>(); this.config = config; this.httpClient = httpClient; @@ -150,6 +156,14 @@ public synchronized void close() { httpClient.getClass().getName()); } httpClient.close(); + closable.forEach(c -> { + try { + c.close(); + } catch (Exception e) { + logger.warn("Error closing resource", e); + } + }); + closable.clear(); if (this.executorSupplier != null) { this.executorSupplier.onClose(executor); this.executorSupplier = null; @@ -398,4 +412,15 @@ public KubernetesSerialization getKubernetesSerialization() { return kubernetesSerialization; } + public synchronized void addToCloseable(AutoCloseable closeable) { + if (this.closed.isDone()) { + throw new KubernetesClientException("Client is already closed"); + } + this.closable.add(closeable); + } + + public synchronized void removeFromCloseable(AutoCloseable closeable) { + this.closable.remove(closeable); + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index ab5fc877c41..8ebf3ed8e2a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -166,6 +166,10 @@ public CompletableFuture start() { return reflector.start(); } + public CompletableFuture started() { + return reflector.getStartFuture(); + } + @Override public SharedIndexInformer run() { Utils.waitUntilReadyOrFail(start(), -1, TimeUnit.MILLISECONDS); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 4314e18d429..d49d949d7d6 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -1216,8 +1216,29 @@ void testClientStopClosesInformer() throws InterruptedException { .inNamespace("ns1") .runnableInformer(60 * WATCH_EVENT_EMIT_TIME); + animalSharedIndexInformer.start(); + client.close(); + await().atMost(60, TimeUnit.SECONDS).until(() -> animalSharedIndexInformer.stopped().toCompletableFuture().isDone()); + } + + @Test + void testClientStopClosesInformerBeforeStarting() throws InterruptedException { + // Given + setupMockServerExpectations(Animal.class, "ns1", this::getList, + r -> new WatchEvent(getAnimal("red-panda", "Carnivora", r), "ADDED"), null, null); + + // When + SharedIndexInformer animalSharedIndexInformer = client + .genericKubernetesResources(animalContext) + .inNamespace("ns1") + .runnableInformer(60 * WATCH_EVENT_EMIT_TIME); + + client.close(); + + animalSharedIndexInformer.start(); + assertTrue(animalSharedIndexInformer.stopped().toCompletableFuture().isDone()); }