From 9bd080d5d5a1ffa82d939525505633da1924386a Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 20 Feb 2024 16:41:14 +0100 Subject: [PATCH 1/3] Configure worker threads relative to core count Worker threads defaults to 2*#cores. This commit allows configuration to be #cores based too. Extracted from https://github.com/trinodb/trino/pull/16303 Co-authored-by: Mateusz "Serafin" Gajewski --- .../io/trino/execution/TaskManagerConfig.java | 4 +- .../io/trino/execution/ThreadCountParser.java | 61 +++++++++++++++++++ .../execution/TestTaskManagerConfig.java | 4 +- 3 files changed, 65 insertions(+), 4 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/execution/ThreadCountParser.java diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index 7ed8fe3f6437..afe81df5ea99 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -286,9 +286,9 @@ public int getMaxWorkerThreads() @LegacyConfig("task.shard.max-threads") @Config("task.max-worker-threads") - public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads) + public TaskManagerConfig setMaxWorkerThreads(String maxWorkerThreads) { - this.maxWorkerThreads = maxWorkerThreads; + this.maxWorkerThreads = ThreadCountParser.DEFAULT.parse(maxWorkerThreads); return this; } diff --git a/core/trino-main/src/main/java/io/trino/execution/ThreadCountParser.java b/core/trino-main/src/main/java/io/trino/execution/ThreadCountParser.java new file mode 100644 index 000000000000..ff114d59e724 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/ThreadCountParser.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.function.Supplier; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.Long.parseLong; +import static java.lang.Math.multiplyExact; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; + +// Based on https://github.com/airlift/units/pull/31, but adapted not to be a value class per https://github.com/trinodb/trino/pull/16303#issuecomment-1730146433 +public class ThreadCountParser +{ + private static final String PER_CORE_SUFFIX = "C"; + private static final Supplier AVAILABLE_PROCESSORS = Runtime.getRuntime()::availableProcessors; + public static final ThreadCountParser DEFAULT = new ThreadCountParser(AVAILABLE_PROCESSORS); + + private final Supplier coreCount; + + @VisibleForTesting + ThreadCountParser(Supplier coreCount) + { + this.coreCount = requireNonNull(coreCount, "coreCount is null"); + } + + public int parse(String value) + { + int coreCount = this.coreCount.get(); + checkState(coreCount > 0, "coreCount must be positive"); + + long threads; + if (value.endsWith(PER_CORE_SUFFIX)) { + long multiplier = parseLong(value.substring(0, value.length() - PER_CORE_SUFFIX.length()).trim()); + checkArgument(multiplier > 0, "Thread multiplier cannot be negative"); + threads = multiplyExact(multiplier, coreCount); + } + else { + threads = parseLong(value); + } + + checkArgument(threads <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1"); + checkArgument(0 <= threads, "Thread count cannot be negative"); + return toIntExact(threads); + } +} diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 9364d8aab68c..e0e8de75e53a 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -47,7 +47,7 @@ public void testDefaults() .setTaskTerminationTimeout(new Duration(1, TimeUnit.MINUTES)) .setPerOperatorCpuTimerEnabled(true) .setTaskCpuTimerEnabled(true) - .setMaxWorkerThreads(Runtime.getRuntime().availableProcessors() * 2) + .setMaxWorkerThreads("2C") .setMinDrivers(Runtime.getRuntime().availableProcessors() * 2 * 2) .setMinDriversPerTask(3) .setMaxDriversPerTask(Integer.MAX_VALUE) @@ -139,7 +139,7 @@ public void testExplicitPropertyMappings() .setMaxPartialAggregationMemoryUsage(DataSize.of(32, Unit.MEGABYTE)) .setMaxPartialTopNMemory(DataSize.of(32, Unit.MEGABYTE)) .setMaxLocalExchangeBufferSize(DataSize.of(33, Unit.MEGABYTE)) - .setMaxWorkerThreads(3) + .setMaxWorkerThreads("3") .setMinDrivers(2) .setMinDriversPerTask(5) .setMaxDriversPerTask(13) From 853bfaf5897bcbf5c6cdddfb0180763a628dcdcc Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 20 Feb 2024 16:54:05 +0100 Subject: [PATCH 2/3] Remove redundant whitespace --- .../src/main/java/io/trino/operator/index/IndexLoader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java b/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java index c23a357edda4..e4d17a7e302e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java +++ b/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java @@ -1,4 +1,4 @@ - /* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at From 175f921c5a98216f319ceb15b1fa98dc32edd1b4 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 20 Feb 2024 17:09:09 +0100 Subject: [PATCH 3/3] Migrate from javax @Generated annotation --- .../src/main/java/io/trino/server/security/jwt/JwkService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/security/jwt/JwkService.java b/core/trino-main/src/main/java/io/trino/server/security/jwt/JwkService.java index dbc05bb3cdb1..a12bc9f8b360 100644 --- a/core/trino-main/src/main/java/io/trino/server/security/jwt/JwkService.java +++ b/core/trino-main/src/main/java/io/trino/server/security/jwt/JwkService.java @@ -21,11 +21,10 @@ import io.airlift.http.client.StringResponseHandler.StringResponse; import io.airlift.log.Logger; import io.airlift.units.Duration; +import jakarta.annotation.Generated; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import javax.annotation.processing.Generated; - import java.io.IOException; import java.net.URI; import java.security.PublicKey;