diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index 2141de344a3d..cc822e556342 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -149,7 +149,8 @@ private static void usage() { + "'delimited') " + newLine + "topic= " + newLine + "key= " + newLine - + "[iterations= (defaults to 1,000,000)] " + newLine + + "[iterations= (if no value is specified, datagen will produce " + + "indefinitely)] " + newLine + "[maxInterval= (defaults to 500)] " + newLine + "[propertiesFile=] " + newLine + "[nThreads=] " + newLine @@ -266,7 +267,7 @@ private Builder() { valueFormat = null; topicName = null; keyName = null; - iterations = 1000000; + iterations = -1; maxInterval = -1; schemaRegistryUrl = "http://localhost:8081"; propertiesFile = null; diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java index d7cf72105d6b..3be3bdb51127 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java @@ -79,33 +79,65 @@ public void populateTopic( valueSerializer ); - for (int i = 0; i < messageCount; i++) { - rateLimiter.ifPresent(RateLimiter::acquire); + if (messageCount != -1) { + for (int i = 0; i < messageCount; i++) { + produceOne( + rowGenerator, + producer, + kafkaTopicName, + maxInterval, + printRows, + rateLimiter + ); + } + } else { + while (true) { + produceOne( + rowGenerator, + producer, + kafkaTopicName, + maxInterval, + printRows, + rateLimiter + ); + } + } - final Pair genericRowPair = rowGenerator.generateRow(); + producer.flush(); + producer.close(); + } - final ProducerRecord producerRecord = new ProducerRecord<>( - kafkaTopicName, - genericRowPair.getLeft(), - genericRowPair.getRight() - ); + private void produceOne( + final RowGenerator rowGenerator, + final KafkaProducer producer, + final String kafkaTopicName, + final long maxInterval, + final boolean printRows, + final Optional rateLimiter + ) { + rateLimiter.ifPresent(RateLimiter::acquire); - producer.send(producerRecord, - new LoggingCallback(kafkaTopicName, - genericRowPair.getLeft(), - genericRowPair.getRight(), - printRows)); + final Pair genericRowPair = rowGenerator.generateRow(); - try { - final long interval = maxInterval < 0 ? INTER_MESSAGE_MAX_INTERVAL : maxInterval; + final ProducerRecord producerRecord = new ProducerRecord<>( + kafkaTopicName, + genericRowPair.getLeft(), + genericRowPair.getRight() + ); - Thread.sleep((long) (interval * Math.random())); - } catch (final InterruptedException e) { - // Ignore the exception. - } + producer.send(producerRecord, + new LoggingCallback(kafkaTopicName, + genericRowPair.getLeft(), + genericRowPair.getRight(), + printRows)); + + try { + final long interval = maxInterval < 0 ? INTER_MESSAGE_MAX_INTERVAL : maxInterval; + + Thread.sleep((long) (interval * Math.random())); + } catch (final InterruptedException e) { + // Ignore the exception. } - producer.flush(); - producer.close(); } private Serializer getKeySerializer() {