Skip to content

Commit

Permalink
KAFKA-6054: Add 'version probing' to Kafka Streams rebalance (apache#…
Browse files Browse the repository at this point in the history
…4636)

implements KIP-268

Reviewers: Bill Bejeck <[email protected]>, John Roesler <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored and ying-zheng committed Jul 6, 2018
1 parent 27c98a2 commit c116408
Show file tree
Hide file tree
Showing 16 changed files with 1,219 additions and 284 deletions.
108 changes: 91 additions & 17 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ abstract class AssignedTasks<T extends Task> {
private final Logger log;
private final String taskTypeName;
private final TaskAction<T> commitAction;
private Map<TaskId, T> created = new HashMap<>();
private Map<TaskId, T> suspended = new HashMap<>();
private Map<TaskId, T> restoring = new HashMap<>();
private Set<TopicPartition> restoredPartitions = new HashSet<>();
private Set<TaskId> previousActiveTasks = new HashSet<>();
private final Map<TaskId, T> created = new HashMap<>();
private final Map<TaskId, T> suspended = new HashMap<>();
private final Map<TaskId, T> restoring = new HashMap<>();
private final Set<TopicPartition> restoredPartitions = new HashSet<>();
private final Set<TaskId> previousActiveTasks = new HashSet<>();
// IQ may access this map.
Map<TaskId, T> running = new ConcurrentHashMap<>();
private Map<TopicPartition, T> runningByPartition = new HashMap<>();
Map<TopicPartition, T> restoringByPartition = new HashMap<>();
final Map<TaskId, T> running = new ConcurrentHashMap<>();
private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
final Map<TopicPartition, T> restoringByPartition = new HashMap<>();

AssignedTasks(final LogContext logContext,
final String taskTypeName) {
Expand Down Expand Up @@ -176,7 +176,7 @@ private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {

private RuntimeException suspendTasks(final Collection<T> tasks) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
for (final Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
final T task = it.next();
try {
task.suspend();
Expand Down Expand Up @@ -249,10 +249,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>

private void addToRestoring(final T task) {
restoring.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
for (final TopicPartition topicPartition : task.partitions()) {
restoringByPartition.put(topicPartition, task);
}
for (TopicPartition topicPartition : task.changelogPartitions()) {
for (final TopicPartition topicPartition : task.changelogPartitions()) {
restoringByPartition.put(topicPartition, task);
}
}
Expand All @@ -264,10 +264,10 @@ private void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
for (TopicPartition topicPartition : task.partitions()) {
for (final TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task);
}
for (TopicPartition topicPartition : task.changelogPartitions()) {
for (final TopicPartition topicPartition : task.changelogPartitions()) {
runningByPartition.put(topicPartition, task);
}
}
Expand Down Expand Up @@ -356,7 +356,7 @@ int commit() {
void applyToRunningTasks(final TaskAction<T> action) {
RuntimeException firstException = null;

for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
final T task = it.next();
try {
action.apply(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.singleton;

public class StreamThread extends Thread {

private final static int UNLIMITED_RECORDS = -1;
private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);

/**
* Stream thread states are the possible states that a stream thread can be in.
Expand Down Expand Up @@ -264,7 +265,9 @@ public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
return;
}
taskManager.createTasks(assignment);
if (!streamThread.versionProbingFlag.get()) {
taskManager.createTasks(assignment);
}
} catch (final Throwable t) {
log.error(
"Error caught during partition assignment, " +
Expand Down Expand Up @@ -298,7 +301,11 @@ public void onPartitionsRevoked(final Collection<TopicPartition> assignment) {
final long start = time.milliseconds();
try {
// suspend active tasks
taskManager.suspendTasksAndState();
if (streamThread.versionProbingFlag.get()) {
streamThread.versionProbingFlag.set(false);
} else {
taskManager.suspendTasksAndState();
}
} catch (final Throwable t) {
log.error(
"Error caught during partition revocation, " +
Expand Down Expand Up @@ -555,6 +562,7 @@ static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
private final String logPrefix;
private final TaskManager taskManager;
private final StreamsMetricsThreadImpl streamsMetrics;
private final AtomicBoolean versionProbingFlag;

private long lastCommitMs;
private long timerStartedMs;
Expand Down Expand Up @@ -647,6 +655,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId);
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
final AtomicBoolean versionProbingFlag = new AtomicBoolean();
consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, versionProbingFlag);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
Expand All @@ -666,7 +676,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
streamsMetrics,
builder,
threadClientId,
logContext);
logContext,
versionProbingFlag);
}

public StreamThread(final Time time,
Expand All @@ -679,7 +690,8 @@ public StreamThread(final Time time,
final StreamsMetricsThreadImpl streamsMetrics,
final InternalTopologyBuilder builder,
final String threadClientId,
final LogContext logContext) {
final LogContext logContext,
final AtomicBoolean versionProbingFlag) {
super(threadClientId);

this.stateLock = new Object();
Expand All @@ -696,6 +708,7 @@ public StreamThread(final Time time,
this.restoreConsumer = restoreConsumer;
this.consumer = consumer;
this.originalReset = originalReset;
this.versionProbingFlag = versionProbingFlag;

this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -750,19 +763,26 @@ private void runLoop() {
while (isRunning()) {
try {
recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
if (versionProbingFlag.get()) {
log.info("Version probing detected. Triggering new rebalance.");
enforceRebalance();
}
} catch (final TaskMigratedException ignoreAndRejoinGroup) {
log.warn("Detected task {} that got migrated to another thread. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));

// re-subscribe to enforce a rebalance in the next poll call
consumer.unsubscribe();
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
enforceRebalance();
}
}
}

private void enforceRebalance() {
consumer.unsubscribe();
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
}

/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException If the store's change log does not contain the partition
Expand Down
Loading

0 comments on commit c116408

Please sign in to comment.