Skip to content

Commit

Permalink
MINOR: Remove sleep calls and ignore annotation from streams upgrade …
Browse files Browse the repository at this point in the history
…test (apache#6046)

The StreamsUpgradeTest::test_upgrade_downgrade_brokers used sleep calls in the test which led to flaky test performance and as a result, we placed an @ignore annotation on the test. This PR uses log events instead of the sleep calls hence we can now remove the @ignore setting.

Reviewers: Ewen Cheslack-Postava <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
bbejeck authored and guozhangwang committed Jan 7, 2019
1 parent 1f3cb6c commit 404bdef
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 43 deletions.
12 changes: 12 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,18 @@ project(':streams:upgrade-system-tests-20') {
}
}

project(':streams:upgrade-system-tests-21') {
archivesBaseName = "kafka-streams-upgrade-system-tests-21"

dependencies {
testCompile libs.kafkaStreams_21
}

systemTestLibs {
dependsOn testJar
}
}

project(':jmh-benchmarks') {

apply plugin: 'com.github.johnrengelman.shadow'
Expand Down
4 changes: 3 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ versions += [
kafka_0110: "0.11.0.3",
kafka_10: "1.0.2",
kafka_11: "1.1.1",
kafka_20: "2.0.0",
kafka_20: "2.0.1",
kafka_21: "2.1.0",
lz4: "1.5.0",
mavenArtifact: "3.6.0",
metrics: "2.2.0",
Expand Down Expand Up @@ -126,6 +127,7 @@ libs += [
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20",
kafkaStreams_21: "org.apache.kafka:kafka-streams:$versions.kafka_21",
log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples',
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'streams:upgrade-system-tests-20',
'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
'streams:upgrade-system-tests-21' , 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
'connect:basic-auth-extension', 'jmh-benchmarks'
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;

import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;

public class StreamsUpgradeTest {


@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;

final Properties streamsProperties = Utils.loadProps(propFileName);

System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);

final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}

private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return new ProcessorSupplier<K, V>() {
public Processor<K, V> get() {
return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;

@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}

@Override
public void close() {}
};
}
};
}
}
6 changes: 4 additions & 2 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s
RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3"
RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2"
RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1"
RUN mkdir -p "/opt/kafka-2.0.0" && chmod a+rw /opt/kafka-2.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.0"
RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1"
RUN mkdir -p "/opt/kafka-2.1.0" && chmod a+rw /opt/kafka-2.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.0"

# Streams test dependencies
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
Expand All @@ -58,7 +59,8 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.0-test.jar" -o /opt/kafka-2.0.0/libs/kafka-streams-2.0.0-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.0-test.jar" -o /opt/kafka-2.1.0/libs/kafka-streams-2.1.0-test.jar

# The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sy
Expand Down
93 changes: 57 additions & 36 deletions tests/kafkatest/tests/streams/streams_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@

import random
import time
from ducktape.mark import ignore
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
StreamsUpgradeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, DEV_BRANCH, DEV_VERSION, KafkaVersion
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, DEV_BRANCH, DEV_VERSION, KafkaVersion

# broker 0.10.0 is not compatible with newer Kafka Streams versions
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(DEV_BRANCH)]
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(DEV_BRANCH)]

metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
Expand All @@ -38,7 +40,8 @@ class StreamsUpgradeTest(Test):
"""
Test upgrading Kafka Streams (all version combination)
If metadata was changes, upgrade is more difficult
Metadata version was bumped in 0.10.1.0
Metadata version was bumped in 0.10.1.0 and
subsequently bumped in 2.0.0
"""

def __init__(self, test_context):
Expand All @@ -50,14 +53,15 @@ def __init__(self, test_context):
self.leader = None
self.leader_counter = {}

processed_msg = "processed [0-9]* records"

def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)

@ignore
@cluster(num_nodes=6)
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
def test_upgrade_downgrade_brokers(self, from_version, to_version):
Expand All @@ -69,6 +73,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
return

self.replication = 3
self.num_kafka_nodes = 3
self.partitions = 1
self.isr = 2
self.topics = {
Expand Down Expand Up @@ -99,31 +104,48 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
self.zk.start()

# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()

# allow some time for topics to be created
time.sleep(10)
wait_until(lambda: self.get_topics_count() >= (len(self.topics) * self.num_kafka_nodes),
timeout_sec=60,
err_msg="Broker did not create all topics in 60 seconds ")

self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)

self.driver.start()
self.processor1.start()
time.sleep(15)

self.perform_broker_upgrade(to_version)
processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)

time.sleep(15)
self.driver.wait()
self.driver.stop()
with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
self.driver.start()

with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node))

connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
self.perform_broker_upgrade(to_version)

log_monitor.wait_until(connected_message,
timeout_sec=120,
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))

stdout_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account))

self.processor1.stop()
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
timeout_sec=180,
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account))

node = self.driver.node
node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
self.driver.stop()
processor.stop()
processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)

@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
def test_simple_upgrade_downgrade(self, from_version, to_version):
Expand Down Expand Up @@ -163,7 +185,6 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):

# shutdown
self.driver.stop()
self.driver.wait()

random.shuffle(self.processors)
for p in self.processors:
Expand All @@ -174,8 +195,6 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))

self.driver.stop()

@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
@matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions)
@matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions)
Expand Down Expand Up @@ -219,7 +238,6 @@ def test_metadata_upgrade(self, from_version, to_version):

# shutdown
self.driver.stop()
self.driver.wait()

random.shuffle(self.processors)
for p in self.processors:
Expand All @@ -230,8 +248,6 @@ def test_metadata_upgrade(self, from_version, to_version):
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))

self.driver.stop()

def test_version_probing_upgrade(self):
"""
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
Expand Down Expand Up @@ -276,7 +292,6 @@ def test_version_probing_upgrade(self):

# shutdown
self.driver.stop()
self.driver.wait()

random.shuffle(self.processors)
for p in self.processors:
Expand All @@ -287,8 +302,6 @@ def test_version_probing_upgrade(self):
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))

self.driver.stop()

def update_leader(self):
self.leader = None
retries = 10
Expand Down Expand Up @@ -329,7 +342,7 @@ def start_all_nodes_with(self, version):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until("processed 100 records from topic",
monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))

Expand All @@ -343,10 +356,10 @@ def start_all_nodes_with(self, version):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
first_monitor.wait_until("processed 100 records from topic",
first_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed 100 records from topic",
second_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))

Expand All @@ -361,13 +374,13 @@ def start_all_nodes_with(self, version):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
first_monitor.wait_until("processed 100 records from topic",
first_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed 100 records from topic",
second_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
third_monitor.wait_until("processed 100 records from topic",
third_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))

Expand Down Expand Up @@ -582,3 +595,11 @@ def verify_metadata_no_upgraded_yet(self):
found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'")

def get_topics_count(self):
count = 0
for node in self.kafka.nodes:
topic_list = self.kafka.list_topics("placeholder", node)
for topic in topic_list:
count += 1
return count
Loading

0 comments on commit 404bdef

Please sign in to comment.