Skip to content

Commit

Permalink
fix: making dsl waits aware of client close
Browse files Browse the repository at this point in the history
Closes #5379
  • Loading branch information
shawkins authored and manusa committed Sep 25, 2023
1 parent 351bd6c commit 007abfa
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Fix #5382: [java-generator] Allow to deserialize more valid RFC3339 date-time and make the format customizable
* Fix #5380: [java-generator] Avoid to emit Java Keywords as package names
* Fix #5457: [java-generator] Correctly handle numeric enums
* Fix #5379: ensuring informOnCondition and waitUntilCondition commple with exception when the client closes
* Fix #5463: ensures that onStopLeading is called with releaseOnCancel even when leadership is already lost
* Fix #5423: OkHttpClientImpl supports setting request method for empty payload requests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,13 @@ public void onNothing() {
future.completeExceptionally(t);
}
});
informer.stopped().whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.completeExceptionally(new KubernetesClientException("Informer was stopped"));
}
});
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import io.fabric8.kubernetes.client.dsl.NamespaceableResource;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.Waitable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -55,8 +53,6 @@ public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImp
implements NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata>,
Waitable<List<HasMetadata>, HasMetadata> {

private static final Logger LOGGER = LoggerFactory
.getLogger(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.class);
protected static final String EXPRESSION = "expression";

private OperationContext context;
Expand Down Expand Up @@ -133,24 +129,26 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,

long finish = System.nanoTime() + timeUnit.toNanos(amount);

for (int i = 0; i < items.size(); i++) {
final HasMetadata meta = items.get(i);
CompletableFuture<List<HasMetadata>> future = futures.get(i);
try {
results.add(future.thenApply(l -> l.isEmpty() ? null : l.get(0)).get(Math.max(0, finish - System.nanoTime()),
TimeUnit.NANOSECONDS));
} catch (TimeoutException e) {
itemsWithConditionNotMatched.add(meta);
logAsNotReady(e, meta);
} catch (ExecutionException e) {
itemsWithConditionNotMatched.add(meta);
logAsNotReady(e.getCause(), meta);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
} finally {
future.cancel(true);
try {
for (int i = 0; i < items.size(); i++) {
final HasMetadata meta = items.get(i);
CompletableFuture<List<HasMetadata>> future = futures.get(i);
try {
results.add(future.thenApply(l -> l.isEmpty() ? null : l.get(0)).get(Math.max(0, finish - System.nanoTime()),
TimeUnit.NANOSECONDS));
} catch (TimeoutException e) {
itemsWithConditionNotMatched.add(meta);
} catch (ExecutionException e) {
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
} finally {
future.cancel(true);
}
}
} finally {
futures.forEach(f -> f.cancel(true));
}

if (!itemsWithConditionNotMatched.isEmpty()) {
Expand All @@ -160,13 +158,6 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
return results;
}

private static void logAsNotReady(Throwable t, HasMetadata meta) {
LOGGER.warn(
"Error while waiting for: [{}] with name: [{}] in namespace: [{}]: {}. The resource will be considered not ready.",
meta.getKind(), meta.getMetadata().getName(), meta.getMetadata().getNamespace(), t.getMessage());
LOGGER.debug("The error stack trace:", t);
}

@Override
public ListVisitFromServerWritable<HasMetadata> dryRun(boolean isDryRun) {
return newInstance(this.context.withDryRun(isDryRun));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -47,6 +48,7 @@
import java.net.HttpURLConnection;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.net.HttpURLConnection.HTTP_GONE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -447,6 +449,42 @@ void testWaitUntilCondition() throws InterruptedException {
.containsExactly(tuple("Ready", "True"), tuple("Dummy", "True"));
}

@Test
void testWaitUntilConditionClosedClient() throws InterruptedException {
Pod pod1 = new PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
.withNamespace("test")
.and()
.build();

Pod noReady = createReadyFrom(pod1, "False", "1");
list(noReady);

server.expect()
.get()
.withPath(
"/api/v1/namespaces/test/pods?allowWatchBookmarks=true&fieldSelector=metadata.name%3Dpod1&resourceVersion=1&timeoutSeconds=600&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(4000)
.andEmit(new WatchEvent(noReady, "DELETED"))
.done()
.always();

Utils.schedule(Runnable::run, client::close, 2, TimeUnit.SECONDS);

PodResource podResource = client.pods().withName("pod1");
KubernetesClientException exception = assertThrows(KubernetesClientException.class, () -> podResource.waitUntilCondition(
r -> r.getStatus()
.getConditions()
.stream()
.anyMatch(c -> "Dummy".equals(c.getType()) && "True".equals(c.getStatus())),
10, SECONDS));
assertEquals("Informer was stopped", exception.getMessage());

}

@Test
void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
Pod pod1 = new PodBuilder().withNewMetadata()
Expand Down

0 comments on commit 007abfa

Please sign in to comment.