Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate config-magic in favor of json configuration stuff #14695

Merged
merged 9 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
Expand Down Expand Up @@ -103,7 +104,6 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -258,12 +258,6 @@ public int getNumThreads()
{
return numProcessingThreads;
}

@Override
public boolean useParallelMergePool()
{
return true;
}
};

conglomerate = new DefaultQueryRunnerFactoryConglomerate(
Expand Down Expand Up @@ -339,10 +333,15 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
processingConfig,
new BrokerParallelMergeConfig() {
@Override
public boolean useParallelMergePool()
{
return true;
}
},
forkJoinPool,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,6 @@ public int getNumThreads()
{
return 1;
}

@Override
public boolean useParallelMergePoolConfigured()
{
return true;
}

@Override
public String getFormatString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ public int getNumThreads()
{
return 1;
}

@Override
public boolean useParallelMergePoolConfigured()
{
return true;
}


@Override
public String getFormatString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
Expand All @@ -69,7 +69,6 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
Expand Down Expand Up @@ -363,17 +362,9 @@ public long getMaxQueuedBytes()
return 0L;
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
},
new BrokerParallelMergeConfig(),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())),
new NoopServiceEmitter()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
Expand All @@ -89,7 +90,6 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
Expand All @@ -85,7 +86,6 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4077,13 +4077,13 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec)
reportingExec.scheduleAtFixedRate(
this::emitLag,
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitNoticesQueueSize,
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;

import javax.annotation.Nullable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand All @@ -77,7 +78,6 @@
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
Expand All @@ -102,6 +102,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -501,7 +502,7 @@ public void testIdleStateTransition() throws Exception
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
public Duration getEmitterPeriod()
public Duration getEmissionDuration()
{
return new Period("PT1S").toStandardDuration();
}
Expand Down Expand Up @@ -1062,6 +1063,29 @@ public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
Assert.assertEquals(0, emitter.getEvents().size());
}

@Test
public void testScheduleReporting()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2);
ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class);
EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once();
EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2);

EasyMock.replay(executorService, spec);
final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor()
{
@Override
public LagStats computeLagStats()
{
return new LagStats(0, 0, 0);
}
};
supervisor.scheduleReporting(executorService);
EasyMock.verify(executorService, spec);
}

private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
{
List<Event> result = events.stream()
Expand Down Expand Up @@ -1098,7 +1122,7 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
public Duration getEmitterPeriod()
public Duration getEmissionDuration()
{
return new Period("PT1S").toStandardDuration();
}
Expand Down Expand Up @@ -1608,13 +1632,13 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec)
reportingExec.scheduleAtFixedRate(
this::emitLag,
ioConfig.getStartDelay().getMillis(),
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitNoticesQueueSize,
ioConfig.getStartDelay().getMillis(),
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.DruidProcessingConfigModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.SQLMetadataStorageDruidModule;
Expand Down Expand Up @@ -496,7 +496,7 @@ private static Injector makeInjector(
new AnnouncerModule(),
new DiscoveryModule(),
// Dependencies from other modules
new DruidProcessingConfigModule(),
new LegacyBrokerParallelMergeConfigModule(),
// Dependencies from other modules
new StorageNodeModule(),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/**
*
*/
@Deprecated
public class ConfigProvider<T> implements Provider<T>
{
private static final Logger log = new Logger(ConfigProvider.class);
Expand All @@ -40,11 +41,6 @@ public static <T> void bind(Binder binder, Class<T> clazz)
binder.bind(clazz).toProvider(of(clazz)).in(LazySingleton.class);
}

public static <T> void bind(Binder binder, Class<T> clazz, Map<String, String> replacements)
{
binder.bind(clazz).toProvider(of(clazz, replacements)).in(LazySingleton.class);
}

public static <T> Provider<T> of(Class<T> clazz)
{
return of(clazz, null);
Expand Down
Loading