Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add backend for topic events view based on offset range #2551

Merged

Conversation

khatibtamal
Copy link
Contributor

Linked issue

Resolves: #1987

What kind of change does this PR introduce?

  • Bug fix
  • New feature
  • Refactor
  • Docs update
  • CI update

What is the current behavior?

Currently it is possible to view only last few selected offsets of partitions.

What is the new behavior?

Backend infrastructure to be able to view topic events based on range.

Other information

This PR will not affect anything in terms of front end usage yet, as a front end PR needs to be created and merged after this.

Requirements (all must be checked before review)

  • The pull request title follows our guidelines
  • Tests for the changes have been added (if relevant)
  • The latest changes from the main branch have been pulled
  • pnpm lint has been run successfully

@@ -72,7 +79,8 @@ public Map<Long, String> readEvents(
Set<TopicPartition> topicPartitionsSet = consumer.assignment();

Set<TopicPartition> partitionsAssignment = new HashSet<>();
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)
|| offsetPosition.equals(RANGE_OFFSET_SELECTION)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of RANGE_OFFSET_SELECTION, we should check if start and end offsets are present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my testing if the url pattern is not respected then there is a 403, thats why I had to change ClusterApiService.java. So it is impossible that these values will not be present at all. Therefore I made changes to ensure that they are not negative.

@@ -18,6 +19,8 @@
public class TopicContentsService {

public static final String CUSTOM_OFFSET_SELECTION = "custom";
public static final String RANGE_OFFSET_SELECTION = "range";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how abt having an enum for these two custom/range ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -23,7 +23,7 @@ public class TopicContentsController {
value =
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}",
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",
Copy link
Contributor

@muralibasani muralibasani Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can offset position be custom or range or id from FE ? so we don't introduce this action in url ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I work on FE I will make appropriate changes so that the FE will give offsetPosition as the two existing ones we have (number and custom) plus it will also give range, if thats what the user selects. I preferred to keep this PR only for BE first, then I can work on FE and make another PR.

As far as changes in url, I was a little confused as to how to approach the problem. In current behavior the offsetPosition is used in two cases

  1. offsetPosition can be a number. in this case we ignore the selectedNumberOfOffsets and selectedPartitionId value in the url. Using the offsetPosition number we seek to beginning of topic and pull all data for all partitions upto the given offsetPosition number.
  2. offsetPosition is the string value custom in this case we use the provided values of selectedNumberOfOffsets and selectedPartitionId to only pull data from the provided partition upto the the number provided in selectedNumberOfOffsets

Therefore in my case I felt I had the following options

  1. Reuse the existing url and try to do the range, in this case I have to figure out how to reuse offsetPosition, selectedNumberOfOffsets and selectedPartitionId. If offsetPosition is used to provide the value range then that leaves me with the other two for start and end of range, therefore the data will be for ALL partitions.
  2. Make some changes to url to accommodate the range.

Would you prefer that we keep the existing url exactly same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about offsetPosition = { number, custom, range }
Existing situation, if it's number, or custom, nothing changes in the url.

If it's range, we need partitionId, start and end offsets.
Looks good the way you have done it.
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/"
+ "rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",

for (ConsumerRecord<String, String> record : consumerRecords) {
eventMap.put(record.offset(), record.value());
if (offsetPosition.equals(RANGE_OFFSET_SELECTION)
&& (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need range offsets start position ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the existing pattern and used the rangeOffsetsStart here as we seek to the start position here.

Inside the loop we check the rangeOffsetsEnd only if the selection is range this ensures that the existing default behavior is completely unchanged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Copy link
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khatibtamal thanks for the pr. I have not tested yet locally, but have a couple of comments. And how should frontend changes should look like ?

@khatibtamal
Copy link
Contributor Author

@khatibtamal thanks for the pr. I have not tested yet locally, but have a couple of comments. And how should frontend changes should look like ?

@muralibasani thanks for the review. I tested this locally using postman, and created TopicContentsControllerIT.java to thoroughly test all the new and legacy cases. As far as front end, I was hoping to first merge the BE and then do the FE in another PR. My understanding of the architecture for this project is that the FE never directly communicates with cluster-api it communicates via core and the change I made in core for the file ClusterApiService.java should ensure that even if this PR is merged, there should be no impact on FE usage and existing behavior.
Do you prefer the FE to be done in the same PR?

@muralibasani
Copy link
Contributor

@khatibtamal thanks for the pr. I have not tested yet locally, but have a couple of comments. And how should frontend changes should look like ?

@muralibasani thanks for the review. I tested this locally using postman, and created TopicContentsControllerIT.java to thoroughly test all the new and legacy cases. As far as front end, I was hoping to first merge the BE and then do the FE in another PR. My understanding of the architecture for this project is that the FE never directly communicates with cluster-api it communicates via core and the change I made in core for the file ClusterApiService.java should ensure that even if this PR is merged, there should be no impact on FE usage and existing behavior. Do you prefer the FE to be done in the same PR?

Hi @khatibtamal it's correct, FE never directly communicates with cluster api.
Let's make the FE changes in a different pr.

Copy link
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments. Looks good.
After the enum changes, will test locally.

@khatibtamal
Copy link
Contributor Author

@muralibasani pushed changes

@muralibasani muralibasani force-pushed the 1987-view-topic-events-range-backend branch from f8cae7d to 44d2450 Compare August 21, 2024 07:22
public class TopicContentsControllerIT {

public static final String CUSTOM_SELECTION = "custom";
public static final String RANGE_SELECTION = "range";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the enum here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

topics = {TopicContentsControllerIT.TEST_TOPIC_NAME})
public class TopicContentsControllerIT {

public static final String CUSTOM_SELECTION = "custom";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the enum here ?


@Test
@Order(13)
void getTopicContentsWhenRangeAndTotalOffsetsLargerThanMax() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful tests, I love them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@khatibtamal
Copy link
Contributor Author

khatibtamal commented Aug 24, 2024

@muralibasani I had to make some changes in ClusterApiController.java in my last commit.

This is because build was failing locally, after investigation I found out it was because of embeddedKafkaBroker which was being initialized in a dynamicPropertySource method, and the broker is not being destroyed after the tests end. This caused a port conflict with newly added tests in TopicContentsControllerIT, as port 9092 was not being shut.

Since EmbeddedKafkaZKBroker is implemented such that the intention is for it to be used as a spring bean (it extends from EmbeddedKafkaBroker which implements DisposableBean and InitializationBean), therefore I removed code for its manual instantiation, and relied on the bean provided by EmbeddedKafka annotation. This way spring will take care of the Initiation and Disposed to ensure the context gets properly created and destroyed for the tests in the class.

Copy link
Contributor

@muralibasani muralibasani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Approved.

@muralibasani muralibasani merged commit 10ef3eb into Aiven-Open:main Aug 26, 2024
29 checks passed
@muralibasani
Copy link
Contributor

@muralibasani I had to make some changes in ClusterApiController.java in my last commit.

This is because build was failing locally, after investigation I found out it was because of embeddedKafkaBroker which was being initialized in a dynamicPropertySource method, and the broker is not being destroyed after the tests end. This caused a port conflict with newly added tests in TopicContentsControllerIT, as port 9092 was not being shut.

Since EmbeddedKafkaZKBroker is implemented such that the intention is for it to be used as a spring bean (it extends from EmbeddedKafkaBroker which implements DisposableBean and InitializationBean), therefore I removed code for its manual instantiation, and relied on the bean provided by EmbeddedKafka annotation. This way spring will take care of the Initiation and Disposed to ensure the context gets properly created and destroyed for the tests in the class.

I assume you mean the changes to ClusterApiControllerIT ?

@khatibtamal
Copy link
Contributor Author

ClusterApiController.java

Yes ClusterApiController.java. Sorry about the first link. Thanks for the review. Should I open another issue with the Front End requirements? Or just re open #1987 ? Thanks.

@muralibasani
Copy link
Contributor

ClusterApiController.java

Yes ClusterApiController.java. Sorry about the first link. Thanks for the review. Should I open another issue with the Front End requirements? Or just re open #1987 ? Thanks.

Yes pls, now you have the full context. Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Option to view topic events based on offset range for a partition
2 participants