From 13297d5bd7f8a0d5a9df0584ac5619f845c33e29 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 8 Jul 2020 16:20:27 +1000 Subject: [PATCH 1/6] Issue #5018 - cancellation of WebSocketClient.connect Future should fail upgrade Signed-off-by: Lachlan Roberts --- .../jetty/websocket/client/WebSocketClient.java | 2 +- .../websocket/client/WebSocketUpgradeRequest.java | 12 +++++++++--- .../jetty/websocket/common/WebSocketSession.java | 9 +++++++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index 514e51a8b595..67058f7cfdb3 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -342,7 +342,7 @@ public Future connect(Object websocket, URI toUri, ClientUpgradeRequest } String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH); - if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false)) + if ((!"ws".equals(scheme)) && (!"wss".equals(scheme))) { throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]"); } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java index f415e86d2af3..4154c5833352 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -421,12 +422,17 @@ public WebSocketUpgradeRequest(WebSocketClient wsClient, HttpClient httpClient, } this.localEndpoint = this.wsClient.getEventDriverFactory().wrap(localEndpoint); - this.fut = new CompletableFuture(); + this.fut = new CompletableFuture<>(); + this.fut.whenComplete((session, throwable) -> + { + if (throwable instanceof CancellationException) + abort(throwable); + }); getConversation().setAttribute(HttpConnectionUpgrader.class.getName(), this); } - private final String genRandomKey() + private String genRandomKey() { byte[] bytes = new byte[16]; ThreadLocalRandom.current().nextBytes(bytes); @@ -580,7 +586,7 @@ public void upgrade(HttpResponse response, HttpConnectionOverHTTP oldConn) String expectedHash = AcceptHash.hashKey(reqKey); String respHash = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_ACCEPT); - if (expectedHash.equalsIgnoreCase(respHash) == false) + if (!expectedHash.equalsIgnoreCase(respHash)) { throw new HttpResponseException("Invalid Sec-WebSocket-Accept hash", response); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 6d923db9224b..d0d364608114 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -404,7 +404,7 @@ public void callApplicationOnError(Throwable cause) if (LOG.isDebugEnabled()) LOG.debug("callApplicationOnError()", cause); - if (openFuture != null && !openFuture.isDone()) + if (openFuture != null) openFuture.completeExceptionally(cause); // Only notify onError if onClose has not been called. @@ -478,7 +478,7 @@ public void open() return; } - try (ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader)) + try (ThreadClassLoaderScope ignored = new ThreadClassLoaderScope(classLoader)) { // Upgrade success if (connection.opening()) @@ -535,6 +535,11 @@ public void setExtensionFactory(ExtensionFactory extensionFactory) public void setFuture(CompletableFuture fut) { this.openFuture = fut; + fut.whenComplete((s, t) -> + { + if (t != null) + close(t); + }); } /** From d7c47f42d6e69c997d5e2178eb8169d456c1c3fa Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 8 Jul 2020 16:48:43 +1000 Subject: [PATCH 2/6] Issue #5018 - add testing for WebSocketClient Future cancellations Signed-off-by: Lachlan Roberts --- .../tests/client/ConnectFutureAbortTest.java | 284 ++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java new file mode 100644 index 000000000000..0d1812b6b9d9 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java @@ -0,0 +1,284 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.util.EnumSet; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.servlet.DispatcherType; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.UpgradeException; +import org.eclipse.jetty.websocket.api.UpgradeRequest; +import org.eclipse.jetty.websocket.api.UpgradeResponse; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.client.io.UpgradeListener; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.WebSocketSessionListener; +import org.eclipse.jetty.websocket.server.NativeWebSocketConfiguration; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.eclipse.jetty.websocket.tests.EventSocket; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConnectFutureAbortTest +{ + private Server server; + private WebSocketClient client; + + @FunctionalInterface + public interface Configuration + { + void configure(NativeWebSocketConfiguration configuration); + } + + public void start(Configuration configuration) throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(); + contextHandler.setContextPath("/"); + server.setHandler(contextHandler); + + NativeWebSocketServletContainerInitializer.configure(contextHandler, (context, container) -> + configuration.configure(container)); + contextHandler.addFilter(WebSocketUpgradeFilter.class, "/", EnumSet.of(DispatcherType.REQUEST)); + server.start(); + + client = new WebSocketClient(); + client.start(); + } + + @AfterEach + public void stop() throws Exception + { + if (client != null) + client.stop(); + if (server != null) + server.stop(); + } + + @Test + public void testAbortDuringCreator() throws Exception + { + final CountDownLatch enteredCreator = new CountDownLatch(1); + final CountDownLatch exitCreator = new CountDownLatch(1); + start(c -> + { + c.addMapping("/", (req, res) -> + { + try + { + enteredCreator.countDown(); + exitCreator.await(); + return new EventSocket.EchoSocket(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + }); + }); + + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + + // Cancel the future once we have entered the servers WebSocketCreator (after upgrade request is sent). + assertTrue(enteredCreator.await(5, TimeUnit.SECONDS)); + assertTrue(connect.cancel(true)); + assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); + exitCreator.countDown(); + assertFalse(clientSocket.openLatch.await(2, TimeUnit.SECONDS)); + + // Strange that onError() is called but onOpen() is never called. (This would not be the case in Jetty-10). + assertThat(clientSocket.error.get(), instanceOf(UpgradeException.class)); + } + + @Test + public void testAbortSessionOnCreated() throws Exception + { + start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + + final CountDownLatch enteredListener = new CountDownLatch(1); + final CountDownLatch exitListener = new CountDownLatch(1); + client.addSessionListener(new WebSocketSessionListener() + { + @Override + public void onSessionCreated(WebSocketSession session) + { + try + { + enteredListener.countDown(); + exitListener.await(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + } + }); + + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + + // Abort when session is created, this is before future has been added to session and before the connection upgrade. + assertTrue(enteredListener.await(5, TimeUnit.SECONDS)); + assertTrue(connect.cancel(true)); + assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); + exitListener.countDown(); + assertFalse(clientSocket.openLatch.await(2, TimeUnit.SECONDS)); + + // Strange that onError() is called but onOpen() is never called. (This would not be the case in Jetty-10). + assertThat(clientSocket.error.get(), instanceOf(CancellationException.class)); + } + + @Test + public void testAbortInHandshakeResponse() throws Exception + { + start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + + final CountDownLatch enteredListener = new CountDownLatch(1); + final CountDownLatch exitListener = new CountDownLatch(1); + UpgradeListener upgradeListener = new AbstractUpgradeListener() + { + @Override + public void onHandshakeResponse(UpgradeResponse response) + { + try + { + enteredListener.countDown(); + exitListener.await(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + } + }; + + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI()), upgradeRequest, upgradeListener); + + // Abort after after handshake response, which is before connection upgrade, but after future has been set on session. + assertTrue(enteredListener.await(5, TimeUnit.SECONDS)); + assertTrue(connect.cancel(true)); + assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); + exitListener.countDown(); + assertFalse(clientSocket.openLatch.await(2, TimeUnit.SECONDS)); + + // Strange that onError() is called but onOpen() is never called. (This would not be the case in Jetty-10). + assertThat(clientSocket.error.get(), instanceOf(CancellationException.class)); + } + + @Test + public void testAbortOnOpened() throws Exception + { + start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + + final CountDownLatch exitOnOpen = new CountDownLatch(1); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint() + { + @Override + public void onWebSocketConnect(Session session) + { + try + { + super.onWebSocketConnect(session); + exitOnOpen.await(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + } + }; + + // Abort during the call to onOpened. This is after future has been added to session, + // and after connection has been upgraded, but before future completion. + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + assertTrue(connect.cancel(true)); + exitOnOpen.countDown(); + + // We got an error on the WebSocket endpoint and an error from the future. + assertTrue(clientSocket.errorLatch.await(5, TimeUnit.SECONDS)); + assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testAbortAfterCompletion() throws Exception + { + start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + Session session = connect.get(5, TimeUnit.SECONDS); + + // If we can send and receive messages the future has been completed. + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + clientSocket.getSession().getRemote().sendString("hello"); + assertThat(clientSocket.messageQueue.poll(5, TimeUnit.SECONDS), Matchers.is("hello")); + + // After it has been completed we should not get any errors from cancelling it. + assertFalse(connect.cancel(true)); + assertThat(connect.get(5, TimeUnit.SECONDS), instanceOf(Session.class)); + assertFalse(clientSocket.closeLatch.await(2, TimeUnit.SECONDS)); + assertNull(clientSocket.error.get()); + + // Close the session properly. + session.close(); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + } + + public abstract static class AbstractUpgradeListener implements UpgradeListener + { + @Override + public void onHandshakeRequest(UpgradeRequest request) + { + } + + @Override + public void onHandshakeResponse(UpgradeResponse response) + { + } + } +} \ No newline at end of file From 9c910e941361ddfe33ea16531230451f9c00f39d Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 14 Jul 2020 18:09:40 +1000 Subject: [PATCH 3/6] changes from review Signed-off-by: Lachlan Roberts --- ...eAbortTest.java => ConnectFutureTest.java} | 66 ++++++++++++++----- 1 file changed, 48 insertions(+), 18 deletions(-) rename jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/{ConnectFutureAbortTest.java => ConnectFutureTest.java} (83%) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java similarity index 83% rename from jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java rename to jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java index 0d1812b6b9d9..610278f63935 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureAbortTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.servlet.DispatcherType; import org.eclipse.jetty.server.Server; @@ -56,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class ConnectFutureAbortTest +public class ConnectFutureTest { private Server server; private WebSocketClient client; @@ -98,8 +99,8 @@ public void stop() throws Exception @Test public void testAbortDuringCreator() throws Exception { - final CountDownLatch enteredCreator = new CountDownLatch(1); - final CountDownLatch exitCreator = new CountDownLatch(1); + CountDownLatch enteredCreator = new CountDownLatch(1); + CountDownLatch exitCreator = new CountDownLatch(1); start(c -> { c.addMapping("/", (req, res) -> @@ -125,10 +126,11 @@ public void testAbortDuringCreator() throws Exception assertTrue(connect.cancel(true)); assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); exitCreator.countDown(); - assertFalse(clientSocket.openLatch.await(2, TimeUnit.SECONDS)); + assertFalse(clientSocket.openLatch.await(1, TimeUnit.SECONDS)); - // Strange that onError() is called but onOpen() is never called. (This would not be the case in Jetty-10). - assertThat(clientSocket.error.get(), instanceOf(UpgradeException.class)); + Throwable error = clientSocket.error.get(); + assertThat(error, instanceOf(UpgradeException.class)); + assertThat(error.getCause(), instanceOf(CancellationException.class)); } @Test @@ -136,8 +138,8 @@ public void testAbortSessionOnCreated() throws Exception { start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); - final CountDownLatch enteredListener = new CountDownLatch(1); - final CountDownLatch exitListener = new CountDownLatch(1); + CountDownLatch enteredListener = new CountDownLatch(1); + CountDownLatch exitListener = new CountDownLatch(1); client.addSessionListener(new WebSocketSessionListener() { @Override @@ -163,9 +165,7 @@ public void onSessionCreated(WebSocketSession session) assertTrue(connect.cancel(true)); assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); exitListener.countDown(); - assertFalse(clientSocket.openLatch.await(2, TimeUnit.SECONDS)); - - // Strange that onError() is called but onOpen() is never called. (This would not be the case in Jetty-10). + assertFalse(clientSocket.openLatch.await(1, TimeUnit.SECONDS)); assertThat(clientSocket.error.get(), instanceOf(CancellationException.class)); } @@ -174,8 +174,8 @@ public void testAbortInHandshakeResponse() throws Exception { start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); - final CountDownLatch enteredListener = new CountDownLatch(1); - final CountDownLatch exitListener = new CountDownLatch(1); + CountDownLatch enteredListener = new CountDownLatch(1); + CountDownLatch exitListener = new CountDownLatch(1); UpgradeListener upgradeListener = new AbstractUpgradeListener() { @Override @@ -202,9 +202,7 @@ public void onHandshakeResponse(UpgradeResponse response) assertTrue(connect.cancel(true)); assertThrows(CancellationException.class, () -> connect.get(5, TimeUnit.SECONDS)); exitListener.countDown(); - assertFalse(clientSocket.openLatch.await(2, TimeUnit.SECONDS)); - - // Strange that onError() is called but onOpen() is never called. (This would not be the case in Jetty-10). + assertFalse(clientSocket.openLatch.await(1, TimeUnit.SECONDS)); assertThat(clientSocket.error.get(), instanceOf(CancellationException.class)); } @@ -213,7 +211,7 @@ public void testAbortOnOpened() throws Exception { start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); - final CountDownLatch exitOnOpen = new CountDownLatch(1); + CountDownLatch exitOnOpen = new CountDownLatch(1); CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint() { @Override @@ -260,7 +258,7 @@ public void testAbortAfterCompletion() throws Exception // After it has been completed we should not get any errors from cancelling it. assertFalse(connect.cancel(true)); assertThat(connect.get(5, TimeUnit.SECONDS), instanceOf(Session.class)); - assertFalse(clientSocket.closeLatch.await(2, TimeUnit.SECONDS)); + assertFalse(clientSocket.closeLatch.await(1, TimeUnit.SECONDS)); assertNull(clientSocket.error.get()); // Close the session properly. @@ -269,6 +267,38 @@ public void testAbortAfterCompletion() throws Exception assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); } + @Test + public void testFutureTimeout() throws Exception + { + CountDownLatch exitCreator = new CountDownLatch(1); + start(c -> + { + c.addMapping("/", (req, res) -> + { + try + { + exitCreator.await(); + return new EventSocket.EchoSocket(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + }); + }); + + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + assertThrows(TimeoutException.class, () -> connect.get(1, TimeUnit.SECONDS)); + exitCreator.countDown(); + Session session = connect.get(5, TimeUnit.SECONDS); + + // Close the session properly. + session.close(); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); + } + public abstract static class AbstractUpgradeListener implements UpgradeListener { @Override From 25a7da263547ab5d7231d94b57b5800318b05d62 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 14 Jul 2020 21:47:31 +1000 Subject: [PATCH 4/6] add test for abort with CompletableFuture.completeExceptionally() Signed-off-by: Lachlan Roberts --- .../tests/client/ConnectFutureTest.java | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java index 610278f63935..12d75c29b14b 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java @@ -20,7 +20,9 @@ import java.util.EnumSet; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,6 +36,7 @@ import org.eclipse.jetty.websocket.api.UpgradeException; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; +import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.util.WSURI; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; @@ -44,7 +47,7 @@ import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; -import org.eclipse.jetty.websocket.tests.EventSocket; +import org.eclipse.jetty.websocket.tests.EchoSocket; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -109,7 +112,7 @@ public void testAbortDuringCreator() throws Exception { enteredCreator.countDown(); exitCreator.await(); - return new EventSocket.EchoSocket(); + return new EchoSocket(); } catch (InterruptedException e) { @@ -136,7 +139,7 @@ public void testAbortDuringCreator() throws Exception @Test public void testAbortSessionOnCreated() throws Exception { - start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + start(c -> c.addMapping("/", EchoSocket.class)); CountDownLatch enteredListener = new CountDownLatch(1); CountDownLatch exitListener = new CountDownLatch(1); @@ -172,7 +175,7 @@ public void onSessionCreated(WebSocketSession session) @Test public void testAbortInHandshakeResponse() throws Exception { - start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + start(c -> c.addMapping("/", EchoSocket.class)); CountDownLatch enteredListener = new CountDownLatch(1); CountDownLatch exitListener = new CountDownLatch(1); @@ -209,7 +212,7 @@ public void onHandshakeResponse(UpgradeResponse response) @Test public void testAbortOnOpened() throws Exception { - start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + start(c -> c.addMapping("/", EchoSocket.class)); CountDownLatch exitOnOpen = new CountDownLatch(1); CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint() @@ -244,7 +247,7 @@ public void onWebSocketConnect(Session session) @Test public void testAbortAfterCompletion() throws Exception { - start(c -> c.addMapping("/", EventSocket.EchoSocket.class)); + start(c -> c.addMapping("/", EchoSocket.class)); CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); @@ -278,7 +281,7 @@ public void testFutureTimeout() throws Exception try { exitCreator.await(); - return new EventSocket.EchoSocket(); + return new EchoSocket(); } catch (InterruptedException e) { @@ -299,6 +302,48 @@ public void testFutureTimeout() throws Exception assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); } + @Test + public void testAbortWithException() throws Exception + { + start(c -> c.addMapping("/", EchoSocket.class)); + CountDownLatch exitOnOpen = new CountDownLatch(1); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint() + { + @Override + public void onWebSocketConnect(Session session) + { + try + { + super.onWebSocketConnect(session); + exitOnOpen.await(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + } + }; + + // Complete the CompletableFuture with an exception the during the call to onOpened. + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); + CompletableFuture completableFuture = (CompletableFuture)connect; + assertTrue(completableFuture.completeExceptionally(new WebSocketException("custom exception"))); + exitOnOpen.countDown(); + + // Exception from the future is correct. + ExecutionException futureError = assertThrows(ExecutionException.class, () -> connect.get(5, TimeUnit.SECONDS)); + Throwable cause = futureError.getCause(); + assertThat(cause, instanceOf(WebSocketException.class)); + assertThat(cause.getMessage(), is("custom exception")); + + // Exception from the endpoint is correct. + assertTrue(clientSocket.errorLatch.await(5, TimeUnit.SECONDS)); + Throwable endpointError = clientSocket.error.get(); + assertThat(endpointError, instanceOf(WebSocketException.class)); + assertThat(endpointError.getMessage(), is("custom exception")); + } + public abstract static class AbstractUpgradeListener implements UpgradeListener { @Override From f9750c9632553f056563dbd386bd0d2a938ab145 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 15 Jul 2020 19:21:01 +1000 Subject: [PATCH 5/6] Test aborting with exception before WS upgrade completes. Signed-off-by: Lachlan Roberts --- .../tests/client/ConnectFutureTest.java | 44 ++++++++++++++++++- .../client/WebSocketUpgradeRequest.java | 3 +- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java index 12d75c29b14b..f403619d2371 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java @@ -303,7 +303,49 @@ public void testFutureTimeout() throws Exception } @Test - public void testAbortWithException() throws Exception + public void testAbortWithExceptionBeforeUpgrade() throws Exception + { + CountDownLatch exitCreator = new CountDownLatch(1); + start(c -> + { + c.addMapping("/", (req, res) -> + { + try + { + exitCreator.await(); + return new EchoSocket(); + } + catch (InterruptedException e) + { + throw new IllegalStateException(e); + } + }); + }); + + // Complete the CompletableFuture with an exception the during the call to onOpened. + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI())); + CompletableFuture completableFuture = (CompletableFuture)connect; + assertTrue(completableFuture.completeExceptionally(new WebSocketException("custom exception"))); + exitCreator.countDown(); + + // Exception from the future is correct. + ExecutionException futureError = assertThrows(ExecutionException.class, () -> connect.get(5, TimeUnit.SECONDS)); + Throwable cause = futureError.getCause(); + assertThat(cause, instanceOf(WebSocketException.class)); + assertThat(cause.getMessage(), is("custom exception")); + + // Exception from the endpoint is correct. + assertTrue(clientSocket.errorLatch.await(5, TimeUnit.SECONDS)); + Throwable endpointError = clientSocket.error.get(); + assertThat(endpointError, instanceOf(UpgradeException.class)); + Throwable endpointErrorCause = endpointError.getCause(); + assertThat(endpointError, instanceOf(WebSocketException.class)); + assertThat(endpointErrorCause.getMessage(), is("custom exception")); + } + + @Test + public void testAbortWithExceptionAfterUpgrade() throws Exception { start(c -> c.addMapping("/", EchoSocket.class)); CountDownLatch exitOnOpen = new CountDownLatch(1); diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java index 4154c5833352..5c4963fe2331 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -425,7 +424,7 @@ public WebSocketUpgradeRequest(WebSocketClient wsClient, HttpClient httpClient, this.fut = new CompletableFuture<>(); this.fut.whenComplete((session, throwable) -> { - if (throwable instanceof CancellationException) + if (throwable != null) abort(throwable); }); From 860cba78775c99f98d1ef5aae77ad75a0452d343 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 16 Jul 2020 07:45:16 +1000 Subject: [PATCH 6/6] Use Consumer in ConnectFutureTest Signed-off-by: Lachlan Roberts --- .../websocket/tests/client/ConnectFutureTest.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java index f403619d2371..d9bdbeff6f79 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ConnectFutureTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import javax.servlet.DispatcherType; import org.eclipse.jetty.server.Server; @@ -65,13 +66,7 @@ public class ConnectFutureTest private Server server; private WebSocketClient client; - @FunctionalInterface - public interface Configuration - { - void configure(NativeWebSocketConfiguration configuration); - } - - public void start(Configuration configuration) throws Exception + public void start(Consumer configuration) throws Exception { server = new Server(); ServerConnector connector = new ServerConnector(server); @@ -82,7 +77,7 @@ public void start(Configuration configuration) throws Exception server.setHandler(contextHandler); NativeWebSocketServletContainerInitializer.configure(contextHandler, (context, container) -> - configuration.configure(container)); + configuration.accept(container)); contextHandler.addFilter(WebSocketUpgradeFilter.class, "/", EnumSet.of(DispatcherType.REQUEST)); server.start();