diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index ba12128ab1e1..07b43c7a0aa2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -50,13 +50,14 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -64,6 +65,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -82,6 +85,9 @@ public class KafkaSupervisor extends SeekableStreamSupervisor }; private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class); + private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000; + private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000; + private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000; private static final Long NOT_SET = -1L; private static final Long END_OF_PARTITION = Long.MAX_VALUE; @@ -126,6 +132,28 @@ protected RecordSupplier setupRecordSupplier() return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper); } + @Override + protected void scheduleReporting(ScheduledExecutorService reportingExec) + { + KafkaSupervisorIOConfig ioConfig = spec.getIoConfig(); + KafkaSupervisorTuningConfig tuningConfig = spec.getTuningConfig(); + reportingExec.scheduleAtFixedRate( + updateCurrentAndLatestOffsets(), + ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up + Math.max( + tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS + ), + TimeUnit.MILLISECONDS + ); + + reportingExec.scheduleAtFixedRate( + emitLag(), + ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up + monitorSchedulerConfig.getEmitterPeriod().getMillis(), + TimeUnit.MILLISECONDS + ); + } + @Override protected int getTaskGroupIdForPartition(Integer partitionId) { @@ -151,7 +179,7 @@ protected SeekableStreamSupervisorReportPayload createReportPaylo ) { KafkaSupervisorIOConfig ioConfig = spec.getIoConfig(); - Map partitionLag = getRecordLagPerPartition(getHighestCurrentOffsets()); + Map partitionLag = getLagPerPartition(getHighestCurrentOffsets()); return new KafkaSupervisorReportPayload( spec.getDataSchema().getDataSource(), ioConfig.getTopic(), @@ -239,38 +267,11 @@ protected List> createIndexTasks( return taskList; } - @Override - protected Map getPartitionRecordLag() - { - Map highestCurrentOffsets = getHighestCurrentOffsets(); - - if (latestSequenceFromStream == null) { - return null; - } - - if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) { - log.warn( - "Lag metric: Kafka partitions %s do not match task partitions %s", - latestSequenceFromStream.keySet(), - highestCurrentOffsets.keySet() - ); - } - - return getRecordLagPerPartition(highestCurrentOffsets); - } - - @Nullable - @Override - protected Map getPartitionTimeLag() - { - // time lag not currently support with kafka - return null; - } @Override // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here @SuppressWarnings("SSBasedInspection") - protected Map getRecordLagPerPartition(Map currentOffsets) + protected Map getLagPerPartition(Map currentOffsets) { return currentOffsets .entrySet() @@ -287,12 +288,6 @@ protected Map getRecordLagPerPartition(Map current ); } - @Override - protected Map getTimeLagPerPartition(Map currentOffsets) - { - return null; - } - @Override protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) { @@ -305,6 +300,51 @@ protected OrderedSequenceNumber makeSequenceNumber(Long seq, boolean isExc return KafkaSequenceNumber.of(seq); } + private Runnable emitLag() + { + return () -> { + try { + Map highestCurrentOffsets = getHighestCurrentOffsets(); + String dataSource = spec.getDataSchema().getDataSource(); + + if (latestSequenceFromStream == null) { + throw new ISE("Latest offsets from Kafka have not been fetched"); + } + + if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) { + log.warn( + "Lag metric: Kafka partitions %s do not match task partitions %s", + latestSequenceFromStream.keySet(), + highestCurrentOffsets.keySet() + ); + } + + Map partitionLags = getLagPerPartition(highestCurrentOffsets); + long maxLag = 0, totalLag = 0, avgLag; + for (long lag : partitionLags.values()) { + if (lag > maxLag) { + maxLag = lag; + } + totalLag += lag; + } + avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); + + emitter.emit( + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag) + ); + emitter.emit( + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag) + ); + emitter.emit( + ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag) + ); + } + catch (Exception e) { + log.warn(e, "Unable to compute Kafka lag"); + } + }; + } + @Override protected Long getNotSetMarker() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java index d7e639caea90..768468c933c6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java @@ -55,8 +55,6 @@ public KafkaSupervisorReportPayload( latestOffsets, minimumLag, aggregateLag, - null, - null, offsetsLastUpdated, suspended, healthy, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 2dca7f6edfc5..5c00988e8601 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -34,6 +34,8 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig implements SeekableStreamSupervisorTuningConfig { + private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; + private final Integer workerThreads; private final Integer chatThreads; private final Long chatRetries; @@ -179,7 +181,6 @@ public Duration getRepartitionTransitionDuration() ); } - @Override @JsonProperty public Duration getOffsetFetchPeriod() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index a29fab1a5043..0b29b65a628b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1371,7 +1371,7 @@ public void testDiscoverExistingPublishingTask() throws Exception supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1481,7 +1481,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1625,7 +1625,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index e7ec59a73445..b71f485fe263 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -44,11 +44,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; -import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; @@ -115,9 +113,9 @@ private class PartitionResource private volatile boolean started; private volatile boolean stopRequested; - private volatile long currentLagMillis; - - PartitionResource(StreamPartition streamPartition) + PartitionResource( + StreamPartition streamPartition + ) { this.streamPartition = streamPartition; } @@ -150,53 +148,6 @@ void stopBackgroundFetch() stopRequested = true; } - long getPartitionTimeLag() - { - return currentLagMillis; - } - - long getPartitionTimeLag(String offset) - { - // if not started (fetching records in background), fetch lag ourself with a throw-away iterator - if (!started) { - try { - final String iteratorType; - final String offsetToUse; - if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) { - // this should probably check if will start processing earliest or latest rather than assuming earliest - // if latest we could skip this because latest will not be behind latest so lag is 0. - iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); - offsetToUse = null; - } else { - iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); - offsetToUse = offset; - } - String shardIterator = kinesis.getShardIterator( - streamPartition.getStream(), - streamPartition.getPartitionId(), - iteratorType, - offsetToUse - ).getShardIterator(); - - GetRecordsResult recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch) - ); - - currentLagMillis = recordsResult.getMillisBehindLatest(); - return currentLagMillis; - } - catch (Exception ex) { - // eat it - log.warn( - ex, - "Failed to determine partition lag for partition %s of stream %s", - streamPartition.getPartitionId(), - streamPartition.getStream() - ); - } - } - return currentLagMillis; - } private Runnable getRecordRunnable() { @@ -240,14 +191,11 @@ private Runnable getRecordRunnable() recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator( shardIterator).withLimit(recordsPerFetch)); - currentLagMillis = recordsResult.getMillisBehindLatest(); - // list will come back empty if there are no records for (Record kinesisRecord : recordsResult.getRecords()) { final List data; - if (deaggregate) { if (deaggregateHandle == null || getDataHandle == null) { throw new ISE("deaggregateHandle or getDataHandle is null!"); @@ -689,27 +637,6 @@ public void close() this.closed = true; } - // this is only used for tests - @VisibleForTesting - Map getPartitionTimeLag() - { - return partitionResources.entrySet() - .stream() - .collect( - Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag()) - ); - } - - public Map getPartitionTimeLag(Map currentOffsets) - { - Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); - for (Map.Entry, PartitionResource> partition : partitionResources.entrySet()) { - final String partitionId = partition.getKey().getPartitionId(); - partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId))); - } - return partitionLag; - } - private void seekInternal(StreamPartition partition, String sequenceNumber, ShardIteratorType iteratorEnum) { PartitionResource resource = partitionResources.get(partition); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index c789fc7f3ac7..ddec6f74251f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -64,6 +64,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ScheduledExecutorService; /** * Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a @@ -72,6 +73,8 @@ * and the list of running indexing tasks and ensures that all partitions are being read from and that there are enough * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * Kinesis sequences. + *

+ * the Kinesis supervisor does not yet support lag calculations */ public class KinesisSupervisor extends SeekableStreamSupervisor { @@ -82,10 +85,9 @@ public class KinesisSupervisor extends SeekableStreamSupervisor { }; - public static final String NOT_SET = "-1"; + private static final String NOT_SET = "-1"; private final KinesisSupervisorSpec spec; private final AWSCredentialsConfig awsCredentialsConfig; - private volatile Map currentPartitionTimeLag; public KinesisSupervisor( final TaskStorage taskStorage, @@ -112,7 +114,6 @@ public KinesisSupervisor( this.spec = spec; this.awsCredentialsConfig = awsCredentialsConfig; - this.currentPartitionTimeLag = null; } @Override @@ -214,6 +215,12 @@ protected RecordSupplier setupRecordSupplier() ); } + @Override + protected void scheduleReporting(ScheduledExecutorService reportingExec) + { + // not yet implemented, see issue #6739 + } + /** * We hash the shard ID string, and then use the first four bytes of the hash as an int % task count */ @@ -270,7 +277,6 @@ protected SeekableStreamSupervisorReportPayload createReportPayl ) { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); - Map partitionLag = getTimeLagPerPartition(getHighestCurrentOffsets()); return new KinesisSupervisorReportPayload( spec.getDataSchema().getDataSource(), ioConfig.getStream(), @@ -281,26 +287,17 @@ protected SeekableStreamSupervisorReportPayload createReportPayl stateManager.isHealthy(), stateManager.getSupervisorState().getBasicState(), stateManager.getSupervisorState(), - stateManager.getExceptionEvents(), - includeOffsets ? partitionLag : null, - includeOffsets ? partitionLag.values().stream().mapToLong(x -> Math.max(x, 0)).sum() : null + stateManager.getExceptionEvents() ); } - // not yet supported, will be implemented in the future maybe? need a way to get record count between current - // sequence and latest sequence + // not yet supported, will be implemented in the future @Override - protected Map getRecordLagPerPartition(Map currentOffsets) + protected Map getLagPerPartition(Map currentOffsets) { return ImmutableMap.of(); } - @Override - protected Map getTimeLagPerPartition(Map currentOffsets) - { - return ((KinesisRecordSupplier) recordSupplier).getPartitionTimeLag(currentOffsets); - } - @Override protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, @@ -318,24 +315,10 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i @Override protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> streamPartitions + RecordSupplier recordSupplier, Set> streamPartitions ) { - KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; - currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); - } - - @Override - protected Map getPartitionRecordLag() - { - return null; - } - - @Override - protected Map getPartitionTimeLag() - { - return currentPartitionTimeLag; + // do nothing } @Override @@ -474,4 +457,5 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat return new KinesisDataSourceMetadata(newSequences); } + } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java index 89527a17bcd1..9a4ee86937df 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorReportPayload.java @@ -22,9 +22,8 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; -import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; -import java.util.Map; public class KinesisSupervisorReportPayload extends SeekableStreamSupervisorReportPayload { @@ -38,9 +37,7 @@ public KinesisSupervisorReportPayload( boolean healthy, SupervisorStateManager.State state, SupervisorStateManager.State detailedState, - List recentErrors, - @Nullable Map minimumLagMillis, - @Nullable Long aggregateLagMillis + List recentErrors ) { super( @@ -49,12 +46,10 @@ public KinesisSupervisorReportPayload( partitions, replicas, durationSeconds, + Collections.emptyMap(), + Collections.emptyMap(), null, null, - null, - minimumLagMillis, - aggregateLagMillis, - null, suspended, healthy, state, @@ -79,8 +74,7 @@ public String toString() ", state=" + getState() + ", detailedState=" + getDetailedState() + ", recentErrors=" + getRecentErrors() + - (getMinimumLagMillis() != null ? ", minimumLagMillis=" + getMinimumLagMillis() : "") + - (getAggregateLagMillis() != null ? ", aggregateLagMillis=" + getAggregateLagMillis() : "") + '}'; } + } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 8c1a5fa16701..bc3bbd2314a1 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -39,7 +39,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Duration httpTimeout; private final Duration shutdownTimeout; private final Duration repartitionTransitionDuration; - private final Duration offsetFetchPeriod; public static KinesisSupervisorTuningConfig defaultConfig() { @@ -74,7 +73,6 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, - null, null ); } @@ -110,8 +108,7 @@ public KinesisSupervisorTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, - @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, - @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod + @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration ) { super( @@ -154,10 +151,6 @@ public KinesisSupervisorTuningConfig( repartitionTransitionDuration, DEFAULT_REPARTITION_TRANSITION_DURATION ); - this.offsetFetchPeriod = SeekableStreamSupervisorTuningConfig.defaultDuration( - offsetFetchPeriod, - DEFAULT_OFFSET_FETCH_PERIOD - ); } @Override @@ -201,13 +194,6 @@ public Duration getRepartitionTransitionDuration() return repartitionTransitionDuration; } - @Override - @JsonProperty - public Duration getOffsetFetchPeriod() - { - return offsetFetchPeriod; - } - @Override public String toString() { @@ -275,4 +261,5 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getIntermediateHandoffPeriod() ); } + } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 03302c0538a4..7867522a54eb 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2034,13 +2034,8 @@ public void testRestoreAfterPersistingSequences() throws Exception .andReturn(Collections.emptyList()) .anyTimes(); - EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(null) - .anyTimes(); - replayAll(); - final KinesisIndexTask task1 = createTask( "task1", new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 15e56c032dff..13cfd74c7227 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -312,7 +312,6 @@ public void testConvert() null, null, null, - null, null ); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 76de9a95139b..3bb675e464eb 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -48,7 +48,6 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -60,15 +59,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport private static final String SHARD_ID0 = "0"; private static final String SHARD1_ITERATOR = "1"; private static final String SHARD0_ITERATOR = "0"; - - private static final Long SHARD0_LAG_MILLIS = 100L; - private static final Long SHARD1_LAG_MILLIS = 200L; - private static Map SHARDS_LAG_MILLIS = - ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS); - private static final List SHARD0_RECORDS = ImmutableList.of( - new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), - new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") - ); private static final List SHARD1_RECORDS = ImmutableList.of( new Record().withData(jb("2011", "d", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), new Record().withData(jb("2011", "e", "y", "10", "20.0", "1.0")).withSequenceNumber("1"), @@ -81,6 +71,10 @@ public class KinesisRecordSupplierTest extends EasyMockSupport new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"), new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9") ); + private static final List SHARD0_RECORDS = ImmutableList.of( + new Record().withData(jb("2008", "a", "y", "10", "20.0", "1.0")).withSequenceNumber("0"), + new Record().withData(jb("2009", "b", "y", "10", "20.0", "1.0")).withSequenceNumber("1") + ); private static final List ALL_RECORDS = ImmutableList.builder() .addAll(SHARD0_RECORDS.stream() .map(x -> new OrderedPartitionableRecord<>( @@ -110,7 +104,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport .toList())) .build(); - private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) { try { @@ -269,8 +262,6 @@ public void testPoll() throws InterruptedException EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -308,7 +299,6 @@ public void testPoll() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); } @Test @@ -345,8 +335,6 @@ public void testSeek() EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS.subList(2, SHARD1_RECORDS.size())).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -386,7 +374,7 @@ public void testSeek() Assert.assertEquals(9, polledRecords.size()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12))); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2))); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + } @@ -511,8 +499,6 @@ public void testPollAfterSeek() EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD1_RECORDS.subList(7, SHARD1_RECORDS.size())).once(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -548,9 +534,6 @@ public void testPollAfterSeek() firstRecord ); - // only one partition in this test. first results come from getRecordsResult1, which has SHARD1_LAG_MILLIS - Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); - recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7"); recordSupplier.start(); @@ -558,12 +541,9 @@ public void testPollAfterSeek() Thread.sleep(100); } - OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals(ALL_RECORDS.get(9), record2); - // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS - Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); verifyAll(); } @@ -601,8 +581,6 @@ public void testPollDeaggregate() throws InterruptedException EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); replayAll(); @@ -640,83 +618,6 @@ public void testPollDeaggregate() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); - } - - - @Test - public void getPartitionTimeLag() throws InterruptedException - { - EasyMock.expect(kinesis.getShardIterator( - EasyMock.anyObject(), - EasyMock.eq(SHARD_ID0), - EasyMock.anyString(), - EasyMock.anyString() - )).andReturn( - getShardIteratorResult0).anyTimes(); - - EasyMock.expect(kinesis.getShardIterator( - EasyMock.anyObject(), - EasyMock.eq(SHARD_ID1), - EasyMock.anyString(), - EasyMock.anyString() - )).andReturn( - getShardIteratorResult1).anyTimes(); - - EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); - EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) - .andReturn(getRecordsResult0) - .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) - .andReturn(getRecordsResult1) - .anyTimes(); - EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); - EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); - EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); - - replayAll(); - - Set> partitions = ImmutableSet.of( - StreamPartition.of(STREAM, SHARD_ID0), - StreamPartition.of(STREAM, SHARD_ID1) - ); - - - recordSupplier = new KinesisRecordSupplier( - kinesis, - recordsPerFetch, - 0, - 2, - true, - 100, - 5000, - 5000, - 60000, - 100 - ); - - recordSupplier.assign(partitions); - recordSupplier.seekToEarliest(partitions); - recordSupplier.start(); - - for (int i = 0; i < 10 && recordSupplier.bufferSize() < 12; i++) { - Thread.sleep(100); - } - - Map offsts = ImmutableMap.of( - SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(), - SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber() - ); - Map timeLag = recordSupplier.getPartitionTimeLag(offsts); - - verifyAll(); - - Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); } /** @@ -736,5 +637,4 @@ private static byte[] toByteArray(final ByteBuffer buffer) return retVal; } } - } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 94ca6bf693b0..ad267e8a1eaa 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -128,7 +128,7 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final StreamPartition SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0); private static final StreamPartition SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1); private static final StreamPartition SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2); - private static final Map TIME_LAG = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); + private static DataSchema dataSchema; private KinesisRecordSupplier supervisorRecordSupplier; @@ -198,7 +198,6 @@ public void setupTest() null, null, null, - null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -231,9 +230,6 @@ public void testNoInitialState() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -301,9 +297,6 @@ public void testMultiTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -363,9 +356,6 @@ public void testReplicas() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -443,9 +433,6 @@ public void testLateMessageRejectionPeriod() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -498,9 +485,6 @@ public void testEarlyMessageRejectionPeriod() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -560,9 +544,6 @@ public void testDatasourceMetadata() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -660,9 +641,6 @@ public void testDontKillTasksWithMismatchedType() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); // non KinesisIndexTask (don't kill) Task id2 = new RealtimeIndexTask( @@ -729,9 +707,6 @@ public void testKillBadPartitionAssignment() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -844,9 +819,7 @@ public void testRequeueTaskWhenFailed() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); + Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -958,9 +931,6 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); DateTime now = DateTimes.nowUtc(); DateTime maxi = now.plusMinutes(60); @@ -1098,9 +1068,6 @@ public void testQueueNextTasksOnSuccess() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1227,9 +1194,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); final Capture firstTasks = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1341,7 +1305,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - final Map timeLag = ImmutableMap.of(SHARD_ID1, 0L, SHARD_ID0, 20000000L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1360,9 +1323,6 @@ public void testDiscoverExistingPublishingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(timeLag) - .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -1434,7 +1394,7 @@ public void testDiscoverExistingPublishingTask() throws Exception supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1452,9 +1412,6 @@ public void testDiscoverExistingPublishingTask() throws Exception Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); - Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); - Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis()); - TaskReportData publishingReport = payload.getPublishingTasks().get(0); Assert.assertEquals("id1", publishingReport.getId()); @@ -1506,7 +1463,6 @@ public void testDiscoverExistingPublishingTask() throws Exception public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - final Map timeLag = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); supervisorRecordSupplier.assign(EasyMock.anyObject()); @@ -1524,9 +1480,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(timeLag) - .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -1587,7 +1540,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1604,9 +1557,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); - Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); - Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis()); - TaskReportData publishingReport = payload.getPublishingTasks().get(0); @@ -1661,7 +1611,6 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - final Map timeLag = ImmutableMap.of(SHARD_ID0, 100L, SHARD_ID1, 200L); supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); @@ -1680,9 +1629,6 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(timeLag) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", DATASOURCE, @@ -1787,7 +1733,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception supervisor.start(); supervisor.runInternal(); - supervisor.updateCurrentAndLatestOffsets(); + supervisor.updateCurrentAndLatestOffsets().run(); SupervisorReport report = supervisor.getStatus(); verifyAll(); @@ -1822,7 +1768,6 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception SHARD_ID0, "1" ), activeReport.getCurrentOffsets()); - Assert.assertEquals(timeLag, activeReport.getLagMillis()); Assert.assertEquals("id1", publishingReport.getId()); Assert.assertEquals(ImmutableMap.of( @@ -1858,9 +1803,6 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1943,9 +1885,6 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -2055,10 +1994,6 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); - Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -2206,9 +2141,6 @@ public void testStopGracefully() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2617,9 +2549,6 @@ public void testResetRunningTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2846,10 +2775,6 @@ public void testNoDataIngestionTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); @@ -3004,9 +2929,6 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3260,9 +3182,7 @@ public void testSuspendedRunningTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); + Task id1 = createKinesisIndexTask( "id1", @@ -3511,9 +3431,6 @@ public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExi EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task task = createKinesisIndexTask( "id2", @@ -3610,9 +3527,6 @@ public void testKillIncompatibleTasks() throws InterruptedException, EntryExists EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -3729,7 +3643,6 @@ public void testIsTaskCurrent() null, 42, // This property is different from tuningConfig null, - null, null ); @@ -3855,9 +3768,6 @@ private List testShardSplitPhaseOne() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3966,9 +3876,6 @@ private List testShardSplitPhaseTwo(List phaseOneTasks) throws Excep supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4144,9 +4051,6 @@ private void testShardSplitPhaseThree(List phaseTwoTasks) throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4310,9 +4214,6 @@ private List testShardMergePhaseOne() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -4432,9 +4333,6 @@ private List testShardMergePhaseTwo(List phaseOneTasks) throws Excep supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postMergeCaptured = Capture.newInstance(CaptureType.ALL); @@ -4589,9 +4487,6 @@ private void testShardMergePhaseThree(List phaseTwoTasks) throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4799,7 +4694,6 @@ public KinesisIndexTaskClient build( null, null, null, - null, null ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 682a560e8eef..3734085ddc64 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -77,8 +77,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; @@ -108,7 +106,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -131,10 +128,6 @@ public abstract class SeekableStreamSupervisor recordSupplier; + private volatile RecordSupplier recordSupplier; private volatile boolean started = false; private volatile boolean stopped = false; private volatile boolean lifecycleStarted = false; - private final ServiceEmitter emitter; public SeekableStreamSupervisor( final String supervisorId, @@ -510,7 +502,6 @@ public SeekableStreamSupervisor( this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.sortingMapper = mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); this.spec = spec; - this.emitter = spec.getEmitter(); this.rowIngestionMetersFactory = rowIngestionMetersFactory; this.useExclusiveStartingSequence = useExclusiveStartingSequence; this.dataSource = spec.getDataSchema().getDataSource(); @@ -848,8 +839,7 @@ private SupervisorReport { + try { + updateCurrentOffsets(); + updateLatestOffsetsFromStream(); + sequenceLastUpdated = DateTimes.nowUtc(); + } + catch (Exception e) { + log.warn(e, "Exception while getting current/latest sequences"); + } + }; } private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException @@ -3167,35 +3158,18 @@ protected abstract void updateLatestSequenceFromStream( Set> partitions ); - /** - * Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets. - */ - @Nullable - protected abstract Map getPartitionRecordLag(); - - /** - * Gets 'lag' of currently processed offset behind latest offset as a measure of the difference in time inserted. - */ - @Nullable - protected abstract Map getPartitionTimeLag(); - protected Map getHighestCurrentOffsets() { - if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) { - return activelyReadingTaskGroups - .values() - .stream() - .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) - .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) - .collect(Collectors.toMap( - Entry::getKey, - Entry::getValue, - (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 - )); - } else { - // if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist - return getOffsetsFromMetadataStorage(); - } + return activelyReadingTaskGroups + .values() + .stream() + .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) + .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) + .collect(Collectors.toMap( + Entry::getKey, + Entry::getValue, + (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 + )); } private OrderedSequenceNumber makeSequenceNumber(SequenceOffsetType seq) @@ -3378,42 +3352,18 @@ protected abstract OrderedSequenceNumber makeSequenceNumber( ); /** - * default implementation, schedules periodic fetch of latest offsets and {@link #emitLag} reporting for Kafka and Kinesis + * schedules periodic emitLag() reporting for Kafka, not yet implemented in Kinesis, + * but will be in the future */ - protected void scheduleReporting(ScheduledExecutorService reportingExec) - { - SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig(); - SeekableStreamSupervisorTuningConfig tuningConfig = spec.getTuningConfig(); - reportingExec.scheduleAtFixedRate( - this::updateCurrentAndLatestOffsets, - ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up - Math.max( - tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS - ), - TimeUnit.MILLISECONDS - ); - - reportingExec.scheduleAtFixedRate( - this::emitLag, - ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up - spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), - TimeUnit.MILLISECONDS - ); - } + protected abstract void scheduleReporting(ScheduledExecutorService reportingExec); /** - * calculate lag per partition for kafka as a measure of message count, kinesis implementation returns an empty + * calculate lag per partition for kafka, kinesis implementation returns an empty * map * * @return map of partition id -> lag */ - protected abstract Map getRecordLagPerPartition( - Map currentOffsets - ); - - protected abstract Map getTimeLagPerPartition( - Map currentOffsets - ); + protected abstract Map getLagPerPartition(Map currentOffsets); /** * returns an instance of a specific Kinesis/Kafka recordSupplier @@ -3447,61 +3397,6 @@ private boolean checkOffsetAvailability( && makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0; } - protected void emitLag() - { - if (spec.isSuspended()) { - // don't emit metrics if supervisor is suspended (lag should still available in status report) - return; - } - try { - Map partitionRecordLags = getPartitionRecordLag(); - Map partitionTimeLags = getPartitionTimeLag(); - - if (partitionRecordLags == null && partitionTimeLags == null) { - throw new ISE("Latest offsets have not been fetched"); - } - final String type = spec.getType(); - - BiConsumer, String> emitFn = (partitionLags, suffix) -> { - if (partitionLags == null) { - return; - } - - long maxLag = 0, totalLag = 0, avgLag; - for (long lag : partitionLags.values()) { - if (lag > maxLag) { - maxLag = lag; - } - totalLag += lag; - } - avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); - - emitter.emit( - ServiceMetricEvent.builder() - .setDimension("dataSource", dataSource) - .build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag) - ); - emitter.emit( - ServiceMetricEvent.builder() - .setDimension("dataSource", dataSource) - .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag) - ); - emitter.emit( - ServiceMetricEvent.builder() - .setDimension("dataSource", dataSource) - .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag) - ); - }; - - // this should probably really be /count or /records or something.. but keeping like this for backwards compat - emitFn.accept(partitionRecordLags, ""); - emitFn.accept(partitionTimeLags, "/time"); - } - catch (Exception e) { - log.warn(e, "Unable to compute lag"); - } - } - /** * a special sequence number that is used to indicate that the sequence offset * for a particular partition has not yet been calculated by the supervisor. When diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java index 7d69c0c8ee79..7436488cf116 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorReportPayload.java @@ -41,10 +41,8 @@ public abstract class SeekableStreamSupervisorReportPayload activeTasks; private final List publishingTasks; private final Map latestOffsets; - private final Map minimumLag; + private final Map minimumLag; private final Long aggregateLag; - private final Map minimumLagMillis; - private final Long aggregateLagMillis; private final DateTime offsetsLastUpdated; private final boolean suspended; private final boolean healthy; @@ -59,10 +57,8 @@ public SeekableStreamSupervisorReportPayload( int replicas, long durationSeconds, @Nullable Map latestOffsets, - @Nullable Map minimumLag, + @Nullable Map minimumLag, @Nullable Long aggregateLag, - @Nullable Map minimumLagMillis, - @Nullable Long aggregateLagMillis, @Nullable DateTime offsetsLastUpdated, boolean suspended, boolean healthy, @@ -81,8 +77,6 @@ public SeekableStreamSupervisorReportPayload( this.latestOffsets = latestOffsets; this.minimumLag = minimumLag; this.aggregateLag = aggregateLag; - this.minimumLagMillis = minimumLagMillis; - this.aggregateLagMillis = aggregateLagMillis; this.offsetsLastUpdated = offsetsLastUpdated; this.suspended = suspended; this.healthy = healthy; @@ -163,7 +157,7 @@ public Map getLatestOffsets() } @JsonProperty - public Map getMinimumLag() + public Map getMinimumLag() { return minimumLag; } @@ -174,19 +168,6 @@ public Long getAggregateLag() return aggregateLag; } - @JsonProperty - public Long getAggregateLagMillis() - { - return aggregateLagMillis; - } - - - @JsonProperty - public Map getMinimumLagMillis() - { - return minimumLagMillis; - } - @JsonProperty public DateTime getOffsetsLastUpdated() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java index 733f1258ccbf..381a5639644e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTuningConfig.java @@ -26,7 +26,7 @@ public interface SeekableStreamSupervisorTuningConfig { - String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; + int DEFAULT_CHAT_RETRIES = 8; String DEFAULT_HTTP_TIMEOUT = "PT10S"; String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S"; @@ -55,8 +55,5 @@ static Duration defaultDuration(final Period period, final String theDefault) @JsonProperty Duration getRepartitionTransitionDuration(); - @JsonProperty - Duration getOffsetFetchPeriod(); - SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java index 3edf7124fd29..0b18e58ee678 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/TaskReportData.java @@ -34,8 +34,7 @@ public class TaskReportData private final Long remainingSeconds; private final TaskType type; private final Map currentOffsets; - private final Map lag; - private final Map lagMillis; + private final Map lag; public TaskReportData( String id, @@ -44,8 +43,7 @@ public TaskReportData( @Nullable DateTime startTime, Long remainingSeconds, TaskType type, - @Nullable Map lag, - @Nullable Map lagMillis + @Nullable Map lag ) { this.id = id; @@ -55,7 +53,6 @@ public TaskReportData( this.remainingSeconds = remainingSeconds; this.type = type; this.lag = lag; - this.lagMillis = lagMillis; } @JsonProperty @@ -98,18 +95,11 @@ public TaskType getType() @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) - public Map getLag() + public Map getLag() { return lag; } - @JsonProperty - @JsonInclude(JsonInclude.Include.NON_NULL) - public Map getLagMillis() - { - return lagMillis; - } - @Override public String toString() { @@ -120,7 +110,6 @@ public String toString() ", startTime=" + startTime + ", remainingSeconds=" + remainingSeconds + (lag != null ? ", lag=" + lag : "") + - (lagMillis != null ? ", lagMillis=" + lagMillis : "") + '}'; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2e5d74af439e..57aaec30f062 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -61,16 +61,12 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.java.util.emitter.core.Event; -import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthorizerMapper; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -89,10 +85,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; public class SeekableStreamSupervisorStateTest extends EasyMockSupport { @@ -116,8 +110,6 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport private RowIngestionMetersFactory rowIngestionMetersFactory; private SupervisorStateManagerConfig supervisorConfig; - private TestEmitter emitter; - @Before public void setupTest() { @@ -135,14 +127,11 @@ public void setupTest() supervisorConfig = new SupervisorStateManagerConfig(); - emitter = new TestEmitter(); - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); - EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); EasyMock.expect(taskClientFactory.build( EasyMock.anyObject(), @@ -563,173 +552,6 @@ public void testStopping() throws Exception verifyAll(); } - - @Test - public void testEmitBothLag() throws Exception - { - expectEmitterSupervisor(false); - - CountDownLatch latch = new CountDownLatch(1); - TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( - latch, - ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), - ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) - ); - - - supervisor.start(); - - Assert.assertTrue(supervisor.stateManager.isHealthy()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); - Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); - Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); - - - latch.await(); - Assert.assertEquals(6, emitter.getEvents().size()); - Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric")); - Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value")); - Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric")); - Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value")); - Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric")); - Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value")); - Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(3).toMap().get("metric")); - Assert.assertEquals(45000L, emitter.getEvents().get(3).toMap().get("value")); - Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(4).toMap().get("metric")); - Assert.assertEquals(20000L, emitter.getEvents().get(4).toMap().get("value")); - Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(5).toMap().get("metric")); - Assert.assertEquals(15000L, emitter.getEvents().get(5).toMap().get("value")); - verifyAll(); - } - - @Test - public void testEmitRecordLag() throws Exception - { - expectEmitterSupervisor(false); - - CountDownLatch latch = new CountDownLatch(1); - TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( - latch, - ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), - null - ); - - - supervisor.start(); - - Assert.assertTrue(supervisor.stateManager.isHealthy()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); - Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); - Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); - - - latch.await(); - Assert.assertEquals(3, emitter.getEvents().size()); - Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric")); - Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value")); - Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric")); - Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value")); - Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric")); - Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value")); - verifyAll(); - } - - @Test - public void testEmitTimeLag() throws Exception - { - expectEmitterSupervisor(false); - - CountDownLatch latch = new CountDownLatch(1); - TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( - latch, - null, - ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) - ); - - - supervisor.start(); - - Assert.assertTrue(supervisor.stateManager.isHealthy()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); - Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); - Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); - - - latch.await(); - Assert.assertEquals(3, emitter.getEvents().size()); - Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(0).toMap().get("metric")); - Assert.assertEquals(45000L, emitter.getEvents().get(0).toMap().get("value")); - Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(1).toMap().get("metric")); - Assert.assertEquals(20000L, emitter.getEvents().get(1).toMap().get("value")); - Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(2).toMap().get("metric")); - Assert.assertEquals(15000L, emitter.getEvents().get(2).toMap().get("value")); - verifyAll(); - } - - @Test - public void testEmitNoLagWhenSuspended() throws Exception - { - expectEmitterSupervisor(true); - - CountDownLatch latch = new CountDownLatch(1); - TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( - latch, - ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), - ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) - ); - - - supervisor.start(); - supervisor.runInternal(); - - Assert.assertTrue(supervisor.stateManager.isHealthy()); - Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState()); - Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState()); - Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); - - - latch.await(); - Assert.assertEquals(0, emitter.getEvents().size()); - verifyAll(); - } - - private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException - { - spec = createMock(SeekableStreamSupervisorSpec.class); - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); - - EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); - EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( - "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()), - 1, - 1, - new Period("PT1H"), - new Period("PT1S"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, null - ) - { - }).anyTimes(); - EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); - EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); - EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()).anyTimes(); - EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes(); - EasyMock.expect(spec.getType()).andReturn("test").anyTimes(); - - EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); - EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); - - replayAll(); - } - private static DataSchema getDataSchema() { List dimensions = new ArrayList<>(); @@ -813,12 +635,6 @@ public Duration getRepartitionTransitionDuration() return new Period("PT2M").toStandardDuration(); } - @Override - public Duration getOffsetFetchPeriod() - { - return new Period("PT5M").toStandardDuration(); - } - @Override public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() { @@ -909,9 +725,9 @@ public String getType() } } - private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor + private class TestSeekableStreamSupervisor extends SeekableStreamSupervisor { - private BaseTestSeekableStreamSupervisor() + private TestSeekableStreamSupervisor() { super( "testSupervisorId", @@ -940,20 +756,6 @@ protected void updateLatestSequenceFromStream( // do nothing } - @Nullable - @Override - protected Map getPartitionRecordLag() - { - return null; - } - - @Nullable - @Override - protected Map getPartitionTimeLag() - { - return null; - } - @Override protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( int groupId, @@ -1048,13 +850,13 @@ public int compareTo(OrderedSequenceNumber o) } @Override - protected Map getRecordLagPerPartition(Map currentOffsets) + protected void scheduleReporting(ScheduledExecutorService reportingExec) { - return null; + // do nothing } @Override - protected Map getTimeLagPerPartition(Map currentOffsets) + protected Map getLagPerPartition(Map currentOffsets) { return null; } @@ -1062,7 +864,7 @@ protected Map getTimeLagPerPartition(Map currentOf @Override protected RecordSupplier setupRecordSupplier() { - return SeekableStreamSupervisorStateTest.this.recordSupplier; + return recordSupplier; } @Override @@ -1081,8 +883,6 @@ protected SeekableStreamSupervisorReportPayload createReportPayl null, null, null, - null, - null, false, true, null, @@ -1123,80 +923,4 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() return false; } } - - private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor - { - @Override - protected void scheduleReporting(ScheduledExecutorService reportingExec) - { - // do nothing - } - } - - private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor - { - private final CountDownLatch latch; - private final Map partitionsRecordLag; - private final Map partitionsTimeLag; - - TestEmittingTestSeekableStreamSupervisor( - CountDownLatch latch, - Map partitionsRecordLag, - Map partitionsTimeLag - ) - { - this.latch = latch; - this.partitionsRecordLag = partitionsRecordLag; - this.partitionsTimeLag = partitionsTimeLag; - } - - @Nullable - @Override - protected Map getPartitionRecordLag() - { - return partitionsRecordLag; - } - - @Nullable - @Override - protected Map getPartitionTimeLag() - { - return partitionsTimeLag; - } - - @Override - protected void emitLag() - { - super.emitLag(); - latch.countDown(); - } - - @Override - protected void scheduleReporting(ScheduledExecutorService reportingExec) - { - SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig(); - reportingExec.scheduleAtFixedRate( - this::emitLag, - ioConfig.getStartDelay().getMillis(), - spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), - TimeUnit.MILLISECONDS - ); - } - } - - private static class TestEmitter extends NoopServiceEmitter - { - private final List events = new ArrayList<>(); - - @Override - public void emit(Event event) - { - events.add(event); - } - - public List getEvents() - { - return events; - } - } }