Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6145: Pt 1. Bump protocol version and encode task lag map #8121

Merged
merged 24 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,49 +217,17 @@ public List<RebalanceProtocol> supportedProtocols() {
public ByteBuffer subscriptionUserData(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
final Set<TaskId> standbyTasks = taskManager.tasksOnLocalStorage();
final Set<TaskId> activeTasks = prepareForSubscription(taskManager,
topics,
standbyTasks,
rebalanceProtocol);
// 2. Map from task id to its overall lag
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plus the tech debt cleanup allows for the subscription handling to be greatly simplified, here and below in #assign


handleRebalanceStart(topics);

return new SubscriptionInfo(
usedSubscriptionMetadataVersion,
LATEST_SUPPORTED_VERSION,
taskManager.processId(),
activeTasks,
standbyTasks,
userEndPoint)
.encode();
}

protected static Set<TaskId> prepareForSubscription(final TaskManager taskManager,
final Set<String> topics,
final Set<TaskId> standbyTasks,
final RebalanceProtocol rebalanceProtocol) {
// Any tasks that are not yet running are counted as standby tasks for assignment purposes,
// along with any old tasks for which we still found state on disk
final Set<TaskId> activeTasks;

switch (rebalanceProtocol) {
case EAGER:
// In eager, onPartitionsRevoked is called first and we must get the previously saved running task ids
activeTasks = taskManager.activeTaskIds();
standbyTasks.removeAll(activeTasks);
break;
case COOPERATIVE:
// In cooperative, we will use the encoded ownedPartitions to determine the running tasks
activeTasks = Collections.emptySet();
standbyTasks.removeAll(taskManager.activeTaskIds());
break;
default:
throw new IllegalStateException("Streams partition assignor's rebalance protocol is unknown");
}

taskManager.handleRebalanceStart(topics);

return activeTasks;
userEndPoint,
taskManager.getTaskOffsetSums())
.encode();
}

private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
Expand Down Expand Up @@ -314,7 +282,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
// keep track of any future consumers in a "dummy" Client since we can't decipher their subscription
final UUID futureId = randomUUID();
final ClientMetadata futureClient = new ClientMetadata(null);
clientMetadataMap.put(futureId, futureClient);
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved

int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
Expand All @@ -333,6 +300,9 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
if (usedVersion > LATEST_SUPPORTED_VERSION) {
futureMetadataVersion = usedVersion;
processId = futureId;
if (!clientMetadataMap.containsKey(futureId)) {
clientMetadataMap.put(futureId, futureClient);
}
} else {
processId = info.processId();
}
Expand All @@ -345,7 +315,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
clientMetadataMap.put(info.processId(), clientMetadata);
}

// add the consumer and any info its its subscription to the client
// add the consumer and any info in its subscription to the client
clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
allOwnedPartitions.addAll(subscription.ownedPartitions());
clientMetadata.addPreviousTasks(info);
Expand All @@ -354,7 +324,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
final boolean versionProbing;
if (futureMetadataVersion == UNKNOWN) {
versionProbing = false;
clientMetadataMap.remove(futureId);
} else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
versionProbing = true;
log.info("Received a future (version probing) subscription (version: {})."
Expand Down Expand Up @@ -589,12 +558,14 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

final Map<UUID, ClientState> states = new HashMap<>();
for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
final UUID uuid = entry.getKey();
final ClientState state = entry.getValue().state;
states.put(entry.getKey(), state);
states.put(uuid, state);

// Either the active tasks (eager) OR the owned partitions (cooperative) were encoded in the subscription
// according to the rebalancing protocol, so convert any partitions in a client to tasks where necessary
if (!state.ownedPartitions().isEmpty()) {
// there are two cases where we need to construct the prevTasks from the ownedPartitions:
// 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks and rely on ownedPartitions instead
// 2) future client during version probing, when we can't decode the future subscription info's prev tasks
if (!state.ownedPartitions().isEmpty() && (uuid == futureId || state.prevActiveTasks().isEmpty())) {
final Set<TaskId> previousActiveTasks = new HashSet<>();
for (final Map.Entry<TopicPartition, String> partitionEntry : state.ownedPartitions().entrySet()) {
final TopicPartition tp = partitionEntry.getKey();
Expand Down Expand Up @@ -1154,6 +1125,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
break;
case 6:
case 7:
validateActiveTaskEncoding(partitions, info, logPrefix);

activeTasks = getActiveTasks(partitions, info);
Expand Down Expand Up @@ -1293,7 +1265,12 @@ protected String userEndPoint() {
return userEndPoint;
}

protected TaskManager taskManger() {
protected TaskManager taskManager() {
return taskManager;
}

protected void handleRebalanceStart(final Set<String> topics) {
taskManager.handleRebalanceStart(topics);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Set;

public interface Task {

// this must be negative to distinguish a running active task from other kinds tasks which may be caught up to the same offsets
long LATEST_OFFSET = -2L;

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
import static org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;

public class TaskManager {
// initialize the task list
Expand Down Expand Up @@ -354,11 +355,27 @@ void handleLostAll() {
}
}

/**
* @return Map from task id to its total offset summed across all state stores
*/
public Map<TaskId, Long> getTaskOffsetSums() {
final Map<TaskId, Long> taskOffsetSums = new HashMap<>();

for (final TaskId id : tasksOnLocalStorage()) {
if (isRunning(id)) {
taskOffsetSums.put(id, Task.LATEST_OFFSET);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just a tiny bit uncomfortable with re-using that sentinel, because the correctness of our logic depends on the active sentinel being less than the standby sentinel, so it must be less than zero. Do we have a reason to believe that Task.LATEST_OFFSET would never change to a number that would spoil us here, such as zero?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually changed this based on working on the next PR, as Task#changelogOffsets uses this sentinel for exactly the same thing, ie an indicator that the task is running (and active). This is only used in computing the lag info for KIP-535, which has a similar desire to differentiate between a running task that is completely caught up and any other. So, I can't imagine this being changed -- but I can add a comment to the constant explaining it should always be negative (not sure why it's "-2" specifically, as opposed to "-1", do you?)

} else {
taskOffsetSums.put(id, 0L);
}
}
return taskOffsetSums;
}

/**
* Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously
* assigned but not yet cleaned up tasks
*/
public Set<TaskId> tasksOnLocalStorage() {
private Set<TaskId> tasksOnLocalStorage() {
// A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
// 1) the client is actively maintaining standby tasks by maintaining their states from the change log.
// 2) the client has just got some tasks migrated out of itself to other clients while these task states
Expand Down Expand Up @@ -472,6 +489,11 @@ private Stream<Task> standbyTaskStream() {
return tasks.values().stream().filter(t -> !t.isActive());
}

private boolean isRunning(final TaskId id) {
final Task task = tasks.get(id);
return task != null && task.isActive() && task.state() == RUNNING;
}

/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public ByteBuffer encode() {
out.writeInt(errCode);
break;
case 6:
case 7:
out.writeInt(usedVersion);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
Expand Down Expand Up @@ -327,6 +328,7 @@ public static AssignmentInfo decode(final ByteBuffer data) {
assignmentInfo.errCode = in.readInt();
break;
case 6:
case 7:
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeActiveTasks(assignmentInfo, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public final class StreamsAssignmentProtocolVersions {
public static final int UNKNOWN = -1;
public static final int EARLIEST_PROBEABLE_VERSION = 3;
public static final int LATEST_SUPPORTED_VERSION = 6;
public static final int LATEST_SUPPORTED_VERSION = 7;

private StreamsAssignmentProtocolVersions() {}
}
Loading