Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Virtual Threads in Executor Services #5648

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -88,6 +87,7 @@
import org.glassfish.jersey.client.innate.http.SSLParamConfigurator;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;

Expand Down Expand Up @@ -129,14 +129,15 @@ class NettyConnector implements Connector {

NettyConnector(Client client) {

final Map<String, Object> properties = client.getConfiguration().getProperties();
final Configuration configuration = client.getConfiguration();
final Map<String, Object> properties = configuration.getProperties();
final Object threadPoolSize = properties.get(ClientProperties.ASYNC_THREADPOOL_SIZE);

if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
executorService = Executors.newFixedThreadPool((Integer) threadPoolSize);
executorService = VirtualThreadUtil.withConfig(configuration).newFixedThreadPool((Integer) threadPoolSize);
this.group = new NioEventLoopGroup((Integer) threadPoolSize);
} else {
executorService = Executors.newCachedThreadPool();
executorService = VirtualThreadUtil.withConfig(configuration).newCachedThreadPool();
this.group = new NioEventLoopGroup();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,8 +20,10 @@
import java.net.URI;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.Configuration;

import org.glassfish.jersey.grizzly2.httpserver.internal.LocalizationMessages;
import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
import org.glassfish.jersey.server.ApplicationHandler;
Expand Down Expand Up @@ -281,10 +283,12 @@ public static HttpServer createHttpServer(final URI uri,
: uri.getPort();

final NetworkListener listener = new NetworkListener("grizzly", host, port);
final Configuration configuration = handler != null ? handler.getConfiguration().getConfiguration() : null;

listener.getTransport().getWorkerThreadPoolConfig().setThreadFactory(new ThreadFactoryBuilder()
.setNameFormat("grizzly-http-server-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.setThreadFactory(VirtualThreadUtil.withConfig(configuration).getThreadFactory())
.build());

listener.setSecure(secure);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -23,6 +23,7 @@
import javax.servlet.Servlet;

import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.glassfish.jersey.uri.UriComponent;

Expand Down Expand Up @@ -251,11 +252,13 @@ private static HttpServer create(URI u, Class<? extends Servlet> c, Servlet serv
}
}

ResourceConfig configuration = new ResourceConfig();
if (initParams != null) {
registration.setInitParameters(initParams);
configuration.addProperties((Map) initParams);
}

HttpServer server = GrizzlyHttpServerFactory.createHttpServer(u);
HttpServer server = GrizzlyHttpServerFactory.createHttpServer(u, configuration);
context.deploy(server);
return server;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,7 +20,9 @@
import java.util.concurrent.ThreadFactory;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.Configuration;

import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.glassfish.jersey.jetty.internal.LocalizationMessages;
import org.glassfish.jersey.process.JerseyProcessingUncaughtExceptionHandler;
Expand Down Expand Up @@ -253,7 +255,8 @@ public static Server createServer(final URI uri,
}
final int port = (uri.getPort() == -1) ? defaultPort : uri.getPort();

final Server server = new Server(new JettyConnectorThreadPool());
final Configuration configuration = handler != null ? handler.getConfiguration() : null;
final Server server = new Server(new JettyConnectorThreadPool(configuration));
final HttpConfiguration config = new HttpConfiguration();
if (sslContextFactory != null) {
config.setSecureScheme("https");
Expand Down Expand Up @@ -291,10 +294,15 @@ public static Server createServer(final URI uri,
//
// Keeping this for backwards compatibility for the time being
private static final class JettyConnectorThreadPool extends QueuedThreadPool {
private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("jetty-http-server-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.build();
private final ThreadFactory threadFactory;

private JettyConnectorThreadPool(Configuration configuration) {
this.threadFactory = new ThreadFactoryBuilder()
.setNameFormat("jetty-http-server-%d")
.setUncaughtExceptionHandler(new JerseyProcessingUncaughtExceptionHandler())
.setThreadFactory(VirtualThreadUtil.withConfig(configuration).getThreadFactory())
.build();
}

@Override
public Thread newThread(Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -32,11 +32,12 @@
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.model.internal.ComponentBag;
import org.glassfish.jersey.model.internal.ManagedObjectsFinalizer;
import org.glassfish.jersey.process.internal.AbstractExecutorProvidersConfigurator;
import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.glassfish.jersey.spi.ScheduledExecutorServiceProvider;

import javax.ws.rs.core.Configuration;

/**
* Configurator which initializes and register {@link ExecutorServiceProvider} and
* {@link ScheduledExecutorServiceProvider}.
Expand Down Expand Up @@ -64,7 +65,8 @@ class ClientExecutorProvidersConfigurator extends AbstractExecutorProvidersConfi

@Override
public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
Map<String, Object> runtimeProperties = bootstrapBag.getConfiguration().getProperties();
final Configuration configuration = bootstrapBag.getConfiguration();
Map<String, Object> runtimeProperties = configuration.getProperties();

ExecutorServiceProvider defaultAsyncExecutorProvider;
ScheduledExecutorServiceProvider defaultScheduledExecutorProvider;
Expand Down Expand Up @@ -94,12 +96,12 @@ public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
.named("ClientAsyncThreadPoolSize");
injectionManager.register(asyncThreadPoolSizeBinding);

defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(asyncThreadPoolSize);
defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(asyncThreadPoolSize, configuration);
} else {
if (MANAGED_EXECUTOR_SERVICE != null) {
defaultAsyncExecutorProvider = new ClientExecutorServiceProvider(MANAGED_EXECUTOR_SERVICE);
} else {
defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(0);
defaultAsyncExecutorProvider = new DefaultClientAsyncExecutorProvider(0, configuration);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -20,6 +20,8 @@

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.Context;

import org.glassfish.jersey.client.internal.LocalizationMessages;
import org.glassfish.jersey.internal.util.collection.LazyValue;
Expand All @@ -46,8 +48,9 @@ class DefaultClientAsyncExecutorProvider extends ThreadPoolExecutorProvider {
* See also {@link org.glassfish.jersey.client.ClientProperties#ASYNC_THREADPOOL_SIZE}.
*/
@Inject
public DefaultClientAsyncExecutorProvider(@Named("ClientAsyncThreadPoolSize") final int poolSize) {
super("jersey-client-async-executor");
public DefaultClientAsyncExecutorProvider(@Named("ClientAsyncThreadPoolSize") final int poolSize,
@Context Configuration configuration) {
super("jersey-client-async-executor", configuration);

this.asyncThreadPoolSize = Values.lazy(new Value<Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -320,6 +320,30 @@ public final class CommonProperties {
*/
public static final String PARAM_CONVERTERS_THROW_IAE = "jersey.config.paramconverters.throw.iae";

/**
* <p>
* Defines the {@link java.util.concurrent.ThreadFactory} to be used by internal default Executor Services.
* </p>
* <p>
* The default is {@link java.util.concurrent.Executors#defaultThreadFactory()} on platform threads and
* {@code Thread.ofVirtual().factory()} on virtual threads.
* </p>
* @since 2.44
*/
public static String THREAD_FACTORY = "jersey.config.threads.factory";

/**
* <p>
* Defines whether the virtual threads should be used by Jersey on JDK 21+ when not using an exact number
* of threads by {@code FixedThreadPool}.
* </p>
* <p>
* The default is {@code false} for this version of Jersey, and {@code true} for Jersey 3.1+.
* </p>
* @since 2.44
*/
public static String USE_VIRTUAL_THREADS = "jersey.config.threads.use.virtual";

/**
* Prevent instantiation.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.innate;

import org.glassfish.jersey.CommonProperties;
import org.glassfish.jersey.innate.virtual.LoomishExecutors;

import javax.ws.rs.core.Configuration;
import java.util.concurrent.ThreadFactory;

/**
* Factory class to provide JDK specific implementation of bits related to the virtual thread support.
*/
public final class VirtualThreadUtil {
/**
* Do not instantiate.
*/
private VirtualThreadUtil() {
throw new IllegalStateException();
}

/**
* Return an instance of {@link LoomishExecutors} based on a configuration property.
* @param config the {@link Configuration}
* @return the {@link LoomishExecutors} instance.
*/
public static LoomishExecutors withConfig(Configuration config) {
boolean bUseVirtualThreads = false;
ThreadFactory tfThreadFactory = null;

if (config != null) {
Object useVirtualThread = config.getProperty(CommonProperties.USE_VIRTUAL_THREADS);
if (useVirtualThread != null && Boolean.class.isInstance(useVirtualThread)) {
bUseVirtualThreads = (boolean) useVirtualThread;
}
if (useVirtualThread != null && String.class.isInstance(useVirtualThread)) {
bUseVirtualThreads = Boolean.parseBoolean(useVirtualThread.toString());
}

Object threadFactory = config.getProperty(CommonProperties.THREAD_FACTORY);
if (threadFactory != null && ThreadFactory.class.isInstance(threadFactory)) {
tfThreadFactory = (ThreadFactory) threadFactory;
}

}
return tfThreadFactory == null
? VirtualThreadSupport.allowVirtual(bUseVirtualThreads)
: VirtualThreadSupport.allowVirtual(bUseVirtualThreads, tfThreadFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.innate.virtual;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* {@link Executors} facade to support virtual threads.
*/
public interface LoomishExecutors {
/**
* Creates a thread pool that creates new threads as needed and uses virtual threads if available.
* @return the newly created thread pool
*/
ExecutorService newCachedThreadPool();

/**
* Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue
* and uses virtual threads if available
* @param nThreads – the number of threads in the pool
* @return the newly created thread pool
*/
ExecutorService newFixedThreadPool(int nThreads);

/**
* Returns thread factory used to create new threads
* @return thread factory used to create new threads
* @see Executors#defaultThreadFactory()
*/
ThreadFactory getThreadFactory();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

/**
* Jersey innate packages. The innate packages will not be opened by JPMS outside of Jersey.
* Not for public use.
* This virtual package should contain only classes that do not have dependencies on Jersey, or the REST API to be buildable with
* ant for multi-release.
*/
package org.glassfish.jersey.innate.virtual;
Loading