From 47b7b64e0fa1ec7e1227a4def8eb01fe9be4202d Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Mon, 20 Jul 2015 16:33:14 -0700 Subject: [PATCH] Created separate tools jar so that the clients package does not pull in dependencies on the Jackson JSON tools or argparse4j. --- bin/kafka-run-class.sh | 10 +++ build.gradle | 62 ++++++++++++-- checkstyle/import-control.xml | 2 +- settings.gradle | 3 +- .../clients/tools/ProducerPerformance.java | 0 .../clients/tools/ThroughputThrottler.java | 0 .../clients/tools/VerifiableProducer.java | 80 ++++++++++++------- 7 files changed, 117 insertions(+), 40 deletions(-) rename {clients => tools}/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java (100%) rename {clients => tools}/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java (100%) rename {clients => tools}/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java (83%) diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 8c3fa28614534..ebe74092a23dd 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -65,6 +65,16 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/tools/build/libs/kafka-tools*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + +for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + # classpath addition for release for file in $base_dir/libs/*.jar; do diff --git a/build.gradle b/build.gradle index 4fd277e874faa..6a8fc9295ddc2 100644 --- a/build.gradle +++ b/build.gradle @@ -203,20 +203,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) { } } -tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) { +tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) { } -tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { } +tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { } -tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { } +tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { } -tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test']) { +tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test', 'tools:test']) { } tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) { } -tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) { +tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) { } project(':core') { @@ -378,8 +378,6 @@ project(':clients') { compile "org.slf4j:slf4j-api:1.7.6" compile 'org.xerial.snappy:snappy-java:1.1.1.7' compile 'net.jpountz.lz4:lz4:1.2.0' - compile 'net.sourceforge.argparse4j:argparse4j:0.5.0' - compile 'com.googlecode.json-simple:json-simple:1.1.1' testCompile 'junit:junit:4.6' testRuntime "$slf4jlog4j" @@ -419,6 +417,56 @@ project(':clients') { test.dependsOn('checkstyleMain', 'checkstyleTest') } +project(':tools') { + apply plugin: 'checkstyle' + archivesBaseName = "kafka-tools" + + dependencies { + compile project(':clients') + compile 'net.sourceforge.argparse4j:argparse4j:0.5.0' + compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4' + compile "$slf4jlog4j" + + testCompile 'junit:junit:4.6' + testCompile project(path: ':clients', configuration: 'archives') + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/tools/*" + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + } + from (configurations.runtime) { + exclude('kafka-clients*') + } + into "$buildDir/dependant-libs-${scalaVersion}" + } + + jar { + dependsOn 'copyDependantLibs' + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} + project(':log4j-appender') { apply plugin: 'checkstyle' archivesBaseName = "kafka-log4j-appender" diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 90d84c2b00dcf..a562eef2d0e93 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -92,7 +92,7 @@ - + diff --git a/settings.gradle b/settings.gradle index 3b6a952cff2b3..1944917408792 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,5 @@ // limitations under the License. apply from: file('scala.gradle') -include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender' +include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender' + diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java similarity index 100% rename from clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java rename to tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java similarity index 100% rename from clients/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java rename to tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java similarity index 83% rename from clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java rename to tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java index 7a174289aacba..204166a25e740 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java @@ -24,9 +24,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.json.simple.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -190,34 +193,47 @@ public void close() { */ String errorString(Exception e, String key, String value, Long nowMs) { assert e != null : "Expected non-null exception."; - - JSONObject obj = new JSONObject(); - obj.put("class", this.getClass().toString()); - obj.put("name", "producer_send_error"); + + Map errorData = new HashMap<>(); + errorData.put("class", this.getClass().toString()); + errorData.put("name", "producer_send_error"); + + errorData.put("time_ms", nowMs); + errorData.put("exception", e.getClass().toString()); + errorData.put("message", e.getMessage()); + errorData.put("topic", this.topic); + errorData.put("key", key); + errorData.put("value", value); - obj.put("time_ms", nowMs); - obj.put("exception", e.getClass().toString()); - obj.put("message", e.getMessage()); - obj.put("topic", this.topic); - obj.put("key", key); - obj.put("value", value); - return obj.toJSONString(); + return toJsonString(errorData); } String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) { assert recordMetadata != null : "Expected non-null recordMetadata object."; - - JSONObject obj = new JSONObject(); - obj.put("class", this.getClass().toString()); - obj.put("name", "producer_send_success"); + + Map successData = new HashMap<>(); + successData.put("class", this.getClass().toString()); + successData.put("name", "producer_send_success"); + + successData.put("time_ms", nowMs); + successData.put("topic", this.topic); + successData.put("partition", recordMetadata.partition()); + successData.put("offset", recordMetadata.offset()); + successData.put("key", key); + successData.put("value", value); - obj.put("time_ms", nowMs); - obj.put("topic", this.topic); - obj.put("partition", recordMetadata.partition()); - obj.put("offset", recordMetadata.offset()); - obj.put("key", key); - obj.put("value", value); - return obj.toJSONString(); + return toJsonString(successData); + } + + private String toJsonString(Map data) { + String json; + try { + ObjectMapper mapper = new ObjectMapper(); + json = mapper.writeValueAsString(data); + } catch(JsonProcessingException e) { + json = "Bad data can't be written as json: " + e.getMessage(); + } + return json; } /** Callback which prints errors to stdout when the producer fails to send. */ @@ -261,14 +277,16 @@ public void run() { // Print a summary long stopMs = System.currentTimeMillis(); double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs)); - JSONObject obj = new JSONObject(); - obj.put("class", producer.getClass().toString()); - obj.put("name", "tool_data"); - obj.put("sent", producer.numSent); - obj.put("acked", producer.numAcked); - obj.put("target_throughput", producer.throughput); - obj.put("avg_throughput", avgThroughput); - System.out.println(obj.toJSONString()); + + Map data = new HashMap<>(); + data.put("class", producer.getClass().toString()); + data.put("name", "tool_data"); + data.put("sent", producer.numSent); + data.put("acked", producer.numAcked); + data.put("target_throughput", producer.throughput); + data.put("avg_throughput", avgThroughput); + + System.out.println(producer.toJsonString(data)); } });