From 81e41562f3836e95e89e12f215c82b1b2d505381 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 24 Apr 2015 09:32:54 +0800 Subject: [PATCH] Bootstrap Kafka system tests --- .gitignore | 5 + Vagrantfile | 51 +++++- tests/.gitignore | 11 ++ tests/README.md | 44 +++++ tests/__init__.py | 1 + tests/aws/aws-access-keys-commands | 3 + tests/aws/aws-example-Vagrantfile.local | 9 + tests/aws/aws-init.sh | 57 +++++++ tests/services/__init__.py | 0 tests/services/kafka_service.py | 212 ++++++++++++++++++++++++ tests/services/performance.py | 189 +++++++++++++++++++++ tests/services/zookeeper_service.py | 75 +++++++++ tests/templates/kafka.properties | 121 ++++++++++++++ tests/tests/__init__.py | 0 tests/tests/kafka_benchmark_test.py | 193 +++++++++++++++++++++ tests/tests/test.py | 51 ++++++ vagrant/base.sh | 9 + 17 files changed, 1028 insertions(+), 3 deletions(-) create mode 100644 tests/.gitignore create mode 100644 tests/README.md create mode 100644 tests/__init__.py create mode 100644 tests/aws/aws-access-keys-commands create mode 100644 tests/aws/aws-example-Vagrantfile.local create mode 100755 tests/aws/aws-init.sh create mode 100644 tests/services/__init__.py create mode 100644 tests/services/kafka_service.py create mode 100644 tests/services/performance.py create mode 100644 tests/services/zookeeper_service.py create mode 100644 tests/templates/kafka.properties create mode 100644 tests/tests/__init__.py create mode 100644 tests/tests/kafka_benchmark_test.py create mode 100644 tests/tests/test.py diff --git a/.gitignore b/.gitignore index 1f3ba7dbbf524..4aae6e76b96a2 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,8 @@ config/server-* config/zookeeper-* core/data/* gradle/wrapper/* + +results +tests/results +.ducktape +tests/.ducktape diff --git a/Vagrantfile b/Vagrantfile index 55c67ddda4581..6cd6bc083f1a4 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -20,16 +20,20 @@ require 'socket' # Vagrantfile API/syntax version. Don't touch unless you know what you're doing! VAGRANTFILE_API_VERSION = "2" +# Mode +mode = "kafka_cluster" + # General config enable_dns = false +ram_megabytes = 1280 +num_workers = 0 # Generic workers that get the code, but don't start any services num_zookeepers = 1 num_brokers = 3 -num_workers = 0 # Generic workers that get the code, but don't start any services -ram_megabytes = 1280 # EC2 ec2_access_key = ENV['AWS_ACCESS_KEY'] ec2_secret_key = ENV['AWS_SECRET_KEY'] +ec2_session_token = ENV['AWS_SESSION_TOKEN'] ec2_keypair_name = nil ec2_keypair_file = nil @@ -49,6 +53,29 @@ if File.exists?(local_config_file) then eval(File.read(local_config_file), binding, "Vagrantfile.local") end +if mode == "test" + num_zookeepers = 0 + num_brokers = 0 +end + +# This is a horrible hack to work around bad interactions between +# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager +# wants to update the /etc/hosts entries, but tries to do so even on nodes that +# aren't up (e.g. even when all nodes are stopped and you run vagrant +# destroy). Because of the way the underlying code in vagrant works, it still +# tries to communicate with the node and has to wait for a very long +# timeout. This modifies the update to check for hosts that are not created or +# stopped, skipping the update in that case since it's impossible to update +# nodes in that state. +Object.const_get("VagrantPlugins").const_get("HostManager").const_get("HostsFile").class_eval do + alias_method :old_update_guest, :update_guest + def update_guest(machine) + state_id = machine.state.id + return if state_id == :not_created || state_id == :stopped + old_update_guest(machine) + end +end + # TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.hostmanager.enabled = true @@ -84,13 +111,31 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| override.vm.box = "dummy" override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box" - override.hostmanager.ignore_private_ip = true + cached_addresses = {} + # Use a custom resolver that SSH's into the machine and finds the IP address + # directly. This lets us get at the private IP address directly, avoiding + # some issues with using the default IP resolver, which uses the public IP + # address. + override.hostmanager.ip_resolver = proc do |vm, resolving_vm| + if !cached_addresses.has_key?(vm.name) + state_id = vm.state.id + if state_id != :not_created && state_id != :stopped && vm.communicate.ready? + vm.communicate.execute("/sbin/ifconfig eth0 | grep 'inet addr' | tail -n 1 | egrep -o '[0-9\.]+' | head -n 1 2>&1") do |type, contents| + cached_addresses[vm.name] = contents.split("\n").first[/(\d+\.\d+\.\d+\.\d+)/, 1] + end + else + cached_addresses[vm.name] = nil + end + end + cached_addresses[vm.name] + end override.ssh.username = ec2_user override.ssh.private_key_path = ec2_keypair_file aws.access_key_id = ec2_access_key aws.secret_access_key = ec2_secret_key + aws.session_token = ec2_session_token aws.keypair_name = ec2_keypair_name aws.region = ec2_region diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 0000000000000..b218b83c4edee --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1,11 @@ +Vagrantfile.local + +.idea/ + +*.pyc +*.ipynb + +.DS_Store + +.ducktape +results/ diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000000000..dc146b333641a --- /dev/null +++ b/tests/README.md @@ -0,0 +1,44 @@ +System Integration & Performance Testing +======================================== + +This directory contains Kafka system integration and performance tests. +[Ducktape](https://github.com/confluentinc/ducktape) is used to run the tests. + +Ducktape is a distributed testing framework which provides test runner, +result reporter and utilities to pull up and tear down services. It automatically +discovers tests from a directory and generate an HTML report for each run. + +To run the tests: + +1. Build a specific branch of Kafka + + $ cd kafka + $ git checkout $BRANCH + $ gradle + $ ./gradlew jar + +2. Setup a testing cluster. You can use Vagrant to create a cluster of local + VMs or on EC2. Configure your Vagrant setup by creating the file + `Vagrantfile.local` in the directory of your Kafka checkout. At a minimum + , you *MUST* set `mode = "test"` and the value of `num_workers` high enough for + the test you're trying to run. If you run on AWS, you also need to set + enable_dns = true. + +3. Bring up the cluster, making sure you have enough workers. For Vagrant, + use `vagrant up`. If you want to run on AWS, use `vagrant up + --provider=aws --no-parallel`. +4. Install ducktape: + + $ git clone https://github.com/confluentinc/ducktape + $ cd ducktape + $ pip install ducktape +5. Run the system tests using ducktape, you can view results in the `results` + directory. + + $ cd tests + $ ducktape tests +6. To iterate/run again if you made any changes: + + $ cd kafka + $ ./gradlew jar + $ vagrant rsync # Re-syncs build output to cluster diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/aws/aws-access-keys-commands b/tests/aws/aws-access-keys-commands new file mode 100644 index 0000000000000..d4aa263e89ad7 --- /dev/null +++ b/tests/aws/aws-access-keys-commands @@ -0,0 +1,3 @@ +export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep AccessKeyId | awk -F\" '{ print $4 }'` +export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep SecretAccessKey | awk -F\" '{ print $4 }'` +export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep Token | awk -F\" '{ print $4 }'` diff --git a/tests/aws/aws-example-Vagrantfile.local b/tests/aws/aws-example-Vagrantfile.local new file mode 100644 index 0000000000000..7f9bbd4192dde --- /dev/null +++ b/tests/aws/aws-example-Vagrantfile.local @@ -0,0 +1,9 @@ +ec3_instance_type = "m3.medium" +enable_dns = true +mode = "test" +num_workers = 1 +ec2_keypair_name = +ec2_keypair_file = +ec2_security_groups = ['ducttape-insecure'] +ec2_region = 'us-west-2' +ec2_ami = "ami-29ebb519" diff --git a/tests/aws/aws-init.sh b/tests/aws/aws-init.sh new file mode 100755 index 0000000000000..027aa90748b79 --- /dev/null +++ b/tests/aws/aws-init.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +# This script should be run once on your aws test driver machine before +# attempting to run any ducktape tests + +# Install dependencies +sudo apt-get install -y maven openjdk-6-jdk build-essential \ + ruby-dev zlib1g-dev realpath python-setuptools + +base_dir=`dirname $0`/.. + +if [ -z `which vagrant` ]; then + echo "Installing vagrant..." + wget https://dl.bintray.com/mitchellh/vagrant/vagrant_1.7.2_x86_64.deb + sudo dpkg -i vagrant_1.7.2_x86_64.deb + rm -f vagrant_1.7.2_x86_64.deb +fi + +# Install necessary vagrant plugins +# Note: Do NOT install vagrant-cachier since it doesn't work on AWS and only +# adds log noise +vagrant_plugins="vagrant-aws vagrant-hostmanager" +existing=`vagrant plugin list` +for plugin in $vagrant_plugins; do + echo $existing | grep $plugin > /dev/null + if [ $? != 0 ]; then + vagrant plugin install $plugin + fi +done + +# Create Vagrantfile.local as a convenience +if [ ! -e "$base_dir/Vagrantfile.local" ]; then + cp $base_dir/aws/aws-example-Vagrantfile.local $base_dir/Vagrantfile.local +fi + +gradle="gradle-2.2.1" +if [ -z `which gradle` ] && [ ! -d $base_dir/$gradle ]; then + if [ ! -e $gradle-bin.zip ]; then + wget https://services.gradle.org/distributions/$gradle-bin.zip + fi + unzip $gradle-bin.zip + rm -rf $gradle-bin.zip + mv $gradle $base_dir/$gradle +fi + +# Ensure aws access keys are in the environment when we use a EC2 driver machine +LOCAL_HOSTNAME=$(hostname -d) +if [[ ${LOCAL_HOSTNAME} =~ .*\.compute\.internal ]]; then + grep "AWS ACCESS KEYS" ~/.bashrc > /dev/null + if [ $? != 0 ]; then + echo "# --- AWS ACCESS KEYS ---" >> ~/.bashrc + echo ". `realpath $base_dir/aws/aws-access-keys-commands`" >> ~/.bashrc + echo "# -----------------------" >> ~/.bashrc + source ~/.bashrc + fi +fi + diff --git a/tests/services/__init__.py b/tests/services/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/services/kafka_service.py b/tests/services/kafka_service.py new file mode 100644 index 0000000000000..dfcd5f8356850 --- /dev/null +++ b/tests/services/kafka_service.py @@ -0,0 +1,212 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service +import time, re, json + + +class KafkaService(Service): + def __init__(self, service_context, zk, topics=None): + """ + :type service_context ducktape.services.service.ServiceContext + :type zk: ZookeeperService + :type topics: dict + """ + super(KafkaService, self).__init__(service_context) + self.zk = zk + self.topics = topics + + def start(self): + super(KafkaService, self).start() + + # Start all nodes in this Kafka service + for idx, node in enumerate(self.nodes, 1): + self.logger.info("Starting Kafka node %d on %s", idx, node.account.hostname) + self._stop_and_clean(node, allow_fail=True) + self.start_node(node) + + # wait for start up + time.sleep(6) + + # Create topics if necessary + if self.topics is not None: + for topic, topic_cfg in self.topics.items(): + if topic_cfg is None: + topic_cfg = {} + + topic_cfg["topic"] = topic + self.create_topic(topic_cfg) + + def create_topic(self, topic_cfg): + node = self.nodes[0] # any node is fine here + self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) + + cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\ + "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % { + 'zk_connect': self.zk.connect_setting(), + 'topic': topic_cfg.get("topic"), + 'partitions': topic_cfg.get('partitions', 1), + 'replication': topic_cfg.get('replication-factor', 1) + } + + if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None: + for config_name, config_value in topic_cfg["configs"].items(): + cmd += " --config %s=%s" % (config_name, str(config_value)) + + self.logger.info("Running topic creation command...\n%s" % cmd) + node.account.ssh(cmd) + + time.sleep(1) + self.logger.info("Checking to see if topic was properly created...\n%s" % cmd) + for line in self.describe_topic(topic_cfg["topic"]).split("\n"): + self.logger.info(line) + + def describe_topic(self, topic): + node = self.nodes[0] + cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \ + (self.zk.connect_setting(), topic) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + return output + + def verify_reassign_partitions(self, reassignment): + """Run the reassign partitions admin tool in "verify" mode + """ + node = self.nodes[0] + json_file = "/tmp/" + str(time.time()) + "_reassign.json" + + # reassignment to json + json_str = json.dumps(reassignment) + json_str = json.dumps(json_str) + + # create command + cmd = "echo %s > %s && " % (json_str, json_file) + cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ + "--zookeeper %(zk_connect)s "\ + "--reassignment-json-file %(reassignment_file)s "\ + "--verify" % {'zk_connect': self.zk.connect_setting(), + 'reassignment_file': json_file} + cmd += " && sleep 1 && rm -f %s" % json_file + + # send command + self.logger.info("Verifying parition reassignment...") + self.logger.debug(cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + self.logger.debug(output) + + if re.match(".*is in progress.*", output) is not None: + return False + + return True + + def execute_reassign_partitions(self, reassignment): + """Run the reassign partitions admin tool in "verify" mode + """ + node = self.nodes[0] + json_file = "/tmp/" + str(time.time()) + "_reassign.json" + + # reassignment to json + json_str = json.dumps(reassignment) + json_str = json.dumps(json_str) + + # create command + cmd = "echo %s > %s && " % (json_str, json_file) + cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ + "--zookeeper %(zk_connect)s "\ + "--reassignment-json-file %(reassignment_file)s "\ + "--execute" % {'zk_connect': self.zk.connect_setting(), + 'reassignment_file': json_file} + cmd += " && sleep 1 && rm -f %s" % json_file + + # send command + self.logger.info("Executing parition reassignment...") + self.logger.debug(cmd) + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + self.logger.debug("Verify partition reassignment:") + self.logger.debug(output) + + def stop(self): + """If the service left any running processes or data, clean them up.""" + super(KafkaService, self).stop() + + for idx, node in enumerate(self.nodes, 1): + self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) + self._stop_and_clean(node, allow_fail=True) + node.free() + + def _stop_and_clean(self, node, allow_fail=False): + node.account.ssh("/opt/kafka/bin/kafka-server-stop.sh", allow_fail=allow_fail) + time.sleep(5) # the stop script doesn't wait + node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log") + + def stop_node(self, node, clean_shutdown=True, allow_fail=True): + node.account.kill_process("kafka", clean_shutdown, allow_fail) + + def start_node(self, node, config=None): + if config is None: + template = open('templates/kafka.properties').read() + template_params = { + 'broker_id': self.idx(node), + 'hostname': node.account.hostname, + 'zk_connect': self.zk.connect_setting() + } + + config = template % template_params + + node.account.create_file("/mnt/kafka.properties", config) + cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &" + self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) + node.account.ssh(cmd) + + def restart_node(self, node, wait_sec=0, clean_shutdown=True): + self.stop_node(node, clean_shutdown, allow_fail=True) + time.sleep(wait_sec) + self.start_node(node) + + def get_leader_node(self, topic, partition=0): + """ Get the leader replica for the given topic and partition. + """ + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \ + % self.zk.connect_setting() + cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition) + self.logger.debug(cmd) + + node = self.nodes[0] + self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic)) + partition_state = None + for line in node.account.ssh_capture(cmd): + match = re.match("^({.+})$", line) + if match is not None: + partition_state = match.groups()[0] + break + + if partition_state is None: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + + partition_state = json.loads(partition_state) + self.logger.info(partition_state) + + leader_idx = int(partition_state["leader"]) + self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) + return self.get_node(leader_idx) + + def bootstrap_servers(self): + return ','.join([node.account.hostname + ":9092" for node in self.nodes]) \ No newline at end of file diff --git a/tests/services/performance.py b/tests/services/performance.py new file mode 100644 index 0000000000000..9478b5ef956af --- /dev/null +++ b/tests/services/performance.py @@ -0,0 +1,189 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service +import threading + + +class PerformanceService(Service): + def __init__(self, service_context): + super(PerformanceService, self).__init__(service_context) + + def start(self): + super(PerformanceService, self).start() + self.worker_threads = [] + self.results = [None] * len(self.nodes) + self.stats = [[] for x in range(len(self.nodes))] + for idx,node in enumerate(self.nodes,1): + self.logger.info("Running %s node %d on %s", self.__class__.__name__, idx, node.account.hostname) + worker = threading.Thread( + name=self.__class__.__name__ + "-worker-" + str(idx), + target=self._worker, + args=(idx,node) + ) + worker.daemon = True + worker.start() + self.worker_threads.append(worker) + + def wait(self): + super(PerformanceService, self).wait() + for idx,worker in enumerate(self.worker_threads,1): + self.logger.debug("Waiting for %s worker %d to finish", self.__class__.__name__, idx) + worker.join() + self.worker_threads = None + + def stop(self): + super(PerformanceService, self).stop() + assert self.worker_threads is None, "%s.stop should only be called after wait" % self.__class__.__name__ + for idx,node in enumerate(self.nodes,1): + self.logger.debug("Stopping %s node %d on %s", self.__class__.__name__, idx, node.account.hostname) + node.free() + + +class ProducerPerformanceService(PerformanceService): + def __init__(self, service_context, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): + super(ProducerPerformanceService, self).__init__(service_context) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'record_size': record_size, + 'throughput': throughput + } + self.settings = settings + self.intermediate_stats = intermediate_stats + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) + cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ + "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args + + for key,value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Producer performance %d command: %s", idx, cmd) + def parse_stats(line): + parts = line.split(',') + return { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + last = None + for line in node.account.ssh_capture(cmd): + self.logger.debug("Producer performance %d: %s", idx, line.strip()) + if self.intermediate_stats: + try: + self.stats[idx-1].append(parse_stats(line)) + except: + # Sometimes there are extraneous log messages + pass + last = line + try: + self.results[idx-1] = parse_stats(last) + except: + self.logger.error("Bad last line: %s", last) + + +class ConsumerPerformanceService(PerformanceService): + def __init__(self, service_context, kafka, topic, num_records, throughput, threads=1, settings={}): + super(ConsumerPerformanceService, self).__init__(service_context) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'throughput': throughput, + 'threads': threads, + } + self.settings = settings + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'zk_connect': self.kafka.zk.connect_setting()}) + cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\ + "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args + for key,value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Consumer performance %d command: %s", idx, cmd) + last = None + for line in node.account.ssh_capture(cmd): + self.logger.debug("Consumer performance %d: %s", idx, line.strip()) + last = line + # Parse and save the last line's information + parts = last.split(',') + self.results[idx-1] = { + 'total_mb': float(parts[3]), + 'mbps': float(parts[4]), + 'records_per_sec': float(parts[6]), + } + + +class EndToEndLatencyService(PerformanceService): + def __init__(self, service_context, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + super(EndToEndLatencyService, self).__init__(service_context) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'consumer_fetch_max_wait': consumer_fetch_max_wait, + 'acks': acks + } + + def _worker(self, idx, node): + args = self.args.copy() + args.update({ + 'zk_connect': self.kafka.zk.connect_setting(), + 'bootstrap_servers': self.kafka.bootstrap_servers(), + }) + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency "\ + "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\ + "%(consumer_fetch_max_wait)d %(acks)d" % args + self.logger.debug("End-to-end latency %d command: %s", idx, cmd) + results = {} + for line in node.account.ssh_capture(cmd): + self.logger.debug("End-to-end latency %d: %s", idx, line.strip()) + if line.startswith("Avg latency:"): + results['latency_avg_ms'] = float(line.split()[2]) + if line.startswith("Percentiles"): + results['latency_50th_ms'] = float(line.split()[3][:-1]) + results['latency_99th_ms'] = float(line.split()[6][:-1]) + results['latency_999th_ms'] = float(line.split()[9]) + self.results[idx-1] = results + + +def parse_performance_output(summary): + parts = summary.split(',') + results = { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + # To provide compatibility with ConsumerPerformanceService + results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec']) + results['rate_mbps'] = results['mbps'] + results['rate_mps'] = results['records_per_sec'] + + return results diff --git a/tests/services/zookeeper_service.py b/tests/services/zookeeper_service.py new file mode 100644 index 0000000000000..efebe848257fe --- /dev/null +++ b/tests/services/zookeeper_service.py @@ -0,0 +1,75 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service +import time + + +class ZookeeperService(Service): + def __init__(self, service_context): + """ + :type service_context ducktape.services.service.ServiceContext + """ + super(ZookeeperService, self).__init__(service_context) + self.logs = {"zk_log": "/mnt/zk.log"} + + def start(self): + super(ZookeeperService, self).start() + config = """ +dataDir=/mnt/zookeeper +clientPort=2181 +maxClientCnxns=0 +initLimit=5 +syncLimit=2 +quorumListenOnAllIPs=true +""" + for idx, node in enumerate(self.nodes, 1): + template_params = { 'idx': idx, 'host': node.account.hostname } + config += "server.%(idx)d=%(host)s:2888:3888\n" % template_params + + for idx, node in enumerate(self.nodes, 1): + self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname) + self._stop_and_clean(node, allow_fail=True) + node.account.ssh("mkdir -p /mnt/zookeeper") + node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx) + node.account.create_file("/mnt/zookeeper.properties", config) + node.account.ssh( + "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(zk_log)s 2>> %(zk_log)s &" + % self.logs) + time.sleep(5) # give it some time to start + + def stop_node(self, node, allow_fail=True): + idx = self.idx(node) + self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) + node.account.ssh("ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM", + allow_fail=allow_fail) + + def clean_node(self, node, allow_fail=True): + node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail) + + def stop(self): + """If the service left any running processes or data, clean them up.""" + super(ZookeeperService, self).stop() + + for idx, node in enumerate(self.nodes, 1): + self.stop_node(node, allow_fail=False) + self.clean_node(node) + node.free() + + def _stop_and_clean(self, node, allow_fail=False): + self.stop_node(node, allow_fail) + self.clean_node(node, allow_fail) + + def connect_setting(self): + return ','.join([node.account.hostname + ':2181' for node in self.nodes]) diff --git a/tests/templates/kafka.properties b/tests/templates/kafka.properties new file mode 100644 index 0000000000000..fe0636e045e2a --- /dev/null +++ b/tests/templates/kafka.properties @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=%(broker_id)d + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +advertised.host.name=%(hostname)s + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=65536 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=/mnt/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=%(zk_connect)s + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=2000 diff --git a/tests/tests/__init__.py b/tests/tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/tests/kafka_benchmark_test.py b/tests/tests/kafka_benchmark_test.py new file mode 100644 index 0000000000000..5ea0913c054e0 --- /dev/null +++ b/tests/tests/kafka_benchmark_test.py @@ -0,0 +1,193 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service + +from tests.test import KafkaTest +from services.performance import ProducerPerformanceService, ConsumerPerformanceService, \ + EndToEndLatencyService + + +class KafkaBenchmark(KafkaTest): + '''A benchmark of Kafka producer/consumer performance. This replicates the test + run here: + https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines + ''' + def __init__(self, test_context): + super(KafkaBenchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ + 'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 }, + 'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 } + }) + + def run(self): + msgs_default = 50000000 + msgs_large = 100000000 + msg_size_default = 100 + batch_size = 8*1024 + buffer_memory = 64*1024*1024 + msg_sizes = [10, 100, 1000, 10000, 100000] + target_data_size = 1024*1024*1024 + target_data_size_gb = target_data_size/float(1024*1024*1024) + # These settings will work in the default local Vagrant VMs, useful for testing + if False: + msgs_default = 1000000 + msgs_large = 10000000 + msg_size_default = 100 + batch_size = 8*1024 + buffer_memory = 64*1024*1024 + msg_sizes = [10, 100, 1000, 10000, 100000] + target_data_size = 128*1024*1024 + target_data_size_gb = target_data_size/float(1024*1024*1024) + + # PRODUCER TESTS + + self.logger.info("BENCHMARK: Single producer, no replication") + single_no_rep = ProducerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-one", num_records=msgs_default, record_size=msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory} + ) + single_no_rep.run() + + self.logger.info("BENCHMARK: Single producer, async 3x replication") + single_rep_async = ProducerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory} + ) + single_rep_async.run() + + self.logger.info("BENCHMARK: Single producer, sync 3x replication") + single_rep_sync = ProducerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1, + settings={'acks':-1, 'batch.size':batch_size, 'buffer.memory':buffer_memory} + ) + single_rep_sync.run() + + self.logger.info("BENCHMARK: Three producers, async 3x replication") + three_rep_async = ProducerPerformanceService( + self.service_context(3), self.kafka, + topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory} + ) + three_rep_async.run() + + + msg_size_perf = {} + for msg_size in msg_sizes: + self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, target_data_size_gb) + # Always generate the same total amount of data + nrecords = int(target_data_size / msg_size) + perf = ProducerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1, + settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory} + ) + perf.run() + msg_size_perf[msg_size] = perf + + # CONSUMER TESTS + + # All consumer tests use the messages from the first benchmark, so + # they'll get messages of the default message size + self.logger.info("BENCHMARK: Single consumer") + single_consumer = ConsumerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1 + ) + single_consumer.run() + + self.logger.info("BENCHMARK: Three consumers") + three_consumers = ConsumerPerformanceService( + self.service_context(3), self.kafka, + topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1 + ) + three_consumers.run() + + # PRODUCER + CONSUMER TEST + self.logger.info("BENCHMARK: Producer + Consumer") + pc_producer = ProducerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory} + ) + pc_consumer = ConsumerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1 + ) + Service.run_parallel(pc_producer, pc_consumer) + + # END TO END LATENCY TEST + self.logger.info("BENCHMARK: End to end latency") + e2e_latency = EndToEndLatencyService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=10000 + ) + e2e_latency.run() + + + # LONG TERM THROUGHPUT TEST + + # Because of how much space this ends up using, we clear out the + # existing cluster to start from a clean slate. This also keeps us from + # running out of space with limited disk space. + self.tearDown() + self.setUp() + self.logger.info("BENCHMARK: Long production") + throughput_perf = ProducerPerformanceService( + self.service_context(1), self.kafka, + topic="test-rep-three", num_records=msgs_large, record_size=msg_size_default, throughput=-1, + settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}, + intermediate_stats=True + ) + throughput_perf.run() + + # Summarize, extracting just the key info. With multiple + # producers/consumers, we display the aggregate value + def throughput(perf): + aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) + aggregate_mbps = sum([r['mbps'] for r in perf.results]) + return "%f rec/sec (%f MB/s)" % (aggregate_rate, aggregate_mbps) + self.logger.info("=================") + self.logger.info("BENCHMARK RESULTS") + self.logger.info("=================") + self.logger.info("Single producer, no replication: %s", throughput(single_no_rep)) + self.logger.info("Single producer, async 3x replication: %s", throughput(single_rep_async)) + self.logger.info("Single producer, sync 3x replication: %s", throughput(single_rep_sync)) + self.logger.info("Three producers, async 3x replication: %s", throughput(three_rep_async)) + self.logger.info("Message size:") + for msg_size in msg_sizes: + self.logger.info(" %d: %s", msg_size, throughput(msg_size_perf[msg_size])) + self.logger.info("Throughput over long run, data > memory:") + # FIXME we should be generating a graph too + # Try to break it into 5 blocks, but fall back to a smaller number if + # there aren't even 5 elements + block_size = max(len(throughput_perf.stats[0]) / 5, 1) + nblocks = len(throughput_perf.stats[0]) / block_size + for i in range(nblocks): + subset = throughput_perf.stats[0][i*block_size:min((i+1)*block_size,len(throughput_perf.stats[0]))] + if len(subset) == 0: + self.logger.info(" Time block %d: (empty)", i) + else: + self.logger.info(" Time block %d: %f rec/sec (%f MB/s)", i, + sum([stat['records_per_sec'] for stat in subset])/float(len(subset)), + sum([stat['mbps'] for stat in subset])/float(len(subset)) + ) + self.logger.info("Single consumer: %s", throughput(single_consumer)) + self.logger.info("Three consumers: %s", throughput(three_consumers)) + self.logger.info("Producer + consumer:") + self.logger.info(" Producer: %s", throughput(pc_producer)) + self.logger.info(" Consumer: %s", throughput(pc_producer)) + self.logger.info("End-to-end latency: median %f ms, 99%% %f ms, 99.9%% %f ms", e2e_latency.results[0]['latency_50th_ms'], e2e_latency.results[0]['latency_99th_ms'], e2e_latency.results[0]['latency_999th_ms']) diff --git a/tests/tests/test.py b/tests/tests/test.py new file mode 100644 index 0000000000000..3ac511f1e5880 --- /dev/null +++ b/tests/tests/test.py @@ -0,0 +1,51 @@ +# Copyright 2014 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.services.service import ServiceContext + +from services.zookeeper_service import ZookeeperService +from services.kafka_service import KafkaService + + +class KafkaTest(Test): + """ + Helper class that managest setting up a Kafka cluster. Use this if the + default settings for Kafka are sufficient for your test; any customization + needs to be done manually. Your run() method should call tearDown and + setUp. The Zookeeper and Kafka services are available as the fields + KafkaTest.zk and KafkaTest.kafka. + + + """ + def __init__(self, test_context, num_zk, num_brokers, topics=None): + super(KafkaTest, self).__init__(test_context) + self.num_zk = num_zk + self.num_brokers = num_brokers + self.topics = topics + + def min_cluster_size(self): + return self.num_zk + self.num_brokers + + def setUp(self): + self.zk = ZookeeperService(ServiceContext(self.cluster, self.num_zk, self.logger)) + self.kafka = KafkaService( + ServiceContext(self.cluster, self.num_brokers, self.logger), + self.zk, topics=self.topics) + self.zk.start() + self.kafka.start() + + def tearDown(self): + self.kafka.stop() + self.zk.stop() diff --git a/vagrant/base.sh b/vagrant/base.sh index 6f28dfed67877..133f10a95622c 100644 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -41,3 +41,12 @@ chmod a+rw /opt if [ ! -e /opt/kafka ]; then ln -s /vagrant /opt/kafka fi + +# For EC2 nodes, we want to use /mnt, which should have the local disk. On local +# VMs, we can just create it if it doesn't exist and use it like we'd use +# /tmp. Eventually, we'd like to also support more directories, e.g. when EC2 +# instances have multiple local disks. +if [ ! -e /mnt ]; then + mkdir /mnt +fi +chmod a+rwx /mnt