Skip to content

Commit

Permalink
Revert "add kinesis lag metric (apache#9509)" (#88)
Browse files Browse the repository at this point in the history
This reverts commit 142742f.
  • Loading branch information
jihoonson authored May 5, 2020
1 parent ddefeec commit 05fbcab
Show file tree
Hide file tree
Showing 17 changed files with 159 additions and 854 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,23 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand All @@ -82,6 +85,9 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
};

private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
private static final Long NOT_SET = -1L;
private static final Long END_OF_PARTITION = Long.MAX_VALUE;

Expand Down Expand Up @@ -126,6 +132,28 @@ protected RecordSupplier<Integer, Long> setupRecordSupplier()
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper);
}

@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
KafkaSupervisorTuningConfig tuningConfig = spec.getTuningConfig();
reportingExec.scheduleAtFixedRate(
updateCurrentAndLatestOffsets(),
ioConfig.getStartDelay().getMillis() + INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(), MINIMUM_GET_OFFSET_PERIOD_MILLIS
),
TimeUnit.MILLISECONDS
);

reportingExec.scheduleAtFixedRate(
emitLag(),
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
monitorSchedulerConfig.getEmitterPeriod().getMillis(),
TimeUnit.MILLISECONDS
);
}

@Override
protected int getTaskGroupIdForPartition(Integer partitionId)
{
Expand All @@ -151,7 +179,7 @@ protected SeekableStreamSupervisorReportPayload<Integer, Long> createReportPaylo
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<Integer, Long> partitionLag = getRecordLagPerPartition(getHighestCurrentOffsets());
Map<Integer, Long> partitionLag = getLagPerPartition(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getTopic(),
Expand Down Expand Up @@ -239,38 +267,11 @@ protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(
return taskList;
}

@Override
protected Map<Integer, Long> getPartitionRecordLag()
{
Map<Integer, Long> highestCurrentOffsets = getHighestCurrentOffsets();

if (latestSequenceFromStream == null) {
return null;
}

if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
"Lag metric: Kafka partitions %s do not match task partitions %s",
latestSequenceFromStream.keySet(),
highestCurrentOffsets.keySet()
);
}

return getRecordLagPerPartition(highestCurrentOffsets);
}

@Nullable
@Override
protected Map<Integer, Long> getPartitionTimeLag()
{
// time lag not currently support with kafka
return null;
}

@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
{
return currentOffsets
.entrySet()
Expand All @@ -287,12 +288,6 @@ protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> current
);
}

@Override
protected Map<Integer, Long> getTimeLagPerPartition(Map<Integer, Long> currentOffsets)
{
return null;
}

@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<Integer, Long> map)
{
Expand All @@ -305,6 +300,51 @@ protected OrderedSequenceNumber<Long> makeSequenceNumber(Long seq, boolean isExc
return KafkaSequenceNumber.of(seq);
}

private Runnable emitLag()
{
return () -> {
try {
Map<Integer, Long> highestCurrentOffsets = getHighestCurrentOffsets();
String dataSource = spec.getDataSchema().getDataSource();

if (latestSequenceFromStream == null) {
throw new ISE("Latest offsets from Kafka have not been fetched");
}

if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
log.warn(
"Lag metric: Kafka partitions %s do not match task partitions %s",
latestSequenceFromStream.keySet(),
highestCurrentOffsets.keySet()
);
}

Map<Integer, Long> partitionLags = getLagPerPartition(highestCurrentOffsets);
long maxLag = 0, totalLag = 0, avgLag;
for (long lag : partitionLags.values()) {
if (lag > maxLag) {
maxLag = lag;
}
totalLag += lag;
}
avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();

emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/lag", totalLag)
);
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/maxLag", maxLag)
);
emitter.emit(
ServiceMetricEvent.builder().setDimension("dataSource", dataSource).build("ingest/kafka/avgLag", avgLag)
);
}
catch (Exception e) {
log.warn(e, "Unable to compute Kafka lag");
}
};
}

@Override
protected Long getNotSetMarker()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public KafkaSupervisorReportPayload(
latestOffsets,
minimumLag,
aggregateLag,
null,
null,
offsetsLastUpdated,
suspended,
healthy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
implements SeekableStreamSupervisorTuningConfig
{
private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S";

private final Integer workerThreads;
private final Integer chatThreads;
private final Long chatRetries;
Expand Down Expand Up @@ -179,7 +181,6 @@ public Duration getRepartitionTransitionDuration()
);
}

@Override
@JsonProperty
public Duration getOffsetFetchPeriod()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ public void testDiscoverExistingPublishingTask() throws Exception

supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets();
supervisor.updateCurrentAndLatestOffsets().run();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();

Expand Down Expand Up @@ -1481,7 +1481,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation()

supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets();
supervisor.updateCurrentAndLatestOffsets().run();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();

Expand Down Expand Up @@ -1625,7 +1625,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception

supervisor.start();
supervisor.runInternal();
supervisor.updateCurrentAndLatestOffsets();
supervisor.updateCurrentAndLatestOffsets().run();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
Expand Down Expand Up @@ -115,9 +113,9 @@ private class PartitionResource
private volatile boolean started;
private volatile boolean stopRequested;

private volatile long currentLagMillis;

PartitionResource(StreamPartition<String> streamPartition)
PartitionResource(
StreamPartition<String> streamPartition
)
{
this.streamPartition = streamPartition;
}
Expand Down Expand Up @@ -150,53 +148,6 @@ void stopBackgroundFetch()
stopRequested = true;
}

long getPartitionTimeLag()
{
return currentLagMillis;
}

long getPartitionTimeLag(String offset)
{
// if not started (fetching records in background), fetch lag ourself with a throw-away iterator
if (!started) {
try {
final String iteratorType;
final String offsetToUse;
if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) {
// this should probably check if will start processing earliest or latest rather than assuming earliest
// if latest we could skip this because latest will not be behind latest so lag is 0.
iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
offsetToUse = null;
} else {
iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
offsetToUse = offset;
}
String shardIterator = kinesis.getShardIterator(
streamPartition.getStream(),
streamPartition.getPartitionId(),
iteratorType,
offsetToUse
).getShardIterator();

GetRecordsResult recordsResult = kinesis.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch)
);

currentLagMillis = recordsResult.getMillisBehindLatest();
return currentLagMillis;
}
catch (Exception ex) {
// eat it
log.warn(
ex,
"Failed to determine partition lag for partition %s of stream %s",
streamPartition.getPartitionId(),
streamPartition.getStream()
);
}
}
return currentLagMillis;
}

private Runnable getRecordRunnable()
{
Expand Down Expand Up @@ -240,14 +191,11 @@ private Runnable getRecordRunnable()
recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator(
shardIterator).withLimit(recordsPerFetch));

currentLagMillis = recordsResult.getMillisBehindLatest();

// list will come back empty if there are no records
for (Record kinesisRecord : recordsResult.getRecords()) {

final List<byte[]> data;


if (deaggregate) {
if (deaggregateHandle == null || getDataHandle == null) {
throw new ISE("deaggregateHandle or getDataHandle is null!");
Expand Down Expand Up @@ -689,27 +637,6 @@ public void close()
this.closed = true;
}

// this is only used for tests
@VisibleForTesting
Map<String, Long> getPartitionTimeLag()
{
return partitionResources.entrySet()
.stream()
.collect(
Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag())
);
}

public Map<String, Long> getPartitionTimeLag(Map<String, String> currentOffsets)
{
Map<String, Long> partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size());
for (Map.Entry<StreamPartition<String>, PartitionResource> partition : partitionResources.entrySet()) {
final String partitionId = partition.getKey().getPartitionId();
partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId)));
}
return partitionLag;
}

private void seekInternal(StreamPartition<String> partition, String sequenceNumber, ShardIteratorType iteratorEnum)
{
PartitionResource resource = partitionResources.get(partition);
Expand Down
Loading

0 comments on commit 05fbcab

Please sign in to comment.