diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 6c9c6aa3d48b..e38ee8862dcb 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfig; @@ -103,7 +104,6 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; -import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -258,12 +258,6 @@ public int getNumThreads() { return numProcessingThreads; } - - @Override - public boolean useParallelMergePool() - { - return true; - } }; conglomerate = new DefaultQueryRunnerFactoryConglomerate( @@ -339,10 +333,15 @@ public > QueryToolChest getToolChest new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0), new CacheConfig(), new DruidHttpClientConfig(), - processingConfig, + new BrokerParallelMergeConfig() { + @Override + public boolean useParallelMergePool() + { + return true; + } + }, forkJoinPool, QueryStackTests.DEFAULT_NOOP_SCHEDULER, - JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index 7733281908f0..80d208c5a433 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -106,13 +106,6 @@ public int getNumThreads() { return 1; } - - @Override - public boolean useParallelMergePoolConfigured() - { - return true; - } - @Override public String getFormatString() { diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java index 98514512e9ab..da42aaeefcdd 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java @@ -112,13 +112,7 @@ public int getNumThreads() { return 1; } - - @Override - public boolean useParallelMergePoolConfigured() - { - return true; - } - + @Override public String getFormatString() { diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 2945bdb1e569..223048200f59 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -54,7 +54,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -69,7 +69,6 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; @@ -363,17 +362,9 @@ public long getMaxQueuedBytes() return 0L; } }, - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return null; - } - }, + new BrokerParallelMergeConfig(), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())), new NoopServiceEmitter() ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index d79d5f3211c4..ecbe45d19221 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -55,8 +55,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.joda.time.DateTime; import javax.annotation.Nullable; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index af6f69ab2ec2..c63c74674600 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -33,9 +33,9 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 7874a11c588d..136fb0f7a1f6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -30,10 +30,10 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.junit.Assert; import org.junit.Test; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7ee0ed9f1e43..c00958ef0b90 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -80,6 +80,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; @@ -89,7 +90,6 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.FireDepartment; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.Action; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index eb9715f9c9e4..4aa60ea0327b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -36,9 +36,9 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index e4890614085c..127683f8e4fe 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -76,6 +76,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -85,7 +86,6 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.FireDepartment; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.Action; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index a72ba05eacad..f8b41fce610f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -4077,13 +4077,13 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec) reportingExec.scheduleAtFixedRate( this::emitLag, ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up - spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), + spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS ); reportingExec.scheduleAtFixedRate( this::emitNoticesQueueSize, ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up - spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), + spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 90b0b705637c..b7d4ba2f2772 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -35,9 +35,9 @@ import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import javax.annotation.Nullable; import java.util.List; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 05e6401a8f6b..f0b17657d885 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -55,13 +55,13 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.joda.time.DateTime; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index c592b9375fda..46f86bacb46b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -77,7 +78,6 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -102,6 +102,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -501,7 +502,7 @@ public void testIdleStateTransition() throws Exception EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { @Override - public Duration getEmitterPeriod() + public Duration getEmissionDuration() { return new Period("PT1S").toStandardDuration(); } @@ -1062,6 +1063,29 @@ public void testStaleOffsetsNegativeLagNotEmitted() throws Exception Assert.assertEquals(0, emitter.getEvents().size()); } + @Test + public void testScheduleReporting() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2); + ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class); + EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once(); + EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2); + + EasyMock.replay(executorService, spec); + final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor() + { + @Override + public LagStats computeLagStats() + { + return new LagStats(0, 0, 0); + } + }; + supervisor.scheduleReporting(executorService); + EasyMock.verify(executorService, spec); + } + private List filterMetrics(List events, List whitelist) { List result = events.stream() @@ -1098,7 +1122,7 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { @Override - public Duration getEmitterPeriod() + public Duration getEmissionDuration() { return new Period("PT1S").toStandardDuration(); } @@ -1608,13 +1632,13 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec) reportingExec.scheduleAtFixedRate( this::emitLag, ioConfig.getStartDelay().getMillis(), - spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), + spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS ); reportingExec.scheduleAtFixedRate( this::emitNoticesQueueSize, ioConfig.getStartDelay().getMillis(), - spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), + spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS ); } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java index 7a2eae93e16a..7931115c6eb2 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java @@ -35,9 +35,9 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.AnnouncerModule; -import org.apache.druid.guice.DruidProcessingConfigModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.SQLMetadataStorageDruidModule; @@ -496,7 +496,7 @@ private static Injector makeInjector( new AnnouncerModule(), new DiscoveryModule(), // Dependencies from other modules - new DruidProcessingConfigModule(), + new LegacyBrokerParallelMergeConfigModule(), // Dependencies from other modules new StorageNodeModule(), diff --git a/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java b/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java index 538f2a61375e..66cc6261c9f3 100644 --- a/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java +++ b/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java @@ -31,6 +31,7 @@ /** * */ +@Deprecated public class ConfigProvider implements Provider { private static final Logger log = new Logger(ConfigProvider.class); @@ -40,11 +41,6 @@ public static void bind(Binder binder, Class clazz) binder.bind(clazz).toProvider(of(clazz)).in(LazySingleton.class); } - public static void bind(Binder binder, Class clazz, Map replacements) - { - binder.bind(clazz).toProvider(of(clazz, replacements)).in(LazySingleton.class); - } - public static Provider of(Class clazz) { return of(clazz, null); diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java deleted file mode 100644 index 6d6327fbdd28..000000000000 --- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.common.concurrent; - -import org.apache.druid.utils.JvmUtils; -import org.skife.config.Config; -import org.skife.config.Default; - -/** - */ -public abstract class ExecutorServiceConfig -{ - public static final int DEFAULT_NUM_THREADS = -1; - - @Config(value = "${base_path}.formatString") - @Default("processing-%s") - public abstract String getFormatString(); - - public int getNumThreads() - { - int numThreadsConfigured = getNumThreadsConfigured(); - if (numThreadsConfigured != DEFAULT_NUM_THREADS) { - return numThreadsConfigured; - } else { - return Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1); - } - } - - /** - * Returns the number of threads _explicitly_ configured, or -1 if it is not explicitly configured, that is not - * a valid number of threads. To get the configured value or the default (valid) number, use {@link #getNumThreads()}. - * This method exists for ability to distinguish between the default value set when there is no explicit config, and - * an explicitly configured value. - */ - @Config(value = "${base_path}.numThreads") - public int getNumThreadsConfigured() - { - return DEFAULT_NUM_THREADS; - } -} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java index a0a9d7f645fd..1ccb9105620c 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java @@ -52,7 +52,7 @@ class AllocationMetricCollector * Tests show the call to getThreadAllocatedBytes for a single thread ID out of 500 threads running takes around * 9000 ns (in the worst case), which for 500 IDs should take 500*9000/1000/1000 = 4.5 ms to the max. * AllocationMetricCollector takes linear time to calculate delta, for 500 threads it's negligible. - * See the default emitting period {@link MonitorSchedulerConfig#getEmitterPeriod}. + * See the default emitting period {@link DruidMonitorSchedulerConfig#getEmissionDuration}. * * @return all threads summed allocated bytes delta */ diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java index 0d5c07d49b4b..c57bbc0dd1d0 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java @@ -34,7 +34,7 @@ public class BasicMonitorScheduler extends MonitorScheduler private final ScheduledExecutorService exec; public BasicMonitorScheduler( - MonitorSchedulerConfig config, + DruidMonitorSchedulerConfig config, ServiceEmitter emitter, List monitors, ScheduledExecutorService exec @@ -50,7 +50,7 @@ void startMonitor(Monitor monitor) monitor.start(); ScheduledExecutors.scheduleAtFixedRate( exec, - getConfig().getEmitterPeriod(), + getConfig().getEmissionDuration(), () -> { if (hasMonitor(monitor) && monitor.monitor(getEmitter())) { return Signal.REPEAT; diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java index 6fcb24406971..68b7bacedd53 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java @@ -41,7 +41,7 @@ public class ClockDriftSafeMonitorScheduler extends MonitorScheduler private final ExecutorService monitorRunner; public ClockDriftSafeMonitorScheduler( - MonitorSchedulerConfig config, + DruidMonitorSchedulerConfig config, ServiceEmitter emitter, List monitors, CronScheduler monitorScheduler, @@ -57,7 +57,7 @@ public ClockDriftSafeMonitorScheduler( void startMonitor(final Monitor monitor) { monitor.start(); - long rate = getConfig().getEmitterPeriod().getMillis(); + long rate = getConfig().getEmissionDuration().getMillis(); final AtomicReference> futureReference = new AtomicReference<>(); Future future = monitorScheduler.scheduleAtFixedRate( rate, diff --git a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java similarity index 76% rename from server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java rename to processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java index 0e242b1244c1..96a554e6efc3 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java @@ -17,17 +17,15 @@ * under the License. */ -package org.apache.druid.server.metrics; +package org.apache.druid.java.util.metrics; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.java.util.metrics.BasicMonitorScheduler; -import org.apache.druid.java.util.metrics.MonitorSchedulerConfig; import org.joda.time.Duration; import org.joda.time.Period; /** */ -public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig +public class DruidMonitorSchedulerConfig { @JsonProperty private String schedulerClassName = BasicMonitorScheduler.class.getName(); @@ -40,14 +38,7 @@ public String getSchedulerClassName() return schedulerClassName; } - @JsonProperty - public Period getEmissionPeriod() - { - return emissionPeriod; - } - - @Override - public Duration getEmitterPeriod() + public Duration getEmissionDuration() { return emissionPeriod.toStandardDuration(); } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 9e159182a1d2..47bb37d16110 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -39,7 +39,7 @@ public abstract class MonitorScheduler { private static final Logger log = new Logger(MonitorScheduler.class); - private final MonitorSchedulerConfig config; + private final DruidMonitorSchedulerConfig config; private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); @@ -47,7 +47,7 @@ public abstract class MonitorScheduler private volatile boolean started = false; MonitorScheduler( - MonitorSchedulerConfig config, + DruidMonitorSchedulerConfig config, ServiceEmitter emitter, List monitors ) @@ -135,7 +135,7 @@ boolean hasMonitor(final Monitor monitor) } } - MonitorSchedulerConfig getConfig() + DruidMonitorSchedulerConfig getConfig() { return config; } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java deleted file mode 100644 index 8d0e165e2add..000000000000 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.metrics; - -import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; - -/** - */ -public abstract class MonitorSchedulerConfig -{ - @Config({"org.apache.druid.java.util.metrics.emitter.period", "com.metamx.druid.emitter.period"}) - @Default("PT60s") - public abstract Duration getEmitterPeriod(); -} diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java new file mode 100644 index 000000000000..ac6b63c8ad79 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import org.apache.druid.java.util.common.HumanReadableBytes; + +import javax.annotation.Nullable; + +public class DruidProcessingBufferConfig +{ + public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1); + public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024; + public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0; + + @JsonProperty + private final HumanReadableBytes sizeBytes; + + @JsonProperty + private final int poolCacheMaxCount; + + @JsonProperty + private final int poolCacheInitialCount; + + @JsonCreator + public DruidProcessingBufferConfig( + @JsonProperty("sizeBytes") @Nullable HumanReadableBytes sizeBytes, + @JsonProperty("poolCacheMaxCount") @Nullable Integer poolCacheMaxCount, + @JsonProperty("poolCacheInitialCount") @Nullable Integer poolCacheInitialCount + ) + { + this.sizeBytes = Configs.valueOrDefault(sizeBytes, DEFAULT_PROCESSING_BUFFER_SIZE_BYTES); + this.poolCacheInitialCount = Configs.valueOrDefault( + poolCacheInitialCount, + DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL + ); + this.poolCacheMaxCount = Configs.valueOrDefault(poolCacheMaxCount, Integer.MAX_VALUE); + } + + public DruidProcessingBufferConfig() + { + this(null, null, null); + } + + public HumanReadableBytes getBufferSize() + { + return sizeBytes; + } + + public int getPoolCacheMaxCount() + { + return poolCacheMaxCount; + } + + public int getPoolCacheInitialCount() + { + return poolCacheInitialCount; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java index 823c9e71efbc..ca645d16e979 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java @@ -19,45 +19,82 @@ package org.apache.druid.query; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.common.config.Configs; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; -import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.utils.JvmUtils; -import org.skife.config.Config; +import javax.annotation.Nullable; import java.util.concurrent.atomic.AtomicReference; -public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig +public class DruidProcessingConfig implements ColumnConfig { private static final Logger log = new Logger(DruidProcessingConfig.class); - public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; - public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1); - public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024; - public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000; - public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0; - - private AtomicReference computedBufferSizeBytes = new AtomicReference<>(); - - @Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"}) - public HumanReadableBytes intermediateComputeSizeBytesConfigured() - { - return DEFAULT_PROCESSING_BUFFER_SIZE_BYTES; - } - - public int intermediateComputeSizeBytes() - { - HumanReadableBytes sizeBytesConfigured = intermediateComputeSizeBytesConfigured(); - if (!DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) { + @JsonProperty + private final String formatString; + @JsonProperty + private final int numThreads; + @JsonProperty + private final int numMergeBuffers; + @JsonProperty + private final boolean fifo; + @JsonProperty + private final String tmpDir; + @JsonProperty + private final DruidProcessingBufferConfig buffer; + @JsonProperty + private final DruidProcessingIndexesConfig indexes; + private final AtomicReference computedBufferSizeBytes = new AtomicReference<>(); + private final boolean numThreadsConfigured; + private final boolean numMergeBuffersConfigured; + + @JsonCreator + public DruidProcessingConfig( + @JsonProperty("formatString") @Nullable String formatString, + @JsonProperty("numThreads") @Nullable Integer numThreads, + @JsonProperty("numMergeBuffers") @Nullable Integer numMergeBuffers, + @JsonProperty("fifo") @Nullable Boolean fifo, + @JsonProperty("tmpDir") @Nullable String tmpDir, + @JsonProperty("buffer") DruidProcessingBufferConfig buffer, + @JsonProperty("indexes") DruidProcessingIndexesConfig indexes + ) + { + this.formatString = Configs.valueOrDefault(formatString, "processing-%s"); + this.numThreads = Configs.valueOrDefault( + numThreads, + Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1) + ); + this.numMergeBuffers = Configs.valueOrDefault(numMergeBuffers, Math.max(2, this.numThreads / 4)); + this.fifo = fifo == null || fifo; + this.tmpDir = Configs.valueOrDefault(tmpDir, System.getProperty("java.io.tmpdir")); + this.buffer = Configs.valueOrDefault(buffer, new DruidProcessingBufferConfig()); + this.indexes = Configs.valueOrDefault(indexes, new DruidProcessingIndexesConfig()); + + this.numThreadsConfigured = numThreads != null; + this.numMergeBuffersConfigured = numMergeBuffers != null; + initializeBufferSize(); + } + + @VisibleForTesting + public DruidProcessingConfig() + { + this(null, null, null, null, null, null, null); + } + + private void initializeBufferSize() + { + HumanReadableBytes sizeBytesConfigured = this.buffer.getBufferSize(); + if (!DruidProcessingBufferConfig.DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) { if (sizeBytesConfigured.getBytes() > Integer.MAX_VALUE) { throw new IAE("druid.processing.buffer.sizeBytes must be less than 2GiB"); } - return sizeBytesConfigured.getBytesInInt(); - } else if (computedBufferSizeBytes.get() != null) { - return computedBufferSizeBytes.get(); + computedBufferSizeBytes.set(sizeBytesConfigured.getBytesInInt()); } long directSizeBytes; @@ -70,173 +107,90 @@ public int intermediateComputeSizeBytes() } catch (UnsupportedOperationException e) { // max direct memory defaults to max heap size on recent JDK version, unless set explicitly - directSizeBytes = computeMaxMemoryFromMaxHeapSize(); + directSizeBytes = Runtime.getRuntime().maxMemory() / 4; log.info("Using up to [%,d] bytes of direct memory for computation buffers.", directSizeBytes); } - int numProcessingThreads = getNumThreads(); - int numMergeBuffers = getNumMergeBuffers(); - int totalNumBuffers = numMergeBuffers + numProcessingThreads; + int totalNumBuffers = this.numMergeBuffers + this.numThreads; int sizePerBuffer = (int) ((double) directSizeBytes / (double) (totalNumBuffers + 1)); - final int computedSizePerBuffer = Math.min(sizePerBuffer, MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES); + final int computedSizePerBuffer = Math.min( + sizePerBuffer, + DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES + ); if (computedBufferSizeBytes.compareAndSet(null, computedSizePerBuffer)) { log.info( "Auto sizing buffers to [%,d] bytes each for [%,d] processing and [%,d] merge buffers. " + "If you run out of direct memory, you may need to set these parameters explicitly using the guidelines at " + "https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html#processing-threads-buffers.", computedSizePerBuffer, - numProcessingThreads, - numMergeBuffers + this.numThreads, + this.numMergeBuffers ); } - return computedSizePerBuffer; - } - - public static long computeMaxMemoryFromMaxHeapSize() - { - return Runtime.getRuntime().maxMemory() / 4; } - @Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"}) - public int poolCacheMaxCount() + public String getFormatString() { - return Integer.MAX_VALUE; + return formatString; } - @Config({ - "druid.computation.buffer.poolCacheInitialCount", - "${base_path}.buffer.poolCacheInitialCount" - }) - public int getNumInitalBuffersForIntermediatePool() + public int getNumThreads() { - return DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL; - } - - @Override - @Config(value = "${base_path}.numThreads") - public int getNumThreadsConfigured() - { - return DEFAULT_NUM_THREADS; + return numThreads; } public int getNumMergeBuffers() { - int numMergeBuffersConfigured = getNumMergeBuffersConfigured(); - if (numMergeBuffersConfigured != DEFAULT_NUM_MERGE_BUFFERS) { - return numMergeBuffersConfigured; - } else { - return Math.max(2, getNumThreads() / 4); - } - } - - /** - * Returns the number of merge buffers _explicitly_ configured, or -1 if it is not explicitly configured, that is not - * a valid number of buffers. To get the configured value or the default (valid) number, use {@link - * #getNumMergeBuffers()}. This method exists for ability to distinguish between the default value set when there is - * no explicit config, and an explicitly configured value. - */ - @Config("${base_path}.numMergeBuffers") - public int getNumMergeBuffersConfigured() - { - return DEFAULT_NUM_MERGE_BUFFERS; - } - - @Override - @Config(value = "${base_path}.indexes.skipValueRangeIndexScale") - public double skipValueRangeIndexScale() - { - return ColumnConfig.super.skipValueRangeIndexScale(); - } - - @Override - @Config(value = "${base_path}.indexes.skipValuePredicateIndexScale") - public double skipValuePredicateIndexScale() - { - return ColumnConfig.super.skipValuePredicateIndexScale(); + return numMergeBuffers; } - @Config(value = "${base_path}.fifo") public boolean isFifo() { - return true; + return fifo; } - @Config(value = "${base_path}.tmpDir") public String getTmpDir() { - return System.getProperty("java.io.tmpdir"); + return tmpDir; } - @Config(value = "${base_path}.merge.useParallelMergePool") - public boolean useParallelMergePoolConfigured() + public int intermediateComputeSizeBytes() { - return true; - } - public boolean useParallelMergePool() - { - final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured(); - final int parallelism = getMergePoolParallelism(); - // need at least 3 to do 2 layer merge - if (parallelism > 2) { - return useParallelMergePoolConfigured; - } - if (useParallelMergePoolConfigured) { - log.debug( - "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s", - parallelism - ); - } - return false; - } - - @Config(value = "${base_path}.merge.pool.parallelism") - public int getMergePoolParallelismConfigured() - { - return DEFAULT_NUM_THREADS; + return computedBufferSizeBytes.get(); } - public int getMergePoolParallelism() + public int poolCacheMaxCount() { - int poolParallelismConfigured = getMergePoolParallelismConfigured(); - if (poolParallelismConfigured != DEFAULT_NUM_THREADS) { - return poolParallelismConfigured; - } else { - // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5 - return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75); - } + return buffer.getPoolCacheMaxCount(); } - @Config(value = "${base_path}.merge.pool.awaitShutdownMillis") - public long getMergePoolAwaitShutdownMillis() + public int getNumInitalBuffersForIntermediatePool() { - return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS; + return buffer.getPoolCacheInitialCount(); } - @Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism") - public int getMergePoolDefaultMaxQueryParallelism() + @Override + public double skipValueRangeIndexScale() { - // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores - return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1); + return indexes.getSkipValueRangeIndexScale(); } - @Config(value = "${base_path}.merge.task.targetRunTimeMillis") - public int getMergePoolTargetTaskRunTimeMillis() + @Override + public double skipValuePredicateIndexScale() { - return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS; + return indexes.getSkipValuePredicateIndexScale(); } - @Config(value = "${base_path}.merge.task.initialYieldNumRows") - public int getMergePoolTaskInitialYieldRows() + public boolean isNumThreadsConfigured() { - return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS; + return numThreadsConfigured; } - @Config(value = "${base_path}.merge.task.smallBatchNumRows") - public int getMergePoolSmallBatchRows() + public boolean isNumMergeBuffersConfigured() { - return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS; + return numMergeBuffersConfigured; } } diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java new file mode 100644 index 000000000000..6afa20418e9d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import org.apache.druid.segment.column.ColumnConfig; + +import javax.annotation.Nullable; + +public class DruidProcessingIndexesConfig +{ + @JsonProperty + private final double skipValueRangeIndexScale; + + @JsonProperty + private final double skipValuePredicateIndexScale; + + @JsonCreator + public DruidProcessingIndexesConfig( + @JsonProperty("skipValueRangeIndexScale") @Nullable Double skipValueRangeIndexScale, + @JsonProperty("skipValuePredicateIndexScale") @Nullable Double skipValuePredicateIndexScale + ) + { + this.skipValueRangeIndexScale = Configs.valueOrDefault( + skipValueRangeIndexScale, + ColumnConfig.DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE + ); + this.skipValuePredicateIndexScale = Configs.valueOrDefault( + skipValuePredicateIndexScale, + ColumnConfig.DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE + ); + } + + public DruidProcessingIndexesConfig() + { + this(null, null); + } + + public double getSkipValueRangeIndexScale() + { + return skipValueRangeIndexScale; + } + + public double getSkipValuePredicateIndexScale() + { + return skipValuePredicateIndexScale; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 3a9b8818cd0b..b61a78a7c9f6 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -159,6 +159,7 @@ public static SerializablePairLongString readPair(final ByteBuffer buf, final in Long timeValue = copyBuffer.getLong(); int stringSizeBytes = copyBuffer.getInt(); + if (stringSizeBytes >= 0) { byte[] valueBytes = new byte[stringSizeBytes]; copyBuffer.get(valueBytes, 0, stringSizeBytes); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index f2d57a5517e6..dd0ac9ab1177 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -79,6 +79,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -276,6 +277,10 @@ private static void validateRowValues( throw notEqualValidationException(dim1Name, vals1, vals2); } } + } else if (vals1 instanceof Object[]) { + if (!Arrays.deepEquals((Object[]) vals1, (Object[]) vals2)) { + throw notEqualValidationException(dim1Name, vals1, vals2); + } } else { if (!Objects.equals(vals1, vals2)) { throw notEqualValidationException(dim1Name, vals1, vals2); diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java index ae5ca7ff8a30..62934d3d342f 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java @@ -26,9 +26,16 @@ public interface ColumnConfig { - ColumnConfig DEFAULT = new ColumnConfig() - { - }; + /** + * this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio + * of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped + * performing consistently faster than a full scan + value matcher + */ + double DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE = 0.08; + + double DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE = 0.08; + + ColumnConfig DEFAULT = new ColumnConfig() {}; ColumnConfig ALWAYS_USE_INDEXES = new ColumnConfig() { @@ -73,10 +80,7 @@ public double skipValuePredicateIndexScale() */ default double skipValueRangeIndexScale() { - // this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio - // of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped - // performing consistently faster than a full scan + value matcher - return 0.08; + return DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE; } /** @@ -109,6 +113,6 @@ default double skipValueRangeIndexScale() */ default double skipValuePredicateIndexScale() { - return 0.08; + return DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE; } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java index 9b87e1cd7b04..ceac1e55644d 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java @@ -32,10 +32,10 @@ public class BasicMonitorSchedulerTest { - private final MonitorSchedulerConfig config = new MonitorSchedulerConfig() + private final DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig() { @Override - public Duration getEmitterPeriod() + public Duration getEmissionDuration() { return Duration.millis(5); } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java index d7ec354ff198..712fcbfbb62e 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java @@ -93,7 +93,7 @@ class Monitor3 extends NoopMonitor ExecutorService executor = Mockito.mock(ExecutorService.class); final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( - Mockito.mock(MonitorSchedulerConfig.class), + Mockito.mock(DruidMonitorSchedulerConfig.class), Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor1, monitor2), CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), @@ -153,8 +153,8 @@ public Future answer(InvocationOnMock invocation) throws Exception Monitor monitor = Mockito.mock(Monitor.class); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); - Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); + Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L)); final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( config, @@ -217,8 +217,8 @@ public Future answer(InvocationOnMock invocation) throws Exception Monitor monitor = Mockito.mock(Monitor.class); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); - Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); + Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L)); final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( config, @@ -248,8 +248,8 @@ public void testStart_UnexpectedExceptionWhileMonitoring() throws InterruptedExc Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))) .thenThrow(new RuntimeException("Test throwing exception while monitoring")); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); - Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); + Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L)); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean monitorResultHolder = new AtomicBoolean(false); @@ -310,8 +310,8 @@ public void testStart_UnexpectedExceptionWhileScheduling() throws InterruptedExc { ExecutorService executor = Mockito.mock(ExecutorService.class); Monitor monitor = Mockito.mock(Monitor.class); - MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); - Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class); + Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L)); CountDownLatch latch = new CountDownLatch(1); Mockito.doAnswer(new Answer>() diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index 40afa8349b35..9f9a67270b30 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -34,10 +34,10 @@ public class MonitorSchedulerTest @Test public void testMonitorAndStopOnRemove() throws IOException { - MonitorSchedulerConfig infiniteFlushDelayConfig = new MonitorSchedulerConfig() + DruidMonitorSchedulerConfig infiniteFlushDelayConfig = new DruidMonitorSchedulerConfig() { @Override - public Duration getEmitterPeriod() + public Duration getEmissionDuration() { return Duration.millis(Long.MAX_VALUE); } diff --git a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java index 8d9f71372801..e19fbf755402 100644 --- a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java +++ b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java @@ -19,22 +19,18 @@ package org.apache.druid.query; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Guice; import com.google.inject.Injector; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.config.Config; +import com.google.inject.ProvisionException; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.utils.JvmUtils; import org.apache.druid.utils.RuntimeInfo; -import org.hamcrest.CoreMatchers; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.skife.config.ConfigurationObjectFactory; -import java.util.Map; import java.util.Properties; /** @@ -46,47 +42,15 @@ public class DruidProcessingConfigTest private static final long DIRECT_SIZE = BUFFER_SIZE * (3L + 2L + 1L); private static final long HEAP_SIZE = BUFFER_SIZE * 2L; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize) - { - return makeInjector(numProcessors, directMemorySize, heapSize, new Properties(), null); - } - @AfterClass public static void teardown() { JvmUtils.resetTestsToDefaultRuntimeInfo(); } - private static Injector makeInjector( - int numProcessors, - long directMemorySize, - long heapSize, - Properties props, - Map replacements - ) - { - return Guice.createInjector( - binder -> { - binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize)); - binder.requestStaticInjection(JvmUtils.class); - ConfigurationObjectFactory factory = Config.createFactory(props); - DruidProcessingConfig config; - if (replacements != null) { - config = factory.buildWithReplacements( - DruidProcessingConfig.class, - replacements - ); - } else { - config = factory.build(DruidProcessingConfig.class); - } - binder.bind(ConfigurationObjectFactory.class).toInstance(factory); - binder.bind(DruidProcessingConfig.class).toInstance(config); - } - ); - } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testDefaultsMultiProcessor() @@ -124,7 +88,7 @@ public void testDefaultsLargeDirect() DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class); Assert.assertEquals( - DruidProcessingConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES, + DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES, config.intermediateComputeSizeBytes() ); } @@ -144,8 +108,7 @@ public void testReplacements() NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, - props, - ImmutableMap.of("base_path", "druid.processing") + props ); DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class); @@ -164,14 +127,19 @@ public void testInvalidSizeBytes() Properties props = new Properties(); props.setProperty("druid.processing.buffer.sizeBytes", "-1"); - expectedException.expectCause(CoreMatchers.isA(IAE.class)); - Injector injector = makeInjector( NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, - props, - ImmutableMap.of("base_path", "druid.processing") + props + ); + Throwable t = Assert.assertThrows( + ProvisionException.class, + () -> injector.getInstance(DruidProcessingConfig.class) + ); + Assert.assertTrue( + t.getMessage() + .contains("Cannot construct instance of `org.apache.druid.java.util.common.HumanReadableBytes`, problem: Invalid format of number: -1. Negative value is not allowed.") ); } @@ -184,13 +152,35 @@ public void testSizeBytesUpperLimit() NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, - props, - ImmutableMap.of("base_path", "druid.processing") + props + ); + Throwable t = Assert.assertThrows( + ProvisionException.class, + () -> injector.getInstance(DruidProcessingConfig.class) ); - DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class); - expectedException.expectMessage("druid.processing.buffer.sizeBytes must be less than 2GiB"); - config.intermediateComputeSizeBytes(); + Assert.assertTrue(t.getMessage().contains("druid.processing.buffer.sizeBytes must be less than 2GiB")); + } + + private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize) + { + return makeInjector(numProcessors, directMemorySize, heapSize, new Properties()); + } + private static Injector makeInjector( + int numProcessors, + long directMemorySize, + long heapSize, + Properties props + ) + { + Injector injector = new StartupInjectorBuilder().withProperties(props).add( + binder -> { + binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize)); + binder.requestStaticInjection(JvmUtils.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + } + ).build(); + return injector; } public static class MockRuntimeInfo extends RuntimeInfo diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 5c0e5efb7ea3..19df276344ef 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -55,9 +55,9 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; @@ -75,7 +75,6 @@ import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.QueryResource; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -124,10 +123,9 @@ public class CachingClusteredClient implements QuerySegmentWalker private final CachePopulator cachePopulator; private final CacheConfig cacheConfig; private final DruidHttpClientConfig httpClientConfig; - private final DruidProcessingConfig processingConfig; + private final BrokerParallelMergeConfig parallelMergeConfig; private final ForkJoinPool pool; private final QueryScheduler scheduler; - private final JoinableFactoryWrapper joinableFactoryWrapper; private final ServiceEmitter emitter; @Inject @@ -139,10 +137,9 @@ public CachingClusteredClient( CachePopulator cachePopulator, CacheConfig cacheConfig, @Client DruidHttpClientConfig httpClientConfig, - DruidProcessingConfig processingConfig, + BrokerParallelMergeConfig parallelMergeConfig, @Merging ForkJoinPool pool, QueryScheduler scheduler, - JoinableFactoryWrapper joinableFactoryWrapper, ServiceEmitter emitter ) { @@ -153,10 +150,9 @@ public CachingClusteredClient( this.cachePopulator = cachePopulator; this.cacheConfig = cacheConfig; this.httpClientConfig = httpClientConfig; - this.processingConfig = processingConfig; + this.parallelMergeConfig = parallelMergeConfig; this.pool = pool; this.scheduler = scheduler; - this.joinableFactoryWrapper = joinableFactoryWrapper; this.emitter = emitter; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { @@ -386,7 +382,7 @@ private Sequence merge(List> sequencesByInterval) { BinaryOperator mergeFn = toolChest.createMergeFn(query); final QueryContext queryContext = query.context(); - if (processingConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) { + if (parallelMergeConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) { return new ParallelMergeCombiningSequence<>( pool, sequencesByInterval, @@ -395,10 +391,10 @@ private Sequence merge(List> sequencesByInterval) queryContext.hasTimeout(), queryContext.getTimeout(), queryContext.getPriority(), - queryContext.getParallelMergeParallelism(processingConfig.getMergePoolDefaultMaxQueryParallelism()), - queryContext.getParallelMergeInitialYieldRows(processingConfig.getMergePoolTaskInitialYieldRows()), - queryContext.getParallelMergeSmallBatchRows(processingConfig.getMergePoolSmallBatchRows()), - processingConfig.getMergePoolTargetTaskRunTimeMillis(), + queryContext.getParallelMergeParallelism(parallelMergeConfig.getDefaultMaxQueryParallelism()), + queryContext.getParallelMergeInitialYieldRows(parallelMergeConfig.getInitialYieldNumRows()), + queryContext.getParallelMergeSmallBatchRows(parallelMergeConfig.getSmallBatchNumRows()), + parallelMergeConfig.getTargetRunTimeMillis(), reportMetrics -> { QueryMetrics queryMetrics = queryPlus.getQueryMetrics(); if (queryMetrics != null) { diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java index d5f18dd1787c..77572bae47a5 100644 --- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java @@ -39,13 +39,14 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.offheap.OffheapBufferGenerator; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; @@ -67,7 +68,9 @@ public class BrokerProcessingModule implements Module @Override public void configure(Binder binder) { - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -133,14 +136,14 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) @Provides @ManageLifecycle - public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config) + public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(BrokerParallelMergeConfig config) { return new LifecycleForkJoinPoolProvider( - config.getMergePoolParallelism(), + config.getParallelism(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t), true, - config.getMergePoolAwaitShutdownMillis() + config.getAwaitShutdownMillis() ); } diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index b02ae366f580..56a0fd0ede6f 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -38,7 +38,6 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.offheap.OffheapBufferGenerator; @@ -47,13 +46,13 @@ import org.apache.druid.query.MetricsEmittingQueryProcessingPool; import org.apache.druid.query.PrioritizedExecutorService; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; /** */ @@ -64,7 +63,8 @@ public class DruidProcessingModule implements Module @Override public void configure(Binder binder) { - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -135,26 +135,6 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) ); } - @Provides - @ManageLifecycle - public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config) - { - return new LifecycleForkJoinPoolProvider( - config.getMergePoolParallelism(), - ForkJoinPool.defaultForkJoinWorkerThreadFactory, - (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t), - true, - config.getMergePoolAwaitShutdownMillis() - ); - } - - @Provides - @Merging - public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider) - { - return poolProvider.getPool(); - } - private void verifyDirectMemory(DruidProcessingConfig config) { try { diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java similarity index 58% rename from server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java rename to server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java index 7e83817de26a..3d44cf23f72b 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java +++ b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java @@ -19,17 +19,24 @@ package org.apache.druid.guice; -import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Module; -import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.LegacyBrokerParallelMergeConfig; -public class DruidProcessingConfigModule implements Module +/** + * Backwards compatibility for runtime.properties for Druid 27 and older to make deprecated config paths of + * {@link LegacyBrokerParallelMergeConfig} still work for Druid 28. + * {@link org.apache.druid.query.BrokerParallelMergeConfig} has replaced these configs, and will warn when these + * deprecated paths are configured. This module should be removed in Druid 29, along with + * {@link LegacyBrokerParallelMergeConfig} as well as the config-magic library that makes it work. + */ +@Deprecated +public class LegacyBrokerParallelMergeConfigModule implements Module { @Override public void configure(Binder binder) { - ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); + ConfigProvider.bind(binder, LegacyBrokerParallelMergeConfig.class); } } diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index 59af28a4f541..3b68289c6dd1 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -29,12 +29,12 @@ import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.server.metrics.MetricsModule; import java.nio.ByteBuffer; @@ -53,7 +53,8 @@ public class RouterProcessingModule implements Module @Override public void configure(Binder binder) { - binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); + binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -61,8 +62,8 @@ public void configure(Binder binder) @ManageLifecycle public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config) { - if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) { - log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured()); + if (config.isNumThreadsConfigured()) { + log.warn("numThreads[%d] configured, that is ignored on Router", config.getNumThreads()); } return new ForwardingQueryProcessingPool(Execs.dummy()); } @@ -80,10 +81,10 @@ public NonBlockingPool getIntermediateResultsPool() @Merging public BlockingPool getMergeBufferPool(DruidProcessingConfig config) { - if (config.getNumMergeBuffersConfigured() != DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS) { - log.error( + if (config.isNumMergeBuffersConfigured()) { + log.warn( "numMergeBuffers[%d] configured, that is ignored on Router", - config.getNumMergeBuffersConfigured() + config.getNumMergeBuffers() ); } return DummyBlockingPool.instance(); diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index 842abd0e5347..cc4e0b1e33a4 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -56,7 +56,6 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); bindLocationSelectorStrategy(binder); - binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null)); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); } diff --git a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java index de4f3abbc8d2..d27fdf75b1e6 100644 --- a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java +++ b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java @@ -26,7 +26,6 @@ import org.apache.druid.guice.AnnouncerModule; import org.apache.druid.guice.CoordinatorDiscoveryModule; import org.apache.druid.guice.DruidInjectorBuilder; -import org.apache.druid.guice.DruidProcessingConfigModule; import org.apache.druid.guice.DruidSecondaryModule; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.ExtensionsModule; @@ -112,7 +111,6 @@ public CoreInjectorBuilder forServer() new MetricsModule(), new SegmentWriteOutMediumModule(), new ServerModule(), - new DruidProcessingConfigModule(), new StorageNodeModule(), new JettyServerModule(), new ExpressionModule(), diff --git a/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java new file mode 100644 index 000000000000..1f8d25a10c2d --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; + +public class BrokerParallelMergeConfig +{ + private static final Logger LOG = new Logger(BrokerParallelMergeConfig.class); + public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000; + + @JsonProperty + private final boolean useParallelMergePool; + @JsonProperty + private final int parallelism; + @JsonProperty + private final long awaitShutdownMillis; + @JsonProperty + private final int defaultMaxQueryParallelism; + @JsonProperty + private final int targetRunTimeMillis; + @JsonProperty + private final int initialYieldNumRows; + @JsonProperty + private final int smallBatchNumRows; + + @JsonCreator + public BrokerParallelMergeConfig( + @JsonProperty("useParallelMergePool") @Nullable Boolean useParallelMergePool, + @JsonProperty("parallelism") @Nullable Integer parallelism, + @JsonProperty("awaitShutdownMillis") @Nullable Long awaitShutdownMillis, + @JsonProperty("defaultMaxQueryParallelism") @Nullable Integer defaultMaxQueryParallelism, + @JsonProperty("targetRunTimeMillis") @Nullable Integer targetRunTimeMillis, + @JsonProperty("initialYieldNumRows") @Nullable Integer initialYieldNumRows, + @JsonProperty("smallBatchNumRows") @Nullable Integer smallBatchNumRows, + @JacksonInject LegacyBrokerParallelMergeConfig oldConfig + ) + { + if (parallelism == null) { + if (oldConfig == null || oldConfig.getMergePoolParallelism() == null) { + // assume 2 hyper-threads per core, so that this value is probably by default the number + // of physical cores * 1.5 + this.parallelism = (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75); + } else { + warnDeprecated( + "druid.processing.merge.pool.parallelism", + "druid.processing.merge.parallelism" + ); + this.parallelism = oldConfig.getMergePoolParallelism(); + } + } else { + this.parallelism = parallelism; + } + + // need at least 3 to do 2 layer merge + if (this.parallelism > 2) { + this.useParallelMergePool = useParallelMergePool == null || useParallelMergePool; + } else { + if (useParallelMergePool == null || useParallelMergePool) { + LOG.debug( + "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s", + parallelism + ); + } + this.useParallelMergePool = false; + } + + if (awaitShutdownMillis == null) { + if (oldConfig == null || oldConfig.getMergePoolAwaitShutdownMillis() == null) { + this.awaitShutdownMillis = DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS; + } else { + warnDeprecated( + "druid.processing.merge.pool.awaitShutdownMillis", + "druid.processing.merge.awaitShutdownMillis" + ); + this.awaitShutdownMillis = oldConfig.getMergePoolAwaitShutdownMillis(); + } + } else { + this.awaitShutdownMillis = awaitShutdownMillis; + } + + if (defaultMaxQueryParallelism == null) { + if (oldConfig == null || oldConfig.getMergePoolDefaultMaxQueryParallelism() == null) { + this.defaultMaxQueryParallelism = (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1); + } else { + warnDeprecated( + "druid.processing.merge.pool.defaultMaxQueryParallelism", + "druid.processing.merge.defaultMaxQueryParallelism" + ); + this.defaultMaxQueryParallelism = oldConfig.getMergePoolDefaultMaxQueryParallelism(); + } + } else { + this.defaultMaxQueryParallelism = defaultMaxQueryParallelism; + } + + if (targetRunTimeMillis == null) { + if (oldConfig == null || oldConfig.getMergePoolTargetTaskRunTimeMillis() == null) { + this.targetRunTimeMillis = ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS; + } else { + warnDeprecated( + "druid.processing.merge.task.targetRunTimeMillis", + "druid.processing.merge.targetRunTimeMillis" + ); + this.targetRunTimeMillis = oldConfig.getMergePoolTargetTaskRunTimeMillis(); + } + } else { + this.targetRunTimeMillis = targetRunTimeMillis; + } + + if (initialYieldNumRows == null) { + if (oldConfig == null || oldConfig.getMergePoolTaskInitialYieldRows() == null) { + this.initialYieldNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS; + } else { + warnDeprecated( + "druid.processing.merge.task.initialYieldNumRows", + "druid.processing.merge.initialYieldNumRows" + ); + this.initialYieldNumRows = oldConfig.getMergePoolTaskInitialYieldRows(); + } + } else { + this.initialYieldNumRows = initialYieldNumRows; + } + + if (smallBatchNumRows == null) { + if (oldConfig == null || oldConfig.getMergePoolSmallBatchRows() == null) { + this.smallBatchNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS; + } else { + warnDeprecated( + "druid.processing.merge.task.smallBatchNumRows", + "druid.processing.merge.smallBatchNumRows" + ); + this.smallBatchNumRows = oldConfig.getMergePoolSmallBatchRows(); + } + } else { + this.smallBatchNumRows = smallBatchNumRows; + } + } + + @VisibleForTesting + public BrokerParallelMergeConfig() + { + this(null, null, null, null, null, null, null, null); + } + + public boolean useParallelMergePool() + { + return useParallelMergePool; + } + + public int getParallelism() + { + return parallelism; + } + + public long getAwaitShutdownMillis() + { + return awaitShutdownMillis; + } + + public int getDefaultMaxQueryParallelism() + { + return defaultMaxQueryParallelism; + } + + public int getTargetRunTimeMillis() + { + return targetRunTimeMillis; + } + + public int getInitialYieldNumRows() + { + return initialYieldNumRows; + } + + public int getSmallBatchNumRows() + { + return smallBatchNumRows; + } + + private static void warnDeprecated(String oldPath, String newPath) + { + LOG.warn( + "Using deprecated config [%s] which has been replace by [%s]. This path is deprecated and will be " + + "removed in a future release, please transition to using [%s]", + oldPath, + newPath, + newPath + ); + } +} diff --git a/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java new file mode 100644 index 000000000000..25b11ab63d17 --- /dev/null +++ b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.skife.config.Config; + +import javax.annotation.Nullable; + +/** + * Backwards compatibility for Druid 27 and older runtime.properties configs, replaced by + * {@link BrokerParallelMergeConfig} in Druid 28. This config should be removed in Druid 29. + */ +@Deprecated +public abstract class LegacyBrokerParallelMergeConfig +{ + @Nullable + @Config(value = "druid.processing.merge.pool.parallelism") + public Integer getMergePoolParallelism() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.pool.awaitShutdownMillis") + public Long getMergePoolAwaitShutdownMillis() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.pool.defaultMaxQueryParallelism") + public Integer getMergePoolDefaultMaxQueryParallelism() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.task.targetRunTimeMillis") + public Integer getMergePoolTargetTaskRunTimeMillis() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.task.initialYieldNumRows") + public Integer getMergePoolTaskInitialYieldRows() + { + return null; + } + + @Nullable + @Config(value = "druid.processing.merge.task.smallBatchNumRows") + public Integer getMergePoolSmallBatchRows() + { + return null; + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java deleted file mode 100644 index 1f6bc47dda42..000000000000 --- a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment.realtime; - -import org.skife.config.Config; - -public abstract class DbSegmentPublisherConfig -{ - @Config("druid.metadata.storage.tables.segments") - public abstract String getSegmentTable(); -} diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 46c0fc90d89d..d486ae4fb8e9 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.BasicMonitorScheduler; import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.JvmCpuMonitor; import org.apache.druid.java.util.metrics.JvmMonitor; import org.apache.druid.java.util.metrics.JvmThreadsMonitor; diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index f77baa481200..89cb2d76f812 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -38,7 +38,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -47,7 +47,6 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -318,16 +317,23 @@ public long getMaxQueuedBytes() return 0L; } }, - new DruidProcessingConfig() + new BrokerParallelMergeConfig() { @Override - public String getFormatString() + public boolean useParallelMergePool() { - return null; + return true; + } + + @Override + public int getParallelism() + { + // fixed so same behavior across all test environments + return 4; } @Override - public int getMergePoolParallelism() + public int getDefaultMaxQueryParallelism() { // fixed so same behavior across all test environments return 4; @@ -335,7 +341,6 @@ public int getMergePoolParallelism() }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java index 95fce2060ecd..6270f3685406 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -34,8 +34,8 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.TestSequence; import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DataSource; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -49,7 +49,6 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerManagerTest; import org.apache.druid.server.coordination.ServerType; @@ -136,10 +135,9 @@ public void testGetQueryRunnerForSegments_singleIntervalLargeSegments() Mockito.mock(CachePopulator.class), new CacheConfig(), Mockito.mock(DruidHttpClientConfig.class), - Mockito.mock(DruidProcessingConfig.class), + Mockito.mock(BrokerParallelMergeConfig.class), ForkJoinPool.commonPool(), queryScheduler, - JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index e083e074acdc..0ebd441360ea 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -67,8 +67,8 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.nary.TrinaryFn; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentResultValueClass; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.FluentQueryRunner; @@ -121,7 +121,6 @@ import org.apache.druid.query.topn.TopNQueryQueryToolChest; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.coordination.ServerType; @@ -2829,19 +2828,26 @@ public long getMaxQueuedBytes() return 0L; } }, - new DruidProcessingConfig() + new BrokerParallelMergeConfig() { @Override - public String getFormatString() + public boolean useParallelMergePool() { - return null; + return true; + } + + @Override + public int getParallelism() + { + // fixed so same behavior across all test environments + return 1; } @Override - public int getMergePoolParallelism() + public int getDefaultMaxQueryParallelism() { // fixed so same behavior across all test environments - return 4; + return 1; } }, ForkJoinPool.commonPool(), @@ -2851,7 +2857,6 @@ public int getMergePoolParallelism() NoQueryLaningStrategy.INSTANCE, new ServerConfig() ), - JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java index 3ce5db12a333..7fca81aaea30 100644 --- a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java +++ b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java @@ -28,7 +28,10 @@ import org.apache.druid.client.cache.CachePopulator; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.initialization.Initialization; +import org.apache.druid.java.util.common.config.Config; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.LegacyBrokerParallelMergeConfig; import org.apache.druid.utils.JvmUtils; import org.junit.Assert; import org.junit.Assume; @@ -37,14 +40,14 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; @RunWith(MockitoJUnitRunner.class) public class BrokerProcessingModuleTest { - private static final boolean INJECT_SERVER_TYPE_CONFIG = true; - @Mock - private DruidProcessingConfig druidProcessingConfig; private Injector injector; private BrokerProcessingModule target; @Mock @@ -56,12 +59,13 @@ public class BrokerProcessingModuleTest public void setUp() { target = new BrokerProcessingModule(); - injector = makeInjector(INJECT_SERVER_TYPE_CONFIG); + injector = makeInjector(new Properties()); } @Test public void testIntermediateResultsPool() { + DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class); target.getIntermediateResultsPool(druidProcessingConfig); } @@ -69,23 +73,30 @@ public void testIntermediateResultsPool() @Test public void testMergeBufferPool() { + DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class); target.getMergeBufferPool(druidProcessingConfig); } @Test public void testMergeProcessingPool() { - DruidProcessingConfig config = new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "processing-test-%s"; - } - }; - DruidProcessingModule module = new DruidProcessingModule(); + BrokerParallelMergeConfig config = injector.getInstance(BrokerParallelMergeConfig.class); + BrokerProcessingModule module = new BrokerProcessingModule(); module.getMergeProcessingPoolProvider(config); - config.getNumInitalBuffersForIntermediatePool(); + } + + @Test + public void testMergeProcessingPoolLegacyConfigs() + { + Properties props = new Properties(); + props.put("druid.processing.merge.pool.parallelism", "10"); + props.put("druid.processing.merge.pool.defaultMaxQueryParallelism", "10"); + props.put("druid.processing.merge.task.targetRunTimeMillis", "1000"); + Injector gadget = makeInjector(props); + BrokerParallelMergeConfig config = gadget.getInstance(BrokerParallelMergeConfig.class); + Assert.assertEquals(10, config.getParallelism()); + Assert.assertEquals(10, config.getDefaultMaxQueryParallelism()); + Assert.assertEquals(1000, config.getTargetRunTimeMillis()); } @Test @@ -93,7 +104,6 @@ public void testCachePopulatorAsSingleton() { CachePopulator cachePopulator = injector.getInstance(CachePopulator.class); Assert.assertNotNull(cachePopulator); - } @Test(expected = ProvisionException.class) @@ -107,43 +117,40 @@ public void testMemoryCheckThrowsException() catch (UnsupportedOperationException e) { Assume.assumeNoException(e); } + Properties props = new Properties(); + props.setProperty("druid.processing.buffer.sizeBytes", "3GiB"); + Injector injector1 = makeInjector(props); + DruidProcessingConfig processingBufferConfig = injector1.getInstance(DruidProcessingConfig.class); BrokerProcessingModule module = new BrokerProcessingModule(); - module.getMergeBufferPool(new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "test"; - } - - @Override - public int intermediateComputeSizeBytes() - { - return Integer.MAX_VALUE; - } - }); + module.getMergeBufferPool(processingBufferConfig); } - private Injector makeInjector(boolean withServerTypeConfig) + private Injector makeInjector(Properties props) { - return Initialization.makeInjectorWithModules( - GuiceInjectors.makeStartupInjector(), (ImmutableList.of(Modules.override( - (binder) -> { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig); - }, - target - ).with( - (binder) -> { + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + Modules.override( + (binder) -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); + binder.bind(Properties.class).toInstance(props); + ConfigurationObjectFactory factory = Config.createFactory(props); + LegacyBrokerParallelMergeConfig legacyConfig = factory.build(LegacyBrokerParallelMergeConfig.class); + binder.bind(ConfigurationObjectFactory.class).toInstance(factory); + binder.bind(LegacyBrokerParallelMergeConfig.class).toInstance(legacyConfig); + }, + target + ).with((binder) -> { binder.bind(CachePopulatorStats.class).toInstance(cachePopulatorStats); binder.bind(CacheConfig.class).toInstance(cacheConfig); - } + }) ) - ))); + ); + return injector; } - } diff --git a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java index b51f2c14853d..8fad8924bf88 100644 --- a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java +++ b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java @@ -77,7 +77,7 @@ private Injector makeInjector(Properties properties) new JacksonModule(), new ConfigModule(), new QueryRunnerFactoryModule(), - new DruidProcessingConfigModule(), + new LegacyBrokerParallelMergeConfigModule(), new BrokerProcessingModule(), new LifecycleModule(), binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class), diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index d07bfdef345b..308427d40fa4 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -49,7 +49,6 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; -import org.apache.druid.segment.join.JoinableFactoryWrapperTest; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -108,7 +107,6 @@ protected QueryRunnerBasedOnClusteredClientTestBase() { conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate( CLOSER, - USE_PARALLEL_MERGE_POOL_CONFIGURED, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD ); @@ -142,13 +140,9 @@ public void setupTestBase() new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0), new CacheConfig(), new DruidHttpClientConfig(), - QueryStackTests.getProcessingConfig( - USE_PARALLEL_MERGE_POOL_CONFIGURED, - DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS - ), + QueryStackTests.getParallelMergeConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new NoopServiceEmitter() ); servers = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 1c23edf3b9a8..69305adc12db 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -24,6 +24,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; @@ -100,6 +101,9 @@ public class QueryStackTests NoQueryLaningStrategy.INSTANCE, new ServerConfig() ); + + public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; + private static final ServiceEmitter EMITTER = new NoopServiceEmitter(); private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; @@ -187,10 +191,19 @@ public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( ); } - public static DruidProcessingConfig getProcessingConfig( - boolean useParallelMergePoolConfigured, - final int mergeBuffers + public static BrokerParallelMergeConfig getParallelMergeConfig( + boolean useParallelMergePoolConfigured ) + { + return new BrokerParallelMergeConfig() { + @Override + public boolean useParallelMergePool() + { + return useParallelMergePoolConfigured; + } + }; + } + public static DruidProcessingConfig getProcessingConfig(final int mergeBuffers) { return new DruidProcessingConfig() { @@ -221,12 +234,6 @@ public int getNumMergeBuffers() } return mergeBuffers; } - - @Override - public boolean useParallelMergePoolConfigured() - { - return useParallelMergePoolConfigured; - } }; } @@ -235,28 +242,18 @@ public boolean useParallelMergePoolConfigured() */ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer) { - return createQueryRunnerFactoryConglomerate(closer, true, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD); - } - - public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( - final Closer closer, - final Supplier minTopNThresholdSupplier - ) - { - return createQueryRunnerFactoryConglomerate(closer, true, minTopNThresholdSupplier); + return createQueryRunnerFactoryConglomerate(closer, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD); } public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( final Closer closer, - final boolean useParallelMergePoolConfigured, final Supplier minTopNThresholdSupplier ) { return createQueryRunnerFactoryConglomerate( closer, getProcessingConfig( - useParallelMergePoolConfigured, - DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS + DEFAULT_NUM_MERGE_BUFFERS ), minTopNThresholdSupplier ); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 6bbcdf7f3a70..dd9c3c3c0b59 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -45,6 +45,7 @@ import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.QueryRunnerFactoryModule; @@ -108,6 +109,7 @@ protected Set getNodeRoles(Properties properties) protected List getModules() { return ImmutableList.of( + new LegacyBrokerParallelMergeConfigModule(), new BrokerProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java index 566f89d5fdf3..afbd58ab9546 100644 --- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java +++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java @@ -32,7 +32,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; @@ -55,7 +54,6 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DirectQueryProcessingPool; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -81,7 +79,6 @@ import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.BaseColumn; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnType; @@ -733,39 +730,10 @@ protected List getModules() new DruidProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - binder.bind(DruidProcessingConfig.class).toInstance( - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "processing-%s"; - } - - @Override - public int intermediateComputeSizeBytes() - { - return 100 * 1024 * 1024; - } - - @Override - public int getNumThreads() - { - return 1; - } - - } - ); - binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); - } + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); } ); } diff --git a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java index 925d7c81a801..f9e2ce627b48 100644 --- a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java +++ b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java @@ -23,7 +23,6 @@ import com.github.rvesse.airline.annotations.Command; import com.github.rvesse.airline.annotations.restrictions.Required; import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.name.Names; @@ -32,9 +31,7 @@ import org.apache.druid.guice.QueryableModule; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.column.ColumnConfig; import java.io.File; import java.util.List; @@ -85,39 +82,10 @@ protected List getModules() new DruidProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); - binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); - binder.bind(DruidProcessingConfig.class).toInstance( - new DruidProcessingConfig() - { - @Override - public String getFormatString() - { - return "processing-%s"; - } - - @Override - public int intermediateComputeSizeBytes() - { - return 100 * 1024 * 1024; - } - - @Override - public int getNumThreads() - { - return 1; - } - - } - ); - binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); - } + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); } ); } diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java index 10027ae73b50..fb440311a6e0 100644 --- a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java +++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java @@ -25,12 +25,15 @@ import com.google.common.collect.Lists; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.name.Names; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.initialization.ServerInjectorBuilder; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -48,6 +51,7 @@ import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex; @@ -215,6 +219,22 @@ public void testDumpNestedColumnPath() throws Exception } } + @Test + public void testGetModules() + { + DumpSegment dumpSegment = new DumpSegment(); + Injector injector = ServerInjectorBuilder.makeServerInjector( + new StartupInjectorBuilder().forServer().build(), + Collections.emptySet(), + dumpSegment.getModules() + ); + Assert.assertNotNull(injector.getInstance(ColumnConfig.class)); + Assert.assertEquals("druid/tool", injector.getInstance(Key.get(String.class, Names.named("serviceName")))); + Assert.assertEquals(9999, (int) injector.getInstance(Key.get(Integer.class, Names.named("servicePort")))); + Assert.assertEquals(-1, (int) injector.getInstance(Key.get(Integer.class, Names.named("tlsServicePort")))); + } + + public static List createSegments( AggregationTestHelper helper, TemporaryFolder tempFolder, diff --git a/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java b/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java new file mode 100644 index 000000000000..03ce574196c8 --- /dev/null +++ b/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.cli; + +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.name.Names; +import org.apache.druid.data.input.ResourceInputSource; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.initialization.ServerInjectorBuilder; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.query.NestedDataTestUtils; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +public class ValidateSegmentsTest extends InitializedNullHandlingTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testValidateSegments() throws IOException + { + + JsonInputFormat inputFormat = new JsonInputFormat( + JSONPathSpec.DEFAULT, + null, + null, + null, + null + ); + IndexBuilder bob = IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) + .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) + .withMetrics( + new CountAggregatorFactory("cnt") + ) + .withRollup(false) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE + ) + ) + .inputFormat(inputFormat) + .inputTmpDir(temporaryFolder.newFolder()); + + final File segment1 = bob.buildMMappedIndexFile(); + final File segment2 = bob.buildMMappedIndexFile(); + final Injector injector = Mockito.mock(Injector.class); + Mockito.when(injector.getInstance(IndexIO.class)).thenReturn(bob.getIndexIO()); + ValidateSegments validator = new ValidateSegments() { + @Override + public Injector makeInjector() + { + return injector; + } + }; + validator.directories = Arrays.asList(segment1.getAbsolutePath(), segment2.getAbsolutePath()); + // if this doesn't pass, it throws a runtime exception, which would fail the test + validator.run(); + } + + @Test + public void testGetModules() + { + ValidateSegments validator = new ValidateSegments(); + Injector injector = ServerInjectorBuilder.makeServerInjector( + new StartupInjectorBuilder().forServer().build(), + Collections.emptySet(), + validator.getModules() + ); + Assert.assertNotNull(injector.getInstance(ColumnConfig.class)); + Assert.assertEquals("druid/tool", injector.getInstance(Key.get(String.class, Names.named("serviceName")))); + Assert.assertEquals(9999, (int) injector.getInstance(Key.get(Integer.class, Names.named("servicePort")))); + Assert.assertEquals(-1, (int) injector.getInstance(Key.get(Integer.class, Names.named("tlsServicePort")))); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index 0881a969fc68..3467d71bb627 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -219,7 +219,7 @@ public QueryRunnerFactoryConglomerate createCongolmerate( } else { return QueryStackTests.createQueryRunnerFactoryConglomerate( resourceCloser, - QueryStackTests.getProcessingConfig(true, builder.mergeBufferCount) + QueryStackTests.getProcessingConfig(builder.mergeBufferCount) ); } }