From 0427a76999149a745d21782fda04a66b8a467fc6 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 5 Dec 2022 18:28:31 -0500 Subject: [PATCH] fix #4641: addressing a regression that misses initial watch event --- CHANGELOG.md | 1 + .../KubernetesNamespacedTestExtension.java | 7 +++---- .../dsl/internal/WatchConnectionManager.java | 20 ++++--------------- .../internal/WatcherWebSocketListener.java | 17 +++++++++------- 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 548df52f29d..f831bd10f43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * Fix #4534: Java Generator CLI default handling of skipGeneratedAnnotations * Fix #4535: The shell command string will now have single quotes sanitized * Fix #4543: (Java Generator) additionalProperties JsonAny setter method generated as setAdditionalProperty +* Fix #4641: fixed regression with missing initial watch event * Fix #4547: preventing timing issues with leader election cancel * Fix #4540: treating GenericKubernetesResource and RawExtension as buildable * Fix #4569: fixing jdk httpclient regression with 0 timeouts diff --git a/junit/kubernetes-junit-jupiter/src/main/java/io/fabric8/junit/jupiter/KubernetesNamespacedTestExtension.java b/junit/kubernetes-junit-jupiter/src/main/java/io/fabric8/junit/jupiter/KubernetesNamespacedTestExtension.java index ddf5da43c80..07b06d6efe8 100644 --- a/junit/kubernetes-junit-jupiter/src/main/java/io/fabric8/junit/jupiter/KubernetesNamespacedTestExtension.java +++ b/junit/kubernetes-junit-jupiter/src/main/java/io/fabric8/junit/jupiter/KubernetesNamespacedTestExtension.java @@ -39,9 +39,6 @@ public class KubernetesNamespacedTestExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback { - private static final ExtensionContext.Namespace EXT_NAMESPACE = ExtensionContext.Namespace - .create(KubernetesNamespacedTestExtension.class); - @Override public void beforeAll(ExtensionContext context) throws Exception { final KubernetesClient client = new KubernetesClientBuilder().build(); @@ -82,7 +79,9 @@ static KubernetesClient getClient(ExtensionContext context) { } private static ExtensionContext.Store getStore(ExtensionContext context) { - return context.getRoot().getStore(EXT_NAMESPACE); + ExtensionContext.Namespace namespace = ExtensionContext.Namespace.create(KubernetesNamespacedTestExtension.class, + context.getRequiredTestClass()); + return context.getRoot().getStore(namespace); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 3404f802be3..5f8416b12db 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -52,7 +52,6 @@ public class WatchConnectionManager listener; private volatile CompletableFuture websocketFuture; - private WebSocket websocket; private volatile boolean ready; @@ -86,8 +85,10 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation { this.websocketFuture = null; theFuture.whenComplete((w, t) -> { @@ -98,22 +99,10 @@ protected synchronized void closeRequest() { }); } - synchronized WatcherWebSocketListener getListener() { - return listener; - } - public CompletableFuture getWebsocketFuture() { return websocketFuture; } - @Override - protected void onMessage(String message) { - // for consistency we only want to process the message when we're open - if (this.websocketFuture != null) { - super.onMessage(message); - } - } - @Override protected void start(URL url, Map headers) { this.listener = new WatcherWebSocketListener<>(this); @@ -147,7 +136,6 @@ protected void start(URL url, Map headers) { } if (w != null) { this.ready = true; - this.websocket = w; } return w; }); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 29ebee897b4..26fc9acb875 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -22,13 +22,15 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; class WatcherWebSocketListener implements WebSocket.Listener { protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); protected final AbstractWatchManager manager; - private boolean reconnected = false; + private AtomicBoolean reconnected = new AtomicBoolean(); + private AtomicBoolean closed = new AtomicBoolean(); protected WatcherWebSocketListener(AbstractWatchManager manager) { this.manager = manager; @@ -48,10 +50,8 @@ public void onError(WebSocket webSocket, Throwable t) { @Override public void onMessage(WebSocket webSocket, String text) { - // onMesssage and onClose are serialized, but it's not specified if onError - // may occur simultaneous with onMessage. So we prevent concurrent processing try { - synchronized (this) { + if (!closed.get()) { manager.onMessage(text); } } finally { @@ -71,11 +71,14 @@ public void onClose(WebSocket webSocket, int code, String reason) { scheduleReconnect(); } - private synchronized void scheduleReconnect() { - if (!reconnected) { + private void scheduleReconnect() { + if (reconnected.compareAndSet(false, true)) { manager.scheduleReconnect(); - reconnected = true; } } + public void close() { + closed.set(true); + } + }