From 3bd93736e3be0d23559163af1b6201fc066d312c Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 13 Sep 2023 10:55:35 -0400 Subject: [PATCH] Adds more proactive shutdown of informers Also moving / tweaking the log of client closure Closes #5327 --- CHANGELOG.md | 1 + .../client/okhttp/OkHttpClientImpl.java | 11 ----------- .../client/dsl/internal/BaseOperation.java | 2 ++ .../kubernetes/client/impl/BaseClient.java | 17 +++++++++++++++++ .../mock/DefaultSharedIndexInformerTest.java | 17 +++++++++++++++++ 5 files changed, 37 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aab6823fd72..cb1762dd85b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Fix #5423: OkHttpClientImpl supports setting request method for empty payload requests #### Improvements +* Fix #5327: added proactive shutdown of informers on client close * Fix #5432: [java-generator] Add the possibility to always emit `additionalProperties` on generated POJOs * Fix #5368: added support for additional ListOptions fields * Fix #5377: added a createOr and unlock function to provide a straight-forward replacement for createOrReplace. diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 204b7b56fbd..f2bfd38e2a3 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -55,9 +55,7 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InterruptedIOException; -import java.io.PrintWriter; import java.io.Reader; -import java.io.StringWriter; import java.lang.reflect.Method; import java.net.MalformedURLException; import java.nio.ByteBuffer; @@ -250,15 +248,6 @@ public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder) { @Override public void close() { - if (LOG.isDebugEnabled()) { - StringWriter writer = new StringWriter(); - PrintWriter printWriter = new PrintWriter(writer); - new Exception().printStackTrace(printWriter); - printWriter.close(); - String stack = writer.toString(); - stack = stack.substring(stack.indexOf("\n")); - LOG.debug("Shutting down dispatcher {} at the following call stack: {}", this.httpClient.dispatcher(), stack); - } ConnectionPool connectionPool = httpClient.connectionPool(); Dispatcher dispatcher = httpClient.dispatcher(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java index b2366550c28..8671305bea3 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java @@ -51,6 +51,7 @@ import io.fabric8.kubernetes.client.dsl.base.PatchType; import io.fabric8.kubernetes.client.extension.ExtensibleResource; import io.fabric8.kubernetes.client.http.HttpRequest; +import io.fabric8.kubernetes.client.impl.BaseClient; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer; @@ -1037,6 +1038,7 @@ private DefaultSharedIndexInformer createInformer(long resync, Executor ex if (indexers != null) { informer.addIndexers(indexers); } + this.context.getClient().adapt(BaseClient.class).getClosed().whenComplete((closed, ignored) -> informer.stop()); return informer; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java index 86ea7160d4e..9c1f4996b45 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/impl/BaseClient.java @@ -41,11 +41,14 @@ import io.fabric8.kubernetes.client.utils.ApiVersionUtil; import io.fabric8.kubernetes.client.utils.KubernetesSerialization; import io.fabric8.kubernetes.client.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -53,6 +56,8 @@ public abstract class BaseClient implements Client { + public static final Logger logger = LoggerFactory.getLogger(BaseClient.class); + /** * An {@link ExecutorSupplier} that provides an unlimited thread pool {@link Executor} per client. */ @@ -84,10 +89,12 @@ public void onClose(Executor executor) { private ExecutorSupplier executorSupplier; private Executor executor; protected KubernetesSerialization kubernetesSerialization; + private CompletableFuture closed; private OperationContext operationContext; BaseClient(BaseClient baseClient) { + this.closed = baseClient.closed; this.config = baseClient.config; this.httpClient = baseClient.httpClient; this.adapters = baseClient.adapters; @@ -104,6 +111,7 @@ public void onClose(Executor executor) { BaseClient(final HttpClient httpClient, Config config, ExecutorSupplier executorSupplier, KubernetesSerialization kubernetesSerialization) { + this.closed = new CompletableFuture<>(); this.config = config; this.httpClient = httpClient; this.handlers = new Handlers(); @@ -136,6 +144,11 @@ protected void setDerivedFields() { @Override public synchronized void close() { + if (closed.complete(null) && logger.isDebugEnabled()) { + logger.debug( + "The client and associated httpclient {} have been closed, the usage of this or any client using the httpclient will not work after this", + httpClient.getClass().getName()); + } httpClient.close(); if (this.executorSupplier != null) { this.executorSupplier.onClose(executor); @@ -143,6 +156,10 @@ public synchronized void close() { } } + public CompletableFuture getClosed() { + return closed; + } + @Override public URL getMasterUrl() { return masterUrl; diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 3412b68ed62..4314e18d429 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -1204,6 +1204,23 @@ void testCustomExceptionHandler() throws InterruptedException { assertEquals(0, foundExistingAnimal.getCount()); } + @Test + void testClientStopClosesInformer() throws InterruptedException { + // Given + setupMockServerExpectations(Animal.class, "ns1", this::getList, + r -> new WatchEvent(getAnimal("red-panda", "Carnivora", r), "ADDED"), null, null); + + // When + SharedIndexInformer animalSharedIndexInformer = client + .genericKubernetesResources(animalContext) + .inNamespace("ns1") + .runnableInformer(60 * WATCH_EVENT_EMIT_TIME); + + client.close(); + + assertTrue(animalSharedIndexInformer.stopped().toCompletableFuture().isDone()); + } + private KubernetesResource getAnimal(String name, String order, String resourceVersion) { AnimalSpec animalSpec = new AnimalSpec(); animalSpec.setOrder(order);