Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/npm_and_yarn/coral/proxy/nodemon-…
Browse files Browse the repository at this point in the history
…3.1.4
  • Loading branch information
programmiri authored Aug 27, 2024
2 parents 14ae3f9 + 1280f8a commit 9f9c066
Show file tree
Hide file tree
Showing 12 changed files with 1,402 additions and 747 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class TopicContentsController {
value =
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}",
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",
method = RequestMethod.GET,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Map<Long, String>> getTopicContents(
Expand All @@ -34,7 +34,9 @@ public ResponseEntity<Map<Long, String>> getTopicContents(
@PathVariable String offsetPosition,
@PathVariable Integer selectedPartitionId,
@PathVariable Integer selectedNumberOfOffsets,
@PathVariable String clusterIdentification) {
@PathVariable String clusterIdentification,
@PathVariable Integer rangeOffsetsStart,
@PathVariable Integer rangeOffsetsEnd) {
Map<Long, String> events =
topicContentsService.readEvents(
bootstrapServers,
Expand All @@ -44,6 +46,8 @@ public ResponseEntity<Map<Long, String>> getTopicContents(
offsetPosition,
selectedPartitionId,
selectedNumberOfOffsets,
rangeOffsetsStart,
rangeOffsetsEnd,
"OFFSET_ID",
clusterIdentification);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.aiven.klaw.clusterapi.models.enums;

import lombok.Getter;

@Getter
public enum TopicContentType {
CUSTOM("custom"),
RANGE("range");

private final String value;

TopicContentType(String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.aiven.klaw.clusterapi.services;

import io.aiven.klaw.clusterapi.models.enums.TopicContentType;
import io.aiven.klaw.clusterapi.utils.ClusterApiUtils;
import java.time.Duration;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -17,7 +19,7 @@
@Service
public class TopicContentsService {

public static final String CUSTOM_OFFSET_SELECTION = "custom";
public static final int RANGE_MAX_RECORDS = 100;
public static final int NUMBER_OF_POLLS = 3;
final ClusterApiUtils clusterApiUtils;

Expand All @@ -39,12 +41,14 @@ public Map<Long, String> readEvents(
String offsetPosition,
Integer selectedPartitionId,
Integer selectedNumberOfOffsets,
Integer rangeOffsetsStart,
Integer rangeOffsetsEnd,
String readMessagesType,
String clusterIdentification) {
log.debug(
"readEvents bootStrapServers {}, protocol {}, consumerGroupId {},"
+ " topicName {}, offsetPosition {}, readMessagesType {} clusterIdentification {} selectedPartitionId {}"
+ " selectedNumberOfOffsets {}",
+ " selectedNumberOfOffsets {} rangeOffsetsStart {} rangeOffsetsEnd {}",
bootStrapServers,
protocol,
consumerGroupId,
Expand All @@ -53,11 +57,18 @@ public Map<Long, String> readEvents(
readMessagesType,
clusterIdentification,
selectedPartitionId,
selectedNumberOfOffsets);
selectedNumberOfOffsets,
rangeOffsetsStart,
rangeOffsetsEnd);

Map<Long, String> eventMap = new TreeMap<>();
KafkaConsumer<String, String> consumer;

if (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& (rangeOffsetsStart < 0 || rangeOffsetsEnd < 0)) {
return eventMap;
}

if (consumerGroupId.equals("notdefined")) {
consumer =
getKafkaConsumer(
Expand All @@ -72,7 +83,8 @@ public Map<Long, String> readEvents(
Set<TopicPartition> topicPartitionsSet = consumer.assignment();

Set<TopicPartition> partitionsAssignment = new HashSet<>();
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())
|| offsetPosition.equals(TopicContentType.RANGE.getValue())) {
for (TopicPartition tp : topicPartitionsSet) {
if (tp.partition() == selectedPartitionId) {
partitionsAssignment = Collections.singleton(tp);
Expand All @@ -83,7 +95,10 @@ public Map<Long, String> readEvents(
partitionsAssignment = topicPartitionsSet;
}

if (partitionsAssignment.isEmpty()) {
if (partitionsAssignment.isEmpty()
|| (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& rangeOffsetsStart > rangeOffsetsEnd)) {
consumer.close();
return eventMap;
}
consumer.seekToBeginning(partitionsAssignment);
Expand All @@ -93,8 +108,10 @@ public Map<Long, String> readEvents(
for (TopicPartition tp : partitionsAssignment) {
long beginningOffset = consumer.position(tp);
long endOffset = endOffsets.get(tp);
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) {
if (offsetPosition.equals(TopicContentType.CUSTOM.getValue())) {
newOffset = endOffset - selectedNumberOfOffsets;
} else if (offsetPosition.equals(TopicContentType.RANGE.getValue())) {
newOffset = rangeOffsetsStart;
} else {
newOffset = endOffset - Integer.parseInt(offsetPosition);
}
Expand All @@ -107,11 +124,19 @@ public Map<Long, String> readEvents(
}

int i = 0;
boolean exitLoop = false;
do {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
consumerRecords.forEach(record -> eventMap.put(record.offset(), record.value()));
for (ConsumerRecord<String, String> record : consumerRecords) {
eventMap.put(record.offset(), record.value());
if (offsetPosition.equals(TopicContentType.RANGE.getValue())
&& (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) {
exitLoop = true;
break;
}
}
i++;
} while (i != NUMBER_OF_POLLS);
} while (i != NUMBER_OF_POLLS && !exitLoop);

consumer.commitAsync();
consumer.close();
Expand Down
Loading

0 comments on commit 9f9c066

Please sign in to comment.