-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14580: Moving EndToEndLatency from core to tools module #13095
Conversation
Thanks for the PR. Can we add a test in that case? We'd want to verify manually that the test matches the previous behavior. |
Also, I'm currently focused on completing KAFKA-14470. @mimaison since you fleshed out KAFKA-14525, do you have cycles to do these reviews? |
Actually i pinged too soon :) Before getting it reviewed, I would test on my local and also add a couple of tests. Thanks. |
Yes I can review this. I started looking at KAFKA-14525 because we were stepping on each others toes in KAFKA-14470, but we should finish that first. Many of the tests for these commands start full clusters and all that test logic is currently in core. We should be able to move it to server-common but I'm not quite sure if we want to drag many ZooKeeper bits there. |
We already depend on core when it comes to the tools test module, so we don't necessarily have to move things for that. |
hi @mimaison , I added a few basic unit tests and updated the system test needs (end_to_end_latency.py). I haven't set it up locally but I am hoping those should run from here to validate if the changes worked. Thanks! |
Looks like there are some more checkstyle failures. Will fix them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @vamossagar12, thanks for working on this.
I left some comments.
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
I looked at other classes in tool and saw arg4jparse being used in them and assumed that this is the direction that has been chosen. Is that not the case? In that case I can revert to using args. Plz let me know |
@fvaleri , i removed the code changes related to argparse4j. That way the interface of the tool is exactly similar to what it was previously. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments.
In general we should be as close as possible to the original code, to avoid having an impact on correctness and/or performance. Any improvement or big refactoring can be discussed in a separate PR.
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
int numMessages = Integer.parseInt(args[2]); | ||
String acks = args[3]; | ||
int messageSizeBytes = Integer.parseInt(args[4]); | ||
String propertiesFile = args.length > 5 ? args[5] : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not equivalent and there is some missing logic (filter).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that was a miss. Added the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code is using Optional, which is a much better approach, and this also requires some changes further down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I can do that but imo it's not going to cause much difference in terms of readability or other factors. Of course using Optional would help in attain parity with the scala code but that's all we get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make the code much more readable IMO and it's a little change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, please don't replace Option
with null
. The equivalent is Optional
in such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
long begin = System.nanoTime(); | ||
//Send message (of random bytes) synchronously then immediately poll for it | ||
producer.send(new ProducerRecord<>(topic, message)).get(); | ||
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why you are not using the iterator as in the original code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I am not using the iterator is that ConsumerRecord
exposes functions like isEmpty
and count
which seemed easier to understand when used in validate
method. Let me know if you think otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to stick with the original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to keep the original code when it's strictly worse (for example the original code ends up exhausting the iterator to get the size).
We should keep the original code if there isn't a clear improvement from changing and whatever changes we do should be localized - changes that affect many methods, other files, etc. are best avoided if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah in this case I feel using the methods provided by ConsumerRecords
seems cleaner.
We should keep the original code if there isn't a clear improvement from changing and whatever changes we do should be localized - changes that affect many methods, other files, etc. are best avoided if possible.
Ack. Would keep that in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to keep the original code when it's strictly worse (for example the original code ends up exhausting the iterator to get the size).
Fair enough. Thanks.
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
@fvaleri , thanks. I made the changes. |
Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers); | ||
Admin adminClient = Admin.create(adminProps); | ||
NewTopic newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor); | ||
adminClient.createTopics(Collections.singletonList(newTopic)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to remove this line to avoid TopicExistsException
when the test topic does not exist. Once you do that, I'll approve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that was a miss. Removed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Builds on 8 and 17 are ok.
Test failure on 11 is unrelated and it works fine on my machine.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. Thanks for the PR, I left a few minor suggestions.
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
List<TopicPartition> topicPartitions = consumer. | ||
partitionsFor(topic). | ||
stream().map(p -> new TopicPartition(p.topic(), p.partition())) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes we put dots at the end of lines and some other times it's in the front. Can you make it constant in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i made the usage of dots consistent in this block of code. I couldn't find other occurrences of inconsistencies wrt dots. There was one for + when being used for concatenation which i have made consistent.
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
9a89209
to
ad4685d
Compare
No problem! I addressed the comments. Thanks for the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we also need to update EndToEndLatencyService
to poin to the new class: https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/end_to_end_latency.py#L93
Thanks @mimaison . Actually I lack some context here. The class, Were you referring to this line instead? https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/end_to_end_latency.py#L127 |
I simply meant that it looks like some changes in the system tests are required too. |
Got it. Thanks for the confirmation. |
0ad6b3d
to
4ecc30e
Compare
@mimaison , I updated the system test to point to the new class. That one place seemed to be the only one relevant in this case. |
Do you have a test run output that shows it works and run time is similar? You can look at what I did for the JmxTool migration that is also used by STs. I would also suggest to discard the first run of such test, because the test framework needs to start a bunch of containers. |
@fvaleri , @mimaison here's a sample run from my local setup :
I stopped after #5 as there was some flakiness. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @vamossagar12,
In end_to_end_latency.py
there is a reference to a class that doesn't exist: kafka.tools.TestEndToEndLatency
. The class EndToEndLatencyService
contained in that file is used in benchmark_test.py
and test_performance_services.py
.
With benchmark_test
I have 1 failure when running the entire suite (62 tests), but it works if I run the failing test in isolation.
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_long_term_producer_throughput.security_protocol=PLAINTEXT.compression_type=none
With sanity_checks
I have 3 failures, because it also runs with old versions, where we still have the old package name. For more context see #13233.
kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=0.9.0.1
kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=0.9.0.1.new_consumer=False
kafkatest.sanity_checks.test_performance_services.PerformanceServiceTest.test_version.version=1.1.1.new_consumer=False
Thanks @fvaleri for pointing me to the fix you made for JMXTool issue. I made the changes to also look into version when choosing the EndToEndLatency class. I did point out the usage of TestEndToEndLatency here but I don't have context here. Also, when I try to run
|
d237d48
to
4e905a5
Compare
@fvaleri , I fixed the above error. Some of the containers had died because of which the error occurred. I have a clean run of the system test now:
cc @mimaison |
Tests passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGMT. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. It looks good overall, I left a couple of small comments
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
Outdated
Show resolved
Hide resolved
…ndToEndLatency.java in benchmark_test.py
Thanks @mimaison . I addressed the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! Should we also have a golden path test?
Thanks Michael. I added a happy path testcase. |
Tests have passed for one of the builds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR
Move EndToEndLatency to tools