Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6054: Add 'version probing' to Kafka Streams rebalance #4636

Merged
merged 19 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 91 additions & 17 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ abstract class AssignedTasks<T extends Task> {
private final Logger log;
private final String taskTypeName;
private final TaskAction<T> commitAction;
private Map<TaskId, T> created = new HashMap<>();
private Map<TaskId, T> suspended = new HashMap<>();
private Map<TaskId, T> restoring = new HashMap<>();
private Set<TopicPartition> restoredPartitions = new HashSet<>();
private Set<TaskId> previousActiveTasks = new HashSet<>();
private final Map<TaskId, T> created = new HashMap<>();
private final Map<TaskId, T> suspended = new HashMap<>();
private final Map<TaskId, T> restoring = new HashMap<>();
private final Set<TopicPartition> restoredPartitions = new HashSet<>();
private final Set<TaskId> previousActiveTasks = new HashSet<>();
// IQ may access this map.
Map<TaskId, T> running = new ConcurrentHashMap<>();
private Map<TopicPartition, T> runningByPartition = new HashMap<>();
Map<TopicPartition, T> restoringByPartition = new HashMap<>();
final Map<TaskId, T> running = new ConcurrentHashMap<>();
private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
final Map<TopicPartition, T> restoringByPartition = new HashMap<>();

AssignedTasks(final LogContext logContext,
final String taskTypeName) {
Expand Down Expand Up @@ -176,7 +176,7 @@ private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {

private RuntimeException suspendTasks(final Collection<T> tasks) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
for (final Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
final T task = it.next();
try {
task.suspend();
Expand Down Expand Up @@ -249,10 +249,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>

private void addToRestoring(final T task) {
restoring.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
for (final TopicPartition topicPartition : task.partitions()) {
restoringByPartition.put(topicPartition, task);
}
for (TopicPartition topicPartition : task.changelogPartitions()) {
for (final TopicPartition topicPartition : task.changelogPartitions()) {
restoringByPartition.put(topicPartition, task);
}
}
Expand All @@ -264,10 +264,10 @@ private void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
for (TopicPartition topicPartition : task.partitions()) {
for (final TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task);
}
for (TopicPartition topicPartition : task.changelogPartitions()) {
for (final TopicPartition topicPartition : task.changelogPartitions()) {
runningByPartition.put(topicPartition, task);
}
}
Expand Down Expand Up @@ -356,7 +356,7 @@ int commit() {
void applyToRunningTasks(final TaskAction<T> action) {
RuntimeException firstException = null;

for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
final T task = it.next();
try {
action.apply(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.singleton;

public class StreamThread extends Thread {

private final static int UNLIMITED_RECORDS = -1;
private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);

/**
* Stream thread states are the possible states that a stream thread can be in.
Expand Down Expand Up @@ -264,7 +265,9 @@ public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
return;
}
taskManager.createTasks(assignment);
if (!streamThread.versionProbingFlag.get()) {
taskManager.createTasks(assignment);
}
} catch (final Throwable t) {
log.error(
"Error caught during partition assignment, " +
Expand Down Expand Up @@ -298,7 +301,11 @@ public void onPartitionsRevoked(final Collection<TopicPartition> assignment) {
final long start = time.milliseconds();
try {
// suspend active tasks
taskManager.suspendTasksAndState();
if (streamThread.versionProbingFlag.get()) {
streamThread.versionProbingFlag.set(false);
} else {
taskManager.suspendTasksAndState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something and brought this up before, but above in onPartitionsAssigned we create tasks with the assignment when not version probing. But in onPartitionsRevoked if we are version probing we flip the version probing flag, hence on assignment we create tasks. Why don't we flip the version probing flag in onPartitionedAssigned as an else statement on line 270 so we are only every suspending and creating tasks during non-version probing rebalances?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onPartitionsAssigned: if version probing flag is set, it means assignment is empty and we want to trigger a new rebalance. If we call taskManager.createTasks(assignment);, we would close suspended task and that is what we do not want to do at this point, because we hope to get those task assigned after the second rebalance.

onPartitionsRevoked: if version probing flag is set, we don't want to suspend tasks either. Tasks are already suspended but if we call taskManager.suspendTasksAndState(); again, we loose the information about currently suspended tasks (but we need to keep this information; ie, we avoid an incorrect internal metadata update here).

The flow is the following:

  • trigger first rebalance
  • onPartitionsRevoke -> version probing flag not set: suspend tasks regularly
  • onPartitionAssigned -> version probing flag set by StreamsPartitionsAssignor: we skip task creation as we will rebalance again (we cannot reset the flag here, because we need it in the next step)
  • trigger second rebalance
  • onPartitionsRevoke -> version probing flag is still set; we can reset the flag and skip suspending tasks to preserve metadata
  • onPartitionAssigned -> version probing flag not set: we do regular assignment and start processing

Does this make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep - thanks for the clarification

}
} catch (final Throwable t) {
log.error(
"Error caught during partition revocation, " +
Expand Down Expand Up @@ -555,6 +562,7 @@ static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
private final String logPrefix;
private final TaskManager taskManager;
private final StreamsMetricsThreadImpl streamsMetrics;
private final AtomicBoolean versionProbingFlag;

private long lastCommitMs;
private long timerStartedMs;
Expand Down Expand Up @@ -647,6 +655,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId);
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
final AtomicBoolean versionProbingFlag = new AtomicBoolean();
consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, versionProbingFlag);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
Expand All @@ -666,7 +676,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
streamsMetrics,
builder,
threadClientId,
logContext);
logContext,
versionProbingFlag);
}

public StreamThread(final Time time,
Expand All @@ -679,7 +690,8 @@ public StreamThread(final Time time,
final StreamsMetricsThreadImpl streamsMetrics,
final InternalTopologyBuilder builder,
final String threadClientId,
final LogContext logContext) {
final LogContext logContext,
final AtomicBoolean versionProbingFlag) {
super(threadClientId);

this.stateLock = new Object();
Expand All @@ -696,6 +708,7 @@ public StreamThread(final Time time,
this.restoreConsumer = restoreConsumer;
this.consumer = consumer;
this.originalReset = originalReset;
this.versionProbingFlag = versionProbingFlag;

this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -750,19 +763,26 @@ private void runLoop() {
while (isRunning()) {
try {
recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
if (versionProbingFlag.get()) {
log.info("Version probing detected. Triggering new rebalance.");
enforceRebalance();
}
} catch (final TaskMigratedException ignoreAndRejoinGroup) {
log.warn("Detected task {} that got migrated to another thread. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));

// re-subscribe to enforce a rebalance in the next poll call
consumer.unsubscribe();
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
enforceRebalance();
}
}
}

private void enforceRebalance() {
consumer.unsubscribe();
consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
}

/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException If the store's change log does not contain the partition
Expand Down
Loading