Skip to content

Commit

Permalink
Properly report Vertx worker pool size
Browse files Browse the repository at this point in the history
This is done by setting the relevant property
in VertxOptions to the size of the Quarkus
ExecutorService that is actually by Vertx
(in prod mode).
The reason we update VertOptions is that
PoolMetrics uses it to calculate the blocking
pool size.

Closes: quarkusio#34998
  • Loading branch information
geoand committed Aug 24, 2023
1 parent ca83189 commit 8b06077
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;
Expand Down Expand Up @@ -151,10 +152,9 @@ private static EnhancedQueueExecutor createExecutor(ThreadPoolConfig threadPoolC
.setRegisterMBean(false)
.setHandoffExecutor(JBossExecutors.rejectingExecutor())
.setThreadFactory(JBossExecutors.resettingThreadFactory(threadFactory));
final int cpus = ProcessorInfo.availableProcessors();
// run time config variables
builder.setCorePoolSize(threadPoolConfig.coreThreads);
builder.setMaximumPoolSize(threadPoolConfig.maxThreads.orElse(Math.max(8 * cpus, 200)));
builder.setMaximumPoolSize(getMaxSize(threadPoolConfig));
if (threadPoolConfig.queueSize.isPresent()) {
if (threadPoolConfig.queueSize.getAsInt() < 0) {
builder.setMaximumQueueSize(Integer.MAX_VALUE);
Expand All @@ -172,6 +172,35 @@ private static EnhancedQueueExecutor createExecutor(ThreadPoolConfig threadPoolC
return builder.build();
}

public static int getMaxSize(ThreadPoolConfig threadPoolConfig) {
return threadPoolConfig.maxThreads.orElseGet(MaxThreadsCalculator.INSTANCE);
}

public static int calculateMaxThreads() {
return MaxThreadsCalculator.INSTANCE.getAsInt();
}

/**
* NOTE: This is not folded at native image build time, so it works as expected
*/
private static final class MaxThreadsCalculator implements IntSupplier {

private static final MaxThreadsCalculator INSTANCE = new MaxThreadsCalculator();

private MaxThreadsCalculator() {
}

@Override
public int getAsInt() {
return Holder.CALCULATION;
}

private static class Holder {
private static final int DEFAULT_MAX_THREADS = 200;
private static final int CALCULATION = Math.max(8 * ProcessorInfo.availableProcessors(), DEFAULT_MAX_THREADS);
}
}

public static Executor getCurrent() {
return current;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,12 @@ public class ThreadPoolConfig {
@ConfigItem(defaultValue = "30")
public Duration keepAliveTime;

public static ThreadPoolConfig empty() {
var config = new ThreadPoolConfig();
config.maxThreads = OptionalInt.empty();
config.queueSize = OptionalInt.empty();
config.shutdownCheckInterval = Optional.empty();
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.quarkus.runtime.QuarkusBindException;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.ConfigInstantiator;
import io.quarkus.runtime.configuration.ConfigUtils;
Expand Down Expand Up @@ -250,7 +251,11 @@ public static void startServerAfterFailedStart() {
.addDiscoveredSources()
.withMapping(VertxConfiguration.class)
.build().getConfigMapping(VertxConfiguration.class);
vertx = VertxCoreRecorder.recoverFailedStart(vertxConfiguration).get();

ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig();
ConfigInstantiator.handleObject(threadPoolConfig);

vertx = VertxCoreRecorder.recoverFailedStart(vertxConfiguration, threadPoolConfig).get();
} else {
vertx = supplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
import io.quarkus.gizmo.Gizmo;
import io.quarkus.netty.deployment.EventLoopSupplierBuildItem;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.vertx.VertxOptionsCustomizer;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.VertxLocalsHelper;
Expand Down Expand Up @@ -215,6 +216,7 @@ IOThreadDetectorBuildItem ioThreadDetector(VertxCoreRecorder recorder) {
CoreVertxBuildItem build(VertxCoreRecorder recorder,
LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown, VertxConfiguration config,
List<VertxOptionsConsumerBuildItem> vertxOptionsConsumers,
ThreadPoolConfig threadPoolConfig,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BuildProducer<EventLoopSupplierBuildItem> eventLoops,
ExecutorBuildItem executorBuildItem) {
Expand All @@ -225,7 +227,7 @@ CoreVertxBuildItem build(VertxCoreRecorder recorder,
consumers.add(x.getConsumer());
}

Supplier<Vertx> vertx = recorder.configureVertx(config,
Supplier<Vertx> vertx = recorder.configureVertx(config, threadPoolConfig,
launchMode.getLaunchMode(), shutdown, consumers, executorBuildItem.getExecutorProxy());
syntheticBeans.produce(SyntheticBeanBuildItem.configure(Vertx.class)
.types(Vertx.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import org.jboss.logging.Logger;
import org.jboss.threads.EnhancedQueueExecutor;
import org.jboss.threads.JBossExecutors;
import org.wildfly.common.cpu.ProcessorInfo;

import io.quarkus.runtime.ExecutorRecorder;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.vertx.core.spi.ExecutorServiceFactory;
Expand Down Expand Up @@ -47,10 +47,9 @@ private ExecutorService internalCreateExecutor(ThreadFactory threadFactory, Inte
.setRegisterMBean(false)
.setHandoffExecutor(JBossExecutors.rejectingExecutor())
.setThreadFactory(JBossExecutors.resettingThreadFactory(threadFactory));
final int cpus = ProcessorInfo.availableProcessors();
// run time config variables
builder.setCorePoolSize(concurrency);
builder.setMaximumPoolSize(maxConcurrency != null ? maxConcurrency : Math.max(8 * cpus, 200));
builder.setMaximumPoolSize(maxConcurrency != null ? maxConcurrency : ExecutorRecorder.calculateMaxThreads());

if (conf != null) {
if (conf.queueSize().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.runtime.ExecutorRecorder;
import io.quarkus.runtime.IOThreadDetector;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.vertx.core.runtime.config.AddressResolverConfiguration;
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
Expand Down Expand Up @@ -95,12 +97,12 @@ public class VertxCoreRecorder {
*/
private static volatile ClassLoader currentDevModeNewThreadCreationClassLoader;

public Supplier<Vertx> configureVertx(VertxConfiguration config,
public Supplier<Vertx> configureVertx(VertxConfiguration config, ThreadPoolConfig threadPoolConfig,
LaunchMode launchMode, ShutdownContext shutdown, List<Consumer<VertxOptions>> customizers,
ExecutorService executorProxy) {
QuarkusExecutorFactory.sharedExecutor = executorProxy;
if (launchMode != LaunchMode.DEVELOPMENT) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
// we need this to be part of the last shutdown tasks because closing it early (basically before Arc)
// could cause problem to beans that rely on Vert.x and contain shutdown tasks
shutdown.addLastShutdownTask(new Runnable() {
Expand All @@ -113,7 +115,7 @@ public void run() {
});
} else {
if (vertx == null) {
vertx = new VertxSupplier(launchMode, config, customizers, shutdown);
vertx = new VertxSupplier(launchMode, config, customizers, threadPoolConfig, shutdown);
} else if (vertx.v != null) {
tryCleanTccl();
}
Expand Down Expand Up @@ -195,13 +197,14 @@ public static Supplier<Vertx> getVertx() {
return vertx;
}

public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer customizer, ShutdownContext shutdown,
public static Vertx initialize(VertxConfiguration conf, VertxOptionsCustomizer customizer,
ThreadPoolConfig threadPoolConfig, ShutdownContext shutdown,
LaunchMode launchMode) {

VertxOptions options = new VertxOptions();

if (conf != null) {
convertToVertxOptions(conf, options, true, shutdown);
convertToVertxOptions(conf, options, threadPoolConfig, true, shutdown);
}

// Allow extension customizers to do their thing
Expand Down Expand Up @@ -293,7 +296,8 @@ private static Vertx logVertxInitialization(Vertx vertx) {
return vertx;
}

private static VertxOptions convertToVertxOptions(VertxConfiguration conf, VertxOptions options, boolean allowClustering,
private static VertxOptions convertToVertxOptions(VertxConfiguration conf, VertxOptions options,
ThreadPoolConfig threadPoolConfig, boolean allowClustering,
ShutdownContext shutdown) {

if (!conf.useAsyncDNS()) {
Expand Down Expand Up @@ -349,7 +353,7 @@ public void run() {
}

options.setFileSystemOptions(fileSystemOptions);
options.setWorkerPoolSize(conf.workerPoolSize());
options.setWorkerPoolSize(ExecutorRecorder.getMaxSize(threadPoolConfig));
options.setInternalBlockingPoolSize(conf.internalBlockingPoolSize());
blockingThreadPoolSize = conf.internalBlockingPoolSize();

Expand Down Expand Up @@ -586,30 +590,33 @@ public void runWith(Runnable task, Object context) {
};
}

public static Supplier<Vertx> recoverFailedStart(VertxConfiguration config) {
return vertx = new VertxSupplier(LaunchMode.DEVELOPMENT, config, Collections.emptyList(), null);
public static Supplier<Vertx> recoverFailedStart(VertxConfiguration config, ThreadPoolConfig threadPoolConfig) {
return vertx = new VertxSupplier(LaunchMode.DEVELOPMENT, config, Collections.emptyList(), threadPoolConfig, null);

}

static class VertxSupplier implements Supplier<Vertx> {
final LaunchMode launchMode;
final VertxConfiguration config;
final VertxOptionsCustomizer customizer;
final ThreadPoolConfig threadPoolConfig;
final ShutdownContext shutdown;
Vertx v;

VertxSupplier(LaunchMode launchMode, VertxConfiguration config, List<Consumer<VertxOptions>> customizers,
ThreadPoolConfig threadPoolConfig,
ShutdownContext shutdown) {
this.launchMode = launchMode;
this.config = config;
this.customizer = new VertxOptionsCustomizer(customizers);
this.threadPoolConfig = threadPoolConfig;
this.shutdown = shutdown;
}

@Override
public synchronized Vertx get() {
if (v == null) {
v = initialize(config, customizer, shutdown, launchMode);
v = initialize(config, customizer, threadPoolConfig, shutdown, launchMode);
}
return v;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ public interface VertxConfiguration {
Duration warningExceptionTime();

/**
* The size of the worker thread pool.
* @deprecated use {@code quarkus.thread-pool.max-threads} instead
*/
@WithDefault("20")
@WithDefault("${quarkus.thread-pool.max-threads:20}")
@Deprecated
int workerPoolSize();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.Test;

import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ThreadPoolConfig;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder.VertxOptionsCustomizer;
import io.quarkus.vertx.core.runtime.config.AddressResolverConfiguration;
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
Expand Down Expand Up @@ -98,7 +99,8 @@ public Duration warningExceptionTime() {
};

try {
VertxCoreRecorder.initialize(configuration, null, null, LaunchMode.TEST);

VertxCoreRecorder.initialize(configuration, null, ThreadPoolConfig.empty(), null, LaunchMode.TEST);
Assertions.fail("It should not have a cluster manager on the classpath, and so fail the creation");
} catch (IllegalStateException e) {
Assertions.assertTrue(e.getMessage().contains("No ClusterManagerFactory"),
Expand Down Expand Up @@ -155,7 +157,7 @@ public void accept(VertxOptions vertxOptions) {
}
}));

VertxCoreRecorder.initialize(configuration, customizers, null, LaunchMode.TEST);
VertxCoreRecorder.initialize(configuration, customizers, ThreadPoolConfig.empty(), null, LaunchMode.TEST);
}

@Test
Expand All @@ -168,7 +170,9 @@ public void accept(VertxOptions vertxOptions) {
called.set(true);
}
}));
Vertx v = VertxCoreRecorder.initialize(new DefaultVertxConfiguration(), customizers, null, LaunchMode.TEST);
Vertx v = VertxCoreRecorder.initialize(new DefaultVertxConfiguration(), customizers, ThreadPoolConfig.empty(),
null,
LaunchMode.TEST);
Assertions.assertTrue(called.get(), "Customizer should get called during initialization");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void tearDown() {

@Test
public void shouldNotFailWithoutConfig() {
verifyProducer(VertxCoreRecorder.initialize(null, null, null, LaunchMode.TEST));
verifyProducer(VertxCoreRecorder.initialize(null, null, null, null, LaunchMode.TEST));
}

private void verifyProducer(Vertx v) {
Expand Down

0 comments on commit 8b06077

Please sign in to comment.