Skip to content

Commit

Permalink
KAFKA-2276; KIP-25 initial patch
Browse files Browse the repository at this point in the history
Initial patch for KIP-25

Note that to install ducktape, do *not* use pip to install ducktape. Instead:

```
$ git clone gitgithub.com:confluentinc/ducktape.git
$ cd ducktape
$ python setup.py install
```

Author: Geoff Anderson <[email protected]>
Author: Geoff <[email protected]>
Author: Liquan Pei <[email protected]>

Reviewers: Ewen, Gwen, Jun, Guozhang

Closes #70 from granders/KAFKA-2276 and squashes the following commits:

a62fb6c [Geoff Anderson] fixed checkstyle errors
a70f0f8 [Geoff Anderson] Merged in upstream trunk.
8b62019 [Geoff Anderson] Merged in upstream trunk.
47b7b64 [Geoff Anderson] Created separate tools jar so that the clients package does not pull in dependencies on the Jackson JSON tools or argparse4j.
a9e6a14 [Geoff Anderson] Merged in upstream changes
d18db7b [Geoff Anderson] fixed :rat errors (needed to add licenses)
321fdf8 [Geoff Anderson] Ignore tests/ and vagrant/ directories when running rat build task
795fc75 [Geoff Anderson] Merged in changes from upstream trunk.
1d93f06 [Geoff Anderson] Updated provisioning to use java 7 in light of KAFKA-2316
2ea4e29 [Geoff Anderson] Tweaked README, changed default log collection behavior on VerifiableProducer
0eb6fdc [Geoff Anderson] Merged in system-tests
69dd7be [Geoff Anderson] Merged in trunk
4034dd6 [Geoff Anderson] Merged in upstream trunk
ede6450 [Geoff] Merge pull request #4 from confluentinc/move_muckrake
7751545 [Geoff Anderson] Corrected license headers
e6d532f [Geoff Anderson] java 7 -> java 6
8c61e2d [Geoff Anderson] Reverted jdk back to 6
f14c507 [Geoff Anderson] Removed mode = "test" from Vagrantfile and Vagrantfile.local examples. Updated testing README to clarify aws setup.
98b7253 [Geoff Anderson] Updated consumer tests to pre-populate kafka logs
e6a41f1 [Geoff Anderson] removed stray println
b15b24f [Geoff Anderson] leftover KafkaBenchmark in super call
0f75187 [Geoff Anderson] Rmoved stray allow_fail. kafka_benchmark_test -> benchmark_test
f469f84 [Geoff Anderson] Tweaked readme, added example Vagrantfile.local
3d73857 [Geoff Anderson] Merged downstream changes
42dcdb1 [Geoff Anderson] Tweaked behavior of stop_node, clean_node to generally fail fast
7f7c3e0 [Geoff Anderson] Updated setup.py for kafkatest
c60125c [Geoff Anderson] TestEndToEndLatency -> EndToEndLatency
4f476fe [Geoff Anderson] Moved aws scripts to vagrant directory
5af88fc [Geoff Anderson] Updated README to include aws quickstart
e5edf03 [Geoff Anderson] Updated example aws Vagrantfile.local
96533c3 [Geoff] Update aws-access-keys-commands
25a413d [Geoff] Update aws-example-Vagrantfile.local
884b20e [Geoff Anderson] Moved a bunch of files to kafkatest directory
fc7c81c [Geoff Anderson] added setup.py
632be12 [Geoff] Merge pull request #3 from confluentinc/verbose-client
51a94fd [Geoff Anderson] Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0.
a80a428 [Geoff Anderson] Added shell program for VerifiableProducer.
d586fb0 [Geoff Anderson] Updated comments to reflect that throttler is not message-specific
6842ed1 [Geoff Anderson] left out a file from last commit
1228eef [Geoff Anderson] Renamed throttler
9100417 [Geoff Anderson] Updated command-line options for VerifiableProducer. Extracted throughput logic to make it reusable.
0a5de8e [Geoff Anderson] Fixed checkstyle errors. Changed name to VerifiableProducer. Added synchronization for thread safety on println statements.
475423b [Geoff Anderson] Convert class to string before adding to json object.
bc009f2 [Geoff Anderson] Got rid of VerboseProducer in core (moved to clients)
c0526fe [Geoff Anderson] Updates per review comments.
8b4b1f2 [Geoff Anderson] Minor updates to VerboseProducer
2777712 [Geoff Anderson] Added some metadata to producer output.
da94b8c [Geoff Anderson] Added number of messages option.
07cd1c6 [Geoff Anderson] Added simple producer which prints status of produced messages to stdout.
a278988 [Geoff Anderson] fixed typos
f1914c3 [Liquan Pei] Merge pull request #2 from confluentinc/system_tests
81e4156 [Liquan Pei] Bootstrap Kafka system tests
  • Loading branch information
Geoff Anderson authored and guozhangwang committed Jul 29, 2015
1 parent f4101ab commit e43c9af
Show file tree
Hide file tree
Showing 34 changed files with 2,322 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ config/server-*
config/zookeeper-*
core/data/*
gradle/wrapper/*

results
tests/results
.ducktape
tests/.ducktape
39 changes: 38 additions & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ ram_megabytes = 1280
# EC2
ec2_access_key = ENV['AWS_ACCESS_KEY']
ec2_secret_key = ENV['AWS_SECRET_KEY']
ec2_session_token = ENV['AWS_SESSION_TOKEN']
ec2_keypair_name = nil
ec2_keypair_file = nil

Expand All @@ -50,6 +51,24 @@ if File.exists?(local_config_file) then
eval(File.read(local_config_file), binding, "Vagrantfile.local")
end

# This is a horrible hack to work around bad interactions between
# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager
# wants to update the /etc/hosts entries, but tries to do so even on nodes that
# aren't up (e.g. even when all nodes are stopped and you run vagrant
# destroy). Because of the way the underlying code in vagrant works, it still
# tries to communicate with the node and has to wait for a very long
# timeout. This modifies the update to check for hosts that are not created or
# stopped, skipping the update in that case since it's impossible to update
# nodes in that state.
Object.const_get("VagrantPlugins").const_get("HostManager").const_get("HostsFile").class_eval do
alias_method :old_update_guest, :update_guest
def update_guest(machine)
state_id = machine.state.id
return if state_id == :not_created || state_id == :stopped
old_update_guest(machine)
end
end

# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered.
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.hostmanager.enabled = true
Expand Down Expand Up @@ -85,13 +104,31 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
override.vm.box = "dummy"
override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box"

override.hostmanager.ignore_private_ip = true
cached_addresses = {}
# Use a custom resolver that SSH's into the machine and finds the IP address
# directly. This lets us get at the private IP address directly, avoiding
# some issues with using the default IP resolver, which uses the public IP
# address.
override.hostmanager.ip_resolver = proc do |vm, resolving_vm|
if !cached_addresses.has_key?(vm.name)
state_id = vm.state.id
if state_id != :not_created && state_id != :stopped && vm.communicate.ready?
vm.communicate.execute("/sbin/ifconfig eth0 | grep 'inet addr' | tail -n 1 | egrep -o '[0-9\.]+' | head -n 1 2>&1") do |type, contents|
cached_addresses[vm.name] = contents.split("\n").first[/(\d+\.\d+\.\d+\.\d+)/, 1]
end
else
cached_addresses[vm.name] = nil
end
end
cached_addresses[vm.name]
end

override.ssh.username = ec2_user
override.ssh.private_key_path = ec2_keypair_file

aws.access_key_id = ec2_access_key
aws.secret_access_key = ec2_secret_key
aws.session_token = ec2_session_token
aws.keypair_name = ec2_keypair_name

aws.region = ec2_region
Expand Down
10 changes: 10 additions & 0 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

# classpath addition for release
for file in $base_dir/libs/*.jar;
do
Expand Down
20 changes: 20 additions & 0 deletions bin/kafka-verifiable-producer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@
60 changes: 55 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,20 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}

tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) {
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
}

tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { }
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }

tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }

tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test']) {
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
}

tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
}

tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) {
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
}

project(':core') {
Expand Down Expand Up @@ -413,6 +413,56 @@ project(':clients') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':tools') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-tools"

dependencies {
compile project(':clients')
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
compile "$slf4jlog4j"

testCompile 'junit:junit:4.6'
testCompile project(path: ':clients', configuration: 'archives')
}

task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}

test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}

javadoc {
include "**/org/apache/kafka/tools/*"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${scalaVersion}"
}

jar {
dependsOn 'copyDependantLibs'
}

checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':log4j-appender') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-log4j-appender"
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package kafka.tools

import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
import java.util.{Arrays, Properties}

import kafka.consumer._
import java.util.Properties
import java.util.Arrays
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.Option.option2Iterable

object TestEndToEndLatency {
object EndToEndLatency {
def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/ProducerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ object ProducerPerformance extends Logging {
.defaultsTo(0)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")
"set, the csv metrics will be output here")
.withRequiredArg
.describedAs("metrics dictory")
.describedAs("metrics directory")
.ofType(classOf[java.lang.String])
val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")

Expand Down
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
// limitations under the License.

apply from: file('scala.gradle')
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender'

11 changes: 11 additions & 0 deletions tests/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Vagrantfile.local

.idea/

*.pyc
*.ipynb

.DS_Store

.ducktape
results/
Loading

0 comments on commit e43c9af

Please sign in to comment.