diff --git a/build.gradle b/build.gradle index 69f560ec38fef..8b07a9584c733 100644 --- a/build.gradle +++ b/build.gradle @@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') { } } +project(':streams:upgrade-system-tests-11') { + archivesBaseName = "kafka-streams-upgrade-system-tests-11" + + dependencies { + testCompile libs.kafkaStreams_11 + } + + systemTestLibs { + dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index d6beba958ddaa..e5f2958ff1086 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -67,6 +67,7 @@ versions += [ kafka_0102: "0.10.2.1", kafka_0110: "0.11.0.2", kafka_10: "1.0.1", + kafka_11: "1.1.0", lz4: "1.4.1", metrics: "2.2.0", // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta @@ -115,6 +116,7 @@ libs += [ kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110", kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10", + kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11", log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", diff --git a/settings.gradle b/settings.gradle index 03136849fd543..2a7977cfc9343 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,5 +15,6 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', - 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender', - 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks' + 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', + 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', + 'jmh-benchmarks' diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 819bebd43b690..65b1da6dedeef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -172,6 +172,31 @@ public class StreamsConfig extends AbstractConfig { */ public static final String UPGRADE_FROM_0100 = "0.10.0"; + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}. + */ + public static final String UPGRADE_FROM_0101 = "0.10.1"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}. + */ + public static final String UPGRADE_FROM_0102 = "0.10.2"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}. + */ + public static final String UPGRADE_FROM_0110 = "0.11.0"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}. + */ + public static final String UPGRADE_FROM_10 = "1.0"; + + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}. + */ + public static final String UPGRADE_FROM_11 = "1.1"; + /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. */ @@ -347,8 +372,9 @@ public class StreamsConfig extends AbstractConfig { /** {@code upgrade.from} */ public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; - public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " + - "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x)."; + public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " + + "When upgrading from 1.2 to a newer version it is not required to specify this config." + + "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version)."; /** * {@code value.serde} @@ -364,7 +390,7 @@ public class StreamsConfig extends AbstractConfig { /** * {@code zookeeper.connect} - * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored. + * @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be ignored. */ @Deprecated public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; @@ -575,7 +601,7 @@ public class StreamsConfig extends AbstractConfig { .define(UPGRADE_FROM_CONFIG, ConfigDef.Type.STRING, null, - in(null, UPGRADE_FROM_0100), + in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11), Importance.LOW, UPGRADE_FROM_DOC) .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 97771e568795d..c81105ef821ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -199,10 +199,24 @@ public void configure(final Map configs) { final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); - final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); - if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) { - log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x."); - userMetadataVersion = 1; + final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); + if (upgradeFrom != null) { + switch (upgradeFrom) { + case StreamsConfig.UPGRADE_FROM_0100: + log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); + userMetadataVersion = 1; + break; + case StreamsConfig.UPGRADE_FROM_0101: + case StreamsConfig.UPGRADE_FROM_0102: + case StreamsConfig.UPGRADE_FROM_0110: + case StreamsConfig.UPGRADE_FROM_10: + case StreamsConfig.UPGRADE_FROM_11: + log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION); + userMetadataVersion = 2; + break; + default: + throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); + } } final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR); @@ -512,7 +526,7 @@ public Map assign(final Cluster metadata, // construct the global partition assignment per host map final Map> partitionsByHostState = new HashMap<>(); - if (minUserMetadataVersion == 2) { + if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) { for (final Map.Entry entry : clientsMetadata.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; @@ -631,6 +645,10 @@ public void onAssignment(final Assignment assignment) { processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; + case 3: + processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION); @@ -684,6 +702,13 @@ private void processVersionTwoAssignment(final AssignmentInfo info, } } + private void processVersionThreeAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks, + final Map topicToPartitionInfo) { + processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); + } + /** * Internal helper function that creates a Kafka topic * @@ -818,4 +843,5 @@ void validate(final Set copartitionGroup, void setInternalTopicManager(final InternalTopicManager internalTopicManager) { this.internalTopicManager = internalTopicManager; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index c8df7498755bb..3c5cee2bfc387 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.HostInfo; @@ -30,6 +30,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,15 +41,20 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 2; + public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int UNKNOWN = -1; private final int usedVersion; + private final int latestSupportedVersion; private List activeTasks; private Map> standbyTasks; private Map> partitionsByHost; - private AssignmentInfo(final int version) { + // used for decoding; don't apply version checks + private AssignmentInfo(final int version, + final int latestSupportedVersion) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; } public AssignmentInfo(final List activeTasks, @@ -57,11 +63,33 @@ public AssignmentInfo(final List activeTasks, this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); } + public AssignmentInfo() { + this(LATEST_SUPPORTED_VERSION, + Collections.emptyList(), + Collections.>emptyMap(), + Collections.>emptyMap()); + } + public AssignmentInfo(final int version, final List activeTasks, final Map> standbyTasks, final Map> hostState) { + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); + + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } + } + + // for testing only; don't apply version checks + AssignmentInfo(final int version, + final int latestSupportedVersion, + final List activeTasks, + final Map> standbyTasks, + final Map> hostState) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = hostState; @@ -71,6 +99,10 @@ public int version() { return usedVersion; } + public int latestSupportedVersion() { + return latestSupportedVersion; + } + public List activeTasks() { return activeTasks; } @@ -98,6 +130,9 @@ public ByteBuffer encode() { case 2: encodeVersionTwo(out); break; + case 3: + encodeVersionThree(out); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -161,6 +196,13 @@ private void writeTopicPartitions(final DataOutputStream out, } } + private void encodeVersionThree(final DataOutputStream out) throws IOException { + out.writeInt(3); + out.writeInt(LATEST_SUPPORTED_VERSION); + encodeActiveAndStandbyTaskAssignment(out); + encodePartitionsByHost(out); + } + /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ @@ -169,19 +211,25 @@ public static AssignmentInfo decode(final ByteBuffer data) { data.rewind(); try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { - // decode used version - final int usedVersion = in.readInt(); - final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion); + final AssignmentInfo assignmentInfo; + final int usedVersion = in.readInt(); switch (usedVersion) { case 1: + assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN); decodeVersionOneData(assignmentInfo, in); break; case 2: + assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN); decodeVersionTwoData(assignmentInfo, in); break; + case 3: + final int latestSupportedVersion = in.readInt(); + assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); + decodeVersionThreeData(assignmentInfo, in); + break; default: - TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + + TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); log.error(fatalException.getMessage(), fatalException); throw fatalException; @@ -195,15 +243,23 @@ public static AssignmentInfo decode(final ByteBuffer data) { private static void decodeVersionOneData(final AssignmentInfo assignmentInfo, final DataInputStream in) throws IOException { - // decode active tasks - int count = in.readInt(); + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + assignmentInfo.partitionsByHost = new HashMap<>(); + } + + private static void decodeActiveTasks(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + final int count = in.readInt(); assignmentInfo.activeTasks = new ArrayList<>(count); for (int i = 0; i < count; i++) { assignmentInfo.activeTasks.add(TaskId.readFrom(in)); } + } - // decode standby tasks - count = in.readInt(); + private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + final int count = in.readInt(); assignmentInfo.standbyTasks = new HashMap<>(count); for (int i = 0; i < count; i++) { TaskId id = TaskId.readFrom(in); @@ -213,9 +269,13 @@ private static void decodeVersionOneData(final AssignmentInfo assignmentInfo, private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo, final DataInputStream in) throws IOException { - decodeVersionOneData(assignmentInfo, in); + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + decodeGlobalAssignmentData(assignmentInfo, in); + } - // decode partitions by host + private static void decodeGlobalAssignmentData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { assignmentInfo.partitionsByHost = new HashMap<>(); final int numEntries = in.readInt(); for (int i = 0; i < numEntries; i++) { @@ -233,19 +293,27 @@ private static Set readTopicPartitions(final DataInputStream in) return partitions; } + private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + decodeGlobalAssignmentData(assignmentInfo, in); + } + @Override public int hashCode() { - return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); + return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); } @Override public boolean equals(final Object o) { if (o instanceof AssignmentInfo) { final AssignmentInfo other = (AssignmentInfo) o; - return this.usedVersion == other.usedVersion && - this.activeTasks.equals(other.activeTasks) && - this.standbyTasks.equals(other.standbyTasks) && - this.partitionsByHost.equals(other.partitionsByHost); + return usedVersion == other.usedVersion && + latestSupportedVersion == other.latestSupportedVersion && + activeTasks.equals(other.activeTasks) && + standbyTasks.equals(other.standbyTasks) && + partitionsByHost.equals(other.partitionsByHost); } else { return false; } @@ -253,7 +321,11 @@ public boolean equals(final Object o) { @Override public String toString() { - return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]"; + return "[version=" + usedVersion + + ", supported version=" + latestSupportedVersion + + ", active tasks=" + activeTasks + + ", standby tasks=" + standbyTasks + + ", global assignment=" + partitionsByHost + "]"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 7fee90b540213..be709472441d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -31,16 +32,21 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 2; + public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int UNKNOWN = -1; private final int usedVersion; + private final int latestSupportedVersion; private UUID processId; private Set prevTasks; private Set standbyTasks; private String userEndPoint; - private SubscriptionInfo(final int version) { + // used for decoding; don't apply version checks + private SubscriptionInfo(final int version, + final int latestSupportedVersion) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; } public SubscriptionInfo(final UUID processId, @@ -55,7 +61,23 @@ public SubscriptionInfo(final int version, final Set prevTasks, final Set standbyTasks, final String userEndPoint) { + this(version, LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint); + + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } + } + + // for testing only; don't apply version checks + protected SubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final String userEndPoint) { this.usedVersion = version; + this.latestSupportedVersion = latestSupportedVersion; this.processId = processId; this.prevTasks = prevTasks; this.standbyTasks = standbyTasks; @@ -66,6 +88,10 @@ public int version() { return usedVersion; } + public int latestSupportedVersion() { + return latestSupportedVersion; + } + public UUID processId() { return processId; } @@ -93,7 +119,10 @@ public ByteBuffer encode() { buf = encodeVersionOne(); break; case 2: - buf = encodeVersionTwo(prepareUserEndPoint()); + buf = encodeVersionTwo(); + break; + case 3: + buf = encodeVersionThree(); break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion @@ -108,7 +137,9 @@ private ByteBuffer encodeVersionOne() { final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength()); buf.putInt(1); // version - encodeVersionOneData(buf); + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); return buf; } @@ -120,18 +151,15 @@ private int getVersionOneByteLength() { 4 + standbyTasks.size() * 8; // length + standby tasks } - private void encodeVersionOneData(final ByteBuffer buf) { - // encode client UUID + private void encodeClientUUID(final ByteBuffer buf) { buf.putLong(processId.getMostSignificantBits()); buf.putLong(processId.getLeastSignificantBits()); - // encode ids of previously running tasks - buf.putInt(prevTasks.size()); - for (TaskId id : prevTasks) { - id.writeTo(buf); - } - // encode ids of cached tasks - buf.putInt(standbyTasks.size()); - for (TaskId id : standbyTasks) { + } + + private void encodeTasks(final ByteBuffer buf, + final Collection taskIds) { + buf.putInt(taskIds.size()); + for (TaskId id : taskIds) { id.writeTo(buf); } } @@ -144,52 +172,87 @@ private byte[] prepareUserEndPoint() { } } - private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) { + private ByteBuffer encodeVersionTwo() { + final byte[] endPointBytes = prepareUserEndPoint(); + final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes)); buf.putInt(2); // version - encodeVersionTwoData(buf, endPointBytes); + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); + encodeUserEndPoint(buf, endPointBytes); return buf; } private int getVersionTwoByteLength(final byte[] endPointBytes) { - return getVersionOneByteLength() + + return 4 + // version + 16 + // client ID + 4 + prevTasks.size() * 8 + // length + prev tasks + 4 + standbyTasks.size() * 8 + // length + standby tasks 4 + endPointBytes.length; // length + userEndPoint } - private void encodeVersionTwoData(final ByteBuffer buf, - final byte[] endPointBytes) { - encodeVersionOneData(buf); + private void encodeUserEndPoint(final ByteBuffer buf, + final byte[] endPointBytes) { if (endPointBytes != null) { buf.putInt(endPointBytes.length); buf.put(endPointBytes); } } + private ByteBuffer encodeVersionThree() { + final byte[] endPointBytes = prepareUserEndPoint(); + + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); + + buf.putInt(3); // used version + buf.putInt(LATEST_SUPPORTED_VERSION); // supported version + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); + encodeUserEndPoint(buf, endPointBytes); + + return buf; + } + + private int getVersionThreeByteLength(final byte[] endPointBytes) { + return 4 + // used version + 4 + // latest supported version version + 16 + // client ID + 4 + prevTasks.size() * 8 + // length + prev tasks + 4 + standbyTasks.size() * 8 + // length + standby tasks + 4 + endPointBytes.length; // length + userEndPoint + } + /** * @throws TaskAssignmentException if method fails to decode the data */ public static SubscriptionInfo decode(final ByteBuffer data) { + final SubscriptionInfo subscriptionInfo; + // ensure we are at the beginning of the ByteBuffer data.rewind(); - // decode used version final int usedVersion = data.getInt(); - final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion); - switch (usedVersion) { case 1: + subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); decodeVersionOneData(subscriptionInfo, data); break; case 2: + subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); decodeVersionTwoData(subscriptionInfo, data); break; + case 3: + final int latestSupportedVersion = data.getInt(); + subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion); + decodeVersionThreeData(subscriptionInfo, data); + break; default: - TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " + - "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); - log.error(fatalException.getMessage(), fatalException); - throw fatalException; + subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN); + log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION); } return subscriptionInfo; @@ -197,30 +260,43 @@ public static SubscriptionInfo decode(final ByteBuffer data) { private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { - // decode client UUID - subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); + decodeClientUUID(subscriptionInfo, data); - // decode previously active tasks - final int numPrevs = data.getInt(); subscriptionInfo.prevTasks = new HashSet<>(); - for (int i = 0; i < numPrevs; i++) { - TaskId id = TaskId.readFrom(data); - subscriptionInfo.prevTasks.add(id); - } + decodeTasks(subscriptionInfo.prevTasks, data); - // decode previously cached tasks - final int numCached = data.getInt(); subscriptionInfo.standbyTasks = new HashSet<>(); - for (int i = 0; i < numCached; i++) { - subscriptionInfo.standbyTasks.add(TaskId.readFrom(data)); + decodeTasks(subscriptionInfo.standbyTasks, data); + } + + private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + subscriptionInfo.processId = new UUID(data.getLong(), data.getLong()); + } + + private static void decodeTasks(final Collection taskIds, + final ByteBuffer data) { + final int numPrevs = data.getInt(); + for (int i = 0; i < numPrevs; i++) { + taskIds.add(TaskId.readFrom(data)); } } private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo, final ByteBuffer data) { - decodeVersionOneData(subscriptionInfo, data); + decodeClientUUID(subscriptionInfo, data); + + subscriptionInfo.prevTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.prevTasks, data); - // decode user end point (can be null) + subscriptionInfo.standbyTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.standbyTasks, data); + + decodeUserEndPoint(subscriptionInfo, data); + } + + private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { int bytesLength = data.getInt(); if (bytesLength != 0) { final byte[] bytes = new byte[bytesLength]; @@ -229,9 +305,21 @@ private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo } } - @Override + private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo, + final ByteBuffer data) { + decodeClientUUID(subscriptionInfo, data); + + subscriptionInfo.prevTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.prevTasks, data); + + subscriptionInfo.standbyTasks = new HashSet<>(); + decodeTasks(subscriptionInfo.standbyTasks, data); + + decodeUserEndPoint(subscriptionInfo, data); + } + public int hashCode() { - final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); + final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode(); if (userEndPoint == null) { return hashCode; } @@ -243,6 +331,7 @@ public boolean equals(final Object o) { if (o instanceof SubscriptionInfo) { final SubscriptionInfo other = (SubscriptionInfo) o; return this.usedVersion == other.usedVersion && + this.latestSupportedVersion == other.latestSupportedVersion && this.processId.equals(other.processId) && this.prevTasks.equals(other.prevTasks) && this.standbyTasks.equals(other.standbyTasks) && @@ -252,4 +341,13 @@ public boolean equals(final Object o) { } } + @Override + public String toString() { + return "[version=" + usedVersion + + ", supported version=" + latestSupportedVersion + + ", process ID=" + processId + + ", prev tasks=" + prevTasks + + ", standby tasks=" + standbyTasks + + ", user endpoint=" + userEndPoint + "]"; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index e9ed9682066eb..4e04b4985ed40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -46,7 +46,6 @@ import org.apache.kafka.test.MockStateStoreSupplier; import org.easymock.Capture; import org.easymock.EasyMock; -import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -64,6 +63,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class StreamsPartitionAssignorTest { @@ -867,9 +867,12 @@ public void shouldMapUserEndPointToTopicPartitions() { final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); final Set topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); - assertEquals(Utils.mkSet(new TopicPartition("topic1", 0), + assertEquals( + Utils.mkSet( + new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), - new TopicPartition("topic1", 2)), topicPartitions); + new TopicPartition("topic1", 2)), + topicPartitions); } @Test @@ -881,7 +884,7 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { try { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost")); - Assert.fail("expected to an exception due to invalid config"); + fail("expected to an exception due to invalid config"); } catch (ConfigException e) { // pass } @@ -893,7 +896,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { try { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk")); - Assert.fail("expected to an exception due to invalid config"); + fail("expected to an exception due to invalid config"); } catch (ConfigException e) { // pass } @@ -1088,21 +1091,36 @@ public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProvider } @Test - public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() { + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() { + shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2); + } + + @Test + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() { + shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3); + } + + @Test + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() { + shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3); + } + + private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion, + final int otherVersion) { final Map subscriptions = new HashMap<>(); final Set emptyTasks = Collections.emptySet(); subscriptions.put( "consumer1", new PartitionAssignor.Subscription( Collections.singletonList("topic1"), - new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() ) ); subscriptions.put( "consumer2", new PartitionAssignor.Subscription( Collections.singletonList("topic1"), - new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() ) ); @@ -1115,12 +1133,12 @@ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions( final Map assignment = partitionAssignor.assign(metadata, subscriptions); assertThat(assignment.size(), equalTo(2)); - assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1)); - assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1)); + assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion)); + assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion)); } @Test - public void shouldDownGradeSubscription() { + public void shouldDownGradeSubscriptionToVersion1() { final Set emptyTasks = Collections.emptySet(); mockTaskManager( @@ -1135,6 +1153,46 @@ public void shouldDownGradeSubscription() { assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); } + @Test + public void shouldDownGradeSubscriptionToVersion2For0101() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For0102() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For0110() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For10() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10); + } + + @Test + public void shouldDownGradeSubscriptionToVersion2For11() { + shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11); + } + + private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) { + final Set emptyTasks = Collections.emptySet(); + + mockTaskManager( + emptyTasks, + emptyTasks, + UUID.randomUUID(), + builder); + configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); + + PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + + assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); + } + private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.>emptyMap(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index c1020a98ba9f3..c7382e7671cc2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -22,85 +22,70 @@ import org.apache.kafka.streams.state.HostInfo; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class AssignmentInfoTest { + private final List activeTasks = Arrays.asList( + new TaskId(0, 0), + new TaskId(0, 0), + new TaskId(0, 1), new TaskId(1, 0)); + private final Map> standbyTasks = new HashMap>() { + { + put(new TaskId(1, 1), + Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); + put(new TaskId(2, 0), + Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); + } + }; + private final Map> globalAssignment = new HashMap>() { + { + put(new HostInfo("localhost", 80), + Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t3", 3))); + } + }; @Test - public void testEncodeDecode() { - List activeTasks = - Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)); - Map> standbyTasks = new HashMap<>(); - - standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); - standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); + public void shouldUseLatestSupportedVersionByDefault() { + final AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, globalAssignment); + assertEquals(AssignmentInfo.LATEST_SUPPORTED_VERSION, info.version()); + } - AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap>()); - AssignmentInfo decoded = AssignmentInfo.decode(info.encode()); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion1() { + new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment); + } - assertEquals(info, decoded); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion2() { + new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment); } @Test - public void shouldDecodePreviousVersion() throws IOException { - List activeTasks = - Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)); - Map> standbyTasks = new HashMap<>(); - - standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1))); - standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0))); - final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null); - final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion)); - assertEquals(oldVersion.activeTasks(), decoded.activeTasks()); - assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks()); - assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1 - assertEquals(1, decoded.version()); + public void shouldEncodeAndDecodeVersion1() { + final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.>emptyMap()); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } - /** - * This is a clone of what the V1 encoding did. The encode method has changed for V2 - * so it is impossible to test compatibility without having this - */ - private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); - // Encode version - out.writeInt(oldVersion.version()); - // Encode active tasks - out.writeInt(oldVersion.activeTasks().size()); - for (TaskId id : oldVersion.activeTasks()) { - id.writeTo(out); - } - // Encode standby tasks - out.writeInt(oldVersion.standbyTasks().size()); - for (Map.Entry> entry : oldVersion.standbyTasks().entrySet()) { - TaskId id = entry.getKey(); - id.writeTo(out); - - Set partitions = entry.getValue(); - out.writeInt(partitions.size()); - for (TopicPartition partition : partitions) { - out.writeUTF(partition.topic()); - out.writeInt(partition.partition()); - } - } - - out.flush(); - out.close(); - - return ByteBuffer.wrap(baos.toByteArray()); + @Test + public void shouldEncodeAndDecodeVersion2() { + final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); + } + @Test + public void shouldEncodeAndDecodeVersion3() { + final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 633285a2b4ddc..e98b8ce072705 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -19,81 +19,60 @@ import org.apache.kafka.streams.processor.TaskId; import org.junit.Test; -import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class SubscriptionInfoTest { + private final UUID processId = UUID.randomUUID(); + private final Set activeTasks = new HashSet<>(Arrays.asList( + new TaskId(0, 0), + new TaskId(0, 1), + new TaskId(1, 0))); + private final Set standbyTasks = new HashSet<>(Arrays.asList( + new TaskId(1, 1), + new TaskId(2, 0))); - @Test - public void testEncodeDecode() { - UUID processId = UUID.randomUUID(); + private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80"; - Set activeTasks = - new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0))); - Set standbyTasks = - new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0))); + @Test + public void shouldUseLatestSupportedVersionByDefault() { + final SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION, info.version()); + } - SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null); - SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode()); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion1() { + new SubscriptionInfo(0, processId, activeTasks, standbyTasks, "localhost:80"); + } - assertEquals(info, decoded); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowForUnknownVersion2() { + new SubscriptionInfo(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, processId, activeTasks, standbyTasks, "localhost:80"); } @Test - public void shouldEncodeDecodeWithUserEndPoint() { - SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(), - Collections.singleton(new TaskId(0, 0)), Collections.emptySet(), "localhost:80"); - SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode()); - assertEquals(original, decoded); + public void shouldEncodeAndDecodeVersion1() { + final SubscriptionInfo info = new SubscriptionInfo(1, processId, activeTasks, standbyTasks, IGNORED_USER_ENDPOINT); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(1, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, null); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } @Test - public void shouldBeBackwardCompatible() { - UUID processId = UUID.randomUUID(); - - Set activeTasks = - new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0))); - Set standbyTasks = - new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0))); - - final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks); - final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding); - assertEquals(activeTasks, decode.prevTasks()); - assertEquals(standbyTasks, decode.standbyTasks()); - assertEquals(processId, decode.processId()); - assertNull(decode.userEndPoint()); + public void shouldEncodeAndDecodeVersion2() { + final SubscriptionInfo info = new SubscriptionInfo(2, processId, activeTasks, standbyTasks, "localhost:80"); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(2, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } - /** - * This is a clone of what the V1 encoding did. The encode method has changed for V2 - * so it is impossible to test compatibility without having this - */ - private ByteBuffer encodePreviousVersion(UUID processId, Set prevTasks, Set standbyTasks) { - ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8); - // version - buf.putInt(1); - // encode client UUID - buf.putLong(processId.getMostSignificantBits()); - buf.putLong(processId.getLeastSignificantBits()); - // encode ids of previously running tasks - buf.putInt(prevTasks.size()); - for (TaskId id : prevTasks) { - id.writeTo(buf); - } - // encode ids of cached tasks - buf.putInt(standbyTasks.size()); - for (TaskId id : standbyTasks) { - id.writeTo(buf); - } - buf.rewind(); - - return buf; + @Test + public void shouldEncodeAndDecodeVersion3() { + final SubscriptionInfo info = new SubscriptionInfo(3, processId, activeTasks, standbyTasks, "localhost:80"); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(3, SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } + } diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000000000..a8796cb056a7b --- /dev/null +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + /** + * This test cannot be executed, as long as Kafka 1.1.1 is not released + */ + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] : "")); + } + final String kafka = args[0]; + final String propFileName = args.length > 1 ? args[1] : null; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)"); + System.out.println("kafka=" + kafka); + System.out.println("props=" + streamsProperties); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static ProcessorSupplier printProcessorSupplier() { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 796ca31ea4cd1..a4b902a322c45 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -412,6 +412,7 @@ def __init__(self, test_context, kafka): "org.apache.kafka.streams.tests.StreamsUpgradeTest", "") self.UPGRADE_FROM = None + self.UPGRADE_TO = None def set_version(self, kafka_streams_version): self.KAFKA_STREAMS_VERSION = kafka_streams_version diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index fa79d571f366d..8b7d7712459a1 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -23,8 +23,16 @@ import random import time +# broker 0.10.0 is not compatible with newer Kafka Streams versions broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)] -simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(DEV_VERSION)] + +metadata_1_versions = [str(LATEST_0_10_0)] +metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +# we can add the following versions to `backward_compatible_metadata_2_versions` after the corresponding +# bug-fix release 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, and 1.1.1 are available: +# str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1) +backward_compatible_metadata_2_versions = [] +metadata_3_versions = [str(DEV_VERSION)] class StreamsUpgradeTest(Test): """ @@ -39,6 +47,7 @@ def __init__(self, test_context): 'echo' : { 'partitions': 5 }, 'data' : { 'partitions': 5 }, } + self.leader = None def perform_broker_upgrade(self, to_version): self.logger.info("First pass bounce - rolling broker upgrade") @@ -114,7 +123,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) - @matrix(from_version=simple_upgrade_versions_metadata_version_2, to_version=simple_upgrade_versions_metadata_version_2) + @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) def test_simple_upgrade_downgrade(self, from_version, to_version): """ Starts 3 KafkaStreams instances with , and upgrades one-by-one to @@ -165,15 +174,12 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): self.driver.stop() - #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released - #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released - #@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test until Kafka 0.11.0.3 is released - #@parametrize(new_version=str(LATEST_1_0)) we cannot run this test until Kafka 1.0.2 is released - #@parametrize(new_version=str(LATEST_1_1)) we cannot run this test until Kafka 1.1.1 is released - @parametrize(new_version=str(DEV_VERSION)) - def test_metadata_upgrade(self, new_version): + #@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) + @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions) + @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions) + def test_metadata_upgrade(self, from_version, to_version): """ - Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to + Starts 3 KafkaStreams instances with version and upgrades one-by-one to """ self.zk = ZookeeperService(self.test_context, num_nodes=1) @@ -189,7 +195,7 @@ def test_metadata_upgrade(self, new_version): self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka) self.driver.start() - self.start_all_nodes_with(str(LATEST_0_10_0)) + self.start_all_nodes_with(from_version) self.processors = [self.processor1, self.processor2, self.processor3] @@ -200,13 +206,13 @@ def test_metadata_upgrade(self, new_version): random.shuffle(self.processors) for p in self.processors: p.CLEAN_NODE_ENABLED = False - self.do_rolling_bounce(p, "0.10.0", new_version, counter) + self.do_rolling_bounce(p, from_version[:-2], to_version, counter) counter = counter + 1 # second rolling bounce random.shuffle(self.processors) for p in self.processors: - self.do_rolling_bounce(p, None, new_version, counter) + self.do_rolling_bounce(p, None, to_version, counter) counter = counter + 1 # shutdown diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 66e5fcf18aabf..7823efac1d4b1 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -63,17 +63,17 @@ def get_version(node=None): DEV_BRANCH = KafkaVersion("dev") DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT") -# 0.8.2.X versions +# 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1") V_0_8_2_2 = KafkaVersion("0.8.2.2") LATEST_0_8_2 = V_0_8_2_2 -# 0.9.0.X versions +# 0.9.0.x versions V_0_9_0_0 = KafkaVersion("0.9.0.0") V_0_9_0_1 = KafkaVersion("0.9.0.1") LATEST_0_9 = V_0_9_0_1 -# 0.10.0.X versions +# 0.10.0.x versions V_0_10_0_0 = KafkaVersion("0.10.0.0") V_0_10_0_1 = KafkaVersion("0.10.0.1") LATEST_0_10_0 = V_0_10_0_1