Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9274: fix incorrect default value for task.timeout.ms config #9385

Merged
merged 2 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
Duration.ofSeconds(5L).toMillis(),
Duration.ofMinutes(5L).toMillis(),
atLeast(0L),
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;

public abstract class AbstractTask implements Task {
private final static long NO_DEADLINE = -1L;

private Task.State state = CREATED;
private long deadlineMs = NO_DEADLINE;
protected Set<TopicPartition> inputPartitions;

/**
Expand All @@ -47,17 +53,20 @@ public abstract class AbstractTask implements Task {
protected final ProcessorTopology topology;
protected final StateDirectory stateDirectory;
protected final ProcessorStateManager stateMgr;
private final long taskTimeoutMs;

AbstractTask(final TaskId id,
final ProcessorTopology topology,
final StateDirectory stateDirectory,
final ProcessorStateManager stateMgr,
final Set<TopicPartition> inputPartitions) {
final Set<TopicPartition> inputPartitions,
final long taskTimeoutMs) {
this.id = id;
this.stateMgr = stateMgr;
this.topology = topology;
this.inputPartitions = inputPartitions;
this.stateDirectory = stateDirectory;
this.taskTimeoutMs = taskTimeoutMs;
}

/**
Expand Down Expand Up @@ -137,4 +146,46 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
this.inputPartitions = topicPartitions;
topology.updateSourceTopics(nodeToSourceTopics);
}

void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException,
final Logger log) throws StreamsException {
if (deadlineMs == NO_DEADLINE) {
deadlineMs = currentWallClockMs + taskTimeoutMs;
} else if (currentWallClockMs > deadlineMs) {
final String errorMessage = String.format(
"Task %s did not make progress within %d ms. Adjust `%s` if needed.",
id,
currentWallClockMs - deadlineMs + taskTimeoutMs,
StreamsConfig.TASK_TIMEOUT_MS_CONFIG
);

if (timeoutException != null) {
throw new TimeoutException(errorMessage, timeoutException);
} else {
throw new TimeoutException(errorMessage);
}
}

if (timeoutException != null) {
log.debug(
"Timeout exception. Remaining time to deadline {}; retrying.",
deadlineMs - currentWallClockMs,
timeoutException
);
} else {
log.debug(
"Task did not make progress. Remaining time to deadline {}; retrying.",
deadlineMs - currentWallClockMs
);
}

}

void clearTaskTimeout(final Logger log) {
if (deadlineMs != NO_DEADLINE) {
log.debug("Clearing task timeout.");
deadlineMs = NO_DEADLINE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class StandbyTask extends AbstractTask implements Task {
final StateDirectory stateDirectory,
final ThreadCache cache,
final InternalProcessorContext processorContext) {
super(id, topology, stateDirectory, stateMgr, partitions);
super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG));
this.processorContext = processorContext;
this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache);
Expand Down Expand Up @@ -286,6 +287,17 @@ public void addRecords(final TopicPartition partition, final Iterable<ConsumerRe
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
}

@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException {
maybeInitTaskTimeoutOrThrow(currentWallClockMs, timeoutException, log);
}

@Override
public void clearTaskTimeout() {
clearTaskTimeout(log);
}

InternalProcessorContext processorContext() {
return processorContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public StreamTask(final TaskId id,
final ProcessorStateManager stateMgr,
final RecordCollector recordCollector,
final InternalProcessorContext processorContext) {
super(id, topology, stateDirectory, stateMgr, partitions);
super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG));
this.mainConsumer = mainConsumer;

this.processorContext = processorContext;
Expand Down Expand Up @@ -992,6 +992,17 @@ public boolean commitRequested() {
return commitRequested;
}

@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException {
maybeInitTaskTimeoutOrThrow(currentWallClockMs, timeoutException, log);
}

@Override
public void clearTaskTimeout() {
clearTaskTimeout(log);
}

static String encodeTimestamp(final long partitionTime) {
final ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put(LATEST_MAGIC_BYTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
Expand All @@ -29,6 +29,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -205,4 +206,8 @@ default boolean maybePunctuateSystemTime() {
return false;
}

void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException;

void clearTaskTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -48,9 +49,11 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -557,6 +560,34 @@ public void shouldAlwaysSuspendRunningTasks() {
assertThat(task.state(), equalTo(SUSPENDED));
}

@Test
public void shouldInitTaskTimeoutAndEventuallyThrow() {
EasyMock.replay(stateManager);

final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStandbyTask();

task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log);

assertThrows(
TimeoutException.class,
() -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log)
);
}

@Test
public void shouldCLearTaskTimeout() {
EasyMock.replay(stateManager);

final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStandbyTask();

task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.clearTaskTimeout(log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log);
}

private StandbyTask createStandbyTask() {

final ThreadCache cache = new ThreadCache(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -2073,6 +2075,30 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
"are added in the same order."));
}

@Test
public void shouldInitTaskTimeoutAndEventuallyThrow() {
final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);

task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log);

assertThrows(
TimeoutException.class,
() -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log)
);
}

@Test
public void shouldCLearTaskTimeout() {
final Logger log = new LogContext().logger(StreamTaskTest.class);
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);

task.maybeInitTaskTimeoutOrThrow(0L, null, log);
task.clearTaskTimeout(log);
task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log);
}

private List<MetricName> getTaskMetrics() {
return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2692,7 +2692,7 @@ private static class StateMachineTask extends AbstractTask implements Task {
final Set<TopicPartition> partitions,
final boolean active,
final ProcessorStateManager processorStateManager) {
super(id, null, null, processorStateManager, partitions);
super(id, null, null, processorStateManager, partitions, 0L);
this.active = active;
}

Expand Down Expand Up @@ -2765,6 +2765,13 @@ public void resume() {
}
}

@Override
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final TimeoutException timeoutException) throws StreamsException {};

@Override
public void clearTaskTimeout() {}

@Override
public void closeClean() {
transitionTo(State.CLOSED);
Expand Down