From 2c1c4fc595492d60a109a723a965e52fea919ddf Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 28 Jul 2023 22:10:20 -0700 Subject: [PATCH] json config based processing and broker merge configs to deprecate config-magic --- .../CachingClusteredClientBenchmark.java | 15 +- .../query/SqlExpressionBenchmark.java | 7 - .../query/SqlNestedDataBenchmark.java | 8 +- .../movingaverage/MovingAverageQueryTest.java | 11 +- .../kafka/supervisor/KafkaSupervisor.java | 2 +- .../kafka/supervisor/KafkaSupervisorSpec.java | 2 +- .../supervisor/KafkaSupervisorSpecTest.java | 2 +- .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../supervisor/KinesisSupervisorSpec.java | 2 +- .../supervisor/KinesisSupervisorTest.java | 2 +- .../SeekableStreamSupervisorSpec.java | 2 +- .../SeekableStreamSupervisorSpecTest.java | 2 +- .../SeekableStreamSupervisorStateTest.java | 2 +- .../druid/testsEx/config/Initializer.java | 4 +- .../concurrent/ExecutorServiceConfig.java | 57 ----- .../metrics/AllocationMetricCollector.java | 2 +- .../util/metrics/BasicMonitorScheduler.java | 2 +- .../ClockDriftSafeMonitorScheduler.java | 2 +- .../metrics/DruidMonitorSchedulerConfig.java | 8 +- .../java/util/metrics/MonitorScheduler.java | 6 +- .../util/metrics/MonitorSchedulerConfig.java | 33 --- .../query/DruidProcessingBufferConfig.java | 77 ++++++ .../druid/query/DruidProcessingConfig.java | 227 +++++++----------- .../query/DruidProcessingIndexesConfig.java | 64 +++++ .../druid/segment/column/ColumnConfig.java | 20 +- .../metrics/BasicMonitorSchedulerTest.java | 2 +- .../ClockDriftSafeMonitorSchedulerTest.java | 10 +- .../util/metrics/MonitorSchedulerTest.java | 2 +- .../query/DruidProcessingConfigTest.java | 98 ++++---- .../druid/client/CachingClusteredClient.java | 18 +- .../druid/guice/BrokerProcessingModule.java | 13 +- .../druid/guice/DruidProcessingModule.java | 26 +- ...egacyBrokerParallelMergeConfigModule.java} | 8 +- .../druid/guice/RouterProcessingModule.java | 13 +- .../apache/druid/guice/StorageNodeModule.java | 4 - .../initialization/CoreInjectorBuilder.java | 2 - .../query/BrokerParallelMergeConfig.java | 215 +++++++++++++++++ .../LegacyBrokerParallelMergeConfig.java | 70 ++++++ .../realtime/DbSegmentPublisherConfig.java | 28 --- .../druid/server/metrics/MetricsModule.java | 1 + ...chingClusteredClientFunctionalityTest.java | 12 +- .../CachingClusteredClientPerfTest.java | 4 +- .../client/CachingClusteredClientTest.java | 12 +- .../guice/BrokerProcessingModuleTest.java | 95 ++++---- .../druid/guice/QueryableModuleTest.java | 2 +- ...yRunnerBasedOnClusteredClientTestBase.java | 5 +- .../apache/druid/server/QueryStackTests.java | 24 +- .../java/org/apache/druid/cli/CliBroker.java | 2 + .../org/apache/druid/cli/DumpSegment.java | 40 +-- .../apache/druid/cli/ValidateSegments.java | 40 +-- 50 files changed, 732 insertions(+), 575 deletions(-) delete mode 100644 processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java rename {server/src/main/java/org/apache/druid/server => processing/src/main/java/org/apache/druid/java/util}/metrics/DruidMonitorSchedulerConfig.java (84%) delete mode 100644 processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java create mode 100644 processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java create mode 100644 processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java rename server/src/main/java/org/apache/druid/guice/{DruidProcessingConfigModule.java => LegacyBrokerParallelMergeConfigModule.java} (78%) create mode 100644 server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java create mode 100644 server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 4b0d55c2c62a..064b7ceea0ed 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfig; @@ -258,12 +259,6 @@ public int getNumThreads() { return numProcessingThreads; } - - @Override - public boolean useParallelMergePool() - { - return true; - } }; conglomerate = new DefaultQueryRunnerFactoryConglomerate( @@ -339,7 +334,13 @@ public > QueryToolChest getToolChest new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0), new CacheConfig(), new DruidHttpClientConfig(), - processingConfig, + new BrokerParallelMergeConfig() { + @Override + public boolean useParallelMergePool() + { + return true; + } + }, forkJoinPool, QueryStackTests.DEFAULT_NOOP_SCHEDULER, JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index 7733281908f0..80d208c5a433 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -106,13 +106,6 @@ public int getNumThreads() { return 1; } - - @Override - public boolean useParallelMergePoolConfigured() - { - return true; - } - @Override public String getFormatString() { diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java index 98514512e9ab..da42aaeefcdd 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java @@ -112,13 +112,7 @@ public int getNumThreads() { return 1; } - - @Override - public boolean useParallelMergePoolConfigured() - { - return true; - } - + @Override public String getFormatString() { diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 2945bdb1e569..f8de4263072b 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -54,7 +54,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -363,14 +363,7 @@ public long getMaxQueuedBytes() return 0L; } }, - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return null; - } - }, + new BrokerParallelMergeConfig(), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e875183bbbb7..eedae3782ca6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -55,7 +55,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.joda.time.DateTime; import javax.annotation.Nullable; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index af6f69ab2ec2..4f0f5e36c0fe 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -35,7 +35,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 7874a11c588d..6419d80074c8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -33,7 +33,7 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.junit.Assert; import org.junit.Test; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 47875e106f35..93289536dbdf 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -88,7 +88,7 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.FireDepartment; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.Action; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index eb9715f9c9e4..39afe811024e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -38,7 +38,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 37691b1a7e53..6342e34a471c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -85,7 +85,7 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.FireDepartment; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.Action; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 90b0b705637c..18ee094289a2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -37,7 +37,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import javax.annotation.Nullable; import java.util.List; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 52e160058588..49f2be282c9c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -61,7 +61,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.joda.time.DateTime; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 24a2ed60b35c..7b6d706c7a2a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -77,7 +77,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java index 7a2eae93e16a..35d492b4d402 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java @@ -35,7 +35,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.AnnouncerModule; -import org.apache.druid.guice.DruidProcessingConfigModule; +import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.ManageLifecycle; @@ -496,7 +496,7 @@ private static Injector makeInjector( new AnnouncerModule(), new DiscoveryModule(), // Dependencies from other modules - new DruidProcessingConfigModule(), + new LegacyBrokerParallelMergeConfigModule(), // Dependencies from other modules new StorageNodeModule(), diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java deleted file mode 100644 index 6d6327fbdd28..000000000000 --- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.common.concurrent; - -import org.apache.druid.utils.JvmUtils; -import org.skife.config.Config; -import org.skife.config.Default; - -/** - */ -public abstract class ExecutorServiceConfig -{ - public static final int DEFAULT_NUM_THREADS = -1; - - @Config(value = "${base_path}.formatString") - @Default("processing-%s") - public abstract String getFormatString(); - - public int getNumThreads() - { - int numThreadsConfigured = getNumThreadsConfigured(); - if (numThreadsConfigured != DEFAULT_NUM_THREADS) { - return numThreadsConfigured; - } else { - return Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1); - } - } - - /** - * Returns the number of threads _explicitly_ configured, or -1 if it is not explicitly configured, that is not - * a valid number of threads. To get the configured value or the default (valid) number, use {@link #getNumThreads()}. - * This method exists for ability to distinguish between the default value set when there is no explicit config, and - * an explicitly configured value. - */ - @Config(value = "${base_path}.numThreads") - public int getNumThreadsConfigured() - { - return DEFAULT_NUM_THREADS; - } -} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java index a0a9d7f645fd..dbf7b245393e 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java @@ -52,7 +52,7 @@ class AllocationMetricCollector * Tests show the call to getThreadAllocatedBytes for a single thread ID out of 500 threads running takes around * 9000 ns (in the worst case), which for 500 IDs should take 500*9000/1000/1000 = 4.5 ms to the max. * AllocationMetricCollector takes linear time to calculate delta, for 500 threads it's negligible. - * See the default emitting period {@link MonitorSchedulerConfig#getEmitterPeriod}. + * See the default emitting period {@link DruidMonitorSchedulerConfig#getEmitterPeriod}. * * @return all threads summed allocated bytes delta */ diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java index 0d5c07d49b4b..cbbd05213c18 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java @@ -34,7 +34,7 @@ public class BasicMonitorScheduler extends MonitorScheduler private final ScheduledExecutorService exec; public BasicMonitorScheduler( - MonitorSchedulerConfig config, + DruidMonitorSchedulerConfig config, ServiceEmitter emitter, List monitors, ScheduledExecutorService exec diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java index 6fcb24406971..731f3ba108a0 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java @@ -41,7 +41,7 @@ public class ClockDriftSafeMonitorScheduler extends MonitorScheduler private final ExecutorService monitorRunner; public ClockDriftSafeMonitorScheduler( - MonitorSchedulerConfig config, + DruidMonitorSchedulerConfig config, ServiceEmitter emitter, List monitors, CronScheduler monitorScheduler, diff --git a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java similarity index 84% rename from server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java rename to processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java index 0e242b1244c1..4989bdf29eba 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java @@ -17,17 +17,15 @@ * under the License. */ -package org.apache.druid.server.metrics; +package org.apache.druid.java.util.metrics; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.java.util.metrics.BasicMonitorScheduler; -import org.apache.druid.java.util.metrics.MonitorSchedulerConfig; import org.joda.time.Duration; import org.joda.time.Period; /** */ -public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig +public class DruidMonitorSchedulerConfig { @JsonProperty private String schedulerClassName = BasicMonitorScheduler.class.getName(); @@ -35,6 +33,7 @@ public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig @JsonProperty private Period emissionPeriod = new Period("PT1M"); + @JsonProperty public String getSchedulerClassName() { return schedulerClassName; @@ -46,7 +45,6 @@ public Period getEmissionPeriod() return emissionPeriod; } - @Override public Duration getEmitterPeriod() { return emissionPeriod.toStandardDuration(); diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 9e159182a1d2..47bb37d16110 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -39,7 +39,7 @@ public abstract class MonitorScheduler { private static final Logger log = new Logger(MonitorScheduler.class); - private final MonitorSchedulerConfig config; + private final DruidMonitorSchedulerConfig config; private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); @@ -47,7 +47,7 @@ public abstract class MonitorScheduler private volatile boolean started = false; MonitorScheduler( - MonitorSchedulerConfig config, + DruidMonitorSchedulerConfig config, ServiceEmitter emitter, List monitors ) @@ -135,7 +135,7 @@ boolean hasMonitor(final Monitor monitor) } } - MonitorSchedulerConfig getConfig() + DruidMonitorSchedulerConfig getConfig() { return config; } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java deleted file mode 100644 index 8d0e165e2add..000000000000 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.metrics; - -import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; - -/** - */ -public abstract class MonitorSchedulerConfig -{ - @Config({"org.apache.druid.java.util.metrics.emitter.period", "com.metamx.druid.emitter.period"}) - @Default("PT60s") - public abstract Duration getEmitterPeriod(); -} diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java new file mode 100644 index 000000000000..582d13c14b24 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java @@ -0,0 +1,77 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.HumanReadableBytes; + +import javax.annotation.Nullable; + +public class DruidProcessingBufferConfig +{ + public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1); + public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024; + public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0; + + @JsonProperty + private final HumanReadableBytes sizeBytes; + + @JsonProperty + private final int poolCacheMaxCount; + + @JsonProperty + private final int poolCacheInitialCount; + + @JsonCreator + public DruidProcessingBufferConfig( + @JsonProperty("sizeBytes") @Nullable HumanReadableBytes sizeBytes, + @JsonProperty("poolCacheMaxCount") @Nullable Integer poolCacheMaxCount, + @JsonProperty("poolCacheInitialCount") @Nullable Integer poolCacheInitialCount + ) + { + this.sizeBytes = sizeBytes == null ? DEFAULT_PROCESSING_BUFFER_SIZE_BYTES : sizeBytes; + this.poolCacheInitialCount = poolCacheInitialCount == null + ? DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL + : poolCacheInitialCount; + this.poolCacheMaxCount = poolCacheMaxCount == null ? Integer.MAX_VALUE : poolCacheMaxCount; + } + + public DruidProcessingBufferConfig() + { + this(null, null, null); + } + + public HumanReadableBytes getBufferSize() + { + return sizeBytes; + } + + public int getPoolCacheMaxCount() + { + return poolCacheMaxCount; + } + + public int getPoolCacheInitialCount() + { + return poolCacheInitialCount; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java index 823c9e71efbc..0807ee24be0a 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java @@ -19,45 +19,85 @@ package org.apache.druid.query; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; -import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.utils.JvmUtils; -import org.skife.config.Config; +import javax.annotation.Nullable; import java.util.concurrent.atomic.AtomicReference; -public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig +public class DruidProcessingConfig implements ColumnConfig { private static final Logger log = new Logger(DruidProcessingConfig.class); - public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; - public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1); - public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024; - public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000; - public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0; - - private AtomicReference computedBufferSizeBytes = new AtomicReference<>(); - - @Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"}) - public HumanReadableBytes intermediateComputeSizeBytesConfigured() + public static long computeMaxMemoryFromMaxHeapSize() { - return DEFAULT_PROCESSING_BUFFER_SIZE_BYTES; + return Runtime.getRuntime().maxMemory() / 4; } - public int intermediateComputeSizeBytes() - { - HumanReadableBytes sizeBytesConfigured = intermediateComputeSizeBytesConfigured(); - if (!DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) { + @JsonProperty + private final String formatString; + @JsonProperty + private final int numThreads; + @JsonProperty + private final int numMergeBuffers; + @JsonProperty + private final boolean fifo; + @JsonProperty + private final String tmpDir; + @JsonProperty + private final DruidProcessingBufferConfig buffer; + @JsonProperty + private final DruidProcessingIndexesConfig indexes; + private final AtomicReference computedBufferSizeBytes = new AtomicReference<>(); + private final boolean numThreadsConfigured; + private final boolean numMergeBuffersConfigured; + + @JsonCreator + public DruidProcessingConfig( + @JsonProperty("formatString") @Nullable String formatString, + @JsonProperty("numThreads") @Nullable Integer numThreads, + @JsonProperty("numMergeBuffers") @Nullable Integer numMergeBuffers, + @JsonProperty("fifo") @Nullable Boolean fifo, + @JsonProperty("tmpDir") @Nullable String tmpDir, + @JsonProperty("buffer") DruidProcessingBufferConfig buffer, + @JsonProperty("indexes") DruidProcessingIndexesConfig indexes + ) + { + this.formatString = formatString == null ? "processing-%s" : formatString; + this.numThreads = numThreads == null + ? Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1) + : numThreads; + this.numMergeBuffers = numMergeBuffers == null ? Math.max(2, this.numThreads / 4) : numMergeBuffers; + this.fifo = fifo == null || fifo; + this.tmpDir = tmpDir == null ? System.getProperty("java.io.tmpdir") : tmpDir; + this.buffer = buffer == null ? new DruidProcessingBufferConfig() : buffer; + this.indexes = indexes == null ? new DruidProcessingIndexesConfig() : indexes; + + this.numThreadsConfigured = numThreads != null; + this.numMergeBuffersConfigured = numMergeBuffers != null; + initializeBufferSize(); + } + + @VisibleForTesting + public DruidProcessingConfig() + { + this(null, null, null, null, null, null, null); + } + + private void initializeBufferSize() + { + HumanReadableBytes sizeBytesConfigured = this.buffer.getBufferSize(); + if (!DruidProcessingBufferConfig.DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) { if (sizeBytesConfigured.getBytes() > Integer.MAX_VALUE) { throw new IAE("druid.processing.buffer.sizeBytes must be less than 2GiB"); } - return sizeBytesConfigured.getBytesInInt(); - } else if (computedBufferSizeBytes.get() != null) { - return computedBufferSizeBytes.get(); + computedBufferSizeBytes.set(sizeBytesConfigured.getBytesInInt()); } long directSizeBytes; @@ -74,169 +114,86 @@ public int intermediateComputeSizeBytes() log.info("Using up to [%,d] bytes of direct memory for computation buffers.", directSizeBytes); } - int numProcessingThreads = getNumThreads(); - int numMergeBuffers = getNumMergeBuffers(); - int totalNumBuffers = numMergeBuffers + numProcessingThreads; + int totalNumBuffers = this.numMergeBuffers + this.numThreads; int sizePerBuffer = (int) ((double) directSizeBytes / (double) (totalNumBuffers + 1)); - final int computedSizePerBuffer = Math.min(sizePerBuffer, MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES); + final int computedSizePerBuffer = Math.min( + sizePerBuffer, + DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES + ); if (computedBufferSizeBytes.compareAndSet(null, computedSizePerBuffer)) { log.info( "Auto sizing buffers to [%,d] bytes each for [%,d] processing and [%,d] merge buffers. " + "If you run out of direct memory, you may need to set these parameters explicitly using the guidelines at " + "https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html#processing-threads-buffers.", computedSizePerBuffer, - numProcessingThreads, - numMergeBuffers + this.numThreads, + this.numMergeBuffers ); } - return computedSizePerBuffer; - } - - public static long computeMaxMemoryFromMaxHeapSize() - { - return Runtime.getRuntime().maxMemory() / 4; - } - - @Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"}) - public int poolCacheMaxCount() - { - return Integer.MAX_VALUE; } - @Config({ - "druid.computation.buffer.poolCacheInitialCount", - "${base_path}.buffer.poolCacheInitialCount" - }) - public int getNumInitalBuffersForIntermediatePool() + public String getFormatString() { - return DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL; + return formatString; } - @Override - @Config(value = "${base_path}.numThreads") - public int getNumThreadsConfigured() + public int getNumThreads() { - return DEFAULT_NUM_THREADS; + return numThreads; } public int getNumMergeBuffers() { - int numMergeBuffersConfigured = getNumMergeBuffersConfigured(); - if (numMergeBuffersConfigured != DEFAULT_NUM_MERGE_BUFFERS) { - return numMergeBuffersConfigured; - } else { - return Math.max(2, getNumThreads() / 4); - } - } - - /** - * Returns the number of merge buffers _explicitly_ configured, or -1 if it is not explicitly configured, that is not - * a valid number of buffers. To get the configured value or the default (valid) number, use {@link - * #getNumMergeBuffers()}. This method exists for ability to distinguish between the default value set when there is - * no explicit config, and an explicitly configured value. - */ - @Config("${base_path}.numMergeBuffers") - public int getNumMergeBuffersConfigured() - { - return DEFAULT_NUM_MERGE_BUFFERS; + return numMergeBuffers; } - @Override - @Config(value = "${base_path}.indexes.skipValueRangeIndexScale") - public double skipValueRangeIndexScale() - { - return ColumnConfig.super.skipValueRangeIndexScale(); - } - - @Override - @Config(value = "${base_path}.indexes.skipValuePredicateIndexScale") - public double skipValuePredicateIndexScale() - { - return ColumnConfig.super.skipValuePredicateIndexScale(); - } - - @Config(value = "${base_path}.fifo") public boolean isFifo() { - return true; + return fifo; } - @Config(value = "${base_path}.tmpDir") public String getTmpDir() { - return System.getProperty("java.io.tmpdir"); + return tmpDir; } - @Config(value = "${base_path}.merge.useParallelMergePool") - public boolean useParallelMergePoolConfigured() + public int intermediateComputeSizeBytes() { - return true; - } - public boolean useParallelMergePool() - { - final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured(); - final int parallelism = getMergePoolParallelism(); - // need at least 3 to do 2 layer merge - if (parallelism > 2) { - return useParallelMergePoolConfigured; - } - if (useParallelMergePoolConfigured) { - log.debug( - "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s", - parallelism - ); - } - return false; - } - - @Config(value = "${base_path}.merge.pool.parallelism") - public int getMergePoolParallelismConfigured() - { - return DEFAULT_NUM_THREADS; + return computedBufferSizeBytes.get(); } - public int getMergePoolParallelism() + public int poolCacheMaxCount() { - int poolParallelismConfigured = getMergePoolParallelismConfigured(); - if (poolParallelismConfigured != DEFAULT_NUM_THREADS) { - return poolParallelismConfigured; - } else { - // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5 - return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75); - } + return buffer.getPoolCacheMaxCount(); } - @Config(value = "${base_path}.merge.pool.awaitShutdownMillis") - public long getMergePoolAwaitShutdownMillis() + public int getNumInitalBuffersForIntermediatePool() { - return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS; + return buffer.getPoolCacheInitialCount(); } - @Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism") - public int getMergePoolDefaultMaxQueryParallelism() + @Override + public double skipValueRangeIndexScale() { - // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores - return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1); + return indexes.getSkipValueRangeIndexScale(); } - @Config(value = "${base_path}.merge.task.targetRunTimeMillis") - public int getMergePoolTargetTaskRunTimeMillis() + @Override + public double skipValuePredicateIndexScale() { - return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS; + return indexes.getSkipValuePredicateIndexScale(); } - @Config(value = "${base_path}.merge.task.initialYieldNumRows") - public int getMergePoolTaskInitialYieldRows() + public boolean isNumThreadsConfigured() { - return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS; + return numThreadsConfigured; } - @Config(value = "${base_path}.merge.task.smallBatchNumRows") - public int getMergePoolSmallBatchRows() + public boolean isNumMergeBuffersConfigured() { - return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS; + return numMergeBuffersConfigured; } } diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java new file mode 100644 index 000000000000..b5fdcd93fb8c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.column.ColumnConfig; + +import javax.annotation.Nullable; + +public class DruidProcessingIndexesConfig +{ + @JsonProperty + private final double skipValueRangeIndexScale; + + @JsonProperty + private final double skipValuePredicateIndexScale; + + @JsonCreator + public DruidProcessingIndexesConfig( + @JsonProperty("skipValueRangeIndexScale") @Nullable Double skipValueRangeIndexScale, + @JsonProperty("skipValuePredicateIndexScale") @Nullable Double skipValuePredicateIndexScale + ) + { + this.skipValueRangeIndexScale = skipValueRangeIndexScale == null + ? ColumnConfig.DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE + : skipValueRangeIndexScale; + this.skipValuePredicateIndexScale = skipValuePredicateIndexScale == null + ? ColumnConfig.DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE + : skipValuePredicateIndexScale; + } + + public DruidProcessingIndexesConfig() + { + this(null, null); + } + + public double getSkipValueRangeIndexScale() + { + return skipValueRangeIndexScale; + } + + public double getSkipValuePredicateIndexScale() + { + return skipValuePredicateIndexScale; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java index ae5ca7ff8a30..62934d3d342f 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java @@ -26,9 +26,16 @@ public interface ColumnConfig { - ColumnConfig DEFAULT = new ColumnConfig() - { - }; + /** + * this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio + * of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped + * performing consistently faster than a full scan + value matcher + */ + double DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE = 0.08; + + double DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE = 0.08; + + ColumnConfig DEFAULT = new ColumnConfig() {}; ColumnConfig ALWAYS_USE_INDEXES = new ColumnConfig() { @@ -73,10 +80,7 @@ public double skipValuePredicateIndexScale() */ default double skipValueRangeIndexScale() { - // this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio - // of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped - // performing consistently faster than a full scan + value matcher - return 0.08; + return DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE; } /** @@ -109,6 +113,6 @@ default double skipValueRangeIndexScale() */ default double skipValuePredicateIndexScale() { - return 0.08; + return DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE; } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java index 9b87e1cd7b04..3f50a977f2e6 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java @@ -32,7 +32,7 @@ public class BasicMonitorSchedulerTest { - private final MonitorSchedulerConfig config = new MonitorSchedulerConfig() + private final DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig() { @Override public Duration getEmitterPeriod() diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java index d7ec354ff198..3a5cd531fe3b 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java @@ -93,7 +93,7 @@ class Monitor3 extends NoopMonitor ExecutorService executor = Mockito.mock(ExecutorService.class); final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( - Mockito.mock(MonitorSchedulerConfig.class), + Mockito.mock(DruidMonitorSchedulerConfig.class), Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor1, monitor2), CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), @@ -153,7 +153,7 @@ public Future answer(InvocationOnMock invocation) throws Exception Monitor monitor = Mockito.mock(Monitor.class); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( @@ -217,7 +217,7 @@ public Future answer(InvocationOnMock invocation) throws Exception Monitor monitor = Mockito.mock(Monitor.class); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( @@ -248,7 +248,7 @@ public void testStart_UnexpectedExceptionWhileMonitoring() throws InterruptedExc Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))) .thenThrow(new RuntimeException("Test throwing exception while monitoring")); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); CountDownLatch latch = new CountDownLatch(1); @@ -310,7 +310,7 @@ public void testStart_UnexpectedExceptionWhileScheduling() throws InterruptedExc { ExecutorService executor = Mockito.mock(ExecutorService.class); Monitor monitor = Mockito.mock(Monitor.class); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); CountDownLatch latch = new CountDownLatch(1); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index 40afa8349b35..4b90e81a5f69 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -34,7 +34,7 @@ public class MonitorSchedulerTest @Test public void testMonitorAndStopOnRemove() throws IOException { - MonitorSchedulerConfig infiniteFlushDelayConfig = new MonitorSchedulerConfig() + DruidMonitorSchedulerConfig infiniteFlushDelayConfig = new DruidMonitorSchedulerConfig() { @Override public Duration getEmitterPeriod() diff --git a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java index 8d9f71372801..e19fbf755402 100644 --- a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java @@ -19,22 +19,18 @@ package org.apache.druid.query; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Guice; import com.google.inject.Injector; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.config.Config; +import com.google.inject.ProvisionException; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.utils.JvmUtils; import org.apache.druid.utils.RuntimeInfo; -import org.hamcrest.CoreMatchers; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.skife.config.ConfigurationObjectFactory; -import java.util.Map; import java.util.Properties; /** @@ -46,47 +42,15 @@ public class DruidProcessingConfigTest private static final long DIRECT_SIZE = BUFFER_SIZE * (3L + 2L + 1L); private static final long HEAP_SIZE = BUFFER_SIZE * 2L; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize) - { - return makeInjector(numProcessors, directMemorySize, heapSize, new Properties(), null); - } - @AfterClass public static void teardown() { JvmUtils.resetTestsToDefaultRuntimeInfo(); } - private static Injector makeInjector( - int numProcessors, - long directMemorySize, - long heapSize, - Properties props, - Map replacements - ) - { - return Guice.createInjector( - binder -> { - binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize)); - binder.requestStaticInjection(JvmUtils.class); - ConfigurationObjectFactory factory = Config.createFactory(props); - DruidProcessingConfig config; - if (replacements != null) { - config = factory.buildWithReplacements( - DruidProcessingConfig.class, - replacements - ); - } else { - config = factory.build(DruidProcessingConfig.class); - } - binder.bind(ConfigurationObjectFactory.class).toInstance(factory); - binder.bind(DruidProcessingConfig.class).toInstance(config); - } - ); - } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testDefaultsMultiProcessor() @@ -124,7 +88,7 @@ public void testDefaultsLargeDirect() DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class); Assert.assertEquals( - DruidProcessingConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES, + DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES, config.intermediateComputeSizeBytes() ); } @@ -144,8 +108,7 @@ public void testReplacements() NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, - props, - ImmutableMap.of("base_path", "druid.processing") + props ); DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class); @@ -164,14 +127,19 @@ public void testInvalidSizeBytes() Properties props = new Properties(); props.setProperty("druid.processing.buffer.sizeBytes", "-1"); - expectedException.expectCause(CoreMatchers.isA(IAE.class)); - Injector injector = makeInjector( NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, - props, - ImmutableMap.of("base_path", "druid.processing") + props + ); + Throwable t = Assert.assertThrows( + ProvisionException.class, + () -> injector.getInstance(DruidProcessingConfig.class) + ); + Assert.assertTrue( + t.getMessage() + .contains("Cannot construct instance of `org.apache.druid.java.util.common.HumanReadableBytes`, problem: Invalid format of number: -1. Negative value is not allowed.") ); } @@ -184,13 +152,35 @@ public void testSizeBytesUpperLimit() NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, - props, - ImmutableMap.of("base_path", "druid.processing") + props + ); + Throwable t = Assert.assertThrows( + ProvisionException.class, + () -> injector.getInstance(DruidProcessingConfig.class) ); - DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class); - expectedException.expectMessage("druid.processing.buffer.sizeBytes must be less than 2GiB"); - config.intermediateComputeSizeBytes(); + Assert.assertTrue(t.getMessage().contains("druid.processing.buffer.sizeBytes must be less than 2GiB")); + } + + private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize) + { + return makeInjector(numProcessors, directMemorySize, heapSize, new Properties()); + } + private static Injector makeInjector( + int numProcessors, + long directMemorySize, + long heapSize, + Properties props + ) + { + Injector injector = new StartupInjectorBuilder().withProperties(props).add( + binder -> { + binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize)); + binder.requestStaticInjection(JvmUtils.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + } + ).build(); + return injector; } public static class MockRuntimeInfo extends RuntimeInfo diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 5c0e5efb7ea3..16ab97843881 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -55,9 +55,9 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; @@ -124,7 +124,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final CachePopulator cachePopulator; private final CacheConfig cacheConfig; private final DruidHttpClientConfig httpClientConfig; - private final DruidProcessingConfig processingConfig; + private final BrokerParallelMergeConfig parallelMergeConfig; private final ForkJoinPool pool; private final QueryScheduler scheduler; private final JoinableFactoryWrapper joinableFactoryWrapper; @@ -139,7 +139,7 @@ public CachingClusteredClient( CachePopulator cachePopulator, CacheConfig cacheConfig, @Client DruidHttpClientConfig httpClientConfig, - DruidProcessingConfig processingConfig, + BrokerParallelMergeConfig parallelMergeConfig, @Merging ForkJoinPool pool, QueryScheduler scheduler, JoinableFactoryWrapper joinableFactoryWrapper, @@ -153,7 +153,7 @@ public CachingClusteredClient( this.cachePopulator = cachePopulator; this.cacheConfig = cacheConfig; this.httpClientConfig = httpClientConfig; - this.processingConfig = processingConfig; + this.parallelMergeConfig = parallelMergeConfig; this.pool = pool; this.scheduler = scheduler; this.joinableFactoryWrapper = joinableFactoryWrapper; @@ -386,7 +386,7 @@ private Sequence merge(List> sequencesByInterval) { BinaryOperator mergeFn = toolChest.createMergeFn(query); final QueryContext queryContext = query.context(); - if (processingConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) { + if (parallelMergeConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) { return new ParallelMergeCombiningSequence<>( pool, sequencesByInterval, @@ -395,10 +395,10 @@ private Sequence merge(List> sequencesByInterval) queryContext.hasTimeout(), queryContext.getTimeout(), queryContext.getPriority(), - queryContext.getParallelMergeParallelism(processingConfig.getMergePoolDefaultMaxQueryParallelism()), - queryContext.getParallelMergeInitialYieldRows(processingConfig.getMergePoolTaskInitialYieldRows()), - queryContext.getParallelMergeSmallBatchRows(processingConfig.getMergePoolSmallBatchRows()), - processingConfig.getMergePoolTargetTaskRunTimeMillis(), + queryContext.getParallelMergeParallelism(parallelMergeConfig.getDefaultMaxQueryParallelism()), + queryContext.getParallelMergeInitialYieldRows(parallelMergeConfig.getInitialYieldNumRows()), + queryContext.getParallelMergeSmallBatchRows(parallelMergeConfig.getSmallBatchNumRows()), + parallelMergeConfig.getTargetRunTimeMillis(), reportMetrics -> { QueryMetrics queryMetrics = queryPlus.getQueryMetrics(); if (queryMetrics != null) { diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java index d5f18dd1787c..77572bae47a5 100644 --- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java @@ -39,13 +39,14 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.offheap.OffheapBufferGenerator; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; @@ -67,7 +68,9 @@ public class BrokerProcessingModule implements Module @Override public void configure(Binder binder) { - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -133,14 +136,14 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) @Provides @ManageLifecycle - public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config) + public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(BrokerParallelMergeConfig config) { return new LifecycleForkJoinPoolProvider( - config.getMergePoolParallelism(), + config.getParallelism(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t), true, - config.getMergePoolAwaitShutdownMillis() + config.getAwaitShutdownMillis() ); } diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index b02ae366f580..56a0fd0ede6f 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -38,7 +38,6 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.offheap.OffheapBufferGenerator; @@ -47,13 +46,13 @@ import org.apache.druid.query.MetricsEmittingQueryProcessingPool; import org.apache.druid.query.PrioritizedExecutorService; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; /** */ @@ -64,7 +63,8 @@ public class DruidProcessingModule implements Module @Override public void configure(Binder binder) { - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -135,26 +135,6 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) ); } - @Provides - @ManageLifecycle - public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config) - { - return new LifecycleForkJoinPoolProvider( - config.getMergePoolParallelism(), - ForkJoinPool.defaultForkJoinWorkerThreadFactory, - (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t), - true, - config.getMergePoolAwaitShutdownMillis() - ); - } - - @Provides - @Merging - public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider) - { - return poolProvider.getPool(); - } - private void verifyDirectMemory(DruidProcessingConfig config) { try { diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java similarity index 78% rename from server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java rename to server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java index 7e83817de26a..14dbb4977315 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java +++ b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java @@ -19,17 +19,17 @@ package org.apache.druid.guice; -import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Module; -import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.LegacyBrokerParallelMergeConfig; -public class DruidProcessingConfigModule implements Module +@Deprecated +public class LegacyBrokerParallelMergeConfigModule implements Module { @Override public void configure(Binder binder) { - ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); + ConfigProvider.bind(binder, LegacyBrokerParallelMergeConfig.class); } } diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index 59af28a4f541..ea0bc8df0ff6 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -29,12 +29,12 @@ import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.server.metrics.MetricsModule; import java.nio.ByteBuffer; @@ -53,7 +53,8 @@ public class RouterProcessingModule implements Module @Override public void configure(Binder binder) { - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -61,8 +62,8 @@ public void configure(Binder binder) @ManageLifecycle public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config) { - if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) { - log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured()); + if (config.isNumThreadsConfigured()) { + log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreads()); } return new ForwardingQueryProcessingPool(Execs.dummy()); } @@ -80,10 +81,10 @@ public NonBlockingPool getIntermediateResultsPool() @Merging public BlockingPool getMergeBufferPool(DruidProcessingConfig config) { - if (config.getNumMergeBuffersConfigured() != DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS) { + if (config.isNumMergeBuffersConfigured()) { log.error( "numMergeBuffers[%d] configured, that is ignored on Router", - config.getNumMergeBuffersConfigured() + config.getNumMergeBuffers() ); } return DummyBlockingPool.instance(); diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index 842abd0e5347..987b78a9faff 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -30,8 +30,6 @@ import org.apache.druid.discovery.DataNodeService; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.query.DruidProcessingConfig; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocation; import org.apache.druid.segment.loading.StorageLocationSelectorStrategy; @@ -56,9 +54,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); bindLocationSelectorStrategy(binder); - binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null)); - binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); } @Provides diff --git a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java index de4f3abbc8d2..d27fdf75b1e6 100644 --- a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java +++ b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java @@ -26,7 +26,6 @@ import org.apache.druid.guice.AnnouncerModule; import org.apache.druid.guice.CoordinatorDiscoveryModule; import org.apache.druid.guice.DruidInjectorBuilder; -import org.apache.druid.guice.DruidProcessingConfigModule; import org.apache.druid.guice.DruidSecondaryModule; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.ExtensionsModule; @@ -112,7 +111,6 @@ public CoreInjectorBuilder forServer() new MetricsModule(), new SegmentWriteOutMediumModule(), new ServerModule(), - new DruidProcessingConfigModule(), new StorageNodeModule(), new JettyServerModule(), new ExpressionModule(), diff --git a/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java new file mode 100644 index 000000000000..1f8d25a10c2d --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; + +public class BrokerParallelMergeConfig +{ + private static final Logger LOG = new Logger(BrokerParallelMergeConfig.class); + public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000; + + @JsonProperty + private final boolean useParallelMergePool; + @JsonProperty + private final int parallelism; + @JsonProperty + private final long awaitShutdownMillis; + @JsonProperty + private final int defaultMaxQueryParallelism; + @JsonProperty + private final int targetRunTimeMillis; + @JsonProperty + private final int initialYieldNumRows; + @JsonProperty + private final int smallBatchNumRows; + + @JsonCreator + public BrokerParallelMergeConfig( + @JsonProperty("useParallelMergePool") @Nullable Boolean useParallelMergePool, + @JsonProperty("parallelism") @Nullable Integer parallelism, + @JsonProperty("awaitShutdownMillis") @Nullable Long awaitShutdownMillis, + @JsonProperty("defaultMaxQueryParallelism") @Nullable Integer defaultMaxQueryParallelism, + @JsonProperty("targetRunTimeMillis") @Nullable Integer targetRunTimeMillis, + @JsonProperty("initialYieldNumRows") @Nullable Integer initialYieldNumRows, + @JsonProperty("smallBatchNumRows") @Nullable Integer smallBatchNumRows, + @JacksonInject LegacyBrokerParallelMergeConfig oldConfig + ) + { + if (parallelism == null) { + if (oldConfig == null || oldConfig.getMergePoolParallelism() == null) { + // assume 2 hyper-threads per core, so that this value is probably by default the number + // of physical cores * 1.5 + this.parallelism = (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75); + } else { + warnDeprecated( + "druid.processing.merge.pool.parallelism", + "druid.processing.merge.parallelism" + ); + this.parallelism = oldConfig.getMergePoolParallelism(); + } + } else { + this.parallelism = parallelism; + } + + // need at least 3 to do 2 layer merge + if (this.parallelism > 2) { + this.useParallelMergePool = useParallelMergePool == null || useParallelMergePool; + } else { + if (useParallelMergePool == null || useParallelMergePool) { + LOG.debug( + "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s", + parallelism + ); + } + this.useParallelMergePool = false; + } + + if (awaitShutdownMillis == null) { + if (oldConfig == null || oldConfig.getMergePoolAwaitShutdownMillis() == null) { + this.awaitShutdownMillis = DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS; + } else { + warnDeprecated( + "druid.processing.merge.pool.awaitShutdownMillis", + "druid.processing.merge.awaitShutdownMillis" + ); + this.awaitShutdownMillis = oldConfig.getMergePoolAwaitShutdownMillis(); + } + } else { + this.awaitShutdownMillis = awaitShutdownMillis; + } + + if (defaultMaxQueryParallelism == null) { + if (oldConfig == null || oldConfig.getMergePoolDefaultMaxQueryParallelism() == null) { + this.defaultMaxQueryParallelism = (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1); + } else { + warnDeprecated( + "druid.processing.merge.pool.defaultMaxQueryParallelism", + "druid.processing.merge.defaultMaxQueryParallelism" + ); + this.defaultMaxQueryParallelism = oldConfig.getMergePoolDefaultMaxQueryParallelism(); + } + } else { + this.defaultMaxQueryParallelism = defaultMaxQueryParallelism; + } + + if (targetRunTimeMillis == null) { + if (oldConfig == null || oldConfig.getMergePoolTargetTaskRunTimeMillis() == null) { + this.targetRunTimeMillis = ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS; + } else { + warnDeprecated( + "druid.processing.merge.task.targetRunTimeMillis", + "druid.processing.merge.targetRunTimeMillis" + ); + this.targetRunTimeMillis = oldConfig.getMergePoolTargetTaskRunTimeMillis(); + } + } else { + this.targetRunTimeMillis = targetRunTimeMillis; + } + + if (initialYieldNumRows == null) { + if (oldConfig == null || oldConfig.getMergePoolTaskInitialYieldRows() == null) { + this.initialYieldNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS; + } else { + warnDeprecated( + "druid.processing.merge.task.initialYieldNumRows", + "druid.processing.merge.initialYieldNumRows" + ); + this.initialYieldNumRows = oldConfig.getMergePoolTaskInitialYieldRows(); + } + } else { + this.initialYieldNumRows = initialYieldNumRows; + } + + if (smallBatchNumRows == null) { + if (oldConfig == null || oldConfig.getMergePoolSmallBatchRows() == null) { + this.smallBatchNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS; + } else { + warnDeprecated( + "druid.processing.merge.task.smallBatchNumRows", + "druid.processing.merge.smallBatchNumRows" + ); + this.smallBatchNumRows = oldConfig.getMergePoolSmallBatchRows(); + } + } else { + this.smallBatchNumRows = smallBatchNumRows; + } + } + + @VisibleForTesting + public BrokerParallelMergeConfig() + { + this(null, null, null, null, null, null, null, null); + } + + public boolean useParallelMergePool() + { + return useParallelMergePool; + } + + public int getParallelism() + { + return parallelism; + } + + public long getAwaitShutdownMillis() + { + return awaitShutdownMillis; + } + + public int getDefaultMaxQueryParallelism() + { + return defaultMaxQueryParallelism; + } + + public int getTargetRunTimeMillis() + { + return targetRunTimeMillis; + } + + public int getInitialYieldNumRows() + { + return initialYieldNumRows; + } + + public int getSmallBatchNumRows() + { + return smallBatchNumRows; + } + + private static void warnDeprecated(String oldPath, String newPath) + { + LOG.warn( + "Using deprecated config [%s] which has been replace by [%s]. This path is deprecated and will be " + + "removed in a future release, please transition to using [%s]", + oldPath, + newPath, + newPath + ); + } +} diff --git a/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java new file mode 100644 index 000000000000..532fb5f5a920 --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.skife.config.Config; + +import javax.annotation.Nullable; + +@Deprecated +public abstract class LegacyBrokerParallelMergeConfig +{ + @Nullable + @Config(value = "druid.processing.merge.pool.parallelism") + public Integer getMergePoolParallelism() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.pool.awaitShutdownMillis") + public Long getMergePoolAwaitShutdownMillis() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.pool.defaultMaxQueryParallelism") + public Integer getMergePoolDefaultMaxQueryParallelism() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.task.targetRunTimeMillis") + public Integer getMergePoolTargetTaskRunTimeMillis() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.task.initialYieldNumRows") + public Integer getMergePoolTaskInitialYieldRows() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.task.smallBatchNumRows") + public Integer getMergePoolSmallBatchRows() + { + return null; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java deleted file mode 100644 index 1f6bc47dda42..000000000000 --- a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.realtime; - -import org.skife.config.Config; - -public abstract class DbSegmentPublisherConfig -{ - @Config("druid.metadata.storage.tables.segments") - public abstract String getSegmentTable(); -} diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 46c0fc90d89d..d486ae4fb8e9 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.BasicMonitorScheduler; import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.JvmCpuMonitor; import org.apache.druid.java.util.metrics.JvmMonitor; import org.apache.druid.java.util.metrics.JvmThreadsMonitor; diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index f77baa481200..3790cf298109 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -38,7 +38,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -318,16 +318,10 @@ public long getMaxQueuedBytes() return 0L; } }, - new DruidProcessingConfig() + new BrokerParallelMergeConfig() { @Override - public String getFormatString() - { - return null; - } - - @Override - public int getMergePoolParallelism() + public int getParallelism() { // fixed so same behavior across all test environments return 4; diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java index 95fce2060ecd..29ea0e3ffdc0 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -34,8 +34,8 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.TestSequence; import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DataSource; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -136,7 +136,7 @@ public void testGetQueryRunnerForSegments_singleIntervalLargeSegments() Mockito.mock(CachePopulator.class), new CacheConfig(), Mockito.mock(DruidHttpClientConfig.class), - Mockito.mock(DruidProcessingConfig.class), + Mockito.mock(BrokerParallelMergeConfig.class), ForkJoinPool.commonPool(), queryScheduler, JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index ca4547858948..aec7b3ae6c53 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -67,8 +67,8 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.nary.TrinaryFn; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentResultValueClass; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.Query; @@ -2829,16 +2829,10 @@ public long getMaxQueuedBytes() return 0L; } }, - new DruidProcessingConfig() + new BrokerParallelMergeConfig() { @Override - public String getFormatString() - { - return null; - } - - @Override - public int getMergePoolParallelism() + public int getParallelism() { // fixed so same behavior across all test environments return 4; diff --git a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java index 3ce5db12a333..7fca81aaea30 100644 --- a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java +++ b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java @@ -28,7 +28,10 @@ import org.apache.druid.client.cache.CachePopulator; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.initialization.Initialization; +import org.apache.druid.java.util.common.config.Config; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.LegacyBrokerParallelMergeConfig; import org.apache.druid.utils.JvmUtils; import org.junit.Assert; import org.junit.Assume; @@ -37,14 +40,14 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; @RunWith(MockitoJUnitRunner.class) public class BrokerProcessingModuleTest { - private static final boolean INJECT_SERVER_TYPE_CONFIG = true; - @Mock - private DruidProcessingConfig druidProcessingConfig; private Injector injector; private BrokerProcessingModule target; @Mock @@ -56,12 +59,13 @@ public class BrokerProcessingModuleTest public void setUp() { target = new BrokerProcessingModule(); - injector = makeInjector(INJECT_SERVER_TYPE_CONFIG); + injector = makeInjector(new Properties()); } @Test public void testIntermediateResultsPool() { + DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class); target.getIntermediateResultsPool(druidProcessingConfig); } @@ -69,23 +73,30 @@ public void testIntermediateResultsPool() @Test public void testMergeBufferPool() { + DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class); target.getMergeBufferPool(druidProcessingConfig); } @Test public void testMergeProcessingPool() { - DruidProcessingConfig config = new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "processing-test-%s"; - } - }; - DruidProcessingModule module = new DruidProcessingModule(); + BrokerParallelMergeConfig config = injector.getInstance(BrokerParallelMergeConfig.class); + BrokerProcessingModule module = new BrokerProcessingModule(); module.getMergeProcessingPoolProvider(config); - config.getNumInitalBuffersForIntermediatePool(); + } + + @Test + public void testMergeProcessingPoolLegacyConfigs() + { + Properties props = new Properties(); + props.put("druid.processing.merge.pool.parallelism", "10"); + props.put("druid.processing.merge.pool.defaultMaxQueryParallelism", "10"); + props.put("druid.processing.merge.task.targetRunTimeMillis", "1000"); + Injector gadget = makeInjector(props); + BrokerParallelMergeConfig config = gadget.getInstance(BrokerParallelMergeConfig.class); + Assert.assertEquals(10, config.getParallelism()); + Assert.assertEquals(10, config.getDefaultMaxQueryParallelism()); + Assert.assertEquals(1000, config.getTargetRunTimeMillis()); } @Test @@ -93,7 +104,6 @@ public void testCachePopulatorAsSingleton() { CachePopulator cachePopulator = injector.getInstance(CachePopulator.class); Assert.assertNotNull(cachePopulator); - } @Test(expected = ProvisionException.class) @@ -107,43 +117,40 @@ public void testMemoryCheckThrowsException() catch (UnsupportedOperationException e) { Assume.assumeNoException(e); } + Properties props = new Properties(); + props.setProperty("druid.processing.buffer.sizeBytes", "3GiB"); + Injector injector1 = makeInjector(props); + DruidProcessingConfig processingBufferConfig = injector1.getInstance(DruidProcessingConfig.class); BrokerProcessingModule module = new BrokerProcessingModule(); - module.getMergeBufferPool(new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "test"; - } - - @Override - public int intermediateComputeSizeBytes() - { - return Integer.MAX_VALUE; - } - }); + module.getMergeBufferPool(processingBufferConfig); } - private Injector makeInjector(boolean withServerTypeConfig) + private Injector makeInjector(Properties props) { - return Initialization.makeInjectorWithModules( - GuiceInjectors.makeStartupInjector(), (ImmutableList.of(Modules.override( - (binder) -> { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig); - }, - target - ).with( - (binder) -> { + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + Modules.override( + (binder) -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); + binder.bind(Properties.class).toInstance(props); + ConfigurationObjectFactory factory = Config.createFactory(props); + LegacyBrokerParallelMergeConfig legacyConfig = factory.build(LegacyBrokerParallelMergeConfig.class); + binder.bind(ConfigurationObjectFactory.class).toInstance(factory); + binder.bind(LegacyBrokerParallelMergeConfig.class).toInstance(legacyConfig); + }, + target + ).with((binder) -> { binder.bind(CachePopulatorStats.class).toInstance(cachePopulatorStats); binder.bind(CacheConfig.class).toInstance(cacheConfig); - } + }) ) - ))); + ); + return injector; } - } diff --git a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java index b51f2c14853d..8fad8924bf88 100644 --- a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java +++ b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java @@ -77,7 +77,7 @@ private Injector makeInjector(Properties properties) new JacksonModule(), new ConfigModule(), new QueryRunnerFactoryModule(), - new DruidProcessingConfigModule(), + new LegacyBrokerParallelMergeConfigModule(), new BrokerProcessingModule(), new LifecycleModule(), binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class), diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index d07bfdef345b..843a4b031497 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -142,10 +142,7 @@ public void setupTestBase() new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0), new CacheConfig(), new DruidHttpClientConfig(), - QueryStackTests.getProcessingConfig( - USE_PARALLEL_MERGE_POOL_CONFIGURED, - DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS - ), + QueryStackTests.getParallelMergeConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 1c23edf3b9a8..540cf0a57ae6 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -24,6 +24,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; @@ -100,6 +101,9 @@ public class QueryStackTests NoQueryLaningStrategy.INSTANCE, new ServerConfig() ); + + public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; + private static final ServiceEmitter EMITTER = new NoopServiceEmitter(); private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; @@ -187,6 +191,18 @@ public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( ); } + public static BrokerParallelMergeConfig getParallelMergeConfig( + boolean useParallelMergePoolConfigured + ) + { + return new BrokerParallelMergeConfig() { + @Override + public boolean useParallelMergePool() + { + return useParallelMergePoolConfigured; + } + }; + } public static DruidProcessingConfig getProcessingConfig( boolean useParallelMergePoolConfigured, final int mergeBuffers @@ -221,12 +237,6 @@ public int getNumMergeBuffers() } return mergeBuffers; } - - @Override - public boolean useParallelMergePoolConfigured() - { - return useParallelMergePoolConfigured; - } }; } @@ -256,7 +266,7 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat closer, getProcessingConfig( useParallelMergePoolConfigured, - DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS + DEFAULT_NUM_MERGE_BUFFERS ), minTopNThresholdSupplier ); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 6bbcdf7f3a70..dd9c3c3c0b59 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -45,6 +45,7 @@ import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.QueryRunnerFactoryModule; @@ -108,6 +109,7 @@ protected Set getNodeRoles(Properties properties) protected List getModules() { return ImmutableList.of( + new LegacyBrokerParallelMergeConfigModule(), new BrokerProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java index 566f89d5fdf3..afbd58ab9546 100644 --- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java +++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java @@ -32,7 +32,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; @@ -55,7 +54,6 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DirectQueryProcessingPool; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -81,7 +79,6 @@ import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.BaseColumn; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnType; @@ -733,39 +730,10 @@ protected List getModules() new DruidProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - binder.bind(DruidProcessingConfig.class).toInstance( - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "processing-%s"; - } - - @Override - public int intermediateComputeSizeBytes() - { - return 100 * 1024 * 1024; - } - - @Override - public int getNumThreads() - { - return 1; - } - - } - ); - binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); - } + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); } ); } diff --git a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java index 925d7c81a801..f9e2ce627b48 100644 --- a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java +++ b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java @@ -23,7 +23,6 @@ import com.github.rvesse.airline.annotations.Command; import com.github.rvesse.airline.annotations.restrictions.Required; import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.name.Names; @@ -32,9 +31,7 @@ import org.apache.druid.guice.QueryableModule; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.column.ColumnConfig; import java.io.File; import java.util.List; @@ -85,39 +82,10 @@ protected List getModules() new DruidProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - binder.bind(DruidProcessingConfig.class).toInstance( - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "processing-%s"; - } - - @Override - public int intermediateComputeSizeBytes() - { - return 100 * 1024 * 1024; - } - - @Override - public int getNumThreads() - { - return 1; - } - - } - ); - binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); - } + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); } ); }