Skip to content

Commit

Permalink
Issue #4919 - make the SessionTracker the one to implement Graceful s…
Browse files Browse the repository at this point in the history
…hutdown

Signed-off-by: Lachlan Roberts <[email protected]>
  • Loading branch information
lachlan-roberts committed Jul 28, 2020
1 parent 9e383f0 commit 1be0220
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,54 @@ static CompletableFuture<Void> shutdown(Container component)

return CompletableFuture.allOf(gracefuls.stream().map(Graceful::shutdown).toArray(CompletableFuture[]::new));
}

/**
* Utility method to execute a {@link ThrowingRunnable} in a new daemon thread and
* be notified of the result in a {@link CompletableFuture}.
* @param runnable the ThrowingRunnable to run.
* @return the CompletableFuture to be notified when the runnable either completes or fails.
*/
static CompletableFuture<Void> shutdown(ThrowingRunnable runnable)
{
AtomicReference<Thread> stopThreadReference = new AtomicReference<>();
CompletableFuture<Void> shutdown = new CompletableFuture<>()
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled && mayInterruptIfRunning)
{
Thread thread = stopThreadReference.get();
if (thread != null)
thread.interrupt();
}

return canceled;
}
};

Thread stopThread = new Thread(() ->
{
try
{
runnable.run();
shutdown.complete(null);
}
catch (Throwable t)
{
shutdown.completeExceptionally(t);
}
});
stopThread.setDaemon(true);
stopThreadReference.set(stopThread);
stopThread.start();
return shutdown;
}

@FunctionalInterface
interface ThrowingRunnable
{
void run() throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.websocket.Extension;
Expand All @@ -32,15 +31,13 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer, Graceful
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(JavaxWebSocketContainer.class);
private final List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
Expand All @@ -52,6 +49,7 @@ public JavaxWebSocketContainer(WebSocketComponents components)
{
this.components = components;
addSessionListener(sessionTracker);
addBean(sessionTracker);
}

public abstract Executor getExecutor();
Expand Down Expand Up @@ -200,23 +198,4 @@ public void notifySessionListeners(Consumer<JavaxWebSocketSessionListener> consu
}
}
}

@Override
protected void doStart() throws Exception
{
sessionTracker.start();
super.doStart();
}

@Override
public CompletableFuture<Void> shutdown()
{
return ShutdownUtil.stop(sessionTracker);
}

@Override
public boolean isShutdown()
{
return sessionTracker.isStopped();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@

package org.eclipse.jetty.websocket.javax.common;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.CloseReason;
import javax.websocket.Session;

import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.component.Graceful;

public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener, Graceful
{
private static final Logger LOG = LoggerFactory.getLogger(SessionTracker.class);

private final CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
private boolean isShutdown = false;

public Set<Session> getSessions()
{
Expand All @@ -52,25 +50,40 @@ public void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session)
sessions.remove(session);
}

@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}

@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;
sessions.clear();
super.doStop();
}

try
@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
for (Session session : sessions)
{
if (Thread.interrupted())
break;

// GOING_AWAY is abnormal close status so it will hard close connection after sent.
session.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "Container being shut down"));
}
catch (IOException e)
{
LOG.trace("IGNORED", e);
}
}
});
}

super.doStop();
@Override
public boolean isShutdown()
{
return isShutdown;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.client.UpgradeListener;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer, Graceful
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(WebSocketClient.class);
private final WebSocketCoreClient coreClient;
Expand Down Expand Up @@ -96,6 +95,7 @@ public WebSocketClient(HttpClient httpClient)

frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this);
sessionListeners.add(sessionTracker);
addBean(sessionTracker);
}

public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
Expand Down Expand Up @@ -410,13 +410,6 @@ public boolean isStopAtShutdown()
return stopAtShutdown;
}

@Override
protected void doStart() throws Exception
{
sessionTracker.start();
super.doStart();
}

@Override
protected void doStop() throws Exception
{
Expand All @@ -425,18 +418,6 @@ protected void doStop() throws Exception
super.doStop();
}

@Override
public CompletableFuture<Void> shutdown()
{
return ShutdownUtil.stop(sessionTracker);
}

@Override
public boolean isShutdown()
{
return sessionTracker.isStopped();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;

import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketSessionListener;

public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener
public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener, Graceful
{
private final List<Session> sessions = new CopyOnWriteArrayList<>();
private boolean isShutdown = false;

public Collection<Session> getSessions()
{
Expand All @@ -48,18 +51,40 @@ public void onWebSocketSessionClosed(Session session)
sessions.remove(session);
}

@Override
protected void doStart() throws Exception
{
isShutdown = false;
super.doStart();
}

@Override
protected void doStop() throws Exception
{
for (Session session : sessions)
sessions.clear();
super.doStop();
}

@Override
public CompletableFuture<Void> shutdown()
{
isShutdown = true;
return Graceful.shutdown(() ->
{
if (Thread.interrupted())
break;
for (Session session : sessions)
{
if (Thread.interrupted())
break;

// SHUTDOWN is abnormal close status so it will hard close connection after sent.
session.close(StatusCode.SHUTDOWN, "Container being shut down");
}
// SHUTDOWN is abnormal close status so it will hard close connection after sent.
session.close(StatusCode.SHUTDOWN, "Container being shut down");
}
});
}

super.doStop();
@Override
public boolean isShutdown()
{
return isShutdown;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.servlet.ServletContext;

import org.eclipse.jetty.http.pathmap.PathSpec;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
Expand All @@ -45,13 +43,12 @@
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.internal.JettyServerFrameHandlerFactory;
import org.eclipse.jetty.websocket.util.ReflectUtils;
import org.eclipse.jetty.websocket.util.ShutdownUtil;
import org.eclipse.jetty.websocket.util.server.internal.FrameHandlerFactory;
import org.eclipse.jetty.websocket.util.server.internal.WebSocketMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JettyWebSocketServerContainer extends ContainerLifeCycle implements WebSocketContainer, WebSocketPolicy, LifeCycle.Listener, Graceful
public class JettyWebSocketServerContainer extends ContainerLifeCycle implements WebSocketContainer, WebSocketPolicy, LifeCycle.Listener
{
public static final String JETTY_WEBSOCKET_CONTAINER_ATTRIBUTE = WebSocketContainer.class.getName();

Expand Down Expand Up @@ -122,6 +119,7 @@ public static JettyWebSocketServerContainer ensureContainer(ServletContext servl
frameHandlerFactory = factory;

addSessionListener(sessionTracker);
addBean(sessionTracker);
}

public void addMapping(String pathSpec, JettyWebSocketCreator creator)
Expand Down Expand Up @@ -282,23 +280,4 @@ public void setAutoFragment(boolean autoFragment)
{
customizer.setAutoFragment(autoFragment);
}

@Override
protected void doStart() throws Exception
{
sessionTracker.start();
super.doStart();
}

@Override
public CompletableFuture<Void> shutdown()
{
return ShutdownUtil.stop(sessionTracker);
}

@Override
public boolean isShutdown()
{
return sessionTracker.isStopped();
}
}
Loading

0 comments on commit 1be0220

Please sign in to comment.