From 11f310d0a0021d0f4d04a802f4995bb9929db420 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 6 Oct 2020 13:28:11 -0700 Subject: [PATCH 1/2] KAFKA-9274: fix incorrect default value for `task.timeout.ms` config - also add handler method to trigger/reset the timeout on a task --- .../apache/kafka/streams/StreamsConfig.java | 2 +- .../processor/internals/AbstractTask.java | 61 +++++++++++++++++-- .../processor/internals/StandbyTask.java | 2 +- .../processor/internals/StreamTask.java | 2 +- .../streams/processor/internals/Task.java | 9 ++- .../processor/internals/StandbyTaskTest.java | 31 ++++++++++ .../processor/internals/StreamTaskTest.java | 26 ++++++++ 7 files changed, 125 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index dc34f9a0c3d4..1fe85cd3d9c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -694,7 +694,7 @@ public class StreamsConfig extends AbstractConfig { CommonClientConfigs.SECURITY_PROTOCOL_DOC) .define(TASK_TIMEOUT_MS_CONFIG, Type.LONG, - Duration.ofSeconds(5L).toMillis(), + Duration.ofMinutes(5L).toMillis(), atLeast(0L), Importance.MEDIUM, TASK_TIMEOUT_MS_DOC) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index e933e09a07ca..edad1c0dd665 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -16,23 +16,29 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { + private final static long NO_DEADLINE = -1L; + private Task.State state = CREATED; + private long deadlineMs = NO_DEADLINE; protected Set inputPartitions; /** @@ -47,17 +53,20 @@ public abstract class AbstractTask implements Task { protected final ProcessorTopology topology; protected final StateDirectory stateDirectory; protected final ProcessorStateManager stateMgr; + private final long taskTimeoutMs; AbstractTask(final TaskId id, final ProcessorTopology topology, final StateDirectory stateDirectory, final ProcessorStateManager stateMgr, - final Set inputPartitions) { + final Set inputPartitions, + final long taskTimeoutMs) { this.id = id; this.stateMgr = stateMgr; this.topology = topology; this.inputPartitions = inputPartitions; this.stateDirectory = stateDirectory; + this.taskTimeoutMs = taskTimeoutMs; } /** @@ -137,4 +146,48 @@ public void update(final Set topicPartitions, final Map deadlineMs) { + final String errorMessage = String.format( + "Task %s did not make progress within %d ms. Adjust `%s` if needed.", + id, + currentWallClockMs - deadlineMs + taskTimeoutMs, + StreamsConfig.TASK_TIMEOUT_MS_CONFIG + ); + + if (timeoutException != null) { + throw new TimeoutException(errorMessage, timeoutException); + } else { + throw new TimeoutException(errorMessage); + } + } + + if (timeoutException != null) { + log.debug( + "Timeout exception. Remaining time to deadline {}; retrying.", + deadlineMs - currentWallClockMs, + timeoutException + ); + } else { + log.debug( + "Task did not make progress. Remaining time to deadline {}; retrying.", + deadlineMs - currentWallClockMs + ); + } + + } + + @Override + public void clearTaskTimeout(final Logger log) { + if (deadlineMs != NO_DEADLINE) { + log.debug("Clearing task timeout."); + deadlineMs = NO_DEADLINE; + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index b5b331b12951..9dcccb7246a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -64,7 +64,7 @@ public class StandbyTask extends AbstractTask implements Task { final StateDirectory stateDirectory, final ThreadCache cache, final InternalProcessorContext processorContext) { - super(id, topology, stateDirectory, stateMgr, partitions); + super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG)); this.processorContext = processorContext; this.streamsMetrics = streamsMetrics; processorContext.transitionToStandby(cache); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index e09f33c892a0..46f837dc73dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -118,7 +118,7 @@ public StreamTask(final TaskId id, final ProcessorStateManager stateMgr, final RecordCollector recordCollector, final InternalProcessorContext processorContext) { - super(id, topology, stateDirectory, stateMgr, partitions); + super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG)); this.mainConsumer = mainConsumer; this.processorContext = processorContext; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 1897397ada0f..3ee1d2891d1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -16,19 +16,21 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -205,4 +207,9 @@ default boolean maybePunctuateSystemTime() { return false; } + void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, + final TimeoutException timeoutException, + final Logger log) throws StreamsException; + + void clearTaskTimeout(final Logger log); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index a7dc96375eb7..0b9067c7f341 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -48,9 +49,11 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -557,6 +560,34 @@ public void shouldAlwaysSuspendRunningTasks() { assertThat(task.state(), equalTo(SUSPENDED)); } + @Test + public void shouldInitTaskTimeoutAndEventuallyThrow() { + EasyMock.replay(stateManager); + + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStandbyTask(); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log); + + assertThrows( + TimeoutException.class, + () -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log) + ); + } + + @Test + public void shouldCLearTaskTimeout() { + EasyMock.replay(stateManager); + + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStandbyTask(); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.clearTaskTimeout(log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log); + } + private StandbyTask createStandbyTask() { final ThreadCache cache = new ThreadCache( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 30e20b068da8..22d34944505f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -68,6 +69,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; import java.io.File; import java.io.IOException; @@ -2073,6 +2075,30 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { "are added in the same order.")); } + @Test + public void shouldInitTaskTimeoutAndEventuallyThrow() { + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log); + + assertThrows( + TimeoutException.class, + () -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log) + ); + } + + @Test + public void shouldCLearTaskTimeout() { + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.clearTaskTimeout(log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log); + } + private List getTaskMetrics() { return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList()); } From 57a5ae6b559ff38e73c1d324695c7d2f6f8d1133 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 6 Oct 2020 14:29:30 -0700 Subject: [PATCH 2/2] - Avoid passing loggers - add missing methods to `StateMachineTask` --- .../streams/processor/internals/AbstractTask.java | 10 ++++------ .../streams/processor/internals/StandbyTask.java | 12 ++++++++++++ .../streams/processor/internals/StreamTask.java | 11 +++++++++++ .../kafka/streams/processor/internals/Task.java | 6 ++---- .../streams/processor/internals/TaskManagerTest.java | 9 ++++++++- 5 files changed, 37 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index edad1c0dd665..ecf5ec192141 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -147,10 +147,9 @@ public void update(final Set topicPartitions, final Map deadlineMs) { @@ -183,8 +182,7 @@ public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, } - @Override - public void clearTaskTimeout(final Logger log) { + void clearTaskTimeout(final Logger log) { if (deadlineMs != NO_DEADLINE) { log.debug("Clearing task timeout."); deadlineMs = NO_DEADLINE; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 9dcccb7246a6..75d3dbb7b323 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; @@ -286,6 +287,17 @@ public void addRecords(final TopicPartition partition, final Iterable partitions, final boolean active, final ProcessorStateManager processorStateManager) { - super(id, null, null, processorStateManager, partitions); + super(id, null, null, processorStateManager, partitions, 0L); this.active = active; } @@ -2765,6 +2765,13 @@ public void resume() { } } + @Override + public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, + final TimeoutException timeoutException) throws StreamsException {}; + + @Override + public void clearTaskTimeout() {} + @Override public void closeClean() { transitionTo(State.CLOSED);