Skip to content

Commit

Permalink
fix #5152: ensuring channel closed is not an error after sendClose
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed May 31, 2023
1 parent f6ea58c commit c89a127
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand All @@ -50,6 +51,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener {
private final Lock lock;
private final Condition backPressure;
private final CompletableFuture<Void> terminated = new CompletableFuture<>();
private final AtomicBoolean outputClosed = new AtomicBoolean();
private boolean moreMessages;
private volatile Session webSocketSession;

Expand All @@ -63,7 +65,7 @@ public JettyWebSocket(WebSocket.Listener listener) {

@Override
public boolean send(ByteBuffer buffer) {
if (terminated.isDone() || !webSocketSession.isOpen()) {
if (outputClosed.get() || terminated.isDone() || !webSocketSession.isOpen()) {
return false;
}
buffer = BufferUtil.copy(buffer);
Expand All @@ -88,8 +90,8 @@ public void writeSuccess() {
}

@Override
public synchronized boolean sendClose(int code, String reason) {
if (!webSocketSession.isOpen()) {
public boolean sendClose(int code, String reason) {
if (!outputClosed.compareAndSet(false, true) || !webSocketSession.isOpen()) {
return false;
}
webSocketSession.close(code, reason, new WriteCallback() {
Expand Down Expand Up @@ -140,8 +142,9 @@ public void onWebSocketText(String message) {

@Override
public void onWebSocketClose(int statusCode, String reason) {
terminated.complete(null);
listener.onClose(this, statusCode, reason);
if (terminated.complete(null)) {
listener.onClose(this, statusCode, reason);
}
}

@Override
Expand All @@ -150,10 +153,15 @@ public void onWebSocketConnect(Session session) {
listener.onOpen(this);
}

/**
* The semantics here are different than jdk/okhttp - onClose will be
* invoked after this, if it has not already been called. So we need to skip
* erroneously notifying
*/
@Override
public void onWebSocketError(Throwable cause) {
boolean completed = terminated.complete(null);
if (cause instanceof ClosedChannelException && !completed) {
if (cause instanceof ClosedChannelException && (!completed || outputClosed.get())) {
// TODO: Check better
// It appears to be a race condition in Jetty:
// - The server sends a close frame (but we haven't received it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ void webSocketErrorNotifiesOnErrorWhenClosedAndNotClosedChannelException() {
.hasMessage("NOT ClosedChannelException");
}

@Test
@DisplayName("Remote WebSocket error, notifies onClose if connection is already closed and is NOT ClosedChannelException")
void webSocketErrorIgnoredWhenOutputClosed() {
// Given
final var listener = new Listener();
final var jws = new JettyWebSocket(listener);
jws.onWebSocketConnect(Mockito.mock(Session.class));
listener.events.clear();
jws.sendClose(1000, "Closing");
// When
jws.onWebSocketError(new ClosedChannelException());
// Then
assertThat(listener.events).isEmpty(); // onClose would normally be called later by jetty
}

@Test
@DisplayName("backPressure, onWebSocketText processes first frame and waits for request() call")
void backPressure() throws Exception {
Expand Down

0 comments on commit c89a127

Please sign in to comment.