-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enhance datagen for use as a load generator #3230
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai -- very cool features!!
private static int parseNumThreads(final String numThreadsString) { | ||
try { | ||
final int result = Integer.valueOf(numThreadsString, 10); | ||
if (result < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (result < 0) { | |
if (result <= 0) { |
And similarly for message rate below.
@@ -122,6 +157,9 @@ private static void usage() { | |||
private final long maxInterval; | |||
private final String schemaRegistryUrl; | |||
private final InputStream propertiesFile; | |||
private final int numThreads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add these into the usage()
string above, so users will know we've added awesome new features :)
|
||
for (final Thread t : threads) { | ||
try { | ||
t.join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this call? I'm having trouble making sense of the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It blocks waiting for the thread to exit. Otherwise, the program will just exit without producing the records that the user asked it to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason producer threads would exit is if they're interrupted, right? So DataGen
will always exit with exit code 1? (Not saying there's anything wrong with this, just clarifying my understanding.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no they could just finish producing the requested number of records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good call. That means this PR also changes the meaning of iterations
from the total number of messages produced to the number of messages produced per thread.
import java.time.Instant; | ||
import java.util.Objects; | ||
|
||
public class TokenBucket { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the Guava RateLimiter
? https://guava.dev/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No good reason. I threw this together without much thought like a year ago to do ksql benchmarking. I agree it's better to just use that.
Resurrecting some ancient enhancements to datagen so that we can use it to generate load: - Add a flag to disable printing each row - Add a flag to control the number of threads producing data - Add a flag to control the total message rate (msgs/second) across all the threads. The rate limiting is implemented using a token bucket.
e4f996c
to
9295204
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woo -- hooray for refactors! LGTM :)
ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java
Outdated
Show resolved
Hide resolved
.put("schemaRegistryUrl", (builder, argVal) -> builder.schemaRegistryUrl = argVal) | ||
.put("propertiesFile", | ||
(builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal)) | ||
(builder, argVal) -> builder.propertiesFile = toFileInputStream(argVal).get()) | ||
.put("msgRate", (builder, argVal) -> builder.msgRate = parseMsgRate(argVal)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you intended to replace this (and parseNumThreads
below) with parseInt
?
b2287fe
to
a434156
Compare
Resurrecting some ancient enhancements to datagen so that we can use it
to generate load:
threads. The rate limiting is implemented using a token bucket.