Skip to content

Commit

Permalink
Merge pull request #5031 from eclipse/jetty-10.0.x-5018-WebSocketClie…
Browse files Browse the repository at this point in the history
…ntRequestTimeout

Issue #5018 - add WebSocketClient UpgradeRequest timeout to jetty 10
  • Loading branch information
lachlan-roberts authored Jul 15, 2020
2 parents f0f153c + 145dcf5 commit fc473f3
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
Expand All @@ -39,19 +39,19 @@
*/
public final class ClientUpgradeRequest implements UpgradeRequest
{
private URI requestURI;
private List<String> subProtocols = new ArrayList<>(1);
private List<ExtensionConfig> extensions = new ArrayList<>(1);
private List<HttpCookie> cookies = new ArrayList<>(1);
private Map<String, List<String>> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private Map<String, List<String>> parameters = new HashMap<>(1);
private String httpVersion;
private String method;
private String host;
private final List<String> subProtocols = new ArrayList<>(1);
private final List<ExtensionConfig> extensions = new ArrayList<>(1);
private final List<HttpCookie> cookies = new ArrayList<>(1);
private final Map<String, List<String>> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private final URI requestURI;
private final String host;
private long timeout;

public ClientUpgradeRequest()
{
/* anonymous, no requestURI, upgrade request */
this.requestURI = null;
this.host = null;
}

public ClientUpgradeRequest(URI uri)
Expand Down Expand Up @@ -161,13 +161,13 @@ public String getHost()
@Override
public String getHttpVersion()
{
return httpVersion;
throw new UnsupportedOperationException("HttpVersion not available on ClientUpgradeRequest");
}

@Override
public String getMethod()
{
return method;
throw new UnsupportedOperationException("Method not available on ClientUpgradeRequest");
}

@Override
Expand All @@ -176,15 +176,10 @@ public String getOrigin()
return getHeader(HttpHeader.ORIGIN.name());
}

/**
* Returns a map of the query parameters of the request.
*
* @return a unmodifiable map of query parameters of the request.
*/
@Override
public Map<String, List<String>> getParameterMap()
{
return Collections.unmodifiableMap(parameters);
return Collections.emptyMap();
}

@Override
Expand Down Expand Up @@ -297,6 +292,25 @@ public void setSession(Object session)
throw new UnsupportedOperationException("HttpSession not available on Client request");
}

/**
* @param timeout the total timeout for the request/response conversation of the WebSocket handshake;
* use zero or a negative value to disable the timeout
* @param unit the timeout unit
*/
public void setTimeout(long timeout, TimeUnit unit)
{
this.timeout = unit.toMillis(timeout);
}

/**
* @return the total timeout for this request, in milliseconds;
* zero or negative if the timeout is disabled
*/
public long getTimeout()
{
return timeout;
}

/**
* ABNF from RFC 2616, RFC 822, and RFC 6455 specified characters requiring quoting.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketContainer;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
Expand Down Expand Up @@ -109,7 +108,7 @@ public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IO
* @return the future for the session, available on success of connect
* @throws IOException if unable to connect
*/
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request) throws IOException
public CompletableFuture<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
{
return connect(websocket, toUri, request, null);
}
Expand All @@ -124,14 +123,15 @@ public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRe
* @return the future for the session, available on success of connect
* @throws IOException if unable to connect
*/
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request, JettyUpgradeListener upgradeListener) throws IOException
public CompletableFuture<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request, JettyUpgradeListener upgradeListener) throws IOException
{
for (Connection.Listener listener : getBeans(Connection.Listener.class))
{
coreClient.addBean(listener);
}

JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(coreClient, request, toUri, frameHandlerFactory, websocket);
upgradeRequest.setConfiguration(configurationCustomizer);
if (upgradeListener != null)
{
upgradeRequest.addListener(new UpgradeListener()
Expand All @@ -149,9 +149,8 @@ public void onHandshakeResponse(HttpRequest request, HttpResponse response)
}
});
}
upgradeRequest.setConfiguration(configurationCustomizer);
CompletableFuture<Session> futureSession = new CompletableFuture<>();

CompletableFuture<Session> futureSession = new CompletableFuture<>();
coreClient.connect(upgradeRequest).whenComplete((coreSession, error) ->
{
if (error != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,26 @@

package org.eclipse.jetty.websocket.client.impl;

import java.net.HttpCookie;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;

public class JettyClientUpgradeRequest extends ClientUpgradeRequest
public class JettyClientUpgradeRequest extends org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest
{
private final DelegatedJettyClientUpgradeRequest handshakeRequest;
private final JettyWebSocketFrameHandler frameHandler;

public JettyClientUpgradeRequest(WebSocketCoreClient coreClient, UpgradeRequest request, URI requestURI, JettyWebSocketFrameHandlerFactory frameHandlerFactory,
public JettyClientUpgradeRequest(WebSocketCoreClient coreClient, ClientUpgradeRequest request, URI requestURI, JettyWebSocketFrameHandlerFactory frameHandlerFactory,
Object websocketPojo)
{
super(coreClient, requestURI);
Expand All @@ -51,28 +48,14 @@ public JettyClientUpgradeRequest(WebSocketCoreClient coreClient, UpgradeRequest
headers(fields -> request.getHeaders().forEach(fields::put));

// Copy manually created Cookies into place
List<HttpCookie> cookies = request.getCookies();
if (cookies != null)
{
// TODO: remove existing Cookie header (if set)?
headers(fields -> cookies.forEach(cookie -> fields.add(HttpHeader.COOKIE, cookie.toString())));
}
headers(fields -> request.getCookies().forEach(cookie -> fields.add(HttpHeader.COOKIE, cookie.toString())));

// Copy sub-protocols
setSubProtocols(request.getSubProtocols());

// Copy extensions
setExtensions(request.getExtensions().stream()
.map(c -> new ExtensionConfig(c.getName(), c.getParameters()))
.collect(Collectors.toList()));

// Copy method from upgradeRequest object
if (request.getMethod() != null)
method(request.getMethod());

// Copy version from upgradeRequest object
if (request.getHttpVersion() != null)
version(HttpVersion.fromString(request.getHttpVersion()));
timeout(request.getTimeout(), TimeUnit.MILLISECONDS);
}

handshakeRequest = new DelegatedJettyClientUpgradeRequest(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.JettyUpgradeListener;
Expand Down Expand Up @@ -104,7 +103,7 @@ public void testJettyExtensionConfig() throws Exception
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/filterPath");
EventSocket socket = new EventSocket();

UpgradeRequest request = new ClientUpgradeRequest();
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.addExtensions(ExtensionConfig.parse("permessage-deflate"));

CountDownLatch correctResponseExtensions = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer;
Expand Down Expand Up @@ -81,7 +80,7 @@ public void testBadRequest() throws Exception
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/filterPath");
EventSocket socket = new EventSocket();

UpgradeRequest upgradeRequest = new ClientUpgradeRequest();
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.addExtensions("permessage-deflate;invalidParameter");

CompletableFuture<Session> connect = client.connect(socket, uri, upgradeRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.jetty.util.security.Constraint;
import org.eclipse.jetty.util.security.Credential;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
Expand Down Expand Up @@ -139,7 +138,7 @@ public void testAdvancedEchoServlet() throws Exception
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/advancedEcho");
EventSocket socket = new EventSocket();

UpgradeRequest upgradeRequest = new ClientUpgradeRequest();
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.setSubProtocols("text");
CompletableFuture<Session> connect = client.connect(socket, uri, upgradeRequest);
try (Session session = connect.get(5, TimeUnit.SECONDS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.jetty.websocket.tests.EventSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -109,14 +108,13 @@ public void testWebSocketClientTimeout() throws Exception
assertThat(coreUpgradeException.getCause(), instanceOf(TimeoutException.class));
}

@Disabled("need the client timeout to be ported from 9.4 to 10")
@Test
public void testClientUpgradeRequestTimeout() throws Exception
{
EventSocket clientSocket = new EventSocket();
long timeout = 1000;
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
// TODO: upgradeRequest.setTimeout(timeout, TimeUnit.MILLISECONDS);
upgradeRequest.setTimeout(timeout, TimeUnit.MILLISECONDS);
Future<Session> connect = client.connect(clientSocket, WSURI.toWebsocket(server.getURI()), upgradeRequest);

ExecutionException executionException = assertThrows(ExecutionException.class, () -> connect.get(timeout * 2, TimeUnit.MILLISECONDS));
Expand Down

0 comments on commit fc473f3

Please sign in to comment.