Skip to content

Commit

Permalink
KAFKA-15344: Streams task should cache consumer nextOffsets (#17091)
Browse files Browse the repository at this point in the history
This PR augments Streams messages with leader epoch. In case of empty buffer queues, the last offset and leader epoch are retrieved from the streams task 's cache of nextOffsets.

Co-authored-by: Lucas Brutschy <[email protected]>
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
aliehsaeedii and lucasbru authored Oct 29, 2024
1 parent 8c071b0 commit 4817eb9
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -56,6 +57,8 @@ abstract class AbstractPartitionGroup {

abstract Long headRecordOffset(final TopicPartition partition);

abstract Optional<Integer> headRecordLeaderEpoch(final TopicPartition partition);

abstract int numBuffered();

abstract int numBuffered(TopicPartition tp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
Expand Down Expand Up @@ -322,6 +323,18 @@ Long headRecordOffset(final TopicPartition partition) {
return recordQueue.headRecordOffset();
}

@Override
Optional<Integer> headRecordLeaderEpoch(final TopicPartition partition) {
final RecordQueue recordQueue = partitionQueues.get(partition);

if (recordQueue == null) {
throw new IllegalStateException("Partition " + partition + " not found.");
}

return recordQueue.headRecordLeaderEpoch();
}


/**
* @throws IllegalStateException if the record's partition does not belong to this partition group
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public void updateNextOffsets(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
throw new UnsupportedOperationException("This task is read-only");
}

@Override
public boolean process(final long wallClockTime) {
throw new UnsupportedOperationException("This task is read-only");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.Logger;

import java.util.Objects;
import java.util.Optional;

import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;

Expand Down Expand Up @@ -69,7 +68,7 @@ ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processo
sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()),
rawRecord.headers(),
Optional.empty()
rawRecord.leaderEpoch()
);
} catch (final RuntimeException deserializationException) {
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.Logger;

import java.util.ArrayDeque;
import java.util.Optional;

import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerRecordSizeInBytes;

Expand Down Expand Up @@ -181,6 +182,17 @@ public Long headRecordOffset() {
return headRecord == null ? null : headRecord.offset();
}

/**
* Returns the leader epoch of the head record if it exists
*
* @return An Optional containing the leader epoch of the head record, or null if the queue is empty. The Optional.empty()
* is reserved for the case when the leader epoch is not set for head record of the queue.
*/
@SuppressWarnings("OptionalAssignedToNull")
public Optional<Integer> headRecordLeaderEpoch() {
return headRecord == null ? null : headRecord.leaderEpoch();
}

/**
* Clear the fifo queue of its elements
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;

import java.util.Optional;

public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {

public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
Expand All @@ -45,6 +47,10 @@ public long offset() {
return value.offset();
}

public Optional<Integer> leaderEpoch() {
return value.leaderEpoch();
}

public Headers headers() {
return value.headers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final RecordCollector recordCollector;
private final AbstractPartitionGroup.RecordInfo recordInfo;
private final Map<TopicPartition, Long> consumedOffsets;
private final Map<TopicPartition, OffsetAndMetadata> nextOffsetsAndMetadataToBeConsumed = new HashMap<>();
private final Map<TopicPartition, Long> committedOffsets;
private final Map<TopicPartition, Long> highWatermark;
private final Set<TopicPartition> resetOffsetsForPartitions;
Expand Down Expand Up @@ -462,23 +463,27 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
}
}

private Long findOffset(final TopicPartition partition) {
private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition partition) {
Long offset = partitionGroup.headRecordOffset(partition);
Optional<Integer> leaderEpoch = partitionGroup.headRecordLeaderEpoch(partition);
final long partitionTime = partitionGroup.partitionTimestamp(partition);
if (offset == null) {
try {
offset = mainConsumer.position(partition);
} catch (final TimeoutException error) {
// the `consumer.position()` call should never block, because we know that we did process data
// for the requested partition and thus the consumer should have a valid local position
// that it can return immediately

// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
throw new IllegalStateException(error);
if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) {
final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition);
offset = offsetAndMetadata.offset();
leaderEpoch = offsetAndMetadata.leaderEpoch();
} else {
// This indicates a bug and thus we rethrow it as fatal `IllegalStateException`
throw new IllegalStateException("Stream task " + id + " does not know the partition: " + partition);
}
} catch (final KafkaException fatal) {
throw new StreamsException(fatal);
}
}
return offset;
return new OffsetAndMetadata(offset,
leaderEpoch,
new TopicPartitionMetadata(partitionTime, processorContext.processorMetadata()).encode());
}

private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
Expand All @@ -493,7 +498,6 @@ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {

case RUNNING:
case SUSPENDED:
final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();

// If there's processor metadata to be committed. We need to commit them to all
// input partitions
Expand All @@ -502,10 +506,7 @@ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
committableOffsets = new HashMap<>(partitionsNeedCommit.size());

for (final TopicPartition partition : partitionsNeedCommit) {
final Long offset = findOffset(partition);
final long partitionTime = partitionTimes.get(partition);
committableOffsets.put(partition, new OffsetAndMetadata(offset,
new TopicPartitionMetadata(partitionTime, processorContext.processorMetadata()).encode()));
committableOffsets.put(partition, findOffsetAndMetadata(partition));
}
break;

Expand Down Expand Up @@ -561,13 +562,6 @@ private void clearCommitStatuses() {
processorContext.processorMetadata().setNeedsCommit(false);
}

private Map<TopicPartition, Long> extractPartitionTimes() {
final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
for (final TopicPartition partition : partitionGroup.partitions()) {
partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
}
return partitionTimes;
}

@Override
public void closeClean() {
Expand Down Expand Up @@ -1125,6 +1119,10 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
}
}

public void updateNextOffsets(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
nextOffsetsAndMetadataToBeConsumed.put(partition, offsetAndMetadata);
}

/**
* Schedules a punctuation for the processor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,9 @@ private long pollPhase() {
pollRecordsSensor.record(numRecords, now);
taskManager.addRecordsToTasks(records);
}
if (!records.nextOffsets().isEmpty()) {
taskManager.updateNextOffsets(records.nextOffsets());
}

while (!nonFatalExceptionsToHandle.isEmpty()) {
streamsUncaughtExceptionHandler.accept(nonFatalExceptionsToHandle.poll(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -70,6 +71,11 @@ synchronized Long headRecordOffset(final TopicPartition partition) {
return wrapped.headRecordOffset(partition);
}

@Override
Optional<Integer> headRecordLeaderEpoch(final TopicPartition partition) {
return Optional.empty();
}

@Override
synchronized int numBuffered() {
return wrapped.numBuffered();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsFor

void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);

default void updateNextOffsets(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
}

default boolean process(final long wallClockTime) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1805,16 +1805,32 @@ public void signalTaskExecutors() {
*/
void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
for (final TopicPartition partition : records.partitions()) {
final Task activeTask = tasks.activeTasksForInputPartition(partition);
final Task activeTask = getActiveTask(partition);
activeTask.addRecords(partition, records.records(partition));
}
}

if (activeTask == null) {
log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
partition, toString(">"));
throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
}
/**
* Update the next offsets for each task
*
* @param nextOffsets A map of offsets keyed by partition
*/
void updateNextOffsets(final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : nextOffsets.entrySet()) {
final Task activeTask = getActiveTask(entry.getKey());
activeTask.updateNextOffsets(entry.getKey(), entry.getValue());
}
}

activeTask.addRecords(partition, records.records(partition));
private Task getActiveTask(final TopicPartition partition) {
final Task activeTask = tasks.activeTasksForInputPartition(partition);

if (activeTask == null) {
log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
partition, toString(">"));
throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
}
return activeTask;
}

private void maybeLockTasks(final Set<TaskId> ids) {
Expand Down
Loading

0 comments on commit 4817eb9

Please sign in to comment.