diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java index f1b8e48005f54..57ae9be304714 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java @@ -37,6 +37,7 @@ public interface TaskAssignor extends Configurable { * ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task * ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client * INVALID_STANDBY_TASK: stateless task assigned as a standby task + * MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned */ @@ -45,6 +46,7 @@ enum AssignmentError { ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES, ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS, INVALID_STANDBY_TASK, + MISSING_PROCESS_ID, UNKNOWN_PROCESS_ID, UNKNOWN_TASK_ID } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index f2b8143f107ea..81745a9ee79a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -39,6 +39,9 @@ import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError; import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment; @@ -1540,6 +1543,72 @@ private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebal } } + private AssignmentError validateTaskAssignment(final ApplicationState applicationState, + final TaskAssignment taskAssignment) { + final Collection assignments = taskAssignment.assignment(); + final Set activeTasksInOutput = new HashSet<>(); + final Set standbyTasksInOutput = new HashSet<>(); + for (final KafkaStreamsAssignment assignment : assignments) { + final Set tasksForAssignment = new HashSet<>(); + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (activeTasksInOutput.contains(task.id()) && task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id()); + return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; + } + + if (tasksForAssignment.contains(task.id())) { + log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id()); + return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS; + } + + tasksForAssignment.add(task.id()); + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + activeTasksInOutput.add(task.id()); + } else { + standbyTasksInOutput.add(task.id()); + } + } + } + + for (final TaskInfo task : applicationState.allTasks()) { + if (!task.isStateful() && standbyTasksInOutput.contains(task.id())) { + log.error("Assignment is invalid: a standby task was found for a stateless task: {}", task.id()); + return AssignmentError.INVALID_STANDBY_TASK; + } + } + + final Map clientStates = applicationState.kafkaStreamsStates(false); + final Set clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId) + .collect(Collectors.toSet()); + for (final Map.Entry entry : clientStates.entrySet()) { + final ProcessId processIdInInput = entry.getKey(); + if (!clientsInOutput.contains(processIdInInput)) { + log.error("Assignment is invalid: one of the clients has no assignment: {}", processIdInInput.id()); + return AssignmentError.MISSING_PROCESS_ID; + } + } + + for (final ProcessId processIdInOutput : clientsInOutput) { + if (!clientStates.containsKey(processIdInOutput)) { + log.error("Assignment is invalid: one of the clients in the assignment is unknown: {}", processIdInOutput.id()); + return AssignmentError.UNKNOWN_PROCESS_ID; + } + } + + final Set taskIdsInInput = applicationState.allTasks().stream().map(TaskInfo::id) + .collect(Collectors.toSet()); + for (final KafkaStreamsAssignment assignment : assignments) { + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (!taskIdsInInput.contains(task.id())) { + log.error("Assignment is invalid: one of the tasks in the assignment is unknown: {}", task.id()); + return AssignmentError.UNKNOWN_TASK_ID; + } + } + } + + return AssignmentError.NONE; + } + /** * Verify that this client's host info was included in the map returned in the assignment, and trigger a * rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed