diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala new file mode 100755 index 0000000000000..3d25e9c053af0 --- /dev/null +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.util.{Arrays, Properties} + +import kafka.consumer._ +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + +import scala.Option.option2Iterable + +object TestEndToEndLatency { + def main(args: Array[String]) { + if (args.length != 6) { + System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") + System.exit(1) + } + + val brokerList = args(0) + val zkConnect = args(1) + val topic = args(2) + val numMessages = args(3).toInt + val consumerFetchMaxWait = args(4).toInt + val producerAcks = args(5).toInt + + val consumerProps = new Properties() + consumerProps.put("group.id", topic) + consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.offset.reset", "largest") + consumerProps.put("zookeeper.connect", zkConnect) + consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) + consumerProps.put("socket.timeout.ms", 1201000.toString) + + val config = new ConsumerConfig(consumerProps) + val connector = Consumer.create(config) + val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head + val iter = stream.iterator + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + + // make sure the consumer fetcher has started before sending data since otherwise + // the consumption from the tail will skip the first message and hence be blocked + Thread.sleep(5000) + + val message = "hello there beautiful".getBytes + var totalTime = 0.0 + val latencies = new Array[Long](numMessages) + for (i <- 0 until numMessages) { + val begin = System.nanoTime + producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) + val received = iter.next + val elapsed = System.nanoTime - begin + // poor man's progress bar + if (i % 1000 == 0) + println(i + "\t" + elapsed / 1000.0 / 1000.0) + totalTime += elapsed + latencies(i) = (elapsed / 1000 / 1000) + } + println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) + Arrays.sort(latencies) + val p50 = latencies((latencies.length * 0.5).toInt) + val p99 = latencies((latencies.length * 0.99).toInt) + val p999 = latencies((latencies.length * 0.999).toInt) + println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) + producer.close() + connector.commitOffsets(true) + connector.shutdown() + System.exit(0) + } +} \ No newline at end of file diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py new file mode 100644 index 0000000000000..1896e9e3bf471 --- /dev/null +++ b/tests/kafkatest/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/kafkatest/services/__init__.py b/tests/kafkatest/services/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py new file mode 100644 index 0000000000000..e910a520e6cc9 --- /dev/null +++ b/tests/kafkatest/services/console_consumer.py @@ -0,0 +1,142 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + + +def is_int(msg): + """Default method used to check whether text pulled from console consumer is a message. + + return int or None + """ + try: + return int(msg) + except: + return None + + +""" +0.8.2.1 ConsoleConsumer options + +The console consumer is a tool that reads data from Kafka and outputs it to standard output. +Option Description +------ ----------- +--blacklist Blacklist of topics to exclude from + consumption. +--consumer.config Consumer config properties file. +--csv-reporter-enabled If set, the CSV metrics reporter will + be enabled +--delete-consumer-offsets If specified, the consumer path in + zookeeper is deleted when starting up +--formatter The name of a class to use for + formatting kafka messages for + display. (default: kafka.tools. + DefaultMessageFormatter) +--from-beginning If the consumer does not already have + an established offset to consume + from, start with the earliest + message present in the log rather + than the latest message. +--max-messages The maximum number of messages to + consume before exiting. If not set, + consumption is continual. +--metrics-dir If csv-reporter-enable is set, and + this parameter isset, the csv + metrics will be outputed here +--property +--skip-message-on-error If there is an error when processing a + message, skip it instead of halt. +--topic The topic id to consume on. +--whitelist Whitelist of topics to include for + consumption. +--zookeeper REQUIRED: The connection string for + the zookeeper connection in the form + host:port. Multiple URLS can be + given to allow fail-over. +""" + + +class ConsoleConsumer(BackgroundThreadService): + logs = { + "consumer_log": { + "path": "/mnt/consumer.log", + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None): + """ + Args: + context: standard context + num_nodes: number of nodes to use (this should be 1) + kafka: kafka service + topic: consume from this topic + message_validator: function which returns message or None + from_beginning: consume from beginning if True, else from the end + consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between + successively consumed messages exceeds this timeout. Setting this and + waiting for the consumer to stop is a pretty good way to consume all messages + in a topic. + """ + super(ConsoleConsumer, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + } + + self.consumer_timeout_ms = consumer_timeout_ms + + self.from_beginning = from_beginning + self.message_validator = message_validator + self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + + @property + def start_cmd(self): + args = self.args.copy() + args.update({'zk_connect': self.kafka.zk.connect_setting()}) + cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \ + " --consumer.config /mnt/console_consumer.properties" % args + + if self.from_beginning: + cmd += " --from-beginning" + + cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &" + return cmd + + def _worker(self, idx, node): + # form config file + if self.consumer_timeout_ms is not None: + prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) + else: + prop_file = self.render('console_consumer.properties') + node.account.create_file("/mnt/console_consumer.properties", prop_file) + + # Run and capture output + cmd = self.start_cmd + self.logger.debug("Console consumer %d command: %s", idx, cmd) + for line in node.account.ssh_capture(cmd): + msg = line.strip() + msg = self.message_validator(msg) + if msg is not None: + self.logger.debug("consumed a message: " + str(msg)) + self.messages_consumed[idx].append(msg) + + def start_node(self, node): + super(ConsoleConsumer, self).start_node(node) + + def stop_node(self, node): + node.account.kill_process("java", allow_fail=True) + + def clean_node(self, node): + node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=True) + diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py new file mode 100644 index 0000000000000..976fe39d4a5a5 --- /dev/null +++ b/tests/kafkatest/services/kafka.py @@ -0,0 +1,224 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service + +import json +import re +import signal +import time + + +class KafkaService(Service): + + logs = { + "kafka_log": { + "path": "/mnt/kafka.log", + "collect_default": True}, + "kafka_data": { + "path": "/mnt/kafka-logs", + "collect_default": False} + } + + def __init__(self, context, num_nodes, zk, topics=None): + """ + :type context + :type zk: ZookeeperService + :type topics: dict + """ + super(KafkaService, self).__init__(context, num_nodes) + self.zk = zk + self.topics = topics + + def start(self): + super(KafkaService, self).start() + + # Create topics if necessary + if self.topics is not None: + for topic, topic_cfg in self.topics.items(): + if topic_cfg is None: + topic_cfg = {} + + topic_cfg["topic"] = topic + self.create_topic(topic_cfg) + + def start_node(self, node): + props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node)) + node.account.create_file("/mnt/kafka.properties", props_file) + + cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" + self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) + node.account.ssh(cmd) + time.sleep(5) + if len(self.pids(node)) == 0: + raise Exception("No process ids recorded on node %s" % str(node)) + + def pids(self, node): + """Return process ids associated with running processes on the given node.""" + try: + return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)] + except: + return [] + + def signal_node(self, node, sig=signal.SIGTERM): + pids = self.pids(node) + for pid in pids: + node.account.signal(pid, sig) + + def signal_leader(self, topic, partition=0, sig=signal.SIGTERM): + leader = self.leader(topic, partition) + self.signal_node(leader, sig) + + def stop_node(self, node, clean_shutdown=True, allow_fail=True): + pids = self.pids(node) + sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL + + for pid in pids: + node.account.signal(pid, sig, allow_fail=allow_fail) + + node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=True) + + def clean_node(self, node): + node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log") + + def create_topic(self, topic_cfg): + node = self.nodes[0] # any node is fine here + self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) + + cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\ + "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % { + 'zk_connect': self.zk.connect_setting(), + 'topic': topic_cfg.get("topic"), + 'partitions': topic_cfg.get('partitions', 1), + 'replication': topic_cfg.get('replication-factor', 1) + } + + if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None: + for config_name, config_value in topic_cfg["configs"].items(): + cmd += " --config %s=%s" % (config_name, str(config_value)) + + self.logger.info("Running topic creation command...\n%s" % cmd) + node.account.ssh(cmd) + + time.sleep(1) + self.logger.info("Checking to see if topic was properly created...\n%s" % cmd) + for line in self.describe_topic(topic_cfg["topic"]).split("\n"): + self.logger.info(line) + + def describe_topic(self, topic): + node = self.nodes[0] + cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \ + (self.zk.connect_setting(), topic) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + return output + + def verify_reassign_partitions(self, reassignment): + """Run the reassign partitions admin tool in "verify" mode + """ + node = self.nodes[0] + json_file = "/tmp/" + str(time.time()) + "_reassign.json" + + # reassignment to json + json_str = json.dumps(reassignment) + json_str = json.dumps(json_str) + + # create command + cmd = "echo %s > %s && " % (json_str, json_file) + cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ + "--zookeeper %(zk_connect)s "\ + "--reassignment-json-file %(reassignment_file)s "\ + "--verify" % {'zk_connect': self.zk.connect_setting(), + 'reassignment_file': json_file} + cmd += " && sleep 1 && rm -f %s" % json_file + + # send command + self.logger.info("Verifying parition reassignment...") + self.logger.debug(cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + self.logger.debug(output) + + if re.match(".*is in progress.*", output) is not None: + return False + + return True + + def execute_reassign_partitions(self, reassignment): + """Run the reassign partitions admin tool in "verify" mode + """ + node = self.nodes[0] + json_file = "/tmp/" + str(time.time()) + "_reassign.json" + + # reassignment to json + json_str = json.dumps(reassignment) + json_str = json.dumps(json_str) + + # create command + cmd = "echo %s > %s && " % (json_str, json_file) + cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ + "--zookeeper %(zk_connect)s "\ + "--reassignment-json-file %(reassignment_file)s "\ + "--execute" % {'zk_connect': self.zk.connect_setting(), + 'reassignment_file': json_file} + cmd += " && sleep 1 && rm -f %s" % json_file + + # send command + self.logger.info("Executing parition reassignment...") + self.logger.debug(cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + self.logger.debug("Verify partition reassignment:") + self.logger.debug(output) + + def restart_node(self, node, wait_sec=0, clean_shutdown=True): + """Restart the given node, waiting wait_sec in between stopping and starting up again.""" + self.stop_node(node, clean_shutdown, allow_fail=True) + time.sleep(wait_sec) + self.start_node(node) + + def leader(self, topic, partition=0): + """ Get the leader replica for the given topic and partition. + """ + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \ + % self.zk.connect_setting() + cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition) + self.logger.debug(cmd) + + node = self.nodes[0] + self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic)) + partition_state = None + for line in node.account.ssh_capture(cmd): + match = re.match("^({.+})$", line) + if match is not None: + partition_state = match.groups()[0] + break + + if partition_state is None: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + + partition_state = json.loads(partition_state) + self.logger.info(partition_state) + + leader_idx = int(partition_state["leader"]) + self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) + return self.get_node(leader_idx) + + def bootstrap_servers(self): + return ','.join([node.account.hostname + ":9092" for node in self.nodes]) \ No newline at end of file diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py new file mode 100644 index 0000000000000..423d60a8e5b05 --- /dev/null +++ b/tests/kafkatest/services/performance.py @@ -0,0 +1,167 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + + +class PerformanceService(BackgroundThreadService): + def __init__(self, context, num_nodes): + super(PerformanceService, self).__init__(context, num_nodes) + self.results = [None] * self.num_nodes + self.stats = [[] for x in range(self.num_nodes)] + + +class ProducerPerformanceService(PerformanceService): + def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): + super(ProducerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'record_size': record_size, + 'throughput': throughput + } + self.settings = settings + self.intermediate_stats = intermediate_stats + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) + cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ + "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args + + for key,value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Producer performance %d command: %s", idx, cmd) + + def parse_stats(line): + parts = line.split(',') + return { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + last = None + for line in node.account.ssh_capture(cmd): + self.logger.debug("Producer performance %d: %s", idx, line.strip()) + if self.intermediate_stats: + try: + self.stats[idx-1].append(parse_stats(line)) + except: + # Sometimes there are extraneous log messages + pass + last = line + try: + self.results[idx-1] = parse_stats(last) + except: + self.logger.error("Bad last line: %s", last) + + +class ConsumerPerformanceService(PerformanceService): + def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}): + super(ConsumerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'throughput': throughput, + 'threads': threads, + } + self.settings = settings + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'zk_connect': self.kafka.zk.connect_setting()}) + cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\ + "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args + for key,value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Consumer performance %d command: %s", idx, cmd) + last = None + for line in node.account.ssh_capture(cmd): + self.logger.debug("Consumer performance %d: %s", idx, line.strip()) + last = line + # Parse and save the last line's information + parts = last.split(',') + + print "=" * 20 + print "ConsumerPerformanceService data:" + print parts + print "-" * 20 + + self.results[idx-1] = { + 'total_mb': float(parts[3]), + 'mbps': float(parts[4]), + 'records_per_sec': float(parts[5]), + } + + +class EndToEndLatencyService(PerformanceService): + def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + super(EndToEndLatencyService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'consumer_fetch_max_wait': consumer_fetch_max_wait, + 'acks': acks + } + + def _worker(self, idx, node): + args = self.args.copy() + args.update({ + 'zk_connect': self.kafka.zk.connect_setting(), + 'bootstrap_servers': self.kafka.bootstrap_servers(), + }) + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency "\ + "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\ + "%(consumer_fetch_max_wait)d %(acks)d" % args + self.logger.debug("End-to-end latency %d command: %s", idx, cmd) + results = {} + for line in node.account.ssh_capture(cmd): + self.logger.debug("End-to-end latency %d: %s", idx, line.strip()) + if line.startswith("Avg latency:"): + results['latency_avg_ms'] = float(line.split()[2]) + if line.startswith("Percentiles"): + results['latency_50th_ms'] = float(line.split()[3][:-1]) + results['latency_99th_ms'] = float(line.split()[6][:-1]) + results['latency_999th_ms'] = float(line.split()[9]) + self.results[idx-1] = results + + +def parse_performance_output(summary): + parts = summary.split(',') + results = { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + # To provide compatibility with ConsumerPerformanceService + results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec']) + results['rate_mbps'] = results['mbps'] + results['rate_mps'] = results['records_per_sec'] + + return results diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties new file mode 100644 index 0000000000000..9179a199fd496 --- /dev/null +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -0,0 +1,3 @@ +{% if consumer_timeout_ms is defined %} +consumer.timeout.ms={{ consumer_timeout_ms }} +{% endif %} \ No newline at end of file diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties new file mode 100644 index 0000000000000..db1077a4a4eb8 --- /dev/null +++ b/tests/kafkatest/services/templates/kafka.properties @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={{ broker_id }} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +advertised.host.name={{ node.account.hostname }} + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=65536 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=/mnt/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={{ zk.connect_setting() }} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=2000 diff --git a/tests/kafkatest/services/templates/zookeeper.properties b/tests/kafkatest/services/templates/zookeeper.properties new file mode 100644 index 0000000000000..740c30aa32c49 --- /dev/null +++ b/tests/kafkatest/services/templates/zookeeper.properties @@ -0,0 +1,9 @@ +dataDir=/mnt/zookeeper +clientPort=2181 +maxClientCnxns=0 +initLimit=5 +syncLimit=2 +quorumListenOnAllIPs=true +{% for node in nodes %} +server.{{ loop.index }}={{ node.account.hostname }}:2888:3888 +{% endfor %} \ No newline at end of file diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py new file mode 100644 index 0000000000000..88455601dfae7 --- /dev/null +++ b/tests/kafkatest/services/verifiable_producer.py @@ -0,0 +1,108 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +import json + + +class VerifiableProducer(BackgroundThreadService): + + logs = { + "producer_log": { + "path": "/mnt/producer.log", + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000): + super(VerifiableProducer, self).__init__(context, num_nodes) + + self.kafka = kafka + self.topic = topic + self.max_messages = max_messages + self.throughput = throughput + + self.acked_values = [] + self.not_acked_values = [] + + def _worker(self, idx, node): + cmd = self.start_cmd + self.logger.debug("Verbose producer %d command: %s" % (idx, cmd)) + + for line in node.account.ssh_capture(cmd): + line = line.strip() + + data = self.try_parse_json(line) + if data is not None: + + self.logger.debug("VerifiableProducer: " + str(data)) + + with self.lock: + if data["name"] == "producer_send_error": + data["node"] = idx + self.not_acked_values.append(int(data["value"])) + + elif data["name"] == "producer_send_success": + self.acked_values.append(int(data["value"])) + + @property + def start_cmd(self): + cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \ + " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + if self.max_messages > 0: + cmd += " --max-messages %s" % str(self.max_messages) + if self.throughput > 0: + cmd += " --throughput %s" % str(self.throughput) + + cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &" + return cmd + + @property + def acked(self): + with self.lock: + return self.acked_values + + @property + def not_acked(self): + with self.lock: + return self.not_acked_values + + @property + def num_acked(self): + with self.lock: + return len(self.acked_values) + + @property + def num_not_acked(self): + with self.lock: + return len(self.not_acked_values) + + def stop_node(self, node): + node.account.kill_process("VerifiableProducer", allow_fail=False) + # block until the corresponding thread exits + if len(self.worker_threads) >= self.idx(node): + # Need to guard this because stop is preemptively called before the worker threads are added and started + self.worker_threads[self.idx(node) - 1].join() + + def clean_node(self, node): + node.account.ssh("rm -rf /mnt/producer.log") + + def try_parse_json(self, string): + """Try to parse a string as json. Return None if not parseable.""" + try: + record = json.loads(string) + return record + except ValueError: + self.logger.debug("Could not parse as json: %s" % str(string)) + return None diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py new file mode 100644 index 0000000000000..4c7e5a78f5fe9 --- /dev/null +++ b/tests/kafkatest/services/zookeeper.py @@ -0,0 +1,62 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service + +import time + + +class ZookeeperService(Service): + + logs = { + "zk_log": { + "path": "/mnt/zk.log", + "collect_default": True} + } + + def __init__(self, context, num_nodes): + """ + :type context + """ + super(ZookeeperService, self).__init__(context, num_nodes) + + def start_node(self, node): + idx = self.idx(node) + self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname) + + node.account.ssh("mkdir -p /mnt/zookeeper") + node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx) + node.account.create_file("/mnt/zookeeper.properties", self.render('zookeeper.properties')) + + node.account.ssh( + "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" + % self.logs["zk_log"]) + + time.sleep(5) # give it some time to start + + def stop_node(self, node, allow_fail=True): + # This uses Kafka-REST's stop service script because it's better behaved + # (knows how to wait) and sends SIGTERM instead of + # zookeeper-stop-server.sh's SIGINT. We don't actually care about clean + # shutdown here, so it's ok to use the bigger hammer + idx = self.idx(node) + self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) + node.account.ssh("/opt/kafka-rest/bin/kafka-rest-stop-service zookeeper", allow_fail=allow_fail) + + def clean_node(self, node, allow_fail=True): + self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname) + node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail) + + def connect_setting(self): + return ','.join([node.account.hostname + ':2181' for node in self.nodes]) diff --git a/tests/kafkatest/tests/__init__.py b/tests/kafkatest/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/kafkatest/tests/kafka_benchmark_test.py b/tests/kafkatest/tests/kafka_benchmark_test.py new file mode 100644 index 0000000000000..e05e57e44bce3 --- /dev/null +++ b/tests/kafkatest/tests/kafka_benchmark_test.py @@ -0,0 +1,259 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService + + +class KafkaBenchmark(KafkaTest): + '''A benchmark of Kafka producer/consumer performance. This replicates the test + run here: + https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines + ''' + def __init__(self, test_context): + super(KafkaBenchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ + 'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 }, + 'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 } + }) + + if True: + # Works on both aws and local + self.msgs = 1000000 + self.msgs_default = 1000000 + else: + # Can use locally on Vagrant VMs, but may use too much memory for aws + self.msgs = 50000000 + self.msgs_default = 50000000 + + self.msgs_large = 10000000 + self.msg_size_default = 100 + self.batch_size = 8*1024 + self.buffer_memory = 64*1024*1024 + self.msg_sizes = [10, 100, 1000, 10000, 100000] + self.target_data_size = 128*1024*1024 + self.target_data_size_gb = self.target_data_size/float(1024*1024*1024) + + def test_single_producer_no_replication(self): + self.logger.info("BENCHMARK: Single producer, no replication") + self.perf = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + ) + self.perf.run() + data = compute_throughput(self.perf) + self.logger.info("Single producer, no replication: %s", str(data)) + return data + + def test_single_producer_replication(self): + self.logger.info("BENCHMARK: Single producer, async 3x replication") + self.perf = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + ) + self.perf.run() + data = compute_throughput(self.perf) + self.logger.info("Single producer, async 3x replication: %s" % str(data)) + return data + + def test_single_producer_sync(self): + self.logger.info("BENCHMARK: Single producer, sync 3x replication") + self.perf = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, + settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + ) + self.perf.run() + + data = compute_throughput(self.perf) + self.logger.info("Single producer, sync 3x replication: %s" % data) + return data + + def test_three_producers_async(self): + self.logger.info("BENCHMARK: Three producers, async 3x replication") + self.perf = ProducerPerformanceService( + self.test_context, 3, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + ) + self.perf.run() + + data = compute_throughput(self.perf) + self.logger.info("Three producers, async 3x replication: %s" % data) + return data + + def test_multiple_message_size(self): + # TODO this would be a great place to use parametrization + self.perfs = {} + for msg_size in self.msg_sizes: + self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb) + # Always generate the same total amount of data + nrecords = int(self.target_data_size / msg_size) + self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1, + settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} + ) + + self.msg_size_perf = {} + for msg_size in self.msg_sizes: + perf = self.perfs["perf-" + str(msg_size)] + perf.run() + self.msg_size_perf[msg_size] = perf + + summary = ["Message size:"] + data = {} + for msg_size in self.msg_sizes: + datum = compute_throughput(self.msg_size_perf[msg_size]) + summary.append(" %d: %s" % (msg_size, datum)) + data[msg_size] = datum + self.logger.info("\n".join(summary)) + return data + + def test_long_term_throughput(self): + self.logger.info("BENCHMARK: Long production") + self.perf = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}, + intermediate_stats=True + ) + self.perf.run() + + summary = ["Throughput over long run, data > memory:"] + data = {} + # FIXME we should be generating a graph too + # Try to break it into 5 blocks, but fall back to a smaller number if + # there aren't even 5 elements + block_size = max(len(self.perf.stats[0]) / 5, 1) + nblocks = len(self.perf.stats[0]) / block_size + for i in range(nblocks): + subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))] + if len(subset) == 0: + summary.append(" Time block %d: (empty)" % i) + data[i] = None + else: + records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset)) + mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset)) + + summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec)) + data[i] = throughput(records_per_sec, mb_per_sec) + + self.logger.info("\n".join(summary)) + return data + + def test_end_to_end_latency(self): + self.logger.info("BENCHMARK: End to end latency") + self.perf = EndToEndLatencyService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=10000 + ) + self.perf.run() + + data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) + self.logger.info("End-to-end latency: %s" % str(data)) + return data + + def test_producer_and_consumer(self): + self.logger.info("BENCHMARK: Producer + Consumer") + self.producer = ProducerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + ) + + self.consumer = ConsumerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1 + ) + + Service.run_parallel(self.producer, self.consumer) + + data = { + "producer": compute_throughput(self.producer), + "consumer": compute_throughput(self.consumer) + } + summary = [ + "Producer + consumer:", + str(data)] + self.logger.info("\n".join(summary)) + return data + + def test_single_consumer(self): + # All consumer tests use the messages from the first benchmark, so + # they'll get messages of the default message size + self.logger.info("BENCHMARK: Single consumer") + self.perf = ConsumerPerformanceService( + self.test_context, 1, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1 + ) + self.perf.run() + + data = compute_throughput(self.perf) + self.logger.info("Single consumer: %s" % data) + return data + + def test_three_consumers(self): + self.logger.info("BENCHMARK: Three consumers") + self.perf = ConsumerPerformanceService( + self.test_context, 3, self.kafka, + topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1 + ) + self.perf.run() + + data = compute_throughput(self.perf) + self.logger.info("Three consumers: %s", data) + return data + + +def throughput(records_per_sec, mb_per_sec): + """Helper method to ensure uniform representation of throughput data""" + return { + "records_per_sec": records_per_sec, + "mb_per_sec": mb_per_sec + } + + +def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms): + """Helper method to ensure uniform representation of latency data""" + return { + "latency_50th_ms": latency_50th_ms, + "latency_99th_ms": latency_99th_ms, + "latency_999th_ms": latency_999th_ms + } + + +def compute_throughput(perf): + print "=" * 20 + print perf.results + print "-" * 20 + + """Helper method for computing throughput after running a performance service.""" + aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) + aggregate_mbps = sum([r['mb_per_sec'] for r in perf.results]) + + return throughput(aggregate_rate, aggregate_mbps) + + + + + + + + + + diff --git a/tests/kafkatest/tests/kafka_test.py b/tests/kafkatest/tests/kafka_test.py new file mode 100644 index 0000000000000..3cbdf2f622808 --- /dev/null +++ b/tests/kafkatest/tests/kafka_test.py @@ -0,0 +1,44 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test + + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService + + +class KafkaTest(Test): + """ + Helper class that manages setting up a Kafka cluster. Use this if the + default settings for Kafka are sufficient for your test; any customization + needs to be done manually. Your run() method should call tearDown and + setUp. The Zookeeper and Kafka services are available as the fields + KafkaTest.zk and KafkaTest.kafka. + """ + def __init__(self, test_context, num_zk, num_brokers, topics=None): + super(KafkaTest, self).__init__(test_context) + self.num_zk = num_zk + self.num_brokers = num_brokers + self.topics = topics + + self.zk = ZookeeperService(test_context, self.num_zk) + + self.kafka = KafkaService( + test_context, self.num_brokers, + self.zk, topics=self.topics) + + def setUp(self): + self.zk.start() + self.kafka.start() \ No newline at end of file diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py new file mode 100644 index 0000000000000..91c031e2ca0ba --- /dev/null +++ b/tests/kafkatest/tests/replication_test.py @@ -0,0 +1,159 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer + +import signal +import time + + +class ReplicationTest(Test): + """Replication tests. + These tests verify that replication provides simple durability guarantees by checking that data acked by + brokers is still available for consumption in the face of various failure scenarios.""" + + def __init__(self, test_context): + """:type test_context: ducktape.tests.test.TestContext""" + super(ReplicationTest, self).__init__(test_context=test_context) + + self.topic = "test_topic" + self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2} + }) + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + + def setUp(self): + self.zk.start() + self.kafka.start() + + def min_cluster_size(self): + """Override this since we're adding services outside of the constructor""" + return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers + + def run_with_failure(self, failure): + """This is the top-level test template. + + The steps are: + Produce messages in the background while driving some failure condition + When done driving failures, immediately stop producing + Consume all messages + Validate that messages acked by brokers were consumed + + Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages + (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop + too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose + ordering guarantees. + + Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked, + we might exit early if some messages are duplicated (though not an issue here since producer retries==0) + + Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively + consumed messages. Since we run the producer to completion before running the consumer, this is a reliable + indicator that nothing is left to consume. + + """ + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000) + + # Produce in a background thread while driving broker failures + self.producer.start() + if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5): + raise RuntimeError("Producer failed to start in a reasonable amount of time.") + failure() + self.producer.stop() + + self.acked = self.producer.acked + self.not_acked = self.producer.not_acked + self.logger.info("num not acked: %d" % self.producer.num_not_acked) + self.logger.info("num acked: %d" % self.producer.num_acked) + + # Consume all messages + self.consumer.start() + self.consumer.wait() + self.consumed = self.consumer.messages_consumed[1] + self.logger.info("num consumed: %d" % len(self.consumed)) + + # Check produced vs consumed + self.validate() + + def clean_shutdown(self): + """Discover leader node for our topic and shut it down cleanly.""" + self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM) + + def hard_shutdown(self): + """Discover leader node for our topic and shut it down with a hard kill.""" + self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL) + + def clean_bounce(self): + """Chase the leader of one partition and restart it cleanly.""" + for i in range(5): + prev_leader_node = self.kafka.leader(topic=self.topic, partition=0) + self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True) + + def hard_bounce(self): + """Chase the leader and restart it cleanly.""" + for i in range(5): + prev_leader_node = self.kafka.leader(topic=self.topic, partition=0) + self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False) + + # Wait long enough for previous leader to probably be awake again + time.sleep(6) + + def validate(self): + """Check that produced messages were consumed.""" + + success = True + msg = "" + + if len(set(self.consumed)) != len(self.consumed): + # There are duplicates. This is ok, so report it but don't fail the test + msg += "There are duplicate messages in the log\n" + + if not set(self.consumed).issuperset(set(self.acked)): + # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages. + acked_minus_consumed = set(self.producer.acked) - set(self.consumed) + success = False + msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed) + + if not success: + # Collect all the data logs if there was a failure + self.mark_for_collect(self.kafka) + + assert success, msg + + def test_clean_shutdown(self): + self.run_with_failure(self.clean_shutdown) + + def test_hard_shutdown(self): + self.run_with_failure(self.hard_shutdown) + + def test_clean_bounce(self): + self.run_with_failure(self.clean_bounce) + + def test_hard_bounce(self): + self.run_with_failure(self.hard_bounce) + + + diff --git a/tests/setup.py b/tests/setup.py new file mode 100644 index 0000000000000..8cf40b2a5186b --- /dev/null +++ b/tests/setup.py @@ -0,0 +1,10 @@ +from setuptools import find_packages, setup + +setup(name="kafkatest", + version="0.1", + description="System tests for Apache Kafka", + author="Ewen Cheslack-Postava , Geoff Anderson ", + platforms=["any"], + license="apache2.0", + packages=find_packages(), + )