Skip to content

Commit

Permalink
Realtime cleanup and renaming
Browse files Browse the repository at this point in the history
changes:
* `FireHydrant` is now `PartialSegment`. This name much more clearly describes what this class does, and with all the other fireman terminology removed it didn't even fit a theme anymore.
* `Sink` is now `AppendableSegment`. This name also much more clearly describes what this class does, and is composed of `PartialSegments` per the previous `FireHydrant` rename.
* Additionally, `SinkQuerySegmentWalker` -> `AppendableSegmentQuerySegmentWalker`, and `SinkQueryRunner` -> `AppendableSegmentQueryRunner`
* Remove `Firehose`, `IngestSegmentFirehose` was only used by Hadoop indexing `DruidRecordReader`, moved to internal class of `DruidRecordReader` as `SegmentReader`
* Remove `FirehoseFactory` and remaining implementations, after apache#16602 they were no longer used
  • Loading branch information
clintropolis committed Jun 25, 2024
1 parent 37a50e6 commit e5e4938
Show file tree
Hide file tree
Showing 151 changed files with 1,594 additions and 4,344 deletions.
1 change: 0 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
Expand Down
2 changes: 1 addition & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|CPU time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the configuration. Generally a few minutes at most.|
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handed off.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|1~3|
|`ingest/sink/count`|Number of appendable segments not handed off.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. If the value is increasing but lag is low, Druid may not be receiving new data. This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in event.|
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public static Task getTask()
null
),
new IndexTask.IndexIOConfig(
null,
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -83,9 +81,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
Expand All @@ -44,7 +43,6 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -101,9 +99,6 @@ public K8sTaskAdapterTest()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
Expand All @@ -28,7 +27,6 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -67,9 +65,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

package org.apache.druid.k8s.overlord.taskadapter;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
Expand Down Expand Up @@ -66,9 +64,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.IndexerResourcePermissionMapper;
import org.apache.druid.msq.rpc.ControllerResource;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.security.AuthorizerMapper;

public class ControllerChatHandler extends ControllerResource implements ChatHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.utils.CloseableUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
Expand Down
Loading

0 comments on commit e5e4938

Please sign in to comment.