Skip to content

Commit

Permalink
KAFKA-6054: Update Kafka Streams metadata to version 3 (apache#4880)
Browse files Browse the repository at this point in the history
 - adds Streams upgrade tests for 1.1 release
 - introduces metadata version 3

Reviewers: John Roesler <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored and ying-zheng committed Jul 6, 2018
1 parent afd8bf0 commit 4897bc9
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 216 deletions.
12 changes: 12 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') {
}
}

project(':streams:upgrade-system-tests-11') {
archivesBaseName = "kafka-streams-upgrade-system-tests-11"

dependencies {
testCompile libs.kafkaStreams_11
}

systemTestLibs {
dependsOn testJar
}
}

project(':jmh-benchmarks') {

apply plugin: 'com.github.johnrengelman.shadow'
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ versions += [
kafka_0102: "0.10.2.1",
kafka_0110: "0.11.0.2",
kafka_10: "1.0.1",
kafka_11: "1.1.0",
lz4: "1.4.1",
metrics: "2.2.0",
// PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
Expand Down Expand Up @@ -115,6 +116,7 @@ libs += [
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
Expand Down
5 changes: 3 additions & 2 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@

include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples',
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
'jmh-benchmarks'
34 changes: 30 additions & 4 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,31 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String UPGRADE_FROM_0100 = "0.10.0";

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
*/
public static final String UPGRADE_FROM_0101 = "0.10.1";

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
*/
public static final String UPGRADE_FROM_0102 = "0.10.2";

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
*/
public static final String UPGRADE_FROM_0110 = "0.11.0";

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
*/
public static final String UPGRADE_FROM_10 = "1.0";

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
*/
public static final String UPGRADE_FROM_11 = "1.1";

/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
*/
Expand Down Expand Up @@ -347,8 +372,9 @@ public class StreamsConfig extends AbstractConfig {

/** {@code upgrade.from} */
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " +
"When upgrading from 1.2 to a newer version it is not required to specify this config." +
"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version).";

/**
* {@code value.serde}
Expand All @@ -364,7 +390,7 @@ public class StreamsConfig extends AbstractConfig {

/**
* {@code zookeeper.connect}
* @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
* @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be ignored.
*/
@Deprecated
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
Expand Down Expand Up @@ -575,7 +601,7 @@ public class StreamsConfig extends AbstractConfig {
.define(UPGRADE_FROM_CONFIG,
ConfigDef.Type.STRING,
null,
in(null, UPGRADE_FROM_0100),
in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,24 @@ public void configure(final Map<String, ?> configs) {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());

final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
userMetadataVersion = 1;
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
userMetadataVersion = 1;
break;
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
userMetadataVersion = 2;
break;
default:
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
}
}

final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
Expand Down Expand Up @@ -512,7 +526,7 @@ public Map<String, Assignment> assign(final Cluster metadata,

// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
if (minUserMetadataVersion == 2) {
if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;

Expand Down Expand Up @@ -631,6 +645,10 @@ public void onAssignment(final Assignment assignment) {
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
case 3:
processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
Expand Down Expand Up @@ -684,6 +702,13 @@ private void processVersionTwoAssignment(final AssignmentInfo info,
}
}

private void processVersionThreeAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
}

/**
* Internal helper function that creates a Kafka topic
*
Expand Down Expand Up @@ -818,4 +843,5 @@ void validate(final Set<String> copartitionGroup,
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}

}
Loading

0 comments on commit 4897bc9

Please sign in to comment.