Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6145: Pt 1. Bump protocol version and encode task lag map #8121

Merged
merged 24 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,49 +217,17 @@ public List<RebalanceProtocol> supportedProtocols() {
public ByteBuffer subscriptionUserData(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
final Set<TaskId> standbyTasks = taskManager.tasksOnLocalStorage();
final Set<TaskId> activeTasks = prepareForSubscription(taskManager,
topics,
standbyTasks,
rebalanceProtocol);
// 2. Map from task id to its overall lag
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plus the tech debt cleanup allows for the subscription handling to be greatly simplified, here and below in #assign


taskManager.handleRebalanceStart(topics);

return new SubscriptionInfo(
usedSubscriptionMetadataVersion,
LATEST_SUPPORTED_VERSION,
taskManager.processId(),
activeTasks,
standbyTasks,
userEndPoint)
.encode();
}

protected static Set<TaskId> prepareForSubscription(final TaskManager taskManager,
final Set<String> topics,
final Set<TaskId> standbyTasks,
final RebalanceProtocol rebalanceProtocol) {
// Any tasks that are not yet running are counted as standby tasks for assignment purposes,
// along with any old tasks for which we still found state on disk
final Set<TaskId> activeTasks;

switch (rebalanceProtocol) {
case EAGER:
// In eager, onPartitionsRevoked is called first and we must get the previously saved running task ids
activeTasks = taskManager.activeTaskIds();
standbyTasks.removeAll(activeTasks);
break;
case COOPERATIVE:
// In cooperative, we will use the encoded ownedPartitions to determine the running tasks
activeTasks = Collections.emptySet();
standbyTasks.removeAll(taskManager.activeTaskIds());
break;
default:
throw new IllegalStateException("Streams partition assignor's rebalance protocol is unknown");
}

taskManager.handleRebalanceStart(topics);

return activeTasks;
userEndPoint,
taskManager.getTaskOffsetSums())
.encode();
}

private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
Expand Down Expand Up @@ -314,7 +282,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
// keep track of any future consumers in a "dummy" Client since we can't decipher their subscription
final UUID futureId = randomUUID();
final ClientMetadata futureClient = new ClientMetadata(null);
clientMetadataMap.put(futureId, futureClient);
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved

int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
Expand All @@ -333,6 +300,9 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
if (usedVersion > LATEST_SUPPORTED_VERSION) {
futureMetadataVersion = usedVersion;
processId = futureId;
if (!clientMetadataMap.containsKey(futureId)) {
clientMetadataMap.put(futureId, futureClient);
}
} else {
processId = info.processId();
}
Expand All @@ -345,7 +315,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
clientMetadataMap.put(info.processId(), clientMetadata);
}

// add the consumer and any info its its subscription to the client
// add the consumer and any info in its subscription to the client
clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
allOwnedPartitions.addAll(subscription.ownedPartitions());
clientMetadata.addPreviousTasks(info);
Expand All @@ -354,7 +324,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
final boolean versionProbing;
if (futureMetadataVersion == UNKNOWN) {
versionProbing = false;
clientMetadataMap.remove(futureId);
} else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
versionProbing = true;
log.info("Received a future (version probing) subscription (version: {})."
Expand Down Expand Up @@ -589,12 +558,14 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

final Map<UUID, ClientState> states = new HashMap<>();
for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
final UUID uuid = entry.getKey();
final ClientState state = entry.getValue().state;
states.put(entry.getKey(), state);
states.put(uuid, state);

// Either the active tasks (eager) OR the owned partitions (cooperative) were encoded in the subscription
// according to the rebalancing protocol, so convert any partitions in a client to tasks where necessary
if (!state.ownedPartitions().isEmpty()) {
// there are two cases where we need to construct the prevTasks from the ownedPartitions:
// 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks and rely on ownedPartitions instead
// 2) future clientduring version probing: we can't decode the future subscription info's prev tasks
if (!state.ownedPartitions().isEmpty() && (uuid == futureId || state.prevActiveTasks().isEmpty())) {
final Set<TaskId> previousActiveTasks = new HashSet<>();
for (final Map.Entry<TopicPartition, String> partitionEntry : state.ownedPartitions().entrySet()) {
final TopicPartition tp = partitionEntry.getKey();
Expand Down Expand Up @@ -1154,6 +1125,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
break;
case 6:
case 7:
validateActiveTaskEncoding(partitions, info, logPrefix);

activeTasks = getActiveTasks(partitions, info);
Expand Down Expand Up @@ -1293,7 +1265,8 @@ protected String userEndPoint() {
return userEndPoint;
}

protected TaskManager taskManger() {
protected TaskManager taskManager() {
return taskManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
import static org.apache.kafka.streams.processor.internals.Task.State.RESTORING;
import static org.apache.kafka.streams.processor.internals.Task.State.RUNNING;

public class TaskManager {
// initialize the task list
Expand Down Expand Up @@ -354,11 +355,27 @@ void handleLostAll() {
}
}

/**
* @return Map from task id to its total offset summed across all state stores
*/
public Map<TaskId, Long> getTaskOffsetSums() {
final Map<TaskId, Long> taskOffsetSums = new HashMap<>();

for (final TaskId id : tasksOnLocalStorage()) {
if (isRunning(id)) {
taskOffsetSums.put(id, Task.LATEST_OFFSET);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just a tiny bit uncomfortable with re-using that sentinel, because the correctness of our logic depends on the active sentinel being less than the standby sentinel, so it must be less than zero. Do we have a reason to believe that Task.LATEST_OFFSET would never change to a number that would spoil us here, such as zero?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually changed this based on working on the next PR, as Task#changelogOffsets uses this sentinel for exactly the same thing, ie an indicator that the task is running (and active). This is only used in computing the lag info for KIP-535, which has a similar desire to differentiate between a running task that is completely caught up and any other. So, I can't imagine this being changed -- but I can add a comment to the constant explaining it should always be negative (not sure why it's "-2" specifically, as opposed to "-1", do you?)

} else {
taskOffsetSums.put(id, 0L);
}
}
return taskOffsetSums;
}

/**
* Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously
* assigned but not yet cleaned up tasks
*/
public Set<TaskId> tasksOnLocalStorage() {
private Set<TaskId> tasksOnLocalStorage() {
// A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
// 1) the client is actively maintaining standby tasks by maintaining their states from the change log.
// 2) the client has just got some tasks migrated out of itself to other clients while these task states
Expand Down Expand Up @@ -472,6 +489,11 @@ private Stream<Task> standbyTaskStream() {
return tasks.values().stream().filter(t -> !t.isActive());
}

private boolean isRunning(final TaskId id) {
final Task task = tasks.get(id);
return task != null && task.isActive() && task.state() == RUNNING;
}

/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public ByteBuffer encode() {
out.writeInt(errCode);
break;
case 6:
case 7:
out.writeInt(usedVersion);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
Expand Down Expand Up @@ -327,6 +328,7 @@ public static AssignmentInfo decode(final ByteBuffer data) {
assignmentInfo.errCode = in.readInt();
break;
case 6:
case 7:
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeActiveTasks(assignmentInfo, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public final class StreamsAssignmentProtocolVersions {
public static final int UNKNOWN = -1;
public static final int EARLIEST_PROBEABLE_VERSION = 3;
public static final int LATEST_SUPPORTED_VERSION = 6;
public static final int LATEST_SUPPORTED_VERSION = 7;

private StreamsAssignmentProtocolVersions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.TaskOffsetSum;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +45,7 @@ public class SubscriptionInfo {
private final SubscriptionInfoData data;
private Set<TaskId> prevTasksCache = null;
private Set<TaskId> standbyTasksCache = null;
private Map<TaskId, Long> taskOffsetSumsCache = null;

static {
// Just statically check to make sure that the generated code always stays in sync with the overall protocol
Expand Down Expand Up @@ -69,12 +74,13 @@ private static void validateVersions(final int version, final int latestSupporte
public SubscriptionInfo(final int version,
final int latestSupportedVersion,
final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
final String userEndPoint,
final Map<TaskId, Long> taskOffsetSums) {
validateVersions(version, latestSupportedVersion);
final SubscriptionInfoData data = new SubscriptionInfoData();
data.setVersion(version);
data.setProcessId(processId);

if (version >= 2) {
data.setUserEndPoint(userEndPoint == null
? new byte[0]
Expand All @@ -83,7 +89,38 @@ public SubscriptionInfo(final int version,
if (version >= 3) {
data.setLatestSupportedVersion(latestSupportedVersion);
}
data.setProcessId(processId);
if (version >= 7) {
setTaskOffsetSumDataFromTaskOffsetSumMap(data, taskOffsetSums);
} else {
setPrevAndStandbySetsFromParsedTaskOffsetSumMap(data, taskOffsetSums);
}
this.data = data;
}

private static void setTaskOffsetSumDataFromTaskOffsetSumMap(final SubscriptionInfoData data,
final Map<TaskId, Long> taskOffsetSums) {
data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> {
final SubscriptionInfoData.TaskOffsetSum taskOffsetSum = new SubscriptionInfoData.TaskOffsetSum();
taskOffsetSum.setTopicGroupId(t.getKey().topicGroupId);
taskOffsetSum.setPartition(t.getKey().partition);
taskOffsetSum.setOffsetSum(t.getValue());
return taskOffsetSum;
}).collect(Collectors.toList()));
}

private static void setPrevAndStandbySetsFromParsedTaskOffsetSumMap(final SubscriptionInfoData data,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we invoke this method from a number of places, should we add a flag and make sure it only sets the state once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized the other callers actually don't need to call this at all, so now these only get called from the constructor

final Map<TaskId, Long> taskOffsetSums) {
final Set<TaskId> prevTasks = new HashSet<>();
final Set<TaskId> standbyTasks = new HashSet<>();

for (final Map.Entry<TaskId, Long> taskOffsetSum : taskOffsetSums.entrySet()) {
if (taskOffsetSum.getValue() == Task.LATEST_OFFSET) {
prevTasks.add(taskOffsetSum.getKey());
} else {
standbyTasks.add(taskOffsetSum.getKey());
}
}

data.setPrevTasks(prevTasks.stream().map(t -> {
final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId();
taskId.setTopicGroupId(t.topicGroupId);
Expand All @@ -96,8 +133,6 @@ public SubscriptionInfo(final int version,
taskId.setPartition(t.partition);
return taskId;
}).collect(Collectors.toList()));

this.data = data;
}

private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) {
Expand All @@ -119,6 +154,10 @@ public UUID processId() {

public Set<TaskId> prevTasks() {
if (prevTasksCache == null) {
// lazily initialize the prev and standby task maps as they may not be needed
if (data.version() >= 7) {
setPrevAndStandbySetsFromParsedTaskOffsetSumMap(data, taskOffsetSums());
}
prevTasksCache = Collections.unmodifiableSet(
data.prevTasks()
.stream()
Expand All @@ -131,6 +170,10 @@ public Set<TaskId> prevTasks() {

public Set<TaskId> standbyTasks() {
if (standbyTasksCache == null) {
// lazily initialize the prev and standby task maps as they may not be needed
if (data.version() >= 7) {
setPrevAndStandbySetsFromParsedTaskOffsetSumMap(data, taskOffsetSums());
}
standbyTasksCache = Collections.unmodifiableSet(
data.standbyTasks()
.stream()
Expand All @@ -141,12 +184,39 @@ public Set<TaskId> standbyTasks() {
return standbyTasksCache;
}

public Map<TaskId, Long> taskOffsetSums() {
if (taskOffsetSumsCache == null) {
taskOffsetSumsCache = Collections.unmodifiableMap(
data.taskOffsetSums()
.stream()
.collect(Collectors.toMap(t -> new TaskId(t.topicGroupId(), t.partition()), TaskOffsetSum::offsetSum))
);
}
return taskOffsetSumsCache;
}

public String userEndPoint() {
return data.userEndPoint() == null || data.userEndPoint().length == 0
? null
: new String(data.userEndPoint(), StandardCharsets.UTF_8);
}

public static Set<TaskId> getActiveTasksFromTaskOffsetSumMap(final Map<TaskId, Long> taskOffsetSums) {
return taskOffsetSumMapToTaskSet(taskOffsetSums, true);
}

public static Set<TaskId> getStandbyTasksFromTaskOffsetSumMap(final Map<TaskId, Long> taskOffsetSums) {
return taskOffsetSumMapToTaskSet(taskOffsetSums, false);
}

private static Set<TaskId> taskOffsetSumMapToTaskSet(final Map<TaskId, Long> taskOffsetSums,
final boolean getActiveTasks) {
return taskOffsetSums.entrySet().stream()
.filter(t -> getActiveTasks == (t.getValue() == Task.LATEST_OFFSET))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

/**
* @throws TaskAssignmentException if method fails to encode the data
*/
Expand Down
Loading