Skip to content

Commit

Permalink
ensuring that the reconnect task terminates
Browse files Browse the repository at this point in the history
Partially addresses: #5327
  • Loading branch information
shawkins authored and manusa committed Jul 18, 2023
1 parent 7c652b0 commit 20be3e6
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Fix #5281: Ensure the KubernetesCrudDispatcher's backing map is accessed w/lock
* Fix #5293: Ensured the mock server uses only generic or JsonNode parsing
* Fix #4225: [crd-generator] Principled generation of enum values instead of considering more properties
* Fix #5327: Ensured that the informer reconnect task terminates after client close

#### Improvements
* Fix #5166: Remove opinionated messages from Config's `errorMessages` and deprecate it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public synchronized void closeRequest() {
if (state != null && state.closed.compareAndSet(false, true)) {
logger.debug("Closing the current watch");
closeCurrentRequest();
CompletableFuture<Void> future = Utils.schedule(Runnable::run, () -> failSafeReconnect(state), watchEndCheckMs,
CompletableFuture<Void> future = Utils.schedule(baseOperation.getOperationContext().getExecutor(),
() -> failSafeReconnect(state), watchEndCheckMs,
TimeUnit.MILLISECONDS);
state.ended.whenComplete((v, t) -> future.cancel(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(informerExecutor, description);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(listerWatcher, processorStore);
this.reflector = new Reflector<>(listerWatcher, processorStore, informerExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
Expand All @@ -54,6 +55,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private final CompletableFuture<Void> startFuture = new CompletableFuture<>();
private final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
private final Executor executor;
//default behavior - retry if started and it's not a watcherexception
private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException);
private long minTimeout = MIN_TIMEOUT;
Expand All @@ -63,11 +65,16 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private boolean cachedListing = true;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, Runnable::run);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store, Executor executor) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher();
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(),
ExponentialBackoffIntervalCalculator.UNLIMITED_RETRIES);
this.executor = executor;
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -160,9 +167,7 @@ protected void reconnect() {
if (isStopped()) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, this::listSyncAndWatch,
reconnectFuture = Utils.schedule(executor, this::listSyncAndWatch,
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -222,7 +227,7 @@ private synchronized CompletableFuture<? extends Watch> startWatcher(final Strin
timeoutFuture.cancel(true);
}
timeoutFuture = new CompletableFuture<>();
Utils.scheduleWithVariableRate(timeoutFuture, Runnable::run,
Utils.scheduleWithVariableRate(timeoutFuture, executor,
() -> future.thenAccept(AbstractWatchManager::closeRequest), timeout.getAsLong(), timeout, TimeUnit.SECONDS);
watchFuture = future;
return watchFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -134,7 +138,13 @@ void testTimeout() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class));
Executor ex = Mockito.mock(Executor.class);
Mockito.doAnswer(invocation -> {
((Runnable) invocation.getArgument(0)).run();
return null;
}).when(ex).execute(Mockito.any(Runnable.class));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class), ex);
reflector.setMinTimeout(1);

AbstractWatchManager manager = Mockito.mock(AbstractWatchManager.class);
Expand All @@ -154,6 +164,24 @@ void testTimeout() {
}
return true;
});

// simulate an abrupt client close
AtomicInteger rejected = new AtomicInteger();
Mockito.doAnswer(invocation -> {
rejected.incrementAndGet();
throw new RejectedExecutionException();
}).when(ex).execute(Mockito.any(Runnable.class));

// make sure the reconnect is rejected
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
return rejected.get() > 0;
});

long start = System.currentTimeMillis();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
assertEquals(1, rejected.get());
return System.currentTimeMillis() - start > 5000;
});
}

}

0 comments on commit 20be3e6

Please sign in to comment.