Skip to content

Commit

Permalink
Remove index_realtime and index_realtime_appenderator tasks.
Browse files Browse the repository at this point in the history
index_realtime tasks were removed from the documentation in apache#13107. Even
at that time, they weren't really documented per se— just mentioned. They
existed solely to support Tranquility, which is an obsolete ingestion
method that predates migration of Druid to ASF and is no longer being
maintained. Tranquility docs were also de-linked from the sidebars and
the other doc pages in apache#11134. Only a stub remains, so people with
links to the page can see that it's no longer recommended.

index_realtime_appenderator tasks existed in the code base, but were
never documented, nor as far as I am aware were they used for any purpose.

This patch removes both task types completely, as well as removes all
supporting code that was otherwise unused. It also updates the stub
doc for Tranquility to be firmer that it is not compatible. (Previously,
the stub doc said it wasn't recommended, and pointed out that it is
built against an ancient 0.9.2 version of Druid.)
  • Loading branch information
gianm committed Jan 18, 2024
1 parent a3b32fb commit 60ceeed
Show file tree
Hide file tree
Showing 117 changed files with 597 additions and 12,484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testRunAfterDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -505,7 +505,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -554,7 +554,7 @@ public void testRunBeforeDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
Expand Down Expand Up @@ -89,9 +89,7 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -983,15 +981,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
addSomeEvents(1);

// non KafkaIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
Expand Down Expand Up @@ -85,9 +85,7 @@
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -942,15 +940,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
EasyMock.expectLastCall().anyTimes();

// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
package org.apache.druid.msq.counters;

import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

/**
* Wrapper around {@link FireDepartmentMetrics} which updates the progress counters while updating its metrics. This
* Wrapper around {@link SegmentGenerationMetrics} which updates the progress counters while updating its metrics. This
* is necessary as the {@link org.apache.druid.segment.realtime.appenderator.BatchAppenderator} used by the
* {@link SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* cannot update the counters used in MSQ reports as it persists and pushes segments to deep storage.
*/
public class SegmentGeneratorMetricsWrapper extends FireDepartmentMetrics
public class SegmentGeneratorMetricsWrapper extends SegmentGenerationMetrics
{
private final SegmentGenerationProgressCounter segmentGenerationProgressCounter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,34 @@

package org.apache.druid.indexing.common;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

public class TaskRealtimeMetricsMonitorBuilder
{
private TaskRealtimeMetricsMonitorBuilder()
{
}

public static RealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment)
{
return new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartment),
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
)
);
}

public static TaskRealtimeMetricsMonitor build(
Task task,
FireDepartment fireDepartment,
SegmentGenerationMetrics metrics,
RowIngestionMeters meters
)
{
return new TaskRealtimeMetricsMonitor(
fireDepartment,
metrics,
meters,
ImmutableMap.of(
DruidMetrics.DATASOURCE, new String[]{task.getDataSource()},
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()},
DruidMetrics.GROUP_ID, new String[]{task.getGroupId()}
),
),
task.getContextValue(DruidMetrics.TAGS)
);
}
Expand Down

This file was deleted.

Loading

0 comments on commit 60ceeed

Please sign in to comment.