Skip to content

Commit

Permalink
Adds more proactive shutdown of informers
Browse files Browse the repository at this point in the history
Also moving / tweaking the log of client closure

Closes #5327
  • Loading branch information
shawkins committed Sep 13, 2023
1 parent 864dd19 commit da863d0
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Fix #5423: OkHttpClientImpl supports setting request method for empty payload requests

#### Improvements
* Fix #5327: added proactive shutdown of informers on client close
* Fix #5368: added support for additional ListOptions fields
* Fix #5377: added a createOr and unlock function to provide a straight-forward replacement for createOrReplace.
* Fix #5388: [crd-generator] Generate deterministic CRDs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -250,15 +248,6 @@ public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder) {

@Override
public void close() {
if (LOG.isDebugEnabled()) {
StringWriter writer = new StringWriter();
PrintWriter printWriter = new PrintWriter(writer);
new Exception().printStackTrace(printWriter);
printWriter.close();
String stack = writer.toString();
stack = stack.substring(stack.indexOf("\n"));
LOG.debug("Shutting down dispatcher {} at the following call stack: {}", this.httpClient.dispatcher(), stack);
}
ConnectionPool connectionPool = httpClient.connectionPool();

Dispatcher dispatcher = httpClient.dispatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import io.fabric8.kubernetes.client.extension.ExtensibleResource;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.impl.BaseClient;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer;
Expand Down Expand Up @@ -1037,6 +1038,7 @@ private DefaultSharedIndexInformer<T, L> createInformer(long resync, Executor ex
if (indexers != null) {
informer.addIndexers(indexers);
}
this.context.getClient().adapt(BaseClient.class).getClosed().whenComplete((closed, ignored) -> informer.stop());
return informer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,23 @@
import io.fabric8.kubernetes.client.utils.ApiVersionUtil;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;

public abstract class BaseClient implements Client {

public static final Logger logger = LoggerFactory.getLogger(BaseClient.class);

/**
* An {@link ExecutorSupplier} that provides an unlimited thread pool {@link Executor} per client.
*/
Expand Down Expand Up @@ -84,10 +89,12 @@ public void onClose(Executor executor) {
private ExecutorSupplier executorSupplier;
private Executor executor;
protected KubernetesSerialization kubernetesSerialization;
private CompletableFuture<Void> closed;

private OperationContext operationContext;

BaseClient(BaseClient baseClient) {
this.closed = baseClient.closed;
this.config = baseClient.config;
this.httpClient = baseClient.httpClient;
this.adapters = baseClient.adapters;
Expand All @@ -104,6 +111,7 @@ public void onClose(Executor executor) {

BaseClient(final HttpClient httpClient, Config config, ExecutorSupplier executorSupplier,
KubernetesSerialization kubernetesSerialization) {
this.closed = new CompletableFuture<>();
this.config = config;
this.httpClient = httpClient;
this.handlers = new Handlers();
Expand Down Expand Up @@ -136,13 +144,22 @@ protected void setDerivedFields() {

@Override
public synchronized void close() {
if (closed.complete(null) && logger.isDebugEnabled()) {
logger.debug(
"The client and associated httpclient {} have been closed, the usage of this or any client using the httpclient will not work after this",
httpClient.getClass().getName());
}
httpClient.close();
if (this.executorSupplier != null) {
this.executorSupplier.onClose(executor);
this.executorSupplier = null;
}
}

public CompletableFuture<Void> getClosed() {
return closed;
}

@Override
public URL getMasterUrl() {
return masterUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,23 @@ void testCustomExceptionHandler() throws InterruptedException {
assertEquals(0, foundExistingAnimal.getCount());
}

@Test
void testClientStopClosesInformer() throws InterruptedException {
// Given
setupMockServerExpectations(Animal.class, "ns1", this::getList,
r -> new WatchEvent(getAnimal("red-panda", "Carnivora", r), "ADDED"), null, null);

// When
SharedIndexInformer<GenericKubernetesResource> animalSharedIndexInformer = client
.genericKubernetesResources(animalContext)
.inNamespace("ns1")
.runnableInformer(60 * WATCH_EVENT_EMIT_TIME);

client.close();

assertTrue(animalSharedIndexInformer.stopped().toCompletableFuture().isDone());
}

private KubernetesResource getAnimal(String name, String order, String resourceVersion) {
AnimalSpec animalSpec = new AnimalSpec();
animalSpec.setOrder(order);
Expand Down

0 comments on commit da863d0

Please sign in to comment.