Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #4919 - WebSocket container graceful stop #4931

Merged
merged 11 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,54 @@ static CompletableFuture<Void> shutdown(Container component)

return CompletableFuture.allOf(gracefuls.stream().map(Graceful::shutdown).toArray(CompletableFuture[]::new));
}

/**
* Utility method to execute a {@link ThrowingRunnable} in a new daemon thread and
* be notified of the result in a {@link CompletableFuture}.
* @param runnable the ThrowingRunnable to run.
* @return the CompletableFuture to be notified when the runnable either completes or fails.
*/
static CompletableFuture<Void> shutdown(ThrowingRunnable runnable)
{
AtomicReference<Thread> stopThreadReference = new AtomicReference<>();
CompletableFuture<Void> shutdown = new CompletableFuture<>()
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled && mayInterruptIfRunning)
{
Thread thread = stopThreadReference.get();
if (thread != null)
thread.interrupt();
}

return canceled;
}
};

Thread stopThread = new Thread(() ->
{
try
{
runnable.run();
shutdown.complete(null);
}
catch (Throwable t)
{
shutdown.completeExceptionally(t);
}
});
stopThread.setDaemon(true);
stopThreadReference.set(stopThread);
stopThread.start();
return shutdown;
}

@FunctionalInterface
interface ThrowingRunnable
{
void run() throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(JavaxWebSocketContainer.class);
private final SessionTracker sessionTracker = new SessionTracker();
private final List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
protected final SessionTracker sessionTracker = new SessionTracker();
protected final Configuration.ConfigurationCustomizer defaultCustomizer = new Configuration.ConfigurationCustomizer();
protected final WebSocketComponents components;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@

package org.eclipse.jetty.websocket.javax.common;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.CloseReason;
import javax.websocket.Session;

import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.component.Graceful;

public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener, Graceful
{
private static final Logger LOG = LoggerFactory.getLogger(SessionTracker.class);

private final CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
private boolean isShutdown = false;

public Set<Session> getSessions()
{
Expand All @@ -52,22 +50,40 @@ public void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session)
sessions.remove(session);
}

@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}

@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
sessions.clear();
super.doStop();
}

@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
try
for (Session session : sessions)
{
if (Thread.interrupted())
break;

// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
}
catch (IOException e)
{
LOG.trace("IGNORED", e);
}
}
});
}

super.doStop();
@Override
public boolean isShutdown()
{
return isShutdown;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.javax.tests;

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.websocket.ClientEndpoint;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/")
@ClientEndpoint
public class EchoSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendText(message);
}

@Override
public void onMessage(ByteBuffer message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendBinary(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,4 @@ public void onError(Throwable cause)
error = cause;
errorLatch.countDown();
}

@ServerEndpoint("/")
@ClientEndpoint
public static class EchoSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendText(message);
}

@Override
public void onMessage(ByteBuffer message) throws IOException
{
super.onMessage(message);
session.getBasicRemote().sendBinary(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.javax.tests;

import java.net.URI;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.javax.client.internal.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class GracefulCloseTest
{
private static final BlockingArrayQueue<EventSocket> serverEndpoints = new BlockingArrayQueue<>();
private Server server;
private URI serverUri;
private JavaxWebSocketClientContainer client;

@BeforeEach
public void before() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);

ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
JavaxWebSocketServletContainerInitializer.configure(contextHandler, (context, container) ->
container.addEndpoint(ServerSocket.class));
server.start();
serverUri = WSURI.toWebsocket(server.getURI());

// StopTimeout is necessary for the websocket server sessions to gracefully close.
server.setStopTimeout(1000);

client = new JavaxWebSocketClientContainer();
client.start();
}

@AfterEach
public void after() throws Exception
{
client.stop();
server.stop();
}

@ServerEndpoint("/")
public static class ServerSocket extends EchoSocket
{
@Override
public void onOpen(Session session, EndpointConfig endpointConfig)
{
serverEndpoints.add(this);
super.onOpen(session, endpointConfig);
}
}

@Test
public void testClientStop() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
client.connectToServer(clientEndpoint, serverUri);
EventSocket serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));

// There is no API for a Javax WebSocketContainer stop timeout.
Graceful.shutdown(client).get(5, TimeUnit.SECONDS);
client.stop();

// Check that the client endpoint was closed with the correct status code and no error.
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(clientEndpoint.error);

// Check that the server endpoint was closed with the correct status code and no error.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(serverEndpoint.error);
}

@Test
public void testServerStop() throws Exception
{
EventSocket clientEndpoint = new EventSocket();
client.connectToServer(clientEndpoint, serverUri);
EventSocket serverEndpoint = Objects.requireNonNull(serverEndpoints.poll(5, TimeUnit.SECONDS));

server.stop();

// Check that the client endpoint was closed with the correct status code and no error.
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(clientEndpoint.error);

// Check that the server endpoint was closed with the correct status code and no error.
assertTrue(serverEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(serverEndpoint.closeReason.getCloseCode(), is(CloseReason.CloseCodes.GOING_AWAY));
assertNull(serverEndpoint.error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.encoders.AvailableEncoders;
import org.eclipse.jetty.websocket.javax.server.config.JavaxWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.javax.tests.EventSocket.EchoSocket;
import org.eclipse.jetty.websocket.javax.tests.EchoSocket;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down
Loading