Skip to content

Commit

Permalink
Updating FE, BE with cache enabled calls for sync topics
Browse files Browse the repository at this point in the history
Signed-off-by: muralibasani <[email protected]>
  • Loading branch information
muralibasani committed Oct 13, 2023
1 parent dc715fd commit a565f29
Show file tree
Hide file tree
Showing 25 changed files with 283 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io.aiven.klaw.clusterapi.models.ApiResponse;
import io.aiven.klaw.clusterapi.models.ClusterAclRequest;
import io.aiven.klaw.clusterapi.models.ClusterTopicRequest;
import io.aiven.klaw.clusterapi.models.LoadTopicsResponse;
import io.aiven.klaw.clusterapi.models.ServiceAccountDetails;
import io.aiven.klaw.clusterapi.models.TopicConfig;
import io.aiven.klaw.clusterapi.models.enums.AclType;
import io.aiven.klaw.clusterapi.models.enums.AclsNativeType;
import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus;
Expand Down Expand Up @@ -85,21 +85,22 @@ public ResponseEntity<ClusterStatus> getStatus(
"/getTopics/{bootstrapServers}/{protocol}/{clusterName}/topicsNativeType/{aclsNativeType}/resetCache/{resetCache}",
method = RequestMethod.GET,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Set<TopicConfig>> getTopics(
public ResponseEntity<LoadTopicsResponse> getTopics(
@PathVariable String bootstrapServers,
@Valid @PathVariable KafkaSupportedProtocol protocol,
@PathVariable String clusterName,
@PathVariable String aclsNativeType,
@PathVariable boolean resetCache)
throws Exception {
Set<TopicConfig> topics;
LoadTopicsResponse loadTopicsResponse;
if (AclsNativeType.CONFLUENT_CLOUD.name().equals(aclsNativeType)) {
topics = confluentCloudApiService.listTopics(bootstrapServers, protocol, clusterName);
loadTopicsResponse =
confluentCloudApiService.listTopics(bootstrapServers, protocol, clusterName);
} else {
topics =
loadTopicsResponse =
apacheKafkaTopicService.loadTopics(bootstrapServers, protocol, clusterName, resetCache);
}
return new ResponseEntity<>(topics, HttpStatus.OK);
return new ResponseEntity<>(loadTopicsResponse, HttpStatus.OK);
}

@RequestMapping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SchemaInfoCacheKeySet {
public class ClusterKeyIdentifier {
private String bootstrapServers;
private KafkaSupportedProtocol protocol;
private String clusterIdentification;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.aiven.klaw.clusterapi.models;

import jakarta.validation.constraints.NotNull;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class LoadTopicsResponse {
@NotNull private boolean loadingInProgress;

private Set<TopicConfig> topicConfigSet;
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.aiven.klaw.clusterapi.services;

import io.aiven.klaw.clusterapi.models.ApiResponse;
import io.aiven.klaw.clusterapi.models.ClusterKeyIdentifier;
import io.aiven.klaw.clusterapi.models.ClusterTopicRequest;
import io.aiven.klaw.clusterapi.models.SchemaInfoCacheKeySet;
import io.aiven.klaw.clusterapi.models.LoadTopicsResponse;
import io.aiven.klaw.clusterapi.models.TopicConfig;
import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus;
import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol;
Expand Down Expand Up @@ -46,16 +47,18 @@ public class ApacheKafkaTopicService {

private final SchemaService schemaService;

private static Map<SchemaInfoCacheKeySet, Set<TopicConfig>> cachedTopics = new HashMap<>();
private static Map<ClusterKeyIdentifier, Set<TopicConfig>> cachedTopics = new HashMap<>();

private static Set<SchemaInfoCacheKeySet> topicCacheKeySets = new HashSet<>();
private static Set<ClusterKeyIdentifier> topicCacheKeySets = new HashSet<>();

private static Map<ClusterKeyIdentifier, Boolean> topicsLoadingStatusOfClusters = new HashMap<>();

public ApacheKafkaTopicService(ClusterApiUtils clusterApiUtils, SchemaService schemaService) {
this.clusterApiUtils = clusterApiUtils;
this.schemaService = schemaService;
}

public synchronized Set<TopicConfig> loadTopics(
public synchronized LoadTopicsResponse loadTopics(
String environment,
KafkaSupportedProtocol protocol,
String clusterIdentification,
Expand All @@ -69,24 +72,37 @@ public synchronized Set<TopicConfig> loadTopics(
throw new Exception("Cannot connect to cluster.");
}

SchemaInfoCacheKeySet schemaInfoCacheKeySet =
new SchemaInfoCacheKeySet(environment, protocol, clusterIdentification);
ClusterKeyIdentifier clusterKeyIdentifier =
new ClusterKeyIdentifier(environment, protocol, clusterIdentification);

if (resetCache) {
loadTopicsForCache(client, topics, schemaInfoCacheKeySet);
boolean topicsLoadingStatus = false;
if (topicsLoadingStatusOfClusters.containsKey(clusterKeyIdentifier)) {
topicsLoadingStatus = topicsLoadingStatusOfClusters.get(clusterKeyIdentifier);
} else {
if (cachedTopics.containsKey(schemaInfoCacheKeySet)) {
return cachedTopics.get(schemaInfoCacheKeySet);
} else {
loadTopicsForCache(client, topics, schemaInfoCacheKeySet);
}
topicsLoadingStatusOfClusters.put(clusterKeyIdentifier, false);
}

if (topicsLoadingStatus) {
return LoadTopicsResponse.builder()
.loadingInProgress(true)
.topicConfigSet(new HashSet<>())
.build();
} else {
topicsLoadingStatusOfClusters.put(clusterKeyIdentifier, true);
}

return topics;
if (resetCache || !cachedTopics.containsKey(clusterKeyIdentifier)) {
loadTopicsForCache(client, topics, clusterKeyIdentifier);
} else {
topics = cachedTopics.get(clusterKeyIdentifier);
}

topicsLoadingStatusOfClusters.put(clusterKeyIdentifier, false);
return LoadTopicsResponse.builder().loadingInProgress(false).topicConfigSet(topics).build();
}

private void loadTopicsForCache(
AdminClient client, Set<TopicConfig> topics, SchemaInfoCacheKeySet schemaInfoCacheKeySet) {
AdminClient client, Set<TopicConfig> topics, ClusterKeyIdentifier clusterKeyIdentifier) {
try {
Map<String, TopicDescription> topicDescriptionsPerAdminClient =
loadTopicDescriptionsMap(client);
Expand All @@ -107,17 +123,16 @@ private void loadTopicsForCache(
topicConfig.setPartitions("" + topicDescription.partitions().size());
topics.add(topicConfig);
}

updateCache(clusterKeyIdentifier, topics);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Exception:", e);
}
updateCache(schemaInfoCacheKeySet, topics);
}

private void updateCache(
SchemaInfoCacheKeySet schemaInfoCacheKeySet, Set<TopicConfig> topicConfigSet) {
cachedTopics.put(schemaInfoCacheKeySet, topicConfigSet);
topicCacheKeySets.add(schemaInfoCacheKeySet);
ClusterKeyIdentifier clusterKeyIdentifier, Set<TopicConfig> topicConfigSet) {
cachedTopics.put(clusterKeyIdentifier, topicConfigSet);
topicCacheKeySets.add(clusterKeyIdentifier);
}

private Map<String, TopicDescription> loadTopicDescriptionsMap(AdminClient client)
Expand Down Expand Up @@ -323,16 +338,16 @@ public synchronized ApiResponse deleteTopic(ClusterTopicRequest clusterTopicRequ
zone = "${klaw.topics.cron.expression.timezone:UTC}")
public void resetTopicsCacheScheduler() {
topicCacheKeySets.forEach(
schemaInfoCacheKeySet -> {
clusterKeyIdentifier -> {
try {
log.info("Loading topics {}", schemaInfoCacheKeySet);
log.info("Loading topics {}", clusterKeyIdentifier);
loadTopics(
schemaInfoCacheKeySet.getBootstrapServers(),
schemaInfoCacheKeySet.getProtocol(),
schemaInfoCacheKeySet.getClusterIdentification(),
clusterKeyIdentifier.getBootstrapServers(),
clusterKeyIdentifier.getProtocol(),
clusterKeyIdentifier.getClusterIdentification(),
true);
} catch (Exception e) {
log.error("Error while loading topics {}", schemaInfoCacheKeySet);
log.error("Error while loading topics {}", clusterKeyIdentifier);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.aiven.klaw.clusterapi.models.ApiResponse;
import io.aiven.klaw.clusterapi.models.ClusterAclRequest;
import io.aiven.klaw.clusterapi.models.ClusterTopicRequest;
import io.aiven.klaw.clusterapi.models.LoadTopicsResponse;
import io.aiven.klaw.clusterapi.models.TopicConfig;
import io.aiven.klaw.clusterapi.models.confluentcloud.AclObject;
import io.aiven.klaw.clusterapi.models.confluentcloud.Config;
Expand Down Expand Up @@ -57,12 +58,14 @@ public class ConfluentCloudApiService {
private final Environment env;
final ClusterApiUtils clusterApiUtils;

private static boolean topicsLoadingStatus;

public ConfluentCloudApiService(Environment env, ClusterApiUtils clusterApiUtils) {
this.env = env;
this.clusterApiUtils = clusterApiUtils;
}

public Set<TopicConfig> listTopics(
public LoadTopicsResponse listTopics(
String restApiHost, KafkaSupportedProtocol protocol, String clusterIdentification)
throws Exception {
RestTemplate restTemplate = getRestTemplate();
Expand All @@ -83,7 +86,10 @@ public Set<TopicConfig> listTopics(

List<TopicConfig> topicsListUpdated = processListTopicsResponse(responseEntity);

return new HashSet<>(topicsListUpdated);
return LoadTopicsResponse.builder()
.loadingInProgress(topicsLoadingStatus)
.topicConfigSet(new HashSet<>(topicsListUpdated))
.build();
} catch (RestClientException e) {
log.error("Exception:", e);
throw new Exception("Error in listing topics : " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.aiven.klaw.clusterapi.services;

import io.aiven.klaw.clusterapi.models.ApiResponse;
import io.aiven.klaw.clusterapi.models.ClusterKeyIdentifier;
import io.aiven.klaw.clusterapi.models.ClusterSchemaRequest;
import io.aiven.klaw.clusterapi.models.ClusterTopicRequest;
import io.aiven.klaw.clusterapi.models.RegisterSchemaCustomResponse;
import io.aiven.klaw.clusterapi.models.RegisterSchemaResponse;
import io.aiven.klaw.clusterapi.models.SchemaCompatibilityCheckResponse;
import io.aiven.klaw.clusterapi.models.SchemaInfoCacheKeySet;
import io.aiven.klaw.clusterapi.models.SchemaInfoOfTopic;
import io.aiven.klaw.clusterapi.models.SchemasInfoOfClusterResponse;
import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus;
Expand Down Expand Up @@ -60,7 +60,7 @@ public class SchemaService {
private static Map<String, SchemasInfoOfClusterResponse> schemasInfoOfClusterResponseMap =
new HashMap<>();

private static Map<String, SchemaInfoCacheKeySet> schemasInfoCacheKeySetMap = new HashMap<>();
private static Map<String, ClusterKeyIdentifier> schemasInfoCacheKeySetMap = new HashMap<>();

public static final String SCHEMA_REGISTRY_CONTENT_TYPE =
"application/vnd.schemaregistry.v1+json";
Expand Down Expand Up @@ -703,11 +703,11 @@ private static void updateCache(
String schemasVersionsStorageKey,
SchemasInfoOfClusterResponse schemasInfoOfClusterResponse) {
schemasInfoOfClusterResponseMap.put(schemasVersionsStorageKey, schemasInfoOfClusterResponse);
SchemaInfoCacheKeySet schemaInfoCacheKeySet = new SchemaInfoCacheKeySet();
schemaInfoCacheKeySet.setProtocol(protocol);
schemaInfoCacheKeySet.setClusterIdentification(clusterIdentification);
schemaInfoCacheKeySet.setBootstrapServers(bootstrapServers);
schemasInfoCacheKeySetMap.put(schemasVersionsStorageKey, schemaInfoCacheKeySet);
ClusterKeyIdentifier clusterKeyIdentifier = new ClusterKeyIdentifier();
clusterKeyIdentifier.setProtocol(protocol);
clusterKeyIdentifier.setClusterIdentification(clusterIdentification);
clusterKeyIdentifier.setBootstrapServers(bootstrapServers);
schemasInfoCacheKeySetMap.put(schemasVersionsStorageKey, clusterKeyIdentifier);
}

private SchemasInfoOfClusterResponse handleInterimUpdatesOnSchemas(
Expand Down Expand Up @@ -769,13 +769,13 @@ private void updateSchemaCache(
zone = "${klaw.schemainfo.cron.expression.timezone:UTC}")
public void resetSchemaCacheScheduler() {
for (String schemasVersionsStorageKey : schemasInfoCacheKeySetMap.keySet()) {
SchemaInfoCacheKeySet schemaInfoCacheKeySet =
ClusterKeyIdentifier clusterKeyIdentifier =
schemasInfoCacheKeySetMap.get(schemasVersionsStorageKey);
schemasInfoOfClusterResponseMap.remove(schemasVersionsStorageKey);
loadAllSchemasInfoFromCluster(
schemaInfoCacheKeySet.getBootstrapServers(),
schemaInfoCacheKeySet.getProtocol(),
schemaInfoCacheKeySet.getClusterIdentification(),
clusterKeyIdentifier.getBootstrapServers(),
clusterKeyIdentifier.getProtocol(),
clusterKeyIdentifier.getClusterIdentification(),
false,
SchemaCacheUpdateType.NONE,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ private ClusterStatus getStatusKafka(
String environment, KafkaSupportedProtocol protocol, String clusterName, String kafkaFlavor) {
try {
if (kafkaFlavor != null && kafkaFlavor.equals("Confluent Cloud")) {
if (confluentCloudApiService.listTopics(environment, protocol, clusterName).size() >= 0)
return ClusterStatus.ONLINE;
confluentCloudApiService.listTopics(environment, protocol, clusterName);
return ClusterStatus.ONLINE;
} else {
AdminClient client = clusterApiUtils.getAdminClient(environment, protocol, clusterName);
if (client != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.CommonClientConfigs;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class ClusterApiUtils {
public static final String KAFKA_SR_CREDENTIALS_PROPERTY_SFX = ".klaw.schemaregistry.credentials";
public static final String KAFKA_CONFLUENT_CLOUD_CREDENTIALS_PROPERTY_SFX =
".klaw.confluentcloud.credentials";
private static final long TIME_OUT_SECS_FOR_TOPICS = 5;
private static MessageDigest messageDigest;

static {
Expand Down Expand Up @@ -181,7 +183,8 @@ public AdminClient getAdminClient(
}

try {
adminClient.listTopics().names().get();

adminClient.listTopics().names().get(TIME_OUT_SECS_FOR_TOPICS, TimeUnit.SECONDS);
if (!adminClientsMap.containsKey(adminClientKey)) {
adminClientsMap.put(adminClientKey, adminClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.aiven.klaw.clusterapi.models.LoadTopicsResponse;
import io.aiven.klaw.clusterapi.models.TopicConfig;
import io.aiven.klaw.clusterapi.models.confluentcloud.ListAclsResponse;
import io.aiven.klaw.clusterapi.models.confluentcloud.ListTopicsResponse;
import io.aiven.klaw.clusterapi.models.enums.ClusterStatus;
Expand Down Expand Up @@ -148,13 +150,18 @@ public void getTopics() throws Exception {
.andReturn()
.getResponse();

Set<Map<String, String>> listTopicsSet =
LoadTopicsResponse loadTopicsResponse =
OBJECT_MAPPER.readValue(response.getContentAsString(), new TypeReference<>() {});
assertThat(listTopicsSet).hasSize(2); // two topics
assertThat(listTopicsSet.stream().toList().get(0))
.hasSize(3); // topicName, partitions, replication factor
assertThat(listTopicsSet.stream().toList().get(0))
.containsKeys("topicName", "partitions", "replicationFactor");
assertThat(loadTopicsResponse.getTopicConfigSet()).hasSize(2); // two topics
assertThat(loadTopicsResponse.getTopicConfigSet())
.extracting(TopicConfig::getTopicName)
.containsExactlyInAnyOrder("testtopic1", "testtopic2");
assertThat(loadTopicsResponse.getTopicConfigSet())
.extracting(TopicConfig::getPartitions)
.containsExactlyInAnyOrder("2", "4");
assertThat(loadTopicsResponse.getTopicConfigSet())
.extracting(TopicConfig::getReplicationFactor)
.containsExactlyInAnyOrder("2", "3");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.aiven.klaw.clusterapi.models.ClusterAclRequest;
import io.aiven.klaw.clusterapi.models.ClusterSchemaRequest;
import io.aiven.klaw.clusterapi.models.ClusterTopicRequest;
import io.aiven.klaw.clusterapi.models.LoadTopicsResponse;
import io.aiven.klaw.clusterapi.models.TopicConfig;
import io.aiven.klaw.clusterapi.models.confluentcloud.AclObject;
import io.aiven.klaw.clusterapi.models.confluentcloud.ListAclsResponse;
Expand Down Expand Up @@ -65,15 +66,15 @@ public Set<Map<String, String>> getAcls() {
return aclsSet;
}

public Set<TopicConfig> getTopics() {
public LoadTopicsResponse getTopics() {
Set<TopicConfig> topicsSet = new HashSet<>();
TopicConfig hashMap = new TopicConfig();
hashMap.setTopicName("testtopic1");

hashMap.setPartitions("2");
hashMap.setReplicationFactor("1");
topicsSet.add(hashMap);
return topicsSet;
return LoadTopicsResponse.builder().loadingInProgress(false).topicConfigSet(topicsSet).build();
}

public MultiValueMap<String, String> getMappedValuesTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void getTopics() throws Exception {
mvc.perform(get(urlTemplate))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$", hasSize(1)));
.andExpect(jsonPath("$.topicConfigSet", hasSize(1)));
}

@Test
Expand Down
Loading

0 comments on commit a565f29

Please sign in to comment.