From 007abfa3a498703d20327a0cc95c64f7f718bb08 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Sat, 23 Sep 2023 08:31:50 -0400 Subject: [PATCH] fix: making dsl waits aware of client close Closes #5379 --- CHANGELOG.md | 1 + .../client/dsl/internal/BaseOperation.java | 7 +++ ...hDeleteRecreateWaitApplicableListImpl.java | 47 ++++++++----------- .../kubernetes/client/mock/ResourceTest.java | 38 +++++++++++++++ 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7825642c2be..f96b7e9aef6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Fix #5382: [java-generator] Allow to deserialize more valid RFC3339 date-time and make the format customizable * Fix #5380: [java-generator] Avoid to emit Java Keywords as package names * Fix #5457: [java-generator] Correctly handle numeric enums +* Fix #5379: ensuring informOnCondition and waitUntilCondition commple with exception when the client closes * Fix #5463: ensures that onStopLeading is called with releaseOnCancel even when leadership is already lost * Fix #5423: OkHttpClientImpl supports setting request method for empty payload requests diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java index 9106b39a09b..dd351e94392 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java @@ -990,6 +990,13 @@ public void onNothing() { future.completeExceptionally(t); } }); + informer.stopped().whenComplete((v, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.completeExceptionally(new KubernetesClientException("Informer was stopped")); + } + }); return future; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java index e0df7f8b2cb..31e4a553c8f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java @@ -33,8 +33,6 @@ import io.fabric8.kubernetes.client.dsl.NamespaceableResource; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.Waitable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -55,8 +53,6 @@ public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImp implements NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, Waitable, HasMetadata> { - private static final Logger LOGGER = LoggerFactory - .getLogger(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.class); protected static final String EXPRESSION = "expression"; private OperationContext context; @@ -133,24 +129,26 @@ public List waitUntilCondition(Predicate condition, long finish = System.nanoTime() + timeUnit.toNanos(amount); - for (int i = 0; i < items.size(); i++) { - final HasMetadata meta = items.get(i); - CompletableFuture> future = futures.get(i); - try { - results.add(future.thenApply(l -> l.isEmpty() ? null : l.get(0)).get(Math.max(0, finish - System.nanoTime()), - TimeUnit.NANOSECONDS)); - } catch (TimeoutException e) { - itemsWithConditionNotMatched.add(meta); - logAsNotReady(e, meta); - } catch (ExecutionException e) { - itemsWithConditionNotMatched.add(meta); - logAsNotReady(e.getCause(), meta); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw KubernetesClientException.launderThrowable(e); - } finally { - future.cancel(true); + try { + for (int i = 0; i < items.size(); i++) { + final HasMetadata meta = items.get(i); + CompletableFuture> future = futures.get(i); + try { + results.add(future.thenApply(l -> l.isEmpty() ? null : l.get(0)).get(Math.max(0, finish - System.nanoTime()), + TimeUnit.NANOSECONDS)); + } catch (TimeoutException e) { + itemsWithConditionNotMatched.add(meta); + } catch (ExecutionException e) { + throw KubernetesClientException.launderThrowable(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); + } finally { + future.cancel(true); + } } + } finally { + futures.forEach(f -> f.cancel(true)); } if (!itemsWithConditionNotMatched.isEmpty()) { @@ -160,13 +158,6 @@ public List waitUntilCondition(Predicate condition, return results; } - private static void logAsNotReady(Throwable t, HasMetadata meta) { - LOGGER.warn( - "Error while waiting for: [{}] with name: [{}] in namespace: [{}]: {}. The resource will be considered not ready.", - meta.getKind(), meta.getMetadata().getName(), meta.getMetadata().getNamespace(), t.getMessage()); - LOGGER.debug("The error stack trace:", t); - } - @Override public ListVisitFromServerWritable dryRun(boolean isDryRun) { return newInstance(this.context.withDryRun(isDryRun)); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index 90a7f7796b8..9dfb97533cc 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -39,6 +39,7 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.fabric8.kubernetes.client.utils.Serialization; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -47,6 +48,7 @@ import java.net.HttpURLConnection; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static java.net.HttpURLConnection.HTTP_GONE; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -447,6 +449,42 @@ void testWaitUntilCondition() throws InterruptedException { .containsExactly(tuple("Ready", "True"), tuple("Dummy", "True")); } + @Test + void testWaitUntilConditionClosedClient() throws InterruptedException { + Pod pod1 = new PodBuilder().withNewMetadata() + .withName("pod1") + .withResourceVersion("1") + .withNamespace("test") + .and() + .build(); + + Pod noReady = createReadyFrom(pod1, "False", "1"); + list(noReady); + + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?allowWatchBookmarks=true&fieldSelector=metadata.name%3Dpod1&resourceVersion=1&timeoutSeconds=600&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(4000) + .andEmit(new WatchEvent(noReady, "DELETED")) + .done() + .always(); + + Utils.schedule(Runnable::run, client::close, 2, TimeUnit.SECONDS); + + PodResource podResource = client.pods().withName("pod1"); + KubernetesClientException exception = assertThrows(KubernetesClientException.class, () -> podResource.waitUntilCondition( + r -> r.getStatus() + .getConditions() + .stream() + .anyMatch(c -> "Dummy".equals(c.getType()) && "True".equals(c.getStatus())), + 10, SECONDS)); + assertEquals("Informer was stopped", exception.getMessage()); + + } + @Test void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata()