Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for watch regression #4643

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fix #4534: Java Generator CLI default handling of skipGeneratedAnnotations
* Fix #4535: The shell command string will now have single quotes sanitized
* Fix #4543: (Java Generator) additionalProperties JsonAny setter method generated as setAdditionalProperty
* Fix #4641: fixed regression with missing initial watch event
* Fix #4547: preventing timing issues with leader election cancel
* Fix #4540: treating GenericKubernetesResource and RawExtension as buildable
* Fix #4569: fixing jdk httpclient regression with 0 timeouts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@

public class KubernetesNamespacedTestExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback {

private static final ExtensionContext.Namespace EXT_NAMESPACE = ExtensionContext.Namespace
.create(KubernetesNamespacedTestExtension.class);

@Override
public void beforeAll(ExtensionContext context) throws Exception {
final KubernetesClient client = new KubernetesClientBuilder().build();
Expand Down Expand Up @@ -82,7 +79,9 @@ static KubernetesClient getClient(ExtensionContext context) {
}

private static ExtensionContext.Store getStore(ExtensionContext context) {
return context.getRoot().getStore(EXT_NAMESPACE);
ExtensionContext.Namespace namespace = ExtensionContext.Namespace.create(KubernetesNamespacedTestExtension.class,
context.getRequiredTestClass());
return context.getRoot().getStore(namespace);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesR

protected WatcherWebSocketListener<T> listener;
private volatile CompletableFuture<WebSocket> websocketFuture;
private WebSocket websocket;

private volatile boolean ready;

Expand Down Expand Up @@ -86,8 +85,10 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation<T, L,
}

@Override
protected synchronized void closeRequest() {
closeWebSocket(websocket);
protected void closeRequest() {
if (this.listener != null) {
this.listener.close();
}
Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> {
this.websocketFuture = null;
theFuture.whenComplete((w, t) -> {
Expand All @@ -98,22 +99,10 @@ protected synchronized void closeRequest() {
});
}

synchronized WatcherWebSocketListener<T> getListener() {
return listener;
}

public CompletableFuture<WebSocket> getWebsocketFuture() {
return websocketFuture;
}

@Override
protected void onMessage(String message) {
// for consistency we only want to process the message when we're open
if (this.websocketFuture != null) {
super.onMessage(message);
}
}

@Override
protected void start(URL url, Map<String, String> headers) {
this.listener = new WatcherWebSocketListener<>(this);
Expand Down Expand Up @@ -147,7 +136,6 @@ protected void start(URL url, Map<String, String> headers) {
}
if (w != null) {
this.ready = true;
this.websocket = w;
}
return w;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;

class WatcherWebSocketListener<T extends HasMetadata> implements WebSocket.Listener {
protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class);

protected final AbstractWatchManager<T> manager;

private boolean reconnected = false;
private AtomicBoolean reconnected = new AtomicBoolean();
private AtomicBoolean closed = new AtomicBoolean();

protected WatcherWebSocketListener(AbstractWatchManager<T> manager) {
this.manager = manager;
Expand All @@ -48,10 +50,8 @@ public void onError(WebSocket webSocket, Throwable t) {

@Override
public void onMessage(WebSocket webSocket, String text) {
// onMesssage and onClose are serialized, but it's not specified if onError
// may occur simultaneous with onMessage. So we prevent concurrent processing
try {
synchronized (this) {
if (!closed.get()) {
manager.onMessage(text);
}
} finally {
Expand All @@ -71,11 +71,14 @@ public void onClose(WebSocket webSocket, int code, String reason) {
scheduleReconnect();
}

private synchronized void scheduleReconnect() {
if (!reconnected) {
private void scheduleReconnect() {
if (reconnected.compareAndSet(false, true)) {
manager.scheduleReconnect();
reconnected = true;
}
}

public void close() {
closed.set(true);
}

}