Skip to content

Commit

Permalink
Created separate tools jar so that the clients package does not pull …
Browse files Browse the repository at this point in the history
…in dependencies on the Jackson JSON tools or argparse4j.
  • Loading branch information
Geoff Anderson committed Jul 20, 2015
1 parent a9e6a14 commit 47b7b64
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 40 deletions.
10 changes: 10 additions & 0 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

# classpath addition for release
for file in $base_dir/libs/*.jar;
do
Expand Down
62 changes: 55 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) {
}
}

tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) {
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
}

tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { }
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }

tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }

tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test']) {
tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test', 'tools:test']) {
}

tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) {
}

tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) {
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
}

project(':core') {
Expand Down Expand Up @@ -378,8 +378,6 @@ project(':clients') {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.7'
compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.googlecode.json-simple:json-simple:1.1.1'

testCompile 'junit:junit:4.6'
testRuntime "$slf4jlog4j"
Expand Down Expand Up @@ -419,6 +417,56 @@ project(':clients') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':tools') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-tools"

dependencies {
compile project(':clients')
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
compile "$slf4jlog4j"

testCompile 'junit:junit:4.6'
testCompile project(path: ':clients', configuration: 'archives')
}

task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}

test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}

javadoc {
include "**/org/apache/kafka/tools/*"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${scalaVersion}"
}

jar {
dependsOn 'copyDependantLibs'
}

checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}

project(':log4j-appender') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-log4j-appender"
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.json.simple" />
<allow pkg="com.fasterxml.jackson.core" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>
Expand Down
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
// limitations under the License.

apply from: file('scala.gradle')
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender'

Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import org.json.simple.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static net.sourceforge.argparse4j.impl.Arguments.store;
Expand Down Expand Up @@ -190,34 +193,47 @@ public void close() {
*/
String errorString(Exception e, String key, String value, Long nowMs) {
assert e != null : "Expected non-null exception.";

JSONObject obj = new JSONObject();
obj.put("class", this.getClass().toString());
obj.put("name", "producer_send_error");

Map<String, Object> errorData = new HashMap<>();
errorData.put("class", this.getClass().toString());
errorData.put("name", "producer_send_error");

errorData.put("time_ms", nowMs);
errorData.put("exception", e.getClass().toString());
errorData.put("message", e.getMessage());
errorData.put("topic", this.topic);
errorData.put("key", key);
errorData.put("value", value);

obj.put("time_ms", nowMs);
obj.put("exception", e.getClass().toString());
obj.put("message", e.getMessage());
obj.put("topic", this.topic);
obj.put("key", key);
obj.put("value", value);
return obj.toJSONString();
return toJsonString(errorData);
}

String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
assert recordMetadata != null : "Expected non-null recordMetadata object.";

JSONObject obj = new JSONObject();
obj.put("class", this.getClass().toString());
obj.put("name", "producer_send_success");

Map<String, Object> successData = new HashMap<>();
successData.put("class", this.getClass().toString());
successData.put("name", "producer_send_success");

successData.put("time_ms", nowMs);
successData.put("topic", this.topic);
successData.put("partition", recordMetadata.partition());
successData.put("offset", recordMetadata.offset());
successData.put("key", key);
successData.put("value", value);

obj.put("time_ms", nowMs);
obj.put("topic", this.topic);
obj.put("partition", recordMetadata.partition());
obj.put("offset", recordMetadata.offset());
obj.put("key", key);
obj.put("value", value);
return obj.toJSONString();
return toJsonString(successData);
}

private String toJsonString(Map<String, Object> data) {
String json;
try {
ObjectMapper mapper = new ObjectMapper();
json = mapper.writeValueAsString(data);
} catch(JsonProcessingException e) {
json = "Bad data can't be written as json: " + e.getMessage();
}
return json;
}

/** Callback which prints errors to stdout when the producer fails to send. */
Expand Down Expand Up @@ -261,14 +277,16 @@ public void run() {
// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data");
obj.put("sent", producer.numSent);
obj.put("acked", producer.numAcked);
obj.put("target_throughput", producer.throughput);
obj.put("avg_throughput", avgThroughput);
System.out.println(obj.toJSONString());

Map<String, Object> data = new HashMap<>();
data.put("class", producer.getClass().toString());
data.put("name", "tool_data");
data.put("sent", producer.numSent);
data.put("acked", producer.numAcked);
data.put("target_throughput", producer.throughput);
data.put("avg_throughput", avgThroughput);

System.out.println(producer.toJsonString(data));
}
});

Expand Down

0 comments on commit 47b7b64

Please sign in to comment.