Skip to content

Commit

Permalink
Use argparse4j instead of joptsimple. ThroughputThrottler now has mor…
Browse files Browse the repository at this point in the history
…e intuitive behavior when targetThroughput is 0.
  • Loading branch information
Geoff Anderson committed Jun 4, 2015
1 parent a80a428 commit 51a94fd
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 92 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ project(':clients') {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.6'
compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'net.sf.jopt-simple:jopt-simple:4.8'
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.googlecode.json-simple:json-simple:1.1.1'

testCompile 'com.novocode:junit-interface:0.9'
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.json.simple" />
<allow pkg="joptsimple" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
/**
* This class helps producers throttle throughput.
*
* The resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput)
* If targetThroughput >= 0, the resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
* no throttling will occur.
*
* To use, do this between successive send attempts:
* <pre>
Expand Down Expand Up @@ -53,7 +54,9 @@ public class ThroughputThrottler {
public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs;
this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
this.sleepTimeNs = targetThroughput > 0 ?
NS_PER_SEC / targetThroughput :
Long.MAX_VALUE;
}

/**
Expand All @@ -63,7 +66,7 @@ public ThroughputThrottler(long targetThroughput, long startMs) {
* @return
*/
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput <= 0) {
if (this.targetThroughput < 0) {
// No throttling in this case
return false;
}
Expand All @@ -72,7 +75,21 @@ public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
}

/**
* Occasionally blocks for small amounts of time to achieve targetThroughput.
*
* Note that if targetThroughput is 0, this will block extremely aggressively.
*/
public void throttle() {
if (targetThroughput == 0) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// do nothing
}
return;
}

// throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@
import org.json.simple.JSONObject;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import static net.sourceforge.argparse4j.impl.Arguments.store;

import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;

/**
* Primarily intended for use with system testing, this producer prints metadata
Expand All @@ -51,12 +50,8 @@
* whether a given line is valid JSON.
*/
public class VerifiableProducer {

OptionParser commandLineParser;
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();


String topic;
private Properties producerProps = new Properties();
private Producer<String, String> producer;
// If maxMessages < 0, produce until the process is killed externally
private long maxMessages = -1;
Expand All @@ -66,85 +61,114 @@ public class VerifiableProducer {

// Number of send attempts
private long numSent = 0;

// Throttle message throughput if this is set >= 0
private long throughput;

// Hook to trigger producing thread to stop sending messages
private boolean stopProducing = false;

public VerifiableProducer(
Properties producerProps, String topic, int throughput, int maxMessages) {

/** Construct with command-line arguments */
public VerifiableProducer(String[] args) throws IOException {
this.configureParser();
this.parseCommandLineArgs(args);
this.topic = topic;
this.throughput = throughput;
this.maxMessages = maxMessages;
this.producer = new KafkaProducer<String, String>(producerProps);
}

/** Set up the command-line options. */
private void configureParser() {
this.commandLineParser = new OptionParser();
ArgumentAcceptingOptionSpec<String> topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
.withRequiredArg()
.required()
.describedAs("topic")
.ofType(String.class);
commandLineOptions.put("topic", topicOpt);

ArgumentAcceptingOptionSpec<String> brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.required()
.describedAs("broker-list")
.ofType(String.class);
commandLineOptions.put("broker-list", brokerListOpt);


ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("max-messages", "Produce this many messages. Default: -1, produces messages until the process is killed externally.")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("max-messages")
.ofType(String.class);
commandLineOptions.put("max-messages", numMessagesOpt);

ArgumentAcceptingOptionSpec<String> throughputOpt = commandLineParser.accepts("throughput", "Average message throughput, in messages/sec. Default: -1, results in no throttling.")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("throughput")
.ofType(String.class);
commandLineOptions.put("throughput", throughputOpt);
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-producer")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");

ArgumentAcceptingOptionSpec<String> acksOpt = commandLineParser.accepts("acks", "number of acks required. Default: -1")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("acks")
.ofType(String.class);
commandLineOptions.put("acks", acksOpt);

OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message.");
commandLineOptions.put("help", helpOpt);
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");

parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");

parser.addArgument("--throughput")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");

parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.choices(0, 1, -1)
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");

return parser;
}

/** Validate command-line arguments and parse into properties object. */
public void parseCommandLineArgs(String[] args) throws IOException {
/** Construct a VerifiableProducer object from command-line arguments. */
public static VerifiableProducer createFromArgs(String[] args) {
ArgumentParser parser = argParser();
VerifiableProducer producer = null;

try {
Namespace res;
res = parser.parseArgs(args);
System.out.println(res);
System.out.println(res.getString("brokerList"));


int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
int throughput = res.getInt("throughput");

OptionSet options = commandLineParser.parse(args);
if (options.has(commandLineOptions.get("help"))) {
commandLineParser.printHelpOn(System.out);
System.exit(0);
}
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
// No producer retries
producerProps.put("retries", "0");

this.maxMessages = Integer.parseInt((String) options.valueOf("max-messages"));
this.topic = (String) options.valueOf("topic");
this.throughput = Long.parseLong((String) options.valueOf("throughput"));

producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, options.valueOf("acks"));

// No producer retries
producerProps.put("retries", "0");
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
}

return producer;
}

/**
* Produce a message with given value and no key.
*/
/** Produce a message with given key and value. */
public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
numSent++;
Expand All @@ -158,7 +182,7 @@ public void send(String key, String value) {
}
}

/** Need to close the producer to flush any remaining messages if we're in async mode. */
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
}
Expand Down Expand Up @@ -199,9 +223,7 @@ String successString(RecordMetadata recordMetadata, String key, String value, Lo
return obj.toJSONString();
}

/**
* Callback which prints errors to stdout when the producer fails to send.
*/
/** Callback which prints errors to stdout when the producer fails to send. */
private class PrintInfoCallback implements Callback {

private String key;
Expand All @@ -226,18 +248,22 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {

public static void main(String[] args) throws IOException {

final VerifiableProducer producer = new VerifiableProducer(args);
final VerifiableProducer producer = createFromArgs(args);
final long startMs = System.currentTimeMillis();
boolean infinite = producer.maxMessages < 0;

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Trigger main thread to stop producing messages
producer.stopProducing = true;

// Flush any remaining messages
producer.close();

// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));

JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data");
Expand All @@ -248,14 +274,19 @@ public void run() {
System.out.println(obj.toJSONString());
}
});

ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
for (int i = 0; i < producer.maxMessages || infinite; i++) {
if (producer.stopProducing) {
break;
}
long sendStartMs = System.currentTimeMillis();
producer.send(null, String.format("%d", i));

if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
}

}

0 comments on commit 51a94fd

Please sign in to comment.