Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensuring that the reconnect task terminates #5331

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Fix #5281: Ensure the KubernetesCrudDispatcher's backing map is accessed w/lock
* Fix #5293: Ensured the mock server uses only generic or JsonNode parsing
* Fix #4225: [crd-generator] Principled generation of enum values instead of considering more properties
* Fix #5327: Ensured that the informer reconnect task terminates after client close

#### Improvements
* Fix #5166: Remove opinionated messages from Config's `errorMessages` and deprecate it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public synchronized void closeRequest() {
if (state != null && state.closed.compareAndSet(false, true)) {
logger.debug("Closing the current watch");
closeCurrentRequest();
CompletableFuture<Void> future = Utils.schedule(Runnable::run, () -> failSafeReconnect(state), watchEndCheckMs,
CompletableFuture<Void> future = Utils.schedule(baseOperation.getOperationContext().getExecutor(),
() -> failSafeReconnect(state), watchEndCheckMs,
TimeUnit.MILLISECONDS);
state.ended.whenComplete((v, t) -> future.cancel(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(informerExecutor, description);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(listerWatcher, processorStore);
this.reflector = new Reflector<>(listerWatcher, processorStore, informerExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
Expand All @@ -54,6 +55,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private final CompletableFuture<Void> startFuture = new CompletableFuture<>();
private final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
private final Executor executor;
//default behavior - retry if started and it's not a watcherexception
private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException);
private long minTimeout = MIN_TIMEOUT;
Expand All @@ -63,11 +65,16 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private boolean cachedListing = true;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, Runnable::run);
}
Comment on lines 67 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to completely remove this constructor or at least make it package-private.


public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store, Executor executor) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher();
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(),
ExponentialBackoffIntervalCalculator.UNLIMITED_RETRIES);
this.executor = executor;
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -160,9 +167,7 @@ protected void reconnect() {
if (isStopped()) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, this::listSyncAndWatch,
reconnectFuture = Utils.schedule(executor, this::listSyncAndWatch,
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -222,7 +227,7 @@ private synchronized CompletableFuture<? extends Watch> startWatcher(final Strin
timeoutFuture.cancel(true);
}
timeoutFuture = new CompletableFuture<>();
Utils.scheduleWithVariableRate(timeoutFuture, Runnable::run,
Utils.scheduleWithVariableRate(timeoutFuture, executor,
() -> future.thenAccept(AbstractWatchManager::closeRequest), timeout.getAsLong(), timeout, TimeUnit.SECONDS);
watchFuture = future;
return watchFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -134,7 +138,13 @@ void testTimeout() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class));
Executor ex = Mockito.mock(Executor.class);
Mockito.doAnswer(invocation -> {
((Runnable) invocation.getArgument(0)).run();
return null;
}).when(ex).execute(Mockito.any(Runnable.class));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class), ex);
reflector.setMinTimeout(1);

AbstractWatchManager manager = Mockito.mock(AbstractWatchManager.class);
Expand All @@ -154,6 +164,24 @@ void testTimeout() {
}
return true;
});

// simulate an abrupt client close
AtomicInteger rejected = new AtomicInteger();
Mockito.doAnswer(invocation -> {
rejected.incrementAndGet();
throw new RejectedExecutionException();
}).when(ex).execute(Mockito.any(Runnable.class));

// make sure the reconnect is rejected
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
return rejected.get() > 0;
});

long start = System.currentTimeMillis();
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
assertEquals(1, rejected.get());
return System.currentTimeMillis() - start > 5000;
});
}

}