From 4214f8992a19b93d228921103b02999200f396b7 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 12 Jul 2023 16:12:23 -0400 Subject: [PATCH] ensuring that the reconnect task terminates Partially addresses: #5327 --- CHANGELOG.md | 1 + .../dsl/internal/AbstractWatchManager.java | 3 +- .../impl/DefaultSharedIndexInformer.java | 2 +- .../informers/impl/cache/Reflector.java | 13 +++++--- .../informers/impl/cache/ReflectorTest.java | 30 ++++++++++++++++++- 5 files changed, 42 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28f607dcc68..7c1796fdd17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Fix #5221: Empty kube config file causes NPE * Fix #5281: Ensure the KubernetesCrudDispatcher's backing map is accessed w/lock * Fix #5293: Ensured the mock server uses only generic or JsonNode parsing +* Fix #5327: Ensured that the informer reconnect task terminates after client close #### Improvements * Fix #5166: Remove opinionated messages from Config's `errorMessages` and deprecate it diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 02be8501703..5422cf6d03c 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -159,7 +159,8 @@ public synchronized void closeRequest() { if (state != null && state.closed.compareAndSet(false, true)) { logger.debug("Closing the current watch"); closeCurrentRequest(); - CompletableFuture future = Utils.schedule(Runnable::run, () -> failSafeReconnect(state), watchEndCheckMs, + CompletableFuture future = Utils.schedule(baseOperation.getOperationContext().getExecutor(), + () -> failSafeReconnect(state), watchEndCheckMs, TimeUnit.MILLISECONDS); state.ended.whenComplete((v, t) -> future.cancel(true)); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 4e01004232d..ab5fc877c41 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -88,7 +88,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(informerExecutor, description); processorStore = new ProcessorStore<>(this.indexer, this.processor); - this.reflector = new Reflector<>(listerWatcher, processorStore); + this.reflector = new Reflector<>(listerWatcher, processorStore, informerExecutor); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 50a71b6dd5b..df92400c93f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; @@ -54,6 +55,7 @@ public class Reflector startFuture = new CompletableFuture<>(); private final CompletableFuture stopFuture = new CompletableFuture<>(); private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; + private final Executor executor; //default behavior - retry if started and it's not a watcherexception private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException); private long minTimeout = MIN_TIMEOUT; @@ -63,11 +65,16 @@ public class Reflector listerWatcher, SyncableStore store) { + this(listerWatcher, store, Runnable::run); + } + + public Reflector(ListerWatcher listerWatcher, SyncableStore store, Executor executor) { this.listerWatcher = listerWatcher; this.store = store; this.watcher = new ReflectorWatcher(); this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), ExponentialBackoffIntervalCalculator.UNLIMITED_RETRIES); + this.executor = executor; } public CompletableFuture start() { @@ -160,9 +167,7 @@ protected void reconnect() { if (isStopped()) { return; } - // this can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, this::listSyncAndWatch, + reconnectFuture = Utils.schedule(executor, this::listSyncAndWatch, retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); } @@ -222,7 +227,7 @@ private synchronized CompletableFuture startWatcher(final Strin timeoutFuture.cancel(true); } timeoutFuture = new CompletableFuture<>(); - Utils.scheduleWithVariableRate(timeoutFuture, Runnable::run, + Utils.scheduleWithVariableRate(timeoutFuture, executor, () -> future.thenAccept(AbstractWatchManager::closeRequest), timeout.getAsLong(), timeout, TimeUnit.SECONDS); watchFuture = future; return watchFuture; diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index 7f6cc860385..6abb1cbb616 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -30,8 +30,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -134,7 +138,13 @@ void testTimeout() { PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); - Reflector reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class)); + Executor ex = Mockito.mock(Executor.class); + Mockito.doAnswer(invocation -> { + ((Runnable) invocation.getArgument(0)).run(); + return null; + }).when(ex).execute(Mockito.any(Runnable.class)); + + Reflector reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class), ex); reflector.setMinTimeout(1); AbstractWatchManager manager = Mockito.mock(AbstractWatchManager.class); @@ -154,6 +164,24 @@ void testTimeout() { } return true; }); + + // simulate an abrupt client close + AtomicInteger rejected = new AtomicInteger(); + Mockito.doAnswer(invocation -> { + rejected.incrementAndGet(); + throw new RejectedExecutionException(); + }).when(ex).execute(Mockito.any(Runnable.class)); + + // make sure the reconnect is rejected + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + return rejected.get() > 0; + }); + + long start = System.currentTimeMillis(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + assertEquals(1, rejected.get()); + return System.currentTimeMillis() - start > 5000; + }); } }