From 111de08ac79a2e5b225134ce38514bbe39a71f1b Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 17 Mar 2018 23:02:48 -0700 Subject: [PATCH 1/3] KAFKA-6054: Update Kafka Streams metadata to version 3 - adds Streams upgrade tests for 1.1 release - introduces metadata version 3 and 'version probing' --- build.gradle | 12 + gradle/dependencies.gradle | 2 + settings.gradle | 5 +- .../apache/kafka/streams/StreamsConfig.java | 34 ++- .../processor/internals/StreamThread.java | 16 +- .../internals/StreamsPartitionAssignor.java | 116 ++++++++-- .../processor/internals/TaskManager.java | 26 ++- .../internals/assignment/AssignmentInfo.java | 110 +++++++-- .../assignment/SubscriptionInfo.java | 217 +++++++++++++---- .../StreamsPartitionAssignorTest.java | 161 ++++++++++++- .../assignment/AssignmentInfoTest.java | 105 ++++----- .../assignment/SubscriptionInfoTest.java | 96 ++++---- .../streams/tests/StreamsUpgradeTest.java | 218 +++++++++++++++++- .../streams/tests/StreamsUpgradeTest.java | 104 +++++++++ tests/kafkatest/services/streams.py | 6 + .../tests/streams/streams_upgrade_test.py | 122 ++++++++-- tests/kafkatest/version.py | 6 +- 17 files changed, 1117 insertions(+), 239 deletions(-) create mode 100644 streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java diff --git a/build.gradle b/build.gradle index f8daf2fdddc3c..5b0e6496c2ef0 100644 --- a/build.gradle +++ b/build.gradle @@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') { } } +project(':streams:upgrade-system-tests-11') { + archivesBaseName = "kafka-streams-upgrade-system-tests-11" + + dependencies { + testCompile libs.kafkaStreams_11 + } + + systemTestLibs { + dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index effe763ac451b..a6ef5dddeeca6 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -67,6 +67,7 @@ versions += [ kafka_0102: "0.10.2.1", kafka_0110: "0.11.0.2", kafka_10: "1.0.1", + kafka_11: "1.1.0", lz4: "1.4.1", metrics: "2.2.0", // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta @@ -115,6 +116,7 @@ libs += [ kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110", kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10", + kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11", log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", diff --git a/settings.gradle b/settings.gradle index 03136849fd543..2a7977cfc9343 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,5 +15,6 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', - 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender', - 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks' + 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', + 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', + 'jmh-benchmarks' diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 819bebd43b690..65b1da6dedeef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -172,6 +172,31 @@ public class StreamsConfig extends AbstractConfig { */ public static final String UPGRADE_FROM_0100 = "0.10.0"; + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}. + */ + public static final String UPGRADE_FROM_0101 = "0.10.1"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}. + */ + public static final String UPGRADE_FROM_0102 = "0.10.2"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}. + */ + public static final String UPGRADE_FROM_0110 = "0.11.0"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}. + */ + public static final String UPGRADE_FROM_10 = "1.0"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}. + */ + public static final String UPGRADE_FROM_11 = "1.1"; + /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. */ @@ -347,8 +372,9 @@ public class StreamsConfig extends AbstractConfig { /** {@code upgrade.from} */ public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; - public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " + - "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x)."; + public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " + + "When upgrading from 1.2 to a newer version it is not required to specify this config." + + "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version)."; /** * {@code value.serde} @@ -364,7 +390,7 @@ public class StreamsConfig extends AbstractConfig { /** * {@code zookeeper.connect} - * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored. + * @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be ignored. */ @Deprecated public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; @@ -575,7 +601,7 @@ public class StreamsConfig extends AbstractConfig { .define(UPGRADE_FROM_CONFIG, ConfigDef.Type.STRING, null, - in(null, UPGRADE_FROM_0100), + in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11), Importance.LOW, UPGRADE_FROM_DOC) .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a7e3bcde4e84e..eb407eb9b7d06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -748,19 +748,26 @@ private void runLoop() { while (isRunning()) { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); + if (taskManager.versionProbingFlag) { + taskManager.versionProbingFlag = false; + enforceRebalance(); + } } catch (final TaskMigratedException ignoreAndRejoinGroup) { log.warn("Detected task {} that got migrated to another thread. " + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); - // re-subscribe to enforce a rebalance in the next poll call - consumer.unsubscribe(); - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + enforceRebalance(); } } } + private void enforceRebalance() { + consumer.unsubscribe(); + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + } + /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition @@ -800,6 +807,9 @@ long runOnce(final long recordsProcessedBeforeCommit) { if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) { streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); addRecordsToTasks(records); + if (taskManager.versionProbingFlag) { + return 0; + } final long totalProcessed = processAndMaybeCommit(recordsProcessedBeforeCommit); if (totalProcessed > 0) { final long processLatency = computeLatency(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 97771e568795d..0639ae78a9e3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -159,7 +159,7 @@ public String toString() { } } - private static final Comparator PARTITION_COMPARATOR = new Comparator() { + protected static final Comparator PARTITION_COMPARATOR = new Comparator() { @Override public int compare(final TopicPartition p1, final TopicPartition p2) { @@ -173,13 +173,13 @@ public int compare(final TopicPartition p1, } }; - private String userEndPoint; + protected String userEndPoint; private int numStandbyReplicas; - private TaskManager taskManager; + protected TaskManager taskManager; private PartitionGrouper partitionGrouper; - private int userMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; private InternalTopicManager internalTopicManager; private CopartitionedTopicsValidator copartitionedTopicsValidator; @@ -199,10 +199,24 @@ public void configure(final Map configs) { final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); - final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); - if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) { - log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x."); - userMetadataVersion = 1; + final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); + if (upgradeFrom != null) { + switch (upgradeFrom) { + case StreamsConfig.UPGRADE_FROM_0100: + log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); + usedSubscriptionMetadataVersion = 1; + break; + case StreamsConfig.UPGRADE_FROM_0101: + case StreamsConfig.UPGRADE_FROM_0102: + case StreamsConfig.UPGRADE_FROM_0110: + case StreamsConfig.UPGRADE_FROM_10: + case StreamsConfig.UPGRADE_FROM_11: + log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); + usedSubscriptionMetadataVersion = 2; + break; + default: + throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); + } } final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR); @@ -263,7 +277,7 @@ public Subscription subscription(final Set topics) { final Set standbyTasks = taskManager.cachedTasksIds(); standbyTasks.removeAll(previousActiveTasks); final SubscriptionInfo data = new SubscriptionInfo( - userMetadataVersion, + usedSubscriptionMetadataVersion, taskManager.processId(), previousActiveTasks, standbyTasks, @@ -301,6 +315,7 @@ public Map assign(final Cluster metadata, final Map clientsMetadata = new HashMap<>(); int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + int futureMetadataVersion = -1; for (final Map.Entry entry : subscriptions.entrySet()) { final String consumerId = entry.getKey(); final Subscription subscription = entry.getValue(); @@ -308,8 +323,8 @@ public Map assign(final Cluster metadata, final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); final int usedVersion = info.version(); if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) { - throw new IllegalStateException("Unknown metadata version: " + usedVersion - + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + futureMetadataVersion = usedVersion; + continue; } if (usedVersion < minUserMetadataVersion) { minUserMetadataVersion = usedVersion; @@ -327,6 +342,24 @@ public Map assign(final Cluster metadata, clientMetadata.addConsumer(consumerId, info); } + if (futureMetadataVersion != -1) { + if (minUserMetadataVersion >= 3) { + log.info("Received a future (version probing) subscription (version: {}). Sending empty assignment back (with supported version {}).", + futureMetadataVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + return emptyAssignment(subscriptions); + } else { + throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion + + ") and an incompatible pre Kafka 1.2 subscription (version: " + minUserMetadataVersion + ") at the same time."); + } + } + + if (minUserMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + log.info("Downgrading metadata to version {}. Latest supported version is {}.", + minUserMetadataVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION); + } + log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata); // ---------------- Step Zero ---------------- // @@ -512,7 +545,7 @@ public Map assign(final Cluster metadata, // construct the global partition assignment per host map final Map> partitionsByHostState = new HashMap<>(); - if (minUserMetadataVersion == 2) { + if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) { for (final Map.Entry entry : clientsMetadata.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; @@ -614,7 +647,19 @@ public void onAssignment(final Assignment assignment) { Collections.sort(partitions, PARTITION_COMPARATOR); final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); - final int usedVersion = info.version(); + final int receivedAssignmentMetadataVersion = info.version(); + + if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion) { + if (receivedAssignmentMetadataVersion >= 3) { + log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " + + "Downgrading subscription metadata to received version and trigger new rebalance.", + usedSubscriptionMetadataVersion, + receivedAssignmentMetadataVersion); + usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion; + taskManager.versionProbingFlag = true; + return; + } + } // version 1 field final Map> activeTasks = new HashMap<>(); @@ -622,7 +667,7 @@ public void onAssignment(final Assignment assignment) { final Map topicToPartitionInfo = new HashMap<>(); final Map> partitionsByHost; - switch (usedVersion) { + switch (receivedAssignmentMetadataVersion) { case 1: processVersionOneAssignment(info, partitions, activeTasks); partitionsByHost = Collections.emptyMap(); @@ -631,8 +676,22 @@ public void onAssignment(final Assignment assignment) { processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; + case 3: + final int latestSupportedVersionGroupLeader = info.latestSupportedVersion(); + if (latestSupportedVersionGroupLeader > usedSubscriptionMetadataVersion) { + final int newSubscriptionMetadataVersion = Math.min(latestSupportedVersionGroupLeader, SubscriptionInfo.LATEST_SUPPORTED_VERSION); + log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + + "Upgrading subscription metadata version to {} for next rebalance.", + usedSubscriptionMetadataVersion, + latestSupportedVersionGroupLeader, + newSubscriptionMetadataVersion); + usedSubscriptionMetadataVersion = newSubscriptionMetadataVersion; + } + processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + break; default: - throw new IllegalStateException("Unknown metadata version: " + usedVersion + throw new IllegalStateException("Unknown metadata version: " + receivedAssignmentMetadataVersion + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION); } @@ -667,6 +726,17 @@ private void processVersionOneAssignment(final AssignmentInfo info, } } + private Map emptyAssignment(final Map subscriptions) { + final Map emptyAssignment = new HashMap<>(); + for (final String client : subscriptions.keySet()) { + emptyAssignment.put(client, new Assignment( + Collections.emptyList(), + new AssignmentInfo().encode() + )); + } + return emptyAssignment; + } + private void processVersionTwoAssignment(final AssignmentInfo info, final List partitions, final Map> activeTasks, @@ -684,6 +754,21 @@ private void processVersionTwoAssignment(final AssignmentInfo info, } } + private void processVersionThreeAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks, + final Map topicToPartitionInfo) { + processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); + } + + // for test + protected void processLatestVersionAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks, + final Map topicToPartitionInfo) { + processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); + } + /** * Internal helper function that creates a Kafka topic * @@ -818,4 +903,5 @@ void validate(final Set copartitionGroup, void setInternalTopicManager(final InternalTopicManager internalTopicManager) { this.internalTopicManager = internalTopicManager; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 6308ca7fd8020..b6454d6488421 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -42,7 +42,7 @@ import static java.util.Collections.singleton; -class TaskManager { +public class TaskManager { // initialize the task list // activeTasks needs to be concurrent as it can be accessed // by QueryableState @@ -64,6 +64,7 @@ class TaskManager { private Cluster cluster; private Map> assignedActiveTasks; private Map> assignedStandbyTasks; + public boolean versionProbingFlag = false; private Consumer consumer; @@ -95,6 +96,9 @@ class TaskManager { } void createTasks(final Collection assignment) { + if (versionProbingFlag) { + return; + } if (consumer == null) { throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); } @@ -187,14 +191,14 @@ Set standbyTaskIds() { return standby.allAssignedTaskIds(); } - Set prevActiveTaskIds() { + public Set prevActiveTaskIds() { return active.previousTaskIds(); } /** * Returns ids of tasks whose states are kept on the local storage. */ - Set cachedTasksIds() { + public Set cachedTasksIds() { // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios: // 1) the client is actively maintaining standby tasks by maintaining their states from the change log. // 2) the client has just got some tasks migrated out of itself to other clients while these task states @@ -221,7 +225,7 @@ Set cachedTasksIds() { return tasks; } - UUID processId() { + public UUID processId() { return processId; } @@ -312,6 +316,10 @@ void setConsumer(final Consumer consumer) { * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only) */ boolean updateNewAndRestoringTasks() { + if (versionProbingFlag) { + return false; + } + active.initializeNewTasks(); standby.initializeNewTasks(); @@ -356,21 +364,21 @@ private void assignStandbyPartitions() { } } - void setClusterMetadata(final Cluster cluster) { + public void setClusterMetadata(final Cluster cluster) { this.cluster = cluster; } - void setPartitionsByHostState(final Map> partitionsByHostState) { + public void setPartitionsByHostState(final Map> partitionsByHostState) { this.streamsMetadataState.onChange(partitionsByHostState, cluster); } - void setAssignmentMetadata(final Map> activeTasks, + public void setAssignmentMetadata(final Map> activeTasks, final Map> standbyTasks) { this.assignedActiveTasks = activeTasks; this.assignedStandbyTasks = standbyTasks; } - void updateSubscriptionsFromAssignment(List partitions) { + public void updateSubscriptionsFromAssignment(List partitions) { if (builder().sourceTopicPattern() != null) { final Set assignedTopics = new HashSet<>(); for (final TopicPartition topicPartition : partitions) { @@ -385,7 +393,7 @@ void updateSubscriptionsFromAssignment(List partitions) { } } - void updateSubscriptionsFromMetadata(Set topics) { + public void updateSubscriptionsFromMetadata(Set topics) { if (builder().sourceTopicPattern() != null) { final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); if (!existingTopics.equals(topics)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index c8df7498755bb..12a53afb3c961 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.HostInfo; @@ -30,6 +30,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,15 +41,20 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 2; + public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int UNKNOWN = -1; private final int usedVersion; + private final int latestSupportedVersion; private List activeTasks; private Map> standbyTasks; private Map> partitionsByHost; - private AssignmentInfo(final int version) { + // used for decoding; don't apply version checks + private AssignmentInfo(final int version, + final int latestSupportedVersion) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; } public AssignmentInfo(final List activeTasks, @@ -57,11 +63,33 @@ public AssignmentInfo(final List activeTasks, this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); } + public AssignmentInfo() { + this(LATEST_SUPPORTED_VERSION, + Collections.emptyList(), + Collections.>emptyMap(), + Collections.>emptyMap()); + } + public AssignmentInfo(final int version, final List activeTasks, final Map> standbyTasks, final Map> hostState) { + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); + + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } + } + + // for testing only; don't apply version checks + AssignmentInfo(final int version, + final int latestSupportedVersion, + final List activeTasks, + final Map> standbyTasks, + final Map> hostState) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = hostState; @@ -71,6 +99,10 @@ public int version() { return usedVersion; } + public int latestSupportedVersion() { + return latestSupportedVersion; + } + public List activeTasks() { return activeTasks; } @@ -98,6 +130,9 @@ public ByteBuffer encode() { case 2: encodeVersionTwo(out); break; + case 3: + encodeVersionThree(out); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -161,6 +196,13 @@ private void writeTopicPartitions(final DataOutputStream out, } } + private void encodeVersionThree(final DataOutputStream out) throws IOException { + out.writeInt(usedVersion); + out.writeInt(LATEST_SUPPORTED_VERSION); + encodeActiveAndStandbyTaskAssignment(out); + encodePartitionsByHost(out); + } + /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ @@ -169,19 +211,25 @@ public static AssignmentInfo decode(final ByteBuffer data) { data.rewind(); try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { - // decode used version - final int usedVersion = in.readInt(); - final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion); + final AssignmentInfo assignmentInfo; + final int usedVersion = in.readInt(); switch (usedVersion) { case 1: + assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN); decodeVersionOneData(assignmentInfo, in); break; case 2: + assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN); decodeVersionTwoData(assignmentInfo, in); break; + case 3: + final int latestSupportedVersion = in.readInt(); + assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); + decodeVersionThreeData(assignmentInfo, in); + break; default: - TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + + TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); log.error(fatalException.getMessage(), fatalException); throw fatalException; @@ -195,15 +243,23 @@ public static AssignmentInfo decode(final ByteBuffer data) { private static void decodeVersionOneData(final AssignmentInfo assignmentInfo, final DataInputStream in) throws IOException { - // decode active tasks - int count = in.readInt(); + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + assignmentInfo.partitionsByHost = new HashMap<>(); + } + + private static void decodeActiveTasks(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + final int count = in.readInt(); assignmentInfo.activeTasks = new ArrayList<>(count); for (int i = 0; i < count; i++) { assignmentInfo.activeTasks.add(TaskId.readFrom(in)); } + } - // decode standby tasks - count = in.readInt(); + private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + final int count = in.readInt(); assignmentInfo.standbyTasks = new HashMap<>(count); for (int i = 0; i < count; i++) { TaskId id = TaskId.readFrom(in); @@ -213,9 +269,13 @@ private static void decodeVersionOneData(final AssignmentInfo assignmentInfo, private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo, final DataInputStream in) throws IOException { - decodeVersionOneData(assignmentInfo, in); + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + decodeGlobalAssignmentData(assignmentInfo, in); + } - // decode partitions by host + private static void decodeGlobalAssignmentData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { assignmentInfo.partitionsByHost = new HashMap<>(); final int numEntries = in.readInt(); for (int i = 0; i < numEntries; i++) { @@ -233,19 +293,27 @@ private static Set readTopicPartitions(final DataInputStream in) return partitions; } + private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + decodeGlobalAssignmentData(assignmentInfo, in); + } + @Override public int hashCode() { - return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); + return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); } @Override public boolean equals(final Object o) { if (o instanceof AssignmentInfo) { final AssignmentInfo other = (AssignmentInfo) o; - return this.usedVersion == other.usedVersion && - this.activeTasks.equals(other.activeTasks) && - this.standbyTasks.equals(other.standbyTasks) && - this.partitionsByHost.equals(other.partitionsByHost); + return usedVersion == other.usedVersion && + latestSupportedVersion == other.latestSupportedVersion && + activeTasks.equals(other.activeTasks) && + standbyTasks.equals(other.standbyTasks) && + partitionsByHost.equals(other.partitionsByHost); } else { return false; } @@ -253,7 +321,11 @@ public boolean equals(final Object o) { @Override public String toString() { - return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]"; + return "[version=" + usedVersion + + ", supported version=" + latestSupportedVersion + + ", active tasks=" + activeTasks + + ", standby tasks=" + standbyTasks + + ", global assignment=" + partitionsByHost + "]"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 7fee90b540213..b5b7cd7fbd23d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -31,16 +32,21 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 2; + public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int UNKNOWN = -1; - private final int usedVersion; - private UUID processId; - private Set prevTasks; - private Set standbyTasks; - private String userEndPoint; + protected final int usedVersion; + protected final int latestSupportedVersion; + protected UUID processId; + protected Set prevTasks; + protected Set standbyTasks; + protected String userEndPoint; - private SubscriptionInfo(final int version) { + // used for decoding; don't apply version checks + private SubscriptionInfo(final int version, + final int latestSupportedVersion) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; } public SubscriptionInfo(final UUID processId, @@ -55,7 +61,23 @@ public SubscriptionInfo(final int version, final Set prevTasks, final Set standbyTasks, final String userEndPoint) { + this(version, LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint); + + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } + } + + // for testing only; don't apply version checks + protected SubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final String userEndPoint) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; this.processId = processId; this.prevTasks = prevTasks; this.standbyTasks = standbyTasks; @@ -66,6 +88,10 @@ public int version() { return usedVersion; } + public int latestSupportedVersion() { + return latestSupportedVersion; + } + public UUID processId() { return processId; } @@ -93,7 +119,10 @@ public ByteBuffer encode() { buf = encodeVersionOne(); break; case 2: - buf = encodeVersionTwo(prepareUserEndPoint()); + buf = encodeVersionTwo(); + break; + case 3: + buf = encodeVersionThree(); break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion @@ -108,7 +137,9 @@ private ByteBuffer encodeVersionOne() { final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength()); buf.putInt(1); // version - encodeVersionOneData(buf); + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); return buf; } @@ -120,18 +151,15 @@ private int getVersionOneByteLength() { 4 + standbyTasks.size() * 8; // length + standby tasks } - private void encodeVersionOneData(final ByteBuffer buf) { - // encode client UUID + private void encodeClientUUID(final ByteBuffer buf) { buf.putLong(processId.getMostSignificantBits()); buf.putLong(processId.getLeastSignificantBits()); - // encode ids of previously running tasks - buf.putInt(prevTasks.size()); - for (TaskId id : prevTasks) { - id.writeTo(buf); - } - // encode ids of cached tasks - buf.putInt(standbyTasks.size()); - for (TaskId id : standbyTasks) { + } + + private void encodeTasks(final ByteBuffer buf, + final Collection taskIds) { + buf.putInt(taskIds.size()); + for (TaskId id : taskIds) { id.writeTo(buf); } } @@ -144,52 +172,105 @@ private byte[] prepareUserEndPoint() { } } - private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) { + private ByteBuffer encodeVersionTwo() { + final byte[] endPointBytes = prepareUserEndPoint(); + final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes)); buf.putInt(2); // version - encodeVersionTwoData(buf, endPointBytes); + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); + encodeUserEndPoint(buf, endPointBytes); return buf; } private int getVersionTwoByteLength(final byte[] endPointBytes) { - return getVersionOneByteLength() + + return 4 + // version + 16 + // client ID + 4 + prevTasks.size() * 8 + // length + prev tasks + 4 + standbyTasks.size() * 8 + // length + standby tasks 4 + endPointBytes.length; // length + userEndPoint } - private void encodeVersionTwoData(final ByteBuffer buf, - final byte[] endPointBytes) { - encodeVersionOneData(buf); + private void encodeUserEndPoint(final ByteBuffer buf, + final byte[] endPointBytes) { if (endPointBytes != null) { buf.putInt(endPointBytes.length); buf.put(endPointBytes); } } + private ByteBuffer encodeVersionThree() { + final byte[] endPointBytes = prepareUserEndPoint(); + + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); + + buf.putInt(3); // version + buf.putInt(3); // version + encodeLatestVersion(buf, endPointBytes); + + return buf; + } + + private void encodeLatestVersion(final ByteBuffer buf, + final byte[] endPointBytes) { + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); + encodeUserEndPoint(buf, endPointBytes); + } + + // for testing + public ByteBuffer encodeFutureVersion(int futureVersion) { + final byte[] endPointBytes = prepareUserEndPoint(); + + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); + + buf.putInt(futureVersion); // version + buf.putInt(futureVersion); // version + encodeLatestVersion(buf, endPointBytes); + + return buf; + } + + private int getVersionThreeByteLength(final byte[] endPointBytes) { + return 4 + // used version + 4 + // latest supported version version + 16 + // client ID + 4 + prevTasks.size() * 8 + // length + prev tasks + 4 + standbyTasks.size() * 8 + // length + standby tasks + 4 + endPointBytes.length; // length + userEndPoint + } + /** * @throws TaskAssignmentException if method fails to decode the data */ public static SubscriptionInfo decode(final ByteBuffer data) { + final SubscriptionInfo subscriptionInfo; + // ensure we are at the beginning of the ByteBuffer data.rewind(); - // decode used version final int usedVersion = data.getInt(); - final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion); - switch (usedVersion) { case 1: + subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); decodeVersionOneData(subscriptionInfo, data); break; case 2: + subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); decodeVersionTwoData(subscriptionInfo, data); break; + case 3: + final int latestSupportedVersion = data.getInt(); + subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion); + decodeVersionThreeData(subscriptionInfo, data); + break; default: - TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + - "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); - log.error(fatalException.getMessage(), fatalException); - throw fatalException; + subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); + log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION); } return subscriptionInfo; @@ -197,30 +278,43 @@ public static SubscriptionInfo decode(final ByteBuffer data) { private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { - // decode client UUID - subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); + decodeClientUUID(subscriptionInfo, data); - // decode previously active tasks - final int numPrevs = data.getInt(); subscriptionInfo.prevTasks = new HashSet<>(); - for (int i = 0; i < numPrevs; i++) { - TaskId id = TaskId.readFrom(data); - subscriptionInfo.prevTasks.add(id); - } + decodeTasks(subscriptionInfo.prevTasks, data); - // decode previously cached tasks - final int numCached = data.getInt(); subscriptionInfo.standbyTasks = new HashSet<>(); - for (int i = 0; i < numCached; i++) { - subscriptionInfo.standbyTasks.add(TaskId.readFrom(data)); + decodeTasks(subscriptionInfo.standbyTasks, data); + } + + private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); + } + + private static void decodeTasks(final Collection taskIds, + final ByteBuffer data) { + final int numPrevs = data.getInt(); + for (int i = 0; i < numPrevs; i++) { + taskIds.add(TaskId.readFrom(data)); } } private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { - decodeVersionOneData(subscriptionInfo, data); + decodeClientUUID(subscriptionInfo, data); + + subscriptionInfo.prevTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.prevTasks, data); + + subscriptionInfo.standbyTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.standbyTasks, data); + + decodeUserEndPoint(subscriptionInfo, data); + } - // decode user end point (can be null) + private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { int bytesLength = data.getInt(); if (bytesLength != 0) { final byte[] bytes = new byte[bytesLength]; @@ -229,9 +323,28 @@ private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo } } + private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + decodeClientUUID(subscriptionInfo, data); + + subscriptionInfo.prevTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.prevTasks, data); + + subscriptionInfo.standbyTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.standbyTasks, data); + + decodeUserEndPoint(subscriptionInfo, data); + } + + // for testing only + public static void decodeLatestVersionData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + decodeVersionThreeData(subscriptionInfo, data); + } + @Override public int hashCode() { - final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); + final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); if (userEndPoint == null) { return hashCode; } @@ -243,6 +356,7 @@ public boolean equals(final Object o) { if (o instanceof SubscriptionInfo) { final SubscriptionInfo other = (SubscriptionInfo) o; return this.usedVersion == other.usedVersion && + this.latestSupportedVersion == other.latestSupportedVersion && this.processId.equals(other.processId) && this.prevTasks.equals(other.prevTasks) && this.standbyTasks.equals(other.standbyTasks) && @@ -252,4 +366,13 @@ public boolean equals(final Object o) { } } + @Override + public String toString() { + return "[version=" + usedVersion + + ", supported version=" + latestSupportedVersion + + ", process ID=" + processId + + ", prev tasks=" + prevTasks + + ", standby tasks=" + standbyTasks + + ", user endpoint=" + userEndPoint + "]"; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index e9ed9682066eb..59b909312e1f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -46,9 +46,9 @@ import org.apache.kafka.test.MockStateStoreSupplier; import org.easymock.Capture; import org.easymock.EasyMock; -import org.junit.Assert; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -64,6 +64,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class StreamsPartitionAssignorTest { @@ -867,9 +868,12 @@ public void shouldMapUserEndPointToTopicPartitions() { final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); final Set topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); - assertEquals(Utils.mkSet(new TopicPartition("topic1", 0), + assertEquals( + Utils.mkSet( + new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), - new TopicPartition("topic1", 2)), topicPartitions); + new TopicPartition("topic1", 2)), + topicPartitions); } @Test @@ -881,7 +885,7 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { try { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost")); - Assert.fail("expected to an exception due to invalid config"); + fail("expected to an exception due to invalid config"); } catch (ConfigException e) { // pass } @@ -893,7 +897,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { try { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk")); - Assert.fail("expected to an exception due to invalid config"); + fail("expected to an exception due to invalid config"); } catch (ConfigException e) { // pass } @@ -1088,21 +1092,36 @@ public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProvider } @Test - public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() { + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() { + shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2); + } + + @Test + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() { + shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3); + } + + @Test + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() { + shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3); + } + + private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion, + final int otherVersion) { final Map subscriptions = new HashMap<>(); final Set emptyTasks = Collections.emptySet(); subscriptions.put( "consumer1", new PartitionAssignor.Subscription( Collections.singletonList("topic1"), - new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() ) ); subscriptions.put( "consumer2", new PartitionAssignor.Subscription( Collections.singletonList("topic1"), - new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() ) ); @@ -1115,12 +1134,12 @@ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions( final Map assignment = partitionAssignor.assign(metadata, subscriptions); assertThat(assignment.size(), equalTo(2)); - assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1)); - assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1)); + assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion)); + assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion)); } @Test - public void shouldDownGradeSubscription() { + public void shouldDownGradeSubscriptionToVersion1() { final Set emptyTasks = Collections.emptySet(); mockTaskManager( @@ -1135,6 +1154,126 @@ public void shouldDownGradeSubscription() { assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); } + @Test + public void shouldDownGradeSubscriptionToVersion2For0101() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For0102() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For0110() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For10() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For11() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11); + } + + private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) { + final Set emptyTasks = Collections.emptySet(); + + mockTaskManager( + emptyTasks, + emptyTasks, + UUID.randomUUID(), + builder); + configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); + + PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + + assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); + } + + @Test + public void shouldReturnEmptyAssignmentForAllInstancesIfReceivedSingleFutureVersionSubscription() { + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "future-consumer", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription() + ) + ); + + mockTaskManager( + emptyTasks, + emptyTasks, + UUID.randomUUID(), + builder); + partitionAssignor.configure(configProps()); + final Map assignment = partitionAssignor.assign(metadata, subscriptions); + + assertThat(assignment.size(), equalTo(2)); + assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()), equalTo(new AssignmentInfo())); + assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo())); + } + + @Test + public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() { + shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1); + } + + @Test + public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() { + shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); + } + + private ByteBuffer encodeFutureSubscription() { + final ByteBuffer buf = ByteBuffer.allocate(4 /* version */); + buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); + return buf; + } + + private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { + final Map subscriptions = new HashMap<>(); + final Set emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "future-consumer", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + encodeFutureSubscription() + ) + ); + + mockTaskManager( + emptyTasks, + emptyTasks, + UUID.randomUUID(), + builder); + partitionAssignor.configure(configProps()); + + try { + partitionAssignor.assign(metadata, subscriptions); + fail("Should have thrown IllegalStateException"); + } catch (final IllegalStateException expected) { + // pass + } + } private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.>emptyMap(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index c1020a98ba9f3..c7382e7671cc2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -22,85 +22,70 @@ import org.apache.kafka.streams.state.HostInfo; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class AssignmentInfoTest { + private final List activeTasks = Arrays.asList( + new TaskId(0, 0), + new TaskId(0, 0), + new TaskId(0, 1), new TaskId(1, 0)); + private final Map> standbyTasks = new HashMap>() { + { + put(new TaskId(1, 1), + Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); + put(new TaskId(2, 0), + Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); + } + }; + private final Map> globalAssignment = new HashMap>() { + { + put(new HostInfo("localhost", 80), + Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t3", 3))); + } + }; @Test - public void testEncodeDecode() { - List activeTasks = - Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)); - Map> standbyTasks = new HashMap<>(); - - standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); - standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); + public void shouldUseLatestSupportedVersionByDefault() { + final AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, globalAssignment); + assertEquals(AssignmentInfo.LATEST_SUPPORTED_VERSION, info.version()); + } - AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap>()); - AssignmentInfo decoded = AssignmentInfo.decode(info.encode()); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion1() { + new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment); + } - assertEquals(info, decoded); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion2() { + new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment); } @Test - public void shouldDecodePreviousVersion() throws IOException { - List activeTasks = - Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)); - Map> standbyTasks = new HashMap<>(); - - standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); - standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); - final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null); - final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion)); - assertEquals(oldVersion.activeTasks(), decoded.activeTasks()); - assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks()); - assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1 - assertEquals(1, decoded.version()); + public void shouldEncodeAndDecodeVersion1() { + final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.>emptyMap()); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } - /** - * This is a clone of what the V1 encoding did. The encode method has changed for V2 - * so it is impossible to test compatibility without having this - */ - private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); - // Encode version - out.writeInt(oldVersion.version()); - // Encode active tasks - out.writeInt(oldVersion.activeTasks().size()); - for (TaskId id : oldVersion.activeTasks()) { - id.writeTo(out); - } - // Encode standby tasks - out.writeInt(oldVersion.standbyTasks().size()); - for (Map.Entry> entry : oldVersion.standbyTasks().entrySet()) { - TaskId id = entry.getKey(); - id.writeTo(out); - - Set partitions = entry.getValue(); - out.writeInt(partitions.size()); - for (TopicPartition partition : partitions) { - out.writeUTF(partition.topic()); - out.writeInt(partition.partition()); - } - } - - out.flush(); - out.close(); - - return ByteBuffer.wrap(baos.toByteArray()); + @Test + public void shouldEncodeAndDecodeVersion2() { + final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); + } + @Test + public void shouldEncodeAndDecodeVersion3() { + final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 633285a2b4ddc..b3126190a3017 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -21,79 +21,71 @@ import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class SubscriptionInfoTest { + private final UUID processId = UUID.randomUUID(); + private final Set activeTasks = new HashSet<>(Arrays.asList( + new TaskId(0, 0), + new TaskId(0, 1), + new TaskId(1, 0))); + private final Set standbyTasks = new HashSet<>(Arrays.asList( + new TaskId(1, 1), + new TaskId(2, 0))); - @Test - public void testEncodeDecode() { - UUID processId = UUID.randomUUID(); + private final static String IGNORED_USER_ENDPOINT = "IGNORED_USER_ENDPOINT"; - Set activeTasks = - new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0))); - Set standbyTasks = - new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0))); + @Test + public void shouldUseLatestSupportedVersionByDefault() { + final SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION, info.version()); + } - SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null); - SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode()); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion1() { + new SubscriptionInfo(0, processId, activeTasks, standbyTasks, "localhost:80"); + } - assertEquals(info, decoded); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion2() { + new SubscriptionInfo(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, processId, activeTasks, standbyTasks, "localhost:80"); } @Test - public void shouldEncodeDecodeWithUserEndPoint() { - SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(), - Collections.singleton(new TaskId(0, 0)), Collections.emptySet(), "localhost:80"); - SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode()); - assertEquals(original, decoded); + public void shouldEncodeAndDecodeVersion1() { + final SubscriptionInfo info = new SubscriptionInfo(1, processId, activeTasks, standbyTasks, IGNORED_USER_ENDPOINT); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(1, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, null); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } @Test - public void shouldBeBackwardCompatible() { - UUID processId = UUID.randomUUID(); - - Set activeTasks = - new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0))); - Set standbyTasks = - new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0))); + public void shouldEncodeAndDecodeVersion2() { + final SubscriptionInfo info = new SubscriptionInfo(2, processId, activeTasks, standbyTasks, "localhost:80"); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(2, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); + } - final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks); - final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding); - assertEquals(activeTasks, decode.prevTasks()); - assertEquals(standbyTasks, decode.standbyTasks()); - assertEquals(processId, decode.processId()); - assertNull(decode.userEndPoint()); + @Test + public void shouldEncodeAndDecodeVersion3() { + final SubscriptionInfo info = new SubscriptionInfo(3, processId, activeTasks, standbyTasks, "localhost:80"); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(3, SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } - /** - * This is a clone of what the V1 encoding did. The encode method has changed for V2 - * so it is impossible to test compatibility without having this - */ - private ByteBuffer encodePreviousVersion(UUID processId, Set prevTasks, Set standbyTasks) { - ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8); - // version - buf.putInt(1); - // encode client UUID - buf.putLong(processId.getMostSignificantBits()); - buf.putLong(processId.getLeastSignificantBits()); - // encode ids of previously running tasks - buf.putInt(prevTasks.size()); - for (TaskId id : prevTasks) { - id.writeTo(buf); - } - // encode ids of cached tasks - buf.putInt(standbyTasks.size()); - for (TaskId id : standbyTasks) { - id.writeTo(buf); - } - buf.rewind(); + @Test + public void shouldAllowToDecodeFutureSupportedVersion() { + final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion()); + assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.version()); + assertEquals(SubscriptionInfo.UNKNOWN, info.latestSupportedVersion()); + } + private ByteBuffer encodeFutureVersion() { + final ByteBuffer buf = ByteBuffer.allocate(4 /* version */); + buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); return buf; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 69eea0b37c0ef..ed6dc161f8701 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,13 +16,41 @@ */ package org.apache.kafka.streams.tests; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; +import org.apache.kafka.streams.state.HostInfo; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.UUID; public class StreamsUpgradeTest { @@ -50,9 +78,17 @@ public static void main(final String[] args) throws Exception { config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + + final KafkaClientSupplier kafkaClientSupplier; + if (streamsProperties.containsKey("test.future.metadata")) { + streamsProperties.remove("test.future.metadata"); + kafkaClientSupplier = new FutureKafkaClientSupplier(); + } else { + kafkaClientSupplier = new DefaultKafkaClientSupplier(); + } config.putAll(streamsProperties); - final KafkaStreams streams = new KafkaStreams(builder.build(), config); + final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -66,4 +102,184 @@ public void run() { } }); } + + private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier { + @Override + public Consumer getConsumer(final Map config) { + config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, FutureStreamsPartitionAssignor.class.getName()); + return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + } + + public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor { + + public FutureStreamsPartitionAssignor() { + usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1; + } + + @Override + public Subscription subscription(final Set topics) { + // Adds the following information to subscription + // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) + // 2. Task ids of previously running tasks + // 3. Task ids of valid local states on the client's state directory. + + final Set previousActiveTasks = taskManager.prevActiveTaskIds(); + final Set standbyTasks = taskManager.cachedTasksIds(); + standbyTasks.removeAll(previousActiveTasks); + final FutureSubscriptionInfo data = new FutureSubscriptionInfo( + usedSubscriptionMetadataVersion, + SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, + taskManager.processId(), + previousActiveTasks, + standbyTasks, + this.userEndPoint); + + taskManager.updateSubscriptionsFromMetadata(topics); + + return new Subscription(new ArrayList<>(topics), data.encode()); + } + + @Override + public void onAssignment(final PartitionAssignor.Assignment assignment) { + try { + super.onAssignment(assignment); + return; + } catch (final IllegalStateException cannotProcessFutureVersion) { + // continue + } + + final List partitions = new ArrayList<>(assignment.partitions()); + Collections.sort(partitions, PARTITION_COMPARATOR); + + final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + final int receivedAssignmentMetadataVersion = info.version(); + + if (receivedAssignmentMetadataVersion > AssignmentInfo.LATEST_SUPPORTED_VERSION + 1) { + throw new IllegalStateException("Unknown metadata version: " + receivedAssignmentMetadataVersion + + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); + } + + // version 1 field + final Map> activeTasks = new HashMap<>(); + // version 2 fields + final Map topicToPartitionInfo = new HashMap<>(); + final Map> partitionsByHost; + + processLatestVersionAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + + taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); + taskManager.setPartitionsByHostState(partitionsByHost); + taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); + taskManager.updateSubscriptionsFromAssignment(partitions); + } + + @Override + public Map assign(final Cluster metadata, + final Map subscriptions) { + final Map assignment = super.assign(metadata, subscriptions); + final Map newAssignment = new HashMap<>(); + + for (final Map.Entry entry : assignment.entrySet()) { + final Assignment singleAssignment = entry.getValue(); + newAssignment.put( + entry.getKey(), + new Assignment(singleAssignment.partitions(), + new FutureAssignmentInfo(singleAssignment.userData()).encode())); + } + + return newAssignment; + } + } + + private static class FutureSubscriptionInfo extends SubscriptionInfo { + // for testing only; don't apply version checks + FutureSubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final String userEndPoint) { + super(version, latestSupportedVersion, processId, prevTasks, standbyTasks, userEndPoint); + } + + public ByteBuffer encode() { + if (usedVersion <= SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + return super.encode(); + } + + final ByteBuffer buf = super.encodeFutureVersion(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); + buf.rewind(); + return buf; + } + + public static SubscriptionInfo decode(final ByteBuffer data) { + final SubscriptionInfo subscriptionInfo; + + // ensure we are at the beginning of the ByteBuffer + data.rewind(); + + final int usedVersion = data.getInt(); + + if (usedVersion <= SubscriptionInfo.LATEST_SUPPORTED_VERSION) { + subscriptionInfo = SubscriptionInfo.decode(data); + } else { + if (usedVersion == SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) { + subscriptionInfo = new FutureSubscriptionInfo( + usedVersion, + usedVersion, + null, + null, + null, + null); + SubscriptionInfo.decodeLatestVersionData(subscriptionInfo, data); + } else { + subscriptionInfo = new FutureSubscriptionInfo( + usedVersion, + UNKNOWN, + null, + null, + null, + null); + } + } + + return subscriptionInfo; + } + } + + private static class FutureAssignmentInfo extends AssignmentInfo { + final ByteBuffer originalUserMetadata; + + private FutureAssignmentInfo(final ByteBuffer bytes) { + originalUserMetadata = bytes; + } + + @Override + public ByteBuffer encode() { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + originalUserMetadata.rewind(); + + try (final DataOutputStream out = new DataOutputStream(baos)) { + out.writeInt(originalUserMetadata.getInt()); + originalUserMetadata.getInt(); // discard original supported version + out.writeInt(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); + + try { + while (true) { + out.write(originalUserMetadata.get()); + } + } catch (final BufferUnderflowException expectedWhenAllDataCopied) { } + + out.flush(); + out.close(); + + return ByteBuffer.wrap(baos.toByteArray()); + } catch (final IOException ex) { + throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex); + } + } + } } diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000000000..a8796cb056a7b --- /dev/null +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + /** + * This test cannot be executed, as long as Kafka 1.1.1 is not released + */ + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] : "")); + } + final String kafka = args[0]; + final String propFileName = args.length > 1 ? args[1] : null; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)"); + System.out.println("kafka=" + kafka); + System.out.println("props=" + streamsProperties); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static ProcessorSupplier printProcessorSupplier() { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index a5be816c737e9..8ce6dc39b8d67 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -413,6 +413,7 @@ def __init__(self, test_context, kafka): "org.apache.kafka.streams.tests.StreamsUpgradeTest", "") self.UPGRADE_FROM = None + self.UPGRADE_TO = None def set_version(self, kafka_streams_version): self.KAFKA_STREAMS_VERSION = kafka_streams_version @@ -420,10 +421,15 @@ def set_version(self, kafka_streams_version): def set_upgrade_from(self, upgrade_from): self.UPGRADE_FROM = upgrade_from + def set_upgrade_to(self, upgrade_to): + self.UPGRADE_TO = upgrade_to + def prop_file(self): properties = {STATE_DIR: self.PERSISTENT_ROOT} if self.UPGRADE_FROM is not None: properties['upgrade.from'] = self.UPGRADE_FROM + if self.UPGRADE_TO == "future_version": + properties['test.future.metadata'] = "any_value" cfg = KafkaConfig(**properties) return cfg.render() diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index fa79d571f366d..9859f90a1efd8 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -23,8 +23,16 @@ import random import time +# broker 0.10.0 is not compatible with newer Kafka Streams versions broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)] -simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(DEV_VERSION)] + +metadata_1_versions = [str(LATEST_0_10_0)] +metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +# we can add the following versions to `backward_compatible_metadata_2_versions` after the corresponding +# bug-fix release 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, and 1.1.1 are available: +# str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1) +backward_compatible_metadata_2_versions = [] +metadata_3_versions = [str(DEV_VERSION)] class StreamsUpgradeTest(Test): """ @@ -39,6 +47,7 @@ def __init__(self, test_context): 'echo' : { 'partitions': 5 }, 'data' : { 'partitions': 5 }, } + self.leader = None def perform_broker_upgrade(self, to_version): self.logger.info("First pass bounce - rolling broker upgrade") @@ -114,7 +123,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) - @matrix(from_version=simple_upgrade_versions_metadata_version_2, to_version=simple_upgrade_versions_metadata_version_2) + @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) def test_simple_upgrade_downgrade(self, from_version, to_version): """ Starts 3 KafkaStreams instances with , and upgrades one-by-one to @@ -165,15 +174,12 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): self.driver.stop() - #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released - #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released - #@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test until Kafka 0.11.0.3 is released - #@parametrize(new_version=str(LATEST_1_0)) we cannot run this test until Kafka 1.0.2 is released - #@parametrize(new_version=str(LATEST_1_1)) we cannot run this test until Kafka 1.1.1 is released - @parametrize(new_version=str(DEV_VERSION)) - def test_metadata_upgrade(self, new_version): + @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) + @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions) + @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions) + def test_metadata_upgrade(self, from_version, to_version): """ - Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to + Starts 3 KafkaStreams instances with version and upgrades one-by-one to """ self.zk = ZookeeperService(self.test_context, num_nodes=1) @@ -189,7 +195,7 @@ def test_metadata_upgrade(self, new_version): self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) self.driver.start() - self.start_all_nodes_with(str(LATEST_0_10_0)) + self.start_all_nodes_with(from_version) self.processors = [self.processor1, self.processor2, self.processor3] @@ -200,13 +206,13 @@ def test_metadata_upgrade(self, new_version): random.shuffle(self.processors) for p in self.processors: p.CLEAN_NODE_ENABLED = False - self.do_rolling_bounce(p, "0.10.0", new_version, counter) + self.do_rolling_bounce(p, from_version[:-2], to_version, counter) counter = counter + 1 # second rolling bounce random.shuffle(self.processors) for p in self.processors: - self.do_rolling_bounce(p, None, new_version, counter) + self.do_rolling_bounce(p, None, to_version, counter) counter = counter + 1 # shutdown @@ -224,6 +230,92 @@ def test_metadata_upgrade(self, new_version): self.driver.stop() + def test_version_probing_upgrade(self): + """ + Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" + """ + + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() + + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics) + self.kafka.start() + + self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) + self.driver.disable_auto_terminate() + self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) + self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) + self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) + + self.driver.start() + self.start_all_nodes_with("") # run with TRUNK + + self.processors = [self.processor1, self.processor2, self.processor3] + + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + it = p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True) + if it.has_next(): + if self.leader is not None: + raise Exception("Could not uniquely identify leader") + self.leader = p + + if self.leader is None: + raise Exception("Could not identify leader") + + counter = 1 + random.seed() + + # rolling bounces + random.shuffle(self.processors) + first_bounced_processor = None + expected_new_leader_processor = None + with self.leader.node.account.monitor_log(self.leader.LOG_FILE) as leader_monitor: + for p in self.processors: + if p == self.leader: + continue + + self.do_rolling_bounce(p, None, "future_version", counter) + leader_monitor.wait_until("Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).", + timeout_sec=60, + err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) + p.node.account.ssh_capture("grep \"partition.assignment.strategy = [org.apache.kafka.streams.tests.StreamsUpgradeTest$FutureStreamsPartitionAssignor]\" %s" % p.LOG_FILE, allow_fail=False) + p.node.account.ssh_capture("grep \"Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance\" %s" % p.LOG_FILE, allow_fail=False) + + if first_bounced_processor is None: + first_bounced_processor = p + else: + expected_new_leader_processor = p + + counter = counter + 1 + + it = expected_new_leader_processor.node.account.ssh_capture("grep \"Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).\" %s" % expected_new_leader_processor.LOG_FILE, allow_fail=True) + if it.has_next(): + print it.next() + raise Exception("Future new leader should receive version probing only after current/old leader is bounced.") + + self.do_rolling_bounce(self.leader, None, "future_version", counter) + prevLeader = self.leader + + expected_new_leader_processor.node.account.ssh_capture("grep \"Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).\" %s" % expected_new_leader_processor.LOG_FILE, allow_fail=False) + prevLeader.node.account.ssh_capture("grep \"partition.assignment.strategy = [org.apache.kafka.streams.tests.StreamsUpgradeTest$FutureStreamsPartitionAssignor]\" %s" % prevLeader.LOG_FILE, allow_fail=False) + prevLeader.node.account.ssh_capture("grep \"Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance\" %s" % prevLeader.LOG_FILE, allow_fail=False) + + # shutdown + self.driver.stop() + self.driver.wait() + + random.shuffle(self.processors) + for p in self.processors: + node = p.node + with node.account.monitor_log(p.STDOUT_FILE) as monitor: + p.stop() + monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", + timeout_sec=60, + err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + + self.driver.stop() + def start_all_nodes_with(self, version): # start first with self.prepare_for(self.processor1, version) @@ -321,8 +413,12 @@ def do_rolling_bounce(self, processor, upgrade_from, new_version, counter): if new_version == str(DEV_VERSION): processor.set_version("") # set to TRUNK + elif new_version == "future_version": + processor.set_upgrade_to("future_version") + new_version = str(DEV_VERSION) else: processor.set_version(new_version) + processor.set_upgrade_from(upgrade_from) grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" " diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 66e5fcf18aabf..7823efac1d4b1 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -63,17 +63,17 @@ def get_version(node=None): DEV_BRANCH = KafkaVersion("dev") DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT") -# 0.8.2.X versions +# 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1") V_0_8_2_2 = KafkaVersion("0.8.2.2") LATEST_0_8_2 = V_0_8_2_2 -# 0.9.0.X versions +# 0.9.0.x versions V_0_9_0_0 = KafkaVersion("0.9.0.0") V_0_9_0_1 = KafkaVersion("0.9.0.1") LATEST_0_9 = V_0_9_0_1 -# 0.10.0.X versions +# 0.10.0.x versions V_0_10_0_0 = KafkaVersion("0.10.0.0") V_0_10_0_1 = KafkaVersion("0.10.0.1") LATEST_0_10_0 = V_0_10_0_1 From 12becef23c04c9b93b35f125eef72b2d42de43b3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 16 Apr 2018 16:21:38 +0200 Subject: [PATCH 2/3] Revert version probing --- .../processor/internals/StreamThread.java | 16 +- .../internals/StreamsPartitionAssignor.java | 84 +------ .../processor/internals/TaskManager.java | 26 +-- .../assignment/SubscriptionInfo.java | 41 +--- .../StreamsPartitionAssignorTest.java | 81 ------- .../assignment/SubscriptionInfoTest.java | 13 -- .../streams/tests/StreamsUpgradeTest.java | 218 +----------------- tests/kafkatest/services/console_consumer.py | 2 +- tests/kafkatest/services/streams.py | 2 - .../tests/streams/streams_upgrade_test.py | 92 +------- 10 files changed, 35 insertions(+), 540 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index eb407eb9b7d06..a7e3bcde4e84e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -748,26 +748,19 @@ private void runLoop() { while (isRunning()) { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); - if (taskManager.versionProbingFlag) { - taskManager.versionProbingFlag = false; - enforceRebalance(); - } } catch (final TaskMigratedException ignoreAndRejoinGroup) { log.warn("Detected task {} that got migrated to another thread. " + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); - enforceRebalance(); + // re-subscribe to enforce a rebalance in the next poll call + consumer.unsubscribe(); + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); } } } - private void enforceRebalance() { - consumer.unsubscribe(); - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); - } - /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition @@ -807,9 +800,6 @@ long runOnce(final long recordsProcessedBeforeCommit) { if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) { streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); addRecordsToTasks(records); - if (taskManager.versionProbingFlag) { - return 0; - } final long totalProcessed = processAndMaybeCommit(recordsProcessedBeforeCommit); if (totalProcessed > 0) { final long processLatency = computeLatency(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 0639ae78a9e3c..c81105ef821ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -159,7 +159,7 @@ public String toString() { } } - protected static final Comparator PARTITION_COMPARATOR = new Comparator() { + private static final Comparator PARTITION_COMPARATOR = new Comparator() { @Override public int compare(final TopicPartition p1, final TopicPartition p2) { @@ -173,13 +173,13 @@ public int compare(final TopicPartition p1, } }; - protected String userEndPoint; + private String userEndPoint; private int numStandbyReplicas; - protected TaskManager taskManager; + private TaskManager taskManager; private PartitionGrouper partitionGrouper; - protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; + private int userMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; private InternalTopicManager internalTopicManager; private CopartitionedTopicsValidator copartitionedTopicsValidator; @@ -204,7 +204,7 @@ public void configure(final Map configs) { switch (upgradeFrom) { case StreamsConfig.UPGRADE_FROM_0100: log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); - usedSubscriptionMetadataVersion = 1; + userMetadataVersion = 1; break; case StreamsConfig.UPGRADE_FROM_0101: case StreamsConfig.UPGRADE_FROM_0102: @@ -212,7 +212,7 @@ public void configure(final Map configs) { case StreamsConfig.UPGRADE_FROM_10: case StreamsConfig.UPGRADE_FROM_11: log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); - usedSubscriptionMetadataVersion = 2; + userMetadataVersion = 2; break; default: throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); @@ -277,7 +277,7 @@ public Subscription subscription(final Set topics) { final Set standbyTasks = taskManager.cachedTasksIds(); standbyTasks.removeAll(previousActiveTasks); final SubscriptionInfo data = new SubscriptionInfo( - usedSubscriptionMetadataVersion, + userMetadataVersion, taskManager.processId(), previousActiveTasks, standbyTasks, @@ -315,7 +315,6 @@ public Map assign(final Cluster metadata, final Map clientsMetadata = new HashMap<>(); int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; - int futureMetadataVersion = -1; for (final Map.Entry entry : subscriptions.entrySet()) { final String consumerId = entry.getKey(); final Subscription subscription = entry.getValue(); @@ -323,8 +322,8 @@ public Map assign(final Cluster metadata, final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); final int usedVersion = info.version(); if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) { - futureMetadataVersion = usedVersion; - continue; + throw new IllegalStateException("Unknown metadata version: " + usedVersion + + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION); } if (usedVersion < minUserMetadataVersion) { minUserMetadataVersion = usedVersion; @@ -342,24 +341,6 @@ public Map assign(final Cluster metadata, clientMetadata.addConsumer(consumerId, info); } - if (futureMetadataVersion != -1) { - if (minUserMetadataVersion >= 3) { - log.info("Received a future (version probing) subscription (version: {}). Sending empty assignment back (with supported version {}).", - futureMetadataVersion, - SubscriptionInfo.LATEST_SUPPORTED_VERSION); - return emptyAssignment(subscriptions); - } else { - throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion - + ") and an incompatible pre Kafka 1.2 subscription (version: " + minUserMetadataVersion + ") at the same time."); - } - } - - if (minUserMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) { - log.info("Downgrading metadata to version {}. Latest supported version is {}.", - minUserMetadataVersion, - SubscriptionInfo.LATEST_SUPPORTED_VERSION); - } - log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata); // ---------------- Step Zero ---------------- // @@ -647,19 +628,7 @@ public void onAssignment(final Assignment assignment) { Collections.sort(partitions, PARTITION_COMPARATOR); final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); - final int receivedAssignmentMetadataVersion = info.version(); - - if (usedSubscriptionMetadataVersion > receivedAssignmentMetadataVersion) { - if (receivedAssignmentMetadataVersion >= 3) { - log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " + - "Downgrading subscription metadata to received version and trigger new rebalance.", - usedSubscriptionMetadataVersion, - receivedAssignmentMetadataVersion); - usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion; - taskManager.versionProbingFlag = true; - return; - } - } + final int usedVersion = info.version(); // version 1 field final Map> activeTasks = new HashMap<>(); @@ -667,7 +636,7 @@ public void onAssignment(final Assignment assignment) { final Map topicToPartitionInfo = new HashMap<>(); final Map> partitionsByHost; - switch (receivedAssignmentMetadataVersion) { + switch (usedVersion) { case 1: processVersionOneAssignment(info, partitions, activeTasks); partitionsByHost = Collections.emptyMap(); @@ -677,21 +646,11 @@ public void onAssignment(final Assignment assignment) { partitionsByHost = info.partitionsByHost(); break; case 3: - final int latestSupportedVersionGroupLeader = info.latestSupportedVersion(); - if (latestSupportedVersionGroupLeader > usedSubscriptionMetadataVersion) { - final int newSubscriptionMetadataVersion = Math.min(latestSupportedVersionGroupLeader, SubscriptionInfo.LATEST_SUPPORTED_VERSION); - log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + - "Upgrading subscription metadata version to {} for next rebalance.", - usedSubscriptionMetadataVersion, - latestSupportedVersionGroupLeader, - newSubscriptionMetadataVersion); - usedSubscriptionMetadataVersion = newSubscriptionMetadataVersion; - } processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; default: - throw new IllegalStateException("Unknown metadata version: " + receivedAssignmentMetadataVersion + throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION); } @@ -726,17 +685,6 @@ private void processVersionOneAssignment(final AssignmentInfo info, } } - private Map emptyAssignment(final Map subscriptions) { - final Map emptyAssignment = new HashMap<>(); - for (final String client : subscriptions.keySet()) { - emptyAssignment.put(client, new Assignment( - Collections.emptyList(), - new AssignmentInfo().encode() - )); - } - return emptyAssignment; - } - private void processVersionTwoAssignment(final AssignmentInfo info, final List partitions, final Map> activeTasks, @@ -761,14 +709,6 @@ private void processVersionThreeAssignment(final AssignmentInfo info, processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); } - // for test - protected void processLatestVersionAssignment(final AssignmentInfo info, - final List partitions, - final Map> activeTasks, - final Map topicToPartitionInfo) { - processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); - } - /** * Internal helper function that creates a Kafka topic * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index b6454d6488421..6308ca7fd8020 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -42,7 +42,7 @@ import static java.util.Collections.singleton; -public class TaskManager { +class TaskManager { // initialize the task list // activeTasks needs to be concurrent as it can be accessed // by QueryableState @@ -64,7 +64,6 @@ public class TaskManager { private Cluster cluster; private Map> assignedActiveTasks; private Map> assignedStandbyTasks; - public boolean versionProbingFlag = false; private Consumer consumer; @@ -96,9 +95,6 @@ public class TaskManager { } void createTasks(final Collection assignment) { - if (versionProbingFlag) { - return; - } if (consumer == null) { throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); } @@ -191,14 +187,14 @@ Set standbyTaskIds() { return standby.allAssignedTaskIds(); } - public Set prevActiveTaskIds() { + Set prevActiveTaskIds() { return active.previousTaskIds(); } /** * Returns ids of tasks whose states are kept on the local storage. */ - public Set cachedTasksIds() { + Set cachedTasksIds() { // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios: // 1) the client is actively maintaining standby tasks by maintaining their states from the change log. // 2) the client has just got some tasks migrated out of itself to other clients while these task states @@ -225,7 +221,7 @@ public Set cachedTasksIds() { return tasks; } - public UUID processId() { + UUID processId() { return processId; } @@ -316,10 +312,6 @@ void setConsumer(final Consumer consumer) { * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only) */ boolean updateNewAndRestoringTasks() { - if (versionProbingFlag) { - return false; - } - active.initializeNewTasks(); standby.initializeNewTasks(); @@ -364,21 +356,21 @@ private void assignStandbyPartitions() { } } - public void setClusterMetadata(final Cluster cluster) { + void setClusterMetadata(final Cluster cluster) { this.cluster = cluster; } - public void setPartitionsByHostState(final Map> partitionsByHostState) { + void setPartitionsByHostState(final Map> partitionsByHostState) { this.streamsMetadataState.onChange(partitionsByHostState, cluster); } - public void setAssignmentMetadata(final Map> activeTasks, + void setAssignmentMetadata(final Map> activeTasks, final Map> standbyTasks) { this.assignedActiveTasks = activeTasks; this.assignedStandbyTasks = standbyTasks; } - public void updateSubscriptionsFromAssignment(List partitions) { + void updateSubscriptionsFromAssignment(List partitions) { if (builder().sourceTopicPattern() != null) { final Set assignedTopics = new HashSet<>(); for (final TopicPartition topicPartition : partitions) { @@ -393,7 +385,7 @@ public void updateSubscriptionsFromAssignment(List partitions) { } } - public void updateSubscriptionsFromMetadata(Set topics) { + void updateSubscriptionsFromMetadata(Set topics) { if (builder().sourceTopicPattern() != null) { final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); if (!existingTopics.equals(topics)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index b5b7cd7fbd23d..41162e3fd73a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -35,12 +35,12 @@ public class SubscriptionInfo { public static final int LATEST_SUPPORTED_VERSION = 3; public static final int UNKNOWN = -1; - protected final int usedVersion; - protected final int latestSupportedVersion; - protected UUID processId; - protected Set prevTasks; - protected Set standbyTasks; - protected String userEndPoint; + private final int usedVersion; + private final int latestSupportedVersion; + private UUID processId; + private Set prevTasks; + private Set standbyTasks; + private String userEndPoint; // used for decoding; don't apply version checks private SubscriptionInfo(final int version, @@ -207,30 +207,12 @@ private ByteBuffer encodeVersionThree() { final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); - buf.putInt(3); // version - buf.putInt(3); // version - encodeLatestVersion(buf, endPointBytes); - - return buf; - } - - private void encodeLatestVersion(final ByteBuffer buf, - final byte[] endPointBytes) { + buf.putInt(3); // used version + buf.putInt(3); // supported version encodeClientUUID(buf); encodeTasks(buf, prevTasks); encodeTasks(buf, standbyTasks); encodeUserEndPoint(buf, endPointBytes); - } - - // for testing - public ByteBuffer encodeFutureVersion(int futureVersion) { - final byte[] endPointBytes = prepareUserEndPoint(); - - final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); - - buf.putInt(futureVersion); // version - buf.putInt(futureVersion); // version - encodeLatestVersion(buf, endPointBytes); return buf; } @@ -336,13 +318,6 @@ private static void decodeVersionThreeData(final SubscriptionInfo subscriptionIn decodeUserEndPoint(subscriptionInfo, data); } - // for testing only - public static void decodeLatestVersionData(final SubscriptionInfo subscriptionInfo, - final ByteBuffer data) { - decodeVersionThreeData(subscriptionInfo, data); - } - - @Override public int hashCode() { final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); if (userEndPoint == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 59b909312e1f2..4e04b4985ed40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -48,7 +48,6 @@ import org.easymock.EasyMock; import org.junit.Test; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1194,86 +1193,6 @@ private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); } - @Test - public void shouldReturnEmptyAssignmentForAllInstancesIfReceivedSingleFutureVersionSubscription() { - final Map subscriptions = new HashMap<>(); - final Set emptyTasks = Collections.emptySet(); - subscriptions.put( - "consumer1", - new PartitionAssignor.Subscription( - Collections.singletonList("topic1"), - new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() - ) - ); - subscriptions.put( - "future-consumer", - new PartitionAssignor.Subscription( - Collections.singletonList("topic1"), - encodeFutureSubscription() - ) - ); - - mockTaskManager( - emptyTasks, - emptyTasks, - UUID.randomUUID(), - builder); - partitionAssignor.configure(configProps()); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); - - assertThat(assignment.size(), equalTo(2)); - assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()), equalTo(new AssignmentInfo())); - assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo())); - } - - @Test - public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() { - shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1); - } - - @Test - public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() { - shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2); - } - - private ByteBuffer encodeFutureSubscription() { - final ByteBuffer buf = ByteBuffer.allocate(4 /* version */); - buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); - return buf; - } - - private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { - final Map subscriptions = new HashMap<>(); - final Set emptyTasks = Collections.emptySet(); - subscriptions.put( - "consumer1", - new PartitionAssignor.Subscription( - Collections.singletonList("topic1"), - new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() - ) - ); - subscriptions.put( - "future-consumer", - new PartitionAssignor.Subscription( - Collections.singletonList("topic1"), - encodeFutureSubscription() - ) - ); - - mockTaskManager( - emptyTasks, - emptyTasks, - UUID.randomUUID(), - builder); - partitionAssignor.configure(configProps()); - - try { - partitionAssignor.assign(metadata, subscriptions); - fail("Should have thrown IllegalStateException"); - } catch (final IllegalStateException expected) { - // pass - } - } private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.>emptyMap(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index b3126190a3017..5a32f3b05aa43 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.streams.processor.TaskId; import org.junit.Test; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -76,16 +75,4 @@ public void shouldEncodeAndDecodeVersion3() { assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } - @Test - public void shouldAllowToDecodeFutureSupportedVersion() { - final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion()); - assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.version()); - assertEquals(SubscriptionInfo.UNKNOWN, info.latestSupportedVersion()); - } - - private ByteBuffer encodeFutureVersion() { - final ByteBuffer buf = ByteBuffer.allocate(4 /* version */); - buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); - return buf; - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index ed6dc161f8701..69eea0b37c0ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,41 +16,13 @@ */ package org.apache.kafka.streams.tests; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; -import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; -import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; -import org.apache.kafka.streams.state.HostInfo; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.Set; -import java.util.UUID; public class StreamsUpgradeTest { @@ -78,17 +50,9 @@ public static void main(final String[] args) throws Exception { config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - - final KafkaClientSupplier kafkaClientSupplier; - if (streamsProperties.containsKey("test.future.metadata")) { - streamsProperties.remove("test.future.metadata"); - kafkaClientSupplier = new FutureKafkaClientSupplier(); - } else { - kafkaClientSupplier = new DefaultKafkaClientSupplier(); - } config.putAll(streamsProperties); - final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier); + final KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -102,184 +66,4 @@ public void run() { } }); } - - private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier { - @Override - public Consumer getConsumer(final Map config) { - config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, FutureStreamsPartitionAssignor.class.getName()); - return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); - } - } - - public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor { - - public FutureStreamsPartitionAssignor() { - usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1; - } - - @Override - public Subscription subscription(final Set topics) { - // Adds the following information to subscription - // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) - // 2. Task ids of previously running tasks - // 3. Task ids of valid local states on the client's state directory. - - final Set previousActiveTasks = taskManager.prevActiveTaskIds(); - final Set standbyTasks = taskManager.cachedTasksIds(); - standbyTasks.removeAll(previousActiveTasks); - final FutureSubscriptionInfo data = new FutureSubscriptionInfo( - usedSubscriptionMetadataVersion, - SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, - taskManager.processId(), - previousActiveTasks, - standbyTasks, - this.userEndPoint); - - taskManager.updateSubscriptionsFromMetadata(topics); - - return new Subscription(new ArrayList<>(topics), data.encode()); - } - - @Override - public void onAssignment(final PartitionAssignor.Assignment assignment) { - try { - super.onAssignment(assignment); - return; - } catch (final IllegalStateException cannotProcessFutureVersion) { - // continue - } - - final List partitions = new ArrayList<>(assignment.partitions()); - Collections.sort(partitions, PARTITION_COMPARATOR); - - final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); - final int receivedAssignmentMetadataVersion = info.version(); - - if (receivedAssignmentMetadataVersion > AssignmentInfo.LATEST_SUPPORTED_VERSION + 1) { - throw new IllegalStateException("Unknown metadata version: " + receivedAssignmentMetadataVersion - + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); - } - - // version 1 field - final Map> activeTasks = new HashMap<>(); - // version 2 fields - final Map topicToPartitionInfo = new HashMap<>(); - final Map> partitionsByHost; - - processLatestVersionAssignment(info, partitions, activeTasks, topicToPartitionInfo); - partitionsByHost = info.partitionsByHost(); - - taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo)); - taskManager.setPartitionsByHostState(partitionsByHost); - taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks()); - taskManager.updateSubscriptionsFromAssignment(partitions); - } - - @Override - public Map assign(final Cluster metadata, - final Map subscriptions) { - final Map assignment = super.assign(metadata, subscriptions); - final Map newAssignment = new HashMap<>(); - - for (final Map.Entry entry : assignment.entrySet()) { - final Assignment singleAssignment = entry.getValue(); - newAssignment.put( - entry.getKey(), - new Assignment(singleAssignment.partitions(), - new FutureAssignmentInfo(singleAssignment.userData()).encode())); - } - - return newAssignment; - } - } - - private static class FutureSubscriptionInfo extends SubscriptionInfo { - // for testing only; don't apply version checks - FutureSubscriptionInfo(final int version, - final int latestSupportedVersion, - final UUID processId, - final Set prevTasks, - final Set standbyTasks, - final String userEndPoint) { - super(version, latestSupportedVersion, processId, prevTasks, standbyTasks, userEndPoint); - } - - public ByteBuffer encode() { - if (usedVersion <= SubscriptionInfo.LATEST_SUPPORTED_VERSION) { - return super.encode(); - } - - final ByteBuffer buf = super.encodeFutureVersion(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1); - buf.rewind(); - return buf; - } - - public static SubscriptionInfo decode(final ByteBuffer data) { - final SubscriptionInfo subscriptionInfo; - - // ensure we are at the beginning of the ByteBuffer - data.rewind(); - - final int usedVersion = data.getInt(); - - if (usedVersion <= SubscriptionInfo.LATEST_SUPPORTED_VERSION) { - subscriptionInfo = SubscriptionInfo.decode(data); - } else { - if (usedVersion == SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) { - subscriptionInfo = new FutureSubscriptionInfo( - usedVersion, - usedVersion, - null, - null, - null, - null); - SubscriptionInfo.decodeLatestVersionData(subscriptionInfo, data); - } else { - subscriptionInfo = new FutureSubscriptionInfo( - usedVersion, - UNKNOWN, - null, - null, - null, - null); - } - } - - return subscriptionInfo; - } - } - - private static class FutureAssignmentInfo extends AssignmentInfo { - final ByteBuffer originalUserMetadata; - - private FutureAssignmentInfo(final ByteBuffer bytes) { - originalUserMetadata = bytes; - } - - @Override - public ByteBuffer encode() { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - originalUserMetadata.rewind(); - - try (final DataOutputStream out = new DataOutputStream(baos)) { - out.writeInt(originalUserMetadata.getInt()); - originalUserMetadata.getInt(); // discard original supported version - out.writeInt(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1); - - try { - while (true) { - out.write(originalUserMetadata.get()); - } - } catch (final BufferUnderflowException expectedWhenAllDataCopied) { } - - out.flush(); - out.close(); - - return ByteBuffer.wrap(baos.toByteArray()); - } catch (final IOException ex) { - throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex); - } - } - } } diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 64a99f938e6a2..f089879d4c8a7 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -64,7 +64,7 @@ """ -class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService): +class oConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService): # Root directory for persistent output PERSISTENT_ROOT = "/mnt/console_consumer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout") diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 8ce6dc39b8d67..bf2fd52427a8c 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -428,8 +428,6 @@ def prop_file(self): properties = {STATE_DIR: self.PERSISTENT_ROOT} if self.UPGRADE_FROM is not None: properties['upgrade.from'] = self.UPGRADE_FROM - if self.UPGRADE_TO == "future_version": - properties['test.future.metadata'] = "any_value" cfg = KafkaConfig(**properties) return cfg.render() diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 9859f90a1efd8..8b7d7712459a1 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -174,7 +174,7 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): self.driver.stop() - @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) + #@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions) @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions) def test_metadata_upgrade(self, from_version, to_version): @@ -230,92 +230,6 @@ def test_metadata_upgrade(self, from_version, to_version): self.driver.stop() - def test_version_probing_upgrade(self): - """ - Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" - """ - - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() - - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics) - self.kafka.start() - - self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - self.driver.disable_auto_terminate() - self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) - - self.driver.start() - self.start_all_nodes_with("") # run with TRUNK - - self.processors = [self.processor1, self.processor2, self.processor3] - - for p in self.processors: - p.CLEAN_NODE_ENABLED = False - it = p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True) - if it.has_next(): - if self.leader is not None: - raise Exception("Could not uniquely identify leader") - self.leader = p - - if self.leader is None: - raise Exception("Could not identify leader") - - counter = 1 - random.seed() - - # rolling bounces - random.shuffle(self.processors) - first_bounced_processor = None - expected_new_leader_processor = None - with self.leader.node.account.monitor_log(self.leader.LOG_FILE) as leader_monitor: - for p in self.processors: - if p == self.leader: - continue - - self.do_rolling_bounce(p, None, "future_version", counter) - leader_monitor.wait_until("Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).", - timeout_sec=60, - err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) - p.node.account.ssh_capture("grep \"partition.assignment.strategy = [org.apache.kafka.streams.tests.StreamsUpgradeTest$FutureStreamsPartitionAssignor]\" %s" % p.LOG_FILE, allow_fail=False) - p.node.account.ssh_capture("grep \"Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance\" %s" % p.LOG_FILE, allow_fail=False) - - if first_bounced_processor is None: - first_bounced_processor = p - else: - expected_new_leader_processor = p - - counter = counter + 1 - - it = expected_new_leader_processor.node.account.ssh_capture("grep \"Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).\" %s" % expected_new_leader_processor.LOG_FILE, allow_fail=True) - if it.has_next(): - print it.next() - raise Exception("Future new leader should receive version probing only after current/old leader is bounced.") - - self.do_rolling_bounce(self.leader, None, "future_version", counter) - prevLeader = self.leader - - expected_new_leader_processor.node.account.ssh_capture("grep \"Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).\" %s" % expected_new_leader_processor.LOG_FILE, allow_fail=False) - prevLeader.node.account.ssh_capture("grep \"partition.assignment.strategy = [org.apache.kafka.streams.tests.StreamsUpgradeTest$FutureStreamsPartitionAssignor]\" %s" % prevLeader.LOG_FILE, allow_fail=False) - prevLeader.node.account.ssh_capture("grep \"Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance\" %s" % prevLeader.LOG_FILE, allow_fail=False) - - # shutdown - self.driver.stop() - self.driver.wait() - - random.shuffle(self.processors) - for p in self.processors: - node = p.node - with node.account.monitor_log(p.STDOUT_FILE) as monitor: - p.stop() - monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", - timeout_sec=60, - err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) - - self.driver.stop() - def start_all_nodes_with(self, version): # start first with self.prepare_for(self.processor1, version) @@ -413,12 +327,8 @@ def do_rolling_bounce(self, processor, upgrade_from, new_version, counter): if new_version == str(DEV_VERSION): processor.set_version("") # set to TRUNK - elif new_version == "future_version": - processor.set_upgrade_to("future_version") - new_version = str(DEV_VERSION) else: processor.set_version(new_version) - processor.set_upgrade_from(upgrade_from) grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" " From 0e762d6e1f24dc67a738950aa75264801b98a27f Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 17 Apr 2018 22:46:34 +0200 Subject: [PATCH 3/3] Review comments --- .../streams/processor/internals/assignment/AssignmentInfo.java | 2 +- .../processor/internals/assignment/SubscriptionInfo.java | 2 +- .../processor/internals/assignment/SubscriptionInfoTest.java | 2 +- tests/kafkatest/services/console_consumer.py | 2 +- tests/kafkatest/services/streams.py | 3 --- 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 12a53afb3c961..3c5cee2bfc387 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -197,7 +197,7 @@ private void writeTopicPartitions(final DataOutputStream out, } private void encodeVersionThree(final DataOutputStream out) throws IOException { - out.writeInt(usedVersion); + out.writeInt(3); out.writeInt(LATEST_SUPPORTED_VERSION); encodeActiveAndStandbyTaskAssignment(out); encodePartitionsByHost(out); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 41162e3fd73a6..be709472441d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -208,7 +208,7 @@ private ByteBuffer encodeVersionThree() { final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); buf.putInt(3); // used version - buf.putInt(3); // supported version + buf.putInt(LATEST_SUPPORTED_VERSION); // supported version encodeClientUUID(buf); encodeTasks(buf, prevTasks); encodeTasks(buf, standbyTasks); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 5a32f3b05aa43..e98b8ce072705 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -36,7 +36,7 @@ public class SubscriptionInfoTest { new TaskId(1, 1), new TaskId(2, 0))); - private final static String IGNORED_USER_ENDPOINT = "IGNORED_USER_ENDPOINT"; + private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80"; @Test public void shouldUseLatestSupportedVersionByDefault() { diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index f089879d4c8a7..64a99f938e6a2 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -64,7 +64,7 @@ """ -class oConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService): +class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService): # Root directory for persistent output PERSISTENT_ROOT = "/mnt/console_consumer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout") diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index bf2fd52427a8c..e0e445de22ae6 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -421,9 +421,6 @@ def set_version(self, kafka_streams_version): def set_upgrade_from(self, upgrade_from): self.UPGRADE_FROM = upgrade_from - def set_upgrade_to(self, upgrade_to): - self.UPGRADE_TO = upgrade_to - def prop_file(self): properties = {STATE_DIR: self.PERSISTENT_ROOT} if self.UPGRADE_FROM is not None: