Skip to content

Commit

Permalink
fix #4641: addressing a regression that misses initial watch event
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Dec 7, 2022
1 parent 9b248dd commit 0427a76
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fix #4534: Java Generator CLI default handling of skipGeneratedAnnotations
* Fix #4535: The shell command string will now have single quotes sanitized
* Fix #4543: (Java Generator) additionalProperties JsonAny setter method generated as setAdditionalProperty
* Fix #4641: fixed regression with missing initial watch event
* Fix #4547: preventing timing issues with leader election cancel
* Fix #4540: treating GenericKubernetesResource and RawExtension as buildable
* Fix #4569: fixing jdk httpclient regression with 0 timeouts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@

public class KubernetesNamespacedTestExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback {

private static final ExtensionContext.Namespace EXT_NAMESPACE = ExtensionContext.Namespace
.create(KubernetesNamespacedTestExtension.class);

@Override
public void beforeAll(ExtensionContext context) throws Exception {
final KubernetesClient client = new KubernetesClientBuilder().build();
Expand Down Expand Up @@ -82,7 +79,9 @@ static KubernetesClient getClient(ExtensionContext context) {
}

private static ExtensionContext.Store getStore(ExtensionContext context) {
return context.getRoot().getStore(EXT_NAMESPACE);
ExtensionContext.Namespace namespace = ExtensionContext.Namespace.create(KubernetesNamespacedTestExtension.class,
context.getRequiredTestClass());
return context.getRoot().getStore(namespace);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesR

protected WatcherWebSocketListener<T> listener;
private volatile CompletableFuture<WebSocket> websocketFuture;
private WebSocket websocket;

private volatile boolean ready;

Expand Down Expand Up @@ -86,8 +85,10 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation<T, L,
}

@Override
protected synchronized void closeRequest() {
closeWebSocket(websocket);
protected void closeRequest() {
if (this.listener != null) {
this.listener.close();
}
Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> {
this.websocketFuture = null;
theFuture.whenComplete((w, t) -> {
Expand All @@ -98,22 +99,10 @@ protected synchronized void closeRequest() {
});
}

synchronized WatcherWebSocketListener<T> getListener() {
return listener;
}

public CompletableFuture<WebSocket> getWebsocketFuture() {
return websocketFuture;
}

@Override
protected void onMessage(String message) {
// for consistency we only want to process the message when we're open
if (this.websocketFuture != null) {
super.onMessage(message);
}
}

@Override
protected void start(URL url, Map<String, String> headers) {
this.listener = new WatcherWebSocketListener<>(this);
Expand Down Expand Up @@ -147,7 +136,6 @@ protected void start(URL url, Map<String, String> headers) {
}
if (w != null) {
this.ready = true;
this.websocket = w;
}
return w;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;

class WatcherWebSocketListener<T extends HasMetadata> implements WebSocket.Listener {
protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class);

protected final AbstractWatchManager<T> manager;

private boolean reconnected = false;
private AtomicBoolean reconnected = new AtomicBoolean();
private AtomicBoolean closed = new AtomicBoolean();

protected WatcherWebSocketListener(AbstractWatchManager<T> manager) {
this.manager = manager;
Expand All @@ -48,10 +50,8 @@ public void onError(WebSocket webSocket, Throwable t) {

@Override
public void onMessage(WebSocket webSocket, String text) {
// onMesssage and onClose are serialized, but it's not specified if onError
// may occur simultaneous with onMessage. So we prevent concurrent processing
try {
synchronized (this) {
if (!closed.get()) {
manager.onMessage(text);
}
} finally {
Expand All @@ -71,11 +71,14 @@ public void onClose(WebSocket webSocket, int code, String reason) {
scheduleReconnect();
}

private synchronized void scheduleReconnect() {
if (!reconnected) {
private void scheduleReconnect() {
if (reconnected.compareAndSet(false, true)) {
manager.scheduleReconnect();
reconnected = true;
}
}

public void close() {
closed.set(true);
}

}

0 comments on commit 0427a76

Please sign in to comment.