From 51a94fd6ece926bcdd864af353efcf4c4d1b8ad8 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Thu, 4 Jun 2015 13:55:02 -0700 Subject: [PATCH] Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0. --- build.gradle | 2 +- checkstyle/import-control.xml | 2 +- .../clients/tools/ThroughputThrottler.java | 25 ++- .../clients/tools/VerifiableProducer.java | 203 ++++++++++-------- 4 files changed, 140 insertions(+), 92 deletions(-) diff --git a/build.gradle b/build.gradle index 5941d02672125..516a1393f868a 100644 --- a/build.gradle +++ b/build.gradle @@ -352,7 +352,7 @@ project(':clients') { compile "org.slf4j:slf4j-api:1.7.6" compile 'org.xerial.snappy:snappy-java:1.1.1.6' compile 'net.jpountz.lz4:lz4:1.2.0' - compile 'net.sf.jopt-simple:jopt-simple:4.8' + compile 'net.sourceforge.argparse4j:argparse4j:0.5.0' compile 'com.googlecode.json-simple:json-simple:1.1.1' testCompile 'com.novocode:junit-interface:0.9' diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 68f0058db3e25..f27da4d1bc69e 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -93,7 +93,7 @@ - + diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java index 03fa7b6b1e930..06c443f576cf9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java @@ -21,8 +21,9 @@ /** * This class helps producers throttle throughput. * - * The resulting average throughput will be approximately - * min(targetThroughput, maximumPossibleThroughput) + * If targetThroughput >= 0, the resulting average throughput will be approximately + * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0, + * no throttling will occur. * * To use, do this between successive send attempts: *
@@ -53,7 +54,9 @@ public class ThroughputThrottler {
     public ThroughputThrottler(long targetThroughput, long startMs) {
         this.startMs = startMs;
         this.targetThroughput = targetThroughput;
-        this.sleepTimeNs = NS_PER_SEC / targetThroughput;
+        this.sleepTimeNs = targetThroughput > 0 ?
+                           NS_PER_SEC / targetThroughput : 
+                           Long.MAX_VALUE;
     }
 
     /**
@@ -63,7 +66,7 @@ public ThroughputThrottler(long targetThroughput, long startMs) {
      * @return
      */
     public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
-        if (this.targetThroughput <= 0) {
+        if (this.targetThroughput < 0) {
             // No throttling in this case
             return false;
         }
@@ -72,7 +75,21 @@ public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
         return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
     }
 
+    /**
+     * Occasionally blocks for small amounts of time to achieve targetThroughput.
+     * 
+     * Note that if targetThroughput is 0, this will block extremely aggressively.
+     */
     public void throttle() {
+        if (targetThroughput == 0) {
+            try {
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            return;
+        }
+        
         // throttle throughput by sleeping, on average,
         // (1 / this.throughput) seconds between "things sent"
         sleepDeficitNs += sleepTimeNs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
index 81ffd06d5293d..c12a61993a5af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
@@ -27,15 +27,14 @@
 import org.json.simple.JSONObject;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
-import joptsimple.ArgumentAcceptingOptionSpec;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import joptsimple.OptionSpecBuilder;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
 
 /**
  * Primarily intended for use with system testing, this producer prints metadata
@@ -51,12 +50,8 @@
  * whether a given line is valid JSON.
  */
 public class VerifiableProducer {
-
-    OptionParser commandLineParser;
-    Map> commandLineOptions = new HashMap>();
-  
+    
     String topic;
-    private Properties producerProps = new Properties();
     private Producer producer;
     // If maxMessages < 0, produce until the process is killed externally
     private long maxMessages = -1;
@@ -66,85 +61,114 @@ public class VerifiableProducer {
     
     // Number of send attempts
     private long numSent = 0;
+    
+    // Throttle message throughput if this is set >= 0
     private long throughput;
+    
+    // Hook to trigger producing thread to stop sending messages
+    private boolean stopProducing = false;
+
+    public VerifiableProducer(
+            Properties producerProps, String topic, int throughput, int maxMessages) {
 
-    /** Construct with command-line arguments */
-    public VerifiableProducer(String[] args) throws IOException {
-        this.configureParser();
-        this.parseCommandLineArgs(args);
+        this.topic = topic;
+        this.throughput = throughput;
+        this.maxMessages = maxMessages;
         this.producer = new KafkaProducer(producerProps);
     }
-  
-    /** Set up the command-line options. */
-    private void configureParser() {
-        this.commandLineParser = new OptionParser();
-        ArgumentAcceptingOptionSpec topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
-                .withRequiredArg()
-                .required()
-                .describedAs("topic")
-                .ofType(String.class);
-        commandLineOptions.put("topic", topicOpt);
-    
-        ArgumentAcceptingOptionSpec  brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-                .withRequiredArg()
-                .required()
-                .describedAs("broker-list")
-                .ofType(String.class);
-        commandLineOptions.put("broker-list", brokerListOpt);
-    
-    
-        ArgumentAcceptingOptionSpec  numMessagesOpt = commandLineParser.accepts("max-messages", "Produce this many messages. Default: -1, produces messages until the process is killed externally.")
-                .withOptionalArg()
-                .defaultsTo("-1")
-                .describedAs("max-messages")
-                .ofType(String.class);
-        commandLineOptions.put("max-messages", numMessagesOpt);
 
-        ArgumentAcceptingOptionSpec  throughputOpt = commandLineParser.accepts("throughput", "Average message throughput, in messages/sec. Default: -1, results in no throttling.")
-                .withOptionalArg()
-                .defaultsTo("-1")
-                .describedAs("throughput")
-                .ofType(String.class);
-        commandLineOptions.put("throughput", throughputOpt);
+    /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("verifiable-producer")
+                .defaultHelp(true)
+                .description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
 
-        ArgumentAcceptingOptionSpec  acksOpt = commandLineParser.accepts("acks", "number of acks required. Default: -1")
-                .withOptionalArg()
-                .defaultsTo("-1")
-                .describedAs("acks")
-                .ofType(String.class);
-        commandLineOptions.put("acks", acksOpt);
-    
-        OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message.");
-        commandLineOptions.put("help", helpOpt);
+        parser.addArgument("--topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("TOPIC")
+                .help("Produce messages to this topic.");
+
+        parser.addArgument("--broker-list")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("brokerList")
+                .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+        
+        parser.addArgument("--max-messages")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("MAX-MESSAGES")
+                .dest("maxMessages")
+                .help("Produce this many messages. If -1, produce messages until the process is killed externally.");
+
+        parser.addArgument("--throughput")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("THROUGHPUT")
+                .help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
+
+        parser.addArgument("--acks")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .choices(0, 1, -1)
+                .metavar("ACKS")
+                .help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
+
+        return parser;
     }
   
-    /** Validate command-line arguments and parse into properties object. */
-    public void parseCommandLineArgs(String[] args) throws IOException {
+    /** Construct a VerifiableProducer object from command-line arguments. */
+    public static VerifiableProducer createFromArgs(String[] args) {
+        ArgumentParser parser = argParser();
+        VerifiableProducer producer = null;
+        
+        try {
+            Namespace res;
+            res = parser.parseArgs(args);
+            System.out.println(res);
+            System.out.println(res.getString("brokerList"));
+            
+            
+            int maxMessages = res.getInt("maxMessages");
+            String topic = res.getString("topic");
+            int throughput = res.getInt("throughput");
 
-        OptionSet options = commandLineParser.parse(args);
-        if (options.has(commandLineOptions.get("help"))) {
-            commandLineParser.printHelpOn(System.out);
-            System.exit(0);
-        }
+            Properties producerProps = new Properties();
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                              "org.apache.kafka.common.serialization.StringSerializer");
+            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                              "org.apache.kafka.common.serialization.StringSerializer");
+            producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
+            // No producer retries
+            producerProps.put("retries", "0");
 
-        this.maxMessages = Integer.parseInt((String) options.valueOf("max-messages"));
-        this.topic = (String) options.valueOf("topic");
-        this.throughput = Long.parseLong((String) options.valueOf("throughput"));
-    
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                          "org.apache.kafka.common.serialization.StringSerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                          "org.apache.kafka.common.serialization.StringSerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, options.valueOf("acks"));
-    
-        // No producer retries
-        producerProps.put("retries", "0");
+            producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
+            }
+        }
+        
+        return producer;
     }
   
-    /**
-     * Produce a message with given value and no key.
-     */
+    /** Produce a message with given key and value. */
     public void send(String key, String value) {
         ProducerRecord record = new ProducerRecord(topic, key, value);
         numSent++;
@@ -158,7 +182,7 @@ public void send(String key, String value) {
         }
     }
   
-    /** Need to close the producer to flush any remaining messages if we're in async mode. */
+    /** Close the producer to flush any remaining messages. */
     public void close() {
         producer.close();
     }
@@ -199,9 +223,7 @@ String successString(RecordMetadata recordMetadata, String key, String value, Lo
         return obj.toJSONString();
     }
   
-    /**
-     * Callback which prints errors to stdout when the producer fails to send.
-     */
+    /** Callback which prints errors to stdout when the producer fails to send. */
     private class PrintInfoCallback implements Callback {
         
         private String key;
@@ -226,18 +248,22 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
   
     public static void main(String[] args) throws IOException {
         
-        final VerifiableProducer producer = new VerifiableProducer(args);
+        final VerifiableProducer producer = createFromArgs(args);
         final long startMs = System.currentTimeMillis();
         boolean infinite = producer.maxMessages < 0;
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
+                // Trigger main thread to stop producing messages
+                producer.stopProducing = true;
+                
+                // Flush any remaining messages
                 producer.close();
 
+                // Print a summary
                 long stopMs = System.currentTimeMillis();
                 double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
-
                 JSONObject obj = new JSONObject();
                 obj.put("class", producer.getClass().toString());
                 obj.put("name", "tool_data");
@@ -248,14 +274,19 @@ public void run() {
                 System.out.println(obj.toJSONString());
             }
         });
-        
+
         ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
         for (int i = 0; i < producer.maxMessages || infinite; i++) {
+            if (producer.stopProducing) {
+                break;
+            }
             long sendStartMs = System.currentTimeMillis();
             producer.send(null, String.format("%d", i));
+            
             if (throttler.shouldThrottle(i, sendStartMs)) {
                 throttler.throttle();
             }
         }
     }
+        
 }