Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #8007 - Support Loom. #8035

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());

HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener, isUseVirtualThreadToInvokeRootHandler());
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
connection.addEventListener(sessionContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.AutoLock;

public class HTTP2ServerConnection extends HTTP2Connection
Expand Down Expand Up @@ -84,13 +85,15 @@ public static boolean isSupportedProtocol(String protocol)
private final List<Frame> upgradeFrames = new ArrayList<>();
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
private final boolean useVirtualThreadToInvokeRootHandler;
private boolean recycleHttpChannels = true;

public HTTP2ServerConnection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener, boolean useVirtualThreadToInvokeRootHandler)
{
super(retainableByteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.httpConfig = httpConfig;
this.useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler;
}

@Override
Expand All @@ -99,6 +102,11 @@ protected ServerParser getParser()
return (ServerParser)super.getParser();
}

public boolean isUseVirtualThreadToInvokeRootHandler()
{
return useVirtualThreadToInvokeRootHandler;
}

public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;
Expand Down Expand Up @@ -141,7 +149,14 @@ public void onNewStream(Connector connector, IStream stream, HeadersFrame frame)
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onRequest(frame);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, false);
}
}

public void onData(IStream stream, DataFrame frame, Callback callback)
Expand All @@ -153,7 +168,14 @@ public void onData(IStream stream, DataFrame frame, Callback callback)
{
Runnable task = channel.onData(frame, callback);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, false);
}
}
else
{
Expand All @@ -170,7 +192,14 @@ public void onTrailers(IStream stream, HeadersFrame frame)
{
Runnable task = channel.onTrailer(frame);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, false);
}
}
}

Expand All @@ -193,6 +222,11 @@ public void onStreamFailure(IStream stream, Throwable failure, Callback callback
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
Expand Down Expand Up @@ -234,7 +268,14 @@ public void push(Connector connector, IStream stream, MetaData.Request request)
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onPushRequest(request);
if (task != null)
{
if (isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
offerTask(task, true);
}
}

private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
long streamId = streamEndPoint.getStreamId();
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(http3Session.getSessionServer(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
ServerHTTP3StreamConnection connection = new ServerHTTP3StreamConnection(connector, getHttpConfiguration(), streamEndPoint, http3Session, parser);
ServerHTTP3StreamConnection connection = new ServerHTTP3StreamConnection(connector, getHttpConfiguration(), streamEndPoint, http3Session, parser, isUseVirtualThreadToInvokeRootHandler());
return configure(connection, connector, endPoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.VirtualThreads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,9 +106,15 @@ private ServerHTTP3StreamConnection getConnection()
public void onRequest(Stream stream, HeadersFrame frame)
{
HTTP3StreamServer http3Stream = (HTTP3StreamServer)stream;
Runnable task = getConnection().onRequest(http3Stream, frame);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onRequest(http3Stream, frame);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, false);
}
Expand All @@ -117,9 +124,15 @@ public void onRequest(Stream stream, HeadersFrame frame)
public void onDataAvailable(Stream.Server stream)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onDataAvailable(http3Stream);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onDataAvailable(http3Stream);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, false);
}
Expand All @@ -129,9 +142,15 @@ public void onDataAvailable(Stream.Server stream)
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onTrailer(http3Stream, frame);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onTrailer(http3Stream, frame);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, false);
}
Expand All @@ -155,9 +174,15 @@ public boolean onIdleTimeout(Stream.Server stream, Throwable failure)
public void onFailure(Stream.Server stream, long error, Throwable failure)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);
ServerHTTP3StreamConnection connection = getConnection();
Runnable task = connection.onFailure((HTTP3Stream)stream, failure);
if (task != null)
{
if (connection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
protocolSession.offer(task, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
private final Connector connector;
private final HttpConfiguration httpConfiguration;
private final ServerHTTP3Session session;
private final boolean useVirtualThreadToInvokeRootHandler;

public ServerHTTP3StreamConnection(Connector connector, HttpConfiguration httpConfiguration, QuicStreamEndPoint endPoint, ServerHTTP3Session session, MessageParser parser)
public ServerHTTP3StreamConnection(Connector connector, HttpConfiguration httpConfiguration, QuicStreamEndPoint endPoint, ServerHTTP3Session session, MessageParser parser, boolean useVirtualThreadToInvokeRootHandler)
{
super(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
this.connector = connector;
this.httpConfiguration = httpConfiguration;
this.session = session;
this.useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler;
}

public boolean isUseVirtualThreadToInvokeRootHandler()
{
return useVirtualThreadToInvokeRootHandler;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public abstract class AbstractConnection implements Connection
private final Callback _readCallback;
private int _inputBufferSize = 2048;

protected AbstractConnection(EndPoint endp, Executor executor)
protected AbstractConnection(EndPoint endPoint, Executor executor)
{
if (executor == null)
throw new IllegalArgumentException("Executor must not be null!");
_endPoint = endp;
_endPoint = endPoint;
_executor = executor;
_readCallback = new ReadCallback();
}
Expand Down Expand Up @@ -135,11 +135,6 @@ public void fillInterested()
getEndPoint().fillInterested(_readCallback);
}

public void tryFillInterested()
{
tryFillInterested(_readCallback);
}

public void tryFillInterested(Callback callback)
{
getEndPoint().tryFillInterested(callback);
Expand Down Expand Up @@ -320,15 +315,15 @@ public void succeeded()
}

@Override
public void failed(final Throwable x)
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public String toString()
{
return String.format("AC.ReadCB@%h{%s}", AbstractConnection.this, AbstractConnection.this);
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@

package org.eclipse.jetty.server;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayUtil;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
Expand All @@ -34,18 +33,19 @@ public abstract class AbstractConnectionFactory extends ContainerLifeCycle imple
{
private final String _protocol;
private final List<String> _protocols;
private int _inputbufferSize = 8192;
private int _inputBufferSize = 8192;
private boolean _useVirtualThreadToInvokeRootHandler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think _useVirtualThreads is sufficient for the name. Make it clear in the javadoc that we use virtual threads where appropriate to do so, specifically invoking root handler.


protected AbstractConnectionFactory(String protocol)
{
_protocol = protocol;
_protocols = Collections.unmodifiableList(Arrays.asList(new String[]{protocol}));
_protocols = List.of(protocol);
}

protected AbstractConnectionFactory(String... protocols)
{
_protocol = protocols[0];
_protocols = Collections.unmodifiableList(Arrays.asList(protocols));
_protocols = List.of(protocols);
}

@Override
Expand All @@ -64,12 +64,25 @@ public List<String> getProtocols()
@ManagedAttribute("The buffer size used to read from the network")
public int getInputBufferSize()
{
return _inputbufferSize;
return _inputBufferSize;
}

public void setInputBufferSize(int size)
{
_inputbufferSize = size;
_inputBufferSize = size;
}

public boolean isUseVirtualThreadToInvokeRootHandler()
{
return _useVirtualThreadToInvokeRootHandler;
}

public void setUseVirtualThreadToInvokeRootHandler(boolean useVirtualThreadToInvokeRootHandler)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
if (useVirtualThreadToInvokeRootHandler && !VirtualThreads.areSupported())
VirtualThreads.warn();
else
_useVirtualThreadToInvokeRootHandler = useVirtualThreadToInvokeRootHandler;
}

protected String findNextProtocol(Connector connector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.VirtualThreads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,6 +75,17 @@ public HttpChannelOverHttp(HttpConnection httpConnection, Connector connector, H
_httpConnection = httpConnection;
}

@Override
protected void execute(Runnable task)
{
if (_httpConnection.isUseVirtualThreadToInvokeRootHandler())
{
if (VirtualThreads.startVirtualThread(task))
return;
}
super.execute(task);
}
Comment on lines +79 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this using virtual threads too much? It is certainly using them more than the claim in the current name of "InvokeRootHandler".


@Override
public void abort(Throwable failure)
{
Expand Down
Loading