From 404bdef08db8c88f1b7a921737279feabfd0cb1f Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 7 Jan 2019 02:03:54 -0500 Subject: [PATCH] MINOR: Remove sleep calls and ignore annotation from streams upgrade test (#6046) The StreamsUpgradeTest::test_upgrade_downgrade_brokers used sleep calls in the test which led to flaky test performance and as a result, we placed an @ignore annotation on the test. This PR uses log events instead of the sleep calls hence we can now remove the @ignore setting. Reviewers: Ewen Cheslack-Postava , Matthias J. Sax , Guozhang Wang --- build.gradle | 12 +++ gradle/dependencies.gradle | 4 +- settings.gradle | 2 +- .../streams/tests/StreamsUpgradeTest.java | 98 +++++++++++++++++++ tests/docker/Dockerfile | 6 +- .../tests/streams/streams_upgrade_test.py | 93 +++++++++++------- tests/kafkatest/version.py | 7 +- vagrant/base.sh | 6 +- 8 files changed, 185 insertions(+), 43 deletions(-) create mode 100644 streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java diff --git a/build.gradle b/build.gradle index 59354dd0bd0a3..65d774d377486 100644 --- a/build.gradle +++ b/build.gradle @@ -1207,6 +1207,18 @@ project(':streams:upgrade-system-tests-20') { } } +project(':streams:upgrade-system-tests-21') { + archivesBaseName = "kafka-streams-upgrade-system-tests-21" + + dependencies { + testCompile libs.kafkaStreams_21 + } + + systemTestLibs { + dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 1621be946eeb4..2c2488fd0f7ad 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -71,7 +71,8 @@ versions += [ kafka_0110: "0.11.0.3", kafka_10: "1.0.2", kafka_11: "1.1.1", - kafka_20: "2.0.0", + kafka_20: "2.0.1", + kafka_21: "2.1.0", lz4: "1.5.0", mavenArtifact: "3.6.0", metrics: "2.2.0", @@ -126,6 +127,7 @@ libs += [ kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10", kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11", kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20", + kafkaStreams_21: "org.apache.kafka:kafka-streams:$versions.kafka_21", log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", diff --git a/settings.gradle b/settings.gradle index dff6c2cfcb405..a74df21005e5e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,5 +16,5 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples', 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'streams:upgrade-system-tests-20', - 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', + 'streams:upgrade-system-tests-21' , 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'connect:basic-auth-extension', 'jmh-benchmarks' diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 0000000000000..3e719cf843095 --- /dev/null +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import java.util.Properties; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public class StreamsUpgradeTest { + + + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] : "")); + } + final String kafka = args[0]; + final String propFileName = args.length > 1 ? args[1] : null; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)"); + System.out.println("kafka=" + kafka); + System.out.println("props=" + streamsProperties); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static ProcessorSupplier printProcessorSupplier() { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 7c1efd66df70f..e5cf4392c3e26 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -49,7 +49,8 @@ RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" -RUN mkdir -p "/opt/kafka-2.0.0" && chmod a+rw /opt/kafka-2.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.0" +RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1" +RUN mkdir -p "/opt/kafka-2.1.0" && chmod a+rw /opt/kafka-2.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.0" # Streams test dependencies RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar @@ -58,7 +59,8 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2 RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.0-test.jar" -o /opt/kafka-2.0.0/libs/kafka-streams-2.0.0-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.0-test.jar" -o /opt/kafka-2.1.0/libs/kafka-streams-2.1.0-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sy diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index baf507b3d9e27..57eaad561fdd0 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -15,17 +15,19 @@ import random import time -from ducktape.mark import ignore from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.tests.test import Test +from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService -from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ + StreamsUpgradeTestJobRunnerService from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, DEV_BRANCH, DEV_VERSION, KafkaVersion +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ + LATEST_2_0, LATEST_2_1, DEV_BRANCH, DEV_VERSION, KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions -broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(DEV_BRANCH)] +broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] @@ -38,7 +40,8 @@ class StreamsUpgradeTest(Test): """ Test upgrading Kafka Streams (all version combination) If metadata was changes, upgrade is more difficult - Metadata version was bumped in 0.10.1.0 + Metadata version was bumped in 0.10.1.0 and + subsequently bumped in 2.0.0 """ def __init__(self, test_context): @@ -50,6 +53,8 @@ def __init__(self, test_context): self.leader = None self.leader_counter = {} + processed_msg = "processed [0-9]* records" + def perform_broker_upgrade(self, to_version): self.logger.info("First pass bounce - rolling broker upgrade") for node in self.kafka.nodes: @@ -57,7 +62,6 @@ def perform_broker_upgrade(self, to_version): node.version = KafkaVersion(to_version) self.kafka.start_node(node) - @ignore @cluster(num_nodes=6) @matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions) def test_upgrade_downgrade_brokers(self, from_version, to_version): @@ -69,6 +73,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): return self.replication = 3 + self.num_kafka_nodes = 3 self.partitions = 1 self.isr = 2 self.topics = { @@ -99,31 +104,48 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): self.zk.start() # number of nodes needs to be >= 3 for the smoke test - self.kafka = KafkaService(self.test_context, num_nodes=3, + self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes, zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) self.kafka.start() # allow some time for topics to be created - time.sleep(10) + wait_until(lambda: self.get_topics_count() >= (len(self.topics) * self.num_kafka_nodes), + timeout_sec=60, + err_msg="Broker did not create all topics in 60 seconds ") self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) - - self.driver.start() - self.processor1.start() - time.sleep(15) - self.perform_broker_upgrade(to_version) + processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) - time.sleep(15) - self.driver.wait() - self.driver.stop() + with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor: + self.driver.start() + + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.start() + monitor.wait_until(self.processed_msg, + timeout_sec=60, + err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node)) + + connected_message = "Discovered group coordinator" + with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: + with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor: + self.perform_broker_upgrade(to_version) + + log_monitor.wait_until(connected_message, + timeout_sec=120, + err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) + + stdout_monitor.wait_until(self.processed_msg, + timeout_sec=60, + err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account)) - self.processor1.stop() + driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED', + timeout_sec=180, + err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account)) - node = self.driver.node - node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False) - self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) + self.driver.stop() + processor.stop() + processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) def test_simple_upgrade_downgrade(self, from_version, to_version): @@ -163,7 +185,6 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): # shutdown self.driver.stop() - self.driver.wait() random.shuffle(self.processors) for p in self.processors: @@ -174,8 +195,6 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): timeout_sec=60, err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) - self.driver.stop() - @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) @matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions) @matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions) @@ -219,7 +238,6 @@ def test_metadata_upgrade(self, from_version, to_version): # shutdown self.driver.stop() - self.driver.wait() random.shuffle(self.processors) for p in self.processors: @@ -230,8 +248,6 @@ def test_metadata_upgrade(self, from_version, to_version): timeout_sec=60, err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) - self.driver.stop() - def test_version_probing_upgrade(self): """ Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" @@ -276,7 +292,6 @@ def test_version_probing_upgrade(self): # shutdown self.driver.stop() - self.driver.wait() random.shuffle(self.processors) for p in self.processors: @@ -287,8 +302,6 @@ def test_version_probing_upgrade(self): timeout_sec=60, err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) - self.driver.stop() - def update_leader(self): self.leader = None retries = 10 @@ -329,7 +342,7 @@ def start_all_nodes_with(self, version): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) - monitor.wait_until("processed 100 records from topic", + monitor.wait_until("processed [0-9]* records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) @@ -343,10 +356,10 @@ def start_all_nodes_with(self, version): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account)) - first_monitor.wait_until("processed 100 records from topic", + first_monitor.wait_until("processed [0-9]* records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) - second_monitor.wait_until("processed 100 records from topic", + second_monitor.wait_until("processed [0-9]* records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) @@ -361,13 +374,13 @@ def start_all_nodes_with(self, version): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account)) - first_monitor.wait_until("processed 100 records from topic", + first_monitor.wait_until("processed [0-9]* records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) - second_monitor.wait_until("processed 100 records from topic", + second_monitor.wait_until("processed [0-9]* records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) - third_monitor.wait_until("processed 100 records from topic", + third_monitor.wait_until("processed [0-9]* records from topic", timeout_sec=60, err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account)) @@ -582,3 +595,11 @@ def verify_metadata_no_upgraded_yet(self): found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) if len(found) > 0: raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'") + + def get_topics_count(self): + count = 0 + for node in self.kafka.nodes: + topic_list = self.kafka.list_topics("placeholder", node) + for topic in topic_list: + count += 1 + return count diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 6eed81f9c9c72..264eec5402e8e 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -111,4 +111,9 @@ def get_version(node=None): # 2.0.x versions V_2_0_0 = KafkaVersion("2.0.0") -LATEST_2_0 = V_2_0_0 +V_2_0_1 = KafkaVersion("2.0.1") +LATEST_2_0 = V_2_0_1 + +# 2.1.x versions +V_2_1_0 = KafkaVersion("2.1.0") +LATEST_2_1 = V_2_1_0 diff --git a/vagrant/base.sh b/vagrant/base.sh index 4429096ce3ad3..59e890c64dd2a 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -120,8 +120,10 @@ get_kafka 1.0.2 2.11 chmod a+rw /opt/kafka-1.0.2 get_kafka 1.1.1 2.11 chmod a+rw /opt/kafka-1.1.1 -get_kafka 2.0.0 2.12 -chmod a+rw /opt/kafka-2.0.0 +get_kafka 2.0.1 2.12 +chmod a+rw /opt/kafka-2.0.1 +get_kafka 2.1.0 2.12 +chmod a+rw /opt/kafka-2.1.0 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local