Skip to content

Commit

Permalink
Feature: Add backend for topic events view based on offset range (#2551)
Browse files Browse the repository at this point in the history
* Feature: Add backend for topic events view based on offset range

Signed-off-by: khatib tamal <[email protected]>

* revert accidental change

Signed-off-by: khatib tamal <[email protected]>

* removed slf4j from TopicContentsControllerIT

Signed-off-by: khatib tamal <[email protected]>

* code review changes

Signed-off-by: khatib tamal <[email protected]>

* ran mvn spotless

Signed-off-by: khatib tamal <[email protected]>

* code review changes and ClusterApiControllerIT improvement

Signed-off-by: Khatib Tamal <[email protected]>

---------

Signed-off-by: khatib tamal <[email protected]>
Signed-off-by: Khatib Tamal <[email protected]>
  • Loading branch information
khatibtamal authored Aug 26, 2024
1 parent 019b916 commit 10ef3eb
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class TopicContentsController {
value =
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}",
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",
method = RequestMethod.GET,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Map<Long, String>> getTopicContents(
Expand All @@ -34,7 +34,9 @@ public ResponseEntity<Map<Long, String>> getTopicContents(
@PathVariable String offsetPosition,
@PathVariable Integer selectedPartitionId,
@PathVariable Integer selectedNumberOfOffsets,
@PathVariable String clusterIdentification) {
@PathVariable String clusterIdentification,
@PathVariable Integer rangeOffsetsStart,
@PathVariable Integer rangeOffsetsEnd) {
Map<Long, String> events =
topicContentsService.readEvents(
bootstrapServers,
Expand All @@ -44,6 +46,8 @@ public ResponseEntity<Map<Long, String>> getTopicContents(
offsetPosition,
selectedPartitionId,
selectedNumberOfOffsets,
rangeOffsetsStart,
rangeOffsetsEnd,
"OFFSET_ID",
clusterIdentification);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.aiven.klaw.clusterapi.models.enums;

import lombok.Getter;

@Getter
public enum TopicContentType {
CUSTOM("custom"),
RANGE("range");

private final String value;

TopicContentType(String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.aiven.klaw.clusterapi.services;

import io.aiven.klaw.clusterapi.models.enums.TopicContentType;
import io.aiven.klaw.clusterapi.utils.ClusterApiUtils;
import java.time.Duration;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -17,7 +19,7 @@
@Service
public class TopicContentsService {

public static final String CUSTOM_OFFSET_SELECTION = "custom";
public static final int RANGE_MAX_RECORDS = 100;
public static final int NUMBER_OF_POLLS = 3;
final ClusterApiUtils clusterApiUtils;

Expand All @@ -39,12 +41,14 @@ public Map<Long, String> readEvents(
String offsetPosition,
Integer selectedPartitionId,
Integer selectedNumberOfOffsets,
Integer rangeOffsetsStart,
Integer rangeOffsetsEnd,
String readMessagesType,
String clusterIdentification) {
log.debug(
"readEvents bootStrapServers {}, protocol {}, consumerGroupId {},"
+ " topicName {}, offsetPosition {}, readMessagesType {} clusterIdentification {} selectedPartitionId {}"
+ " selectedNumberOfOffsets {}",
+ " selectedNumberOfOffsets {} rangeOffsetsStart {} rangeOffsetsEnd {}",
bootStrapServers,
protocol,
consumerGroupId,
Expand All @@ -53,11 +57,18 @@ public Map<Long, String> readEvents(
readMessagesType,
clusterIdentification,
selectedPartitionId,
selectedNumberOfOffsets);
selectedNumberOfOffsets,
rangeOffsetsStart,
rangeOffsetsEnd);

Map<Long, String> eventMap = new TreeMap<>();
KafkaConsumer<String, String> consumer;

if (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) {
return eventMap;
}

if (consumerGroupId.equals("notdefined")) {
consumer =
getKafkaConsumer(
Expand All @@ -72,7 +83,8 @@ public Map<Long, String> readEvents(
Set<TopicPartition> topicPartitionsSet = consumer.assignment();

Set<TopicPartition> partitionsAssignment = new HashSet<>();
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())
|| offsetPosition.equals(TopicContentType.RANGE.getValue())) {
for (TopicPartition tp : topicPartitionsSet) {
if (tp.partition() == selectedPartitionId) {
partitionsAssignment = Collections.singleton(tp);
Expand All @@ -83,7 +95,10 @@ public Map<Long, String> readEvents(
partitionsAssignment = topicPartitionsSet;
}

if (partitionsAssignment.isEmpty()) {
if (partitionsAssignment.isEmpty()
|| (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& rangeOffsetsStart > rangeOffsetsEnd)) {
consumer.close();
return eventMap;
}
consumer.seekToBeginning(partitionsAssignment);
Expand All @@ -93,8 +108,10 @@ public Map<Long, String> readEvents(
for (TopicPartition tp : partitionsAssignment) {
long beginningOffset = consumer.position(tp);
long endOffset = endOffsets.get(tp);
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())) {
newOffset = endOffset - selectedNumberOfOffsets;
} else if (offsetPosition.equals(TopicContentType.RANGE.getValue())) {
newOffset = rangeOffsetsStart;
} else {
newOffset = endOffset - Integer.parseInt(offsetPosition);
}
Expand All @@ -107,11 +124,19 @@ public Map<Long, String> readEvents(
}

int i = 0;
boolean exitLoop = false;
do {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
consumerRecords.forEach(record -> eventMap.put(record.offset(), record.value()));
for (ConsumerRecord<String, String> record : consumerRecords) {
eventMap.put(record.offset(), record.value());
if (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) {
exitLoop = true;
break;
}
}
i++;
} while (i != NUMBER_OF_POLLS);
} while (i != NUMBER_OF_POLLS && !exitLoop);

consumer.commitAsync();
consumer.close();
Expand Down
Loading

0 comments on commit 10ef3eb

Please sign in to comment.