Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove index_realtime and index_realtime_appenderator tasks. #15717

Closed
wants to merge 16 commits into from
10 changes: 2 additions & 8 deletions docs/ingestion/tranquility.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@ title: "Tranquility"
~ under the License.
-->

[Tranquility](https://github.com/druid-io/tranquility/) is a separately distributed package for pushing
streams to Druid in real-time.

Tranquility has not been built against a version of Druid later than Druid 0.9.2
release. It may still work with the latest Druid servers, but not all features and functionality will be available
due to limitations of older Druid APIs on the Tranquility side.
[Tranquility](https://github.com/druid-io/tranquility/) was a separately distributed package for pushing
streams to Druid in real-time. It is not compatible with recent versions of Druid.

For new projects that require streaming ingestion, we recommend using Druid's native support for
[Apache Kafka](../ingestion/kafka-ingestion.md) or
[Amazon Kinesis](../ingestion/kinesis-ingestion.md).

For more details, check out the [Tranquility GitHub page](https://github.com/druid-io/tranquility/).
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void setUp()
}

@Test
public void testCheckSegments() throws IOException
public void testCheckSegments()
{
List<DataSegment> baseSegments = createBaseSegments();
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
}

@Test
public void testCheckSegmentsAndSubmitTasks() throws IOException
public void testCheckSegmentsAndSubmitTasks()
{
Set<DataSegment> baseSegments = Collections.singleton(createBaseSegments().get(0));
indexerMetadataStorageCoordinator.commitSegments(baseSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,8 @@ public void testOptimize() throws InterruptedException
Lists.newArrayList("dim1", "dim2", "dim3", "dim4"),
1024 * 1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
return true;
}
);
Expand All @@ -191,13 +186,8 @@ public void testOptimize() throws InterruptedException
Lists.newArrayList("dim1", "dim2", "dim3"),
1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
return true;
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testRunAfterDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -505,7 +505,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down Expand Up @@ -554,7 +554,7 @@ public void testRunBeforeDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
Expand Down Expand Up @@ -89,9 +89,7 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -985,15 +983,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
addSomeEvents(1);

// non KafkaIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
Expand Down Expand Up @@ -84,9 +84,7 @@
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
Expand Down Expand Up @@ -940,15 +938,13 @@ public void testDontKillTasksWithMismatchedType() throws Exception
EasyMock.expectLastCall().anyTimes();

// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
Task id2 = new NoopTask(
"id2",
null,
new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null),
null
),
null
dataSchema.getDataSource(),
100,
100,
ImmutableMap.of()
);

List<Task> existingTasks = ImmutableList.of(id2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
package org.apache.druid.msq.counters;

import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessor;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

/**
* Wrapper around {@link FireDepartmentMetrics} which updates the progress counters while updating its metrics. This
* Wrapper around {@link SegmentGenerationMetrics} which updates the progress counters while updating its metrics. This
* is necessary as the {@link org.apache.druid.segment.realtime.appenderator.BatchAppenderator} used by the
* {@link SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* cannot update the counters used in MSQ reports as it persists and pushes segments to deep storage.
*/
public class SegmentGeneratorMetricsWrapper extends FireDepartmentMetrics
public class SegmentGeneratorMetricsWrapper extends SegmentGenerationMetrics
{
private final SegmentGenerationProgressCounter segmentGenerationProgressCounter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,34 @@

package org.apache.druid.indexing.common;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;

public class TaskRealtimeMetricsMonitorBuilder
{
private TaskRealtimeMetricsMonitorBuilder()
{
}

public static RealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment)
{
return new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartment),
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
)
);
}

public static TaskRealtimeMetricsMonitor build(
Task task,
FireDepartment fireDepartment,
SegmentGenerationMetrics metrics,
RowIngestionMeters meters
)
{
return new TaskRealtimeMetricsMonitor(
fireDepartment,
metrics,
meters,
ImmutableMap.of(
DruidMetrics.DATASOURCE, new String[]{task.getDataSource()},
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()},
DruidMetrics.GROUP_ID, new String[]{task.getGroupId()}
),
),
task.getContextValue(DruidMetrics.TAGS)
);
}
Expand Down

This file was deleted.

Loading
Loading