diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/ClusterApiController.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/ClusterApiController.java index bf45cd3d51..f9da66ce4f 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/ClusterApiController.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/controller/ClusterApiController.java @@ -3,8 +3,8 @@ import io.aiven.klaw.clusterapi.models.ApiResponse; import io.aiven.klaw.clusterapi.models.ClusterAclRequest; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; import io.aiven.klaw.clusterapi.models.ServiceAccountDetails; -import io.aiven.klaw.clusterapi.models.TopicConfig; import io.aiven.klaw.clusterapi.models.enums.AclType; import io.aiven.klaw.clusterapi.models.enums.AclsNativeType; import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus; @@ -85,21 +85,22 @@ public ResponseEntity getStatus( "/getTopics/{bootstrapServers}/{protocol}/{clusterName}/topicsNativeType/{aclsNativeType}/resetCache/{resetCache}", method = RequestMethod.GET, produces = {MediaType.APPLICATION_JSON_VALUE}) - public ResponseEntity> getTopics( + public ResponseEntity getTopics( @PathVariable String bootstrapServers, @Valid @PathVariable KafkaSupportedProtocol protocol, @PathVariable String clusterName, @PathVariable String aclsNativeType, @PathVariable boolean resetCache) throws Exception { - Set topics; + LoadTopicsResponse loadTopicsResponse; if (AclsNativeType.CONFLUENT_CLOUD.name().equals(aclsNativeType)) { - topics = confluentCloudApiService.listTopics(bootstrapServers, protocol, clusterName); + loadTopicsResponse = + confluentCloudApiService.listTopics(bootstrapServers, protocol, clusterName); } else { - topics = + loadTopicsResponse = apacheKafkaTopicService.loadTopics(bootstrapServers, protocol, clusterName, resetCache); } - return new ResponseEntity<>(topics, HttpStatus.OK); + return new ResponseEntity<>(loadTopicsResponse, HttpStatus.OK); } @RequestMapping( diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/SchemaInfoCacheKeySet.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/ClusterKeyIdentifier.java similarity index 90% rename from cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/SchemaInfoCacheKeySet.java rename to cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/ClusterKeyIdentifier.java index b0d0deb2fd..2aaa30cf52 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/SchemaInfoCacheKeySet.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/ClusterKeyIdentifier.java @@ -8,7 +8,7 @@ @Data @AllArgsConstructor @NoArgsConstructor -public class SchemaInfoCacheKeySet { +public class ClusterKeyIdentifier { private String bootstrapServers; private KafkaSupportedProtocol protocol; private String clusterIdentification; diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/LoadTopicsResponse.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/LoadTopicsResponse.java new file mode 100644 index 0000000000..c745847c11 --- /dev/null +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/models/LoadTopicsResponse.java @@ -0,0 +1,18 @@ +package io.aiven.klaw.clusterapi.models; + +import jakarta.validation.constraints.NotNull; +import java.util.Set; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Builder +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class LoadTopicsResponse { + @NotNull private boolean loadingInProgress; + + private Set topicConfigSet; +} diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicService.java index 1043a5b24a..8e7c0ad1cc 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicService.java @@ -1,8 +1,9 @@ package io.aiven.klaw.clusterapi.services; import io.aiven.klaw.clusterapi.models.ApiResponse; +import io.aiven.klaw.clusterapi.models.ClusterKeyIdentifier; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; -import io.aiven.klaw.clusterapi.models.SchemaInfoCacheKeySet; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; import io.aiven.klaw.clusterapi.models.TopicConfig; import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus; import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol; @@ -46,16 +47,18 @@ public class ApacheKafkaTopicService { private final SchemaService schemaService; - private static Map> cachedTopics = new HashMap<>(); + private static Map> cachedTopics = new HashMap<>(); - private static Set topicCacheKeySets = new HashSet<>(); + private static Set topicCacheKeySets = new HashSet<>(); + + private static Map topicsLoadingStatusOfClusters = new HashMap<>(); public ApacheKafkaTopicService(ClusterApiUtils clusterApiUtils, SchemaService schemaService) { this.clusterApiUtils = clusterApiUtils; this.schemaService = schemaService; } - public synchronized Set loadTopics( + public synchronized LoadTopicsResponse loadTopics( String environment, KafkaSupportedProtocol protocol, String clusterIdentification, @@ -69,24 +72,37 @@ public synchronized Set loadTopics( throw new Exception("Cannot connect to cluster."); } - SchemaInfoCacheKeySet schemaInfoCacheKeySet = - new SchemaInfoCacheKeySet(environment, protocol, clusterIdentification); + ClusterKeyIdentifier clusterKeyIdentifier = + new ClusterKeyIdentifier(environment, protocol, clusterIdentification); - if (resetCache) { - loadTopicsForCache(client, topics, schemaInfoCacheKeySet); + boolean topicsLoadingStatus = false; + if (topicsLoadingStatusOfClusters.containsKey(clusterKeyIdentifier)) { + topicsLoadingStatus = topicsLoadingStatusOfClusters.get(clusterKeyIdentifier); } else { - if (cachedTopics.containsKey(schemaInfoCacheKeySet)) { - return cachedTopics.get(schemaInfoCacheKeySet); - } else { - loadTopicsForCache(client, topics, schemaInfoCacheKeySet); - } + topicsLoadingStatusOfClusters.put(clusterKeyIdentifier, false); + } + + if (topicsLoadingStatus) { + return LoadTopicsResponse.builder() + .loadingInProgress(true) + .topicConfigSet(new HashSet<>()) + .build(); + } else { + topicsLoadingStatusOfClusters.put(clusterKeyIdentifier, true); } - return topics; + if (resetCache || !cachedTopics.containsKey(clusterKeyIdentifier)) { + loadTopicsForCache(client, topics, clusterKeyIdentifier); + } else { + topics = cachedTopics.get(clusterKeyIdentifier); + } + + topicsLoadingStatusOfClusters.put(clusterKeyIdentifier, false); + return LoadTopicsResponse.builder().loadingInProgress(false).topicConfigSet(topics).build(); } private void loadTopicsForCache( - AdminClient client, Set topics, SchemaInfoCacheKeySet schemaInfoCacheKeySet) { + AdminClient client, Set topics, ClusterKeyIdentifier clusterKeyIdentifier) { try { Map topicDescriptionsPerAdminClient = loadTopicDescriptionsMap(client); @@ -107,17 +123,16 @@ private void loadTopicsForCache( topicConfig.setPartitions("" + topicDescription.partitions().size()); topics.add(topicConfig); } - + updateCache(clusterKeyIdentifier, topics); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Exception:", e); } - updateCache(schemaInfoCacheKeySet, topics); } private void updateCache( - SchemaInfoCacheKeySet schemaInfoCacheKeySet, Set topicConfigSet) { - cachedTopics.put(schemaInfoCacheKeySet, topicConfigSet); - topicCacheKeySets.add(schemaInfoCacheKeySet); + ClusterKeyIdentifier clusterKeyIdentifier, Set topicConfigSet) { + cachedTopics.put(clusterKeyIdentifier, topicConfigSet); + topicCacheKeySets.add(clusterKeyIdentifier); } private Map loadTopicDescriptionsMap(AdminClient client) @@ -323,16 +338,16 @@ public synchronized ApiResponse deleteTopic(ClusterTopicRequest clusterTopicRequ zone = "${klaw.topics.cron.expression.timezone:UTC}") public void resetTopicsCacheScheduler() { topicCacheKeySets.forEach( - schemaInfoCacheKeySet -> { + clusterKeyIdentifier -> { try { - log.info("Loading topics {}", schemaInfoCacheKeySet); + log.info("Loading topics {}", clusterKeyIdentifier); loadTopics( - schemaInfoCacheKeySet.getBootstrapServers(), - schemaInfoCacheKeySet.getProtocol(), - schemaInfoCacheKeySet.getClusterIdentification(), + clusterKeyIdentifier.getBootstrapServers(), + clusterKeyIdentifier.getProtocol(), + clusterKeyIdentifier.getClusterIdentification(), true); } catch (Exception e) { - log.error("Error while loading topics {}", schemaInfoCacheKeySet); + log.error("Error while loading topics {}", clusterKeyIdentifier); } }); } diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiService.java index a626ca32c9..29fbbda5e7 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiService.java @@ -4,6 +4,7 @@ import io.aiven.klaw.clusterapi.models.ApiResponse; import io.aiven.klaw.clusterapi.models.ClusterAclRequest; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; import io.aiven.klaw.clusterapi.models.TopicConfig; import io.aiven.klaw.clusterapi.models.confluentcloud.AclObject; import io.aiven.klaw.clusterapi.models.confluentcloud.Config; @@ -57,12 +58,14 @@ public class ConfluentCloudApiService { private final Environment env; final ClusterApiUtils clusterApiUtils; + private static boolean topicsLoadingStatus; + public ConfluentCloudApiService(Environment env, ClusterApiUtils clusterApiUtils) { this.env = env; this.clusterApiUtils = clusterApiUtils; } - public Set listTopics( + public LoadTopicsResponse listTopics( String restApiHost, KafkaSupportedProtocol protocol, String clusterIdentification) throws Exception { RestTemplate restTemplate = getRestTemplate(); @@ -83,7 +86,10 @@ public Set listTopics( List topicsListUpdated = processListTopicsResponse(responseEntity); - return new HashSet<>(topicsListUpdated); + return LoadTopicsResponse.builder() + .loadingInProgress(topicsLoadingStatus) + .topicConfigSet(new HashSet<>(topicsListUpdated)) + .build(); } catch (RestClientException e) { log.error("Exception:", e); throw new Exception("Error in listing topics : " + e.getMessage()); diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/SchemaService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/SchemaService.java index f6c46100ff..f1a9057f02 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/SchemaService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/SchemaService.java @@ -1,12 +1,12 @@ package io.aiven.klaw.clusterapi.services; import io.aiven.klaw.clusterapi.models.ApiResponse; +import io.aiven.klaw.clusterapi.models.ClusterKeyIdentifier; import io.aiven.klaw.clusterapi.models.ClusterSchemaRequest; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; import io.aiven.klaw.clusterapi.models.RegisterSchemaCustomResponse; import io.aiven.klaw.clusterapi.models.RegisterSchemaResponse; import io.aiven.klaw.clusterapi.models.SchemaCompatibilityCheckResponse; -import io.aiven.klaw.clusterapi.models.SchemaInfoCacheKeySet; import io.aiven.klaw.clusterapi.models.SchemaInfoOfTopic; import io.aiven.klaw.clusterapi.models.SchemasInfoOfClusterResponse; import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus; @@ -60,7 +60,7 @@ public class SchemaService { private static Map schemasInfoOfClusterResponseMap = new HashMap<>(); - private static Map schemasInfoCacheKeySetMap = new HashMap<>(); + private static Map schemasInfoCacheKeySetMap = new HashMap<>(); public static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; @@ -703,11 +703,11 @@ private static void updateCache( String schemasVersionsStorageKey, SchemasInfoOfClusterResponse schemasInfoOfClusterResponse) { schemasInfoOfClusterResponseMap.put(schemasVersionsStorageKey, schemasInfoOfClusterResponse); - SchemaInfoCacheKeySet schemaInfoCacheKeySet = new SchemaInfoCacheKeySet(); - schemaInfoCacheKeySet.setProtocol(protocol); - schemaInfoCacheKeySet.setClusterIdentification(clusterIdentification); - schemaInfoCacheKeySet.setBootstrapServers(bootstrapServers); - schemasInfoCacheKeySetMap.put(schemasVersionsStorageKey, schemaInfoCacheKeySet); + ClusterKeyIdentifier clusterKeyIdentifier = new ClusterKeyIdentifier(); + clusterKeyIdentifier.setProtocol(protocol); + clusterKeyIdentifier.setClusterIdentification(clusterIdentification); + clusterKeyIdentifier.setBootstrapServers(bootstrapServers); + schemasInfoCacheKeySetMap.put(schemasVersionsStorageKey, clusterKeyIdentifier); } private SchemasInfoOfClusterResponse handleInterimUpdatesOnSchemas( @@ -769,13 +769,13 @@ private void updateSchemaCache( zone = "${klaw.schemainfo.cron.expression.timezone:UTC}") public void resetSchemaCacheScheduler() { for (String schemasVersionsStorageKey : schemasInfoCacheKeySetMap.keySet()) { - SchemaInfoCacheKeySet schemaInfoCacheKeySet = + ClusterKeyIdentifier clusterKeyIdentifier = schemasInfoCacheKeySetMap.get(schemasVersionsStorageKey); schemasInfoOfClusterResponseMap.remove(schemasVersionsStorageKey); loadAllSchemasInfoFromCluster( - schemaInfoCacheKeySet.getBootstrapServers(), - schemaInfoCacheKeySet.getProtocol(), - schemaInfoCacheKeySet.getClusterIdentification(), + clusterKeyIdentifier.getBootstrapServers(), + clusterKeyIdentifier.getProtocol(), + clusterKeyIdentifier.getClusterIdentification(), false, SchemaCacheUpdateType.NONE, null); diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/UtilComponentsService.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/UtilComponentsService.java index 0326515127..54e195fe88 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/UtilComponentsService.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/services/UtilComponentsService.java @@ -70,8 +70,8 @@ private ClusterStatus getStatusKafka( String environment, KafkaSupportedProtocol protocol, String clusterName, String kafkaFlavor) { try { if (kafkaFlavor != null && kafkaFlavor.equals("Confluent Cloud")) { - if (confluentCloudApiService.listTopics(environment, protocol, clusterName).size() >= 0) - return ClusterStatus.ONLINE; + confluentCloudApiService.listTopics(environment, protocol, clusterName); + return ClusterStatus.ONLINE; } else { AdminClient client = clusterApiUtils.getAdminClient(environment, protocol, clusterName); if (client != null) { diff --git a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/utils/ClusterApiUtils.java b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/utils/ClusterApiUtils.java index b98f0e3da3..5fe1946e57 100644 --- a/cluster-api/src/main/java/io/aiven/klaw/clusterapi/utils/ClusterApiUtils.java +++ b/cluster-api/src/main/java/io/aiven/klaw/clusterapi/utils/ClusterApiUtils.java @@ -12,6 +12,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.CommonClientConfigs; @@ -42,6 +43,7 @@ public class ClusterApiUtils { public static final String KAFKA_SR_CREDENTIALS_PROPERTY_SFX = ".klaw.schemaregistry.credentials"; public static final String KAFKA_CONFLUENT_CLOUD_CREDENTIALS_PROPERTY_SFX = ".klaw.confluentcloud.credentials"; + private static final long TIME_OUT_SECS_FOR_TOPICS = 5; private static MessageDigest messageDigest; static { @@ -181,7 +183,8 @@ public AdminClient getAdminClient( } try { - adminClient.listTopics().names().get(); + + adminClient.listTopics().names().get(TIME_OUT_SECS_FOR_TOPICS, TimeUnit.SECONDS); if (!adminClientsMap.containsKey(adminClientKey)) { adminClientsMap.put(adminClientKey, adminClient); } diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiConfluentCloudControllerIT.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiConfluentCloudControllerIT.java index 85d095dcd1..b9cdb22ddf 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiConfluentCloudControllerIT.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/ClusterApiConfluentCloudControllerIT.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; +import io.aiven.klaw.clusterapi.models.TopicConfig; import io.aiven.klaw.clusterapi.models.confluentcloud.ListAclsResponse; import io.aiven.klaw.clusterapi.models.confluentcloud.ListTopicsResponse; import io.aiven.klaw.clusterapi.models.enums.ClusterStatus; @@ -148,13 +150,18 @@ public void getTopics() throws Exception { .andReturn() .getResponse(); - Set> listTopicsSet = + LoadTopicsResponse loadTopicsResponse = OBJECT_MAPPER.readValue(response.getContentAsString(), new TypeReference<>() {}); - assertThat(listTopicsSet).hasSize(2); // two topics - assertThat(listTopicsSet.stream().toList().get(0)) - .hasSize(3); // topicName, partitions, replication factor - assertThat(listTopicsSet.stream().toList().get(0)) - .containsKeys("topicName", "partitions", "replicationFactor"); + assertThat(loadTopicsResponse.getTopicConfigSet()).hasSize(2); // two topics + assertThat(loadTopicsResponse.getTopicConfigSet()) + .extracting(TopicConfig::getTopicName) + .containsExactlyInAnyOrder("testtopic1", "testtopic2"); + assertThat(loadTopicsResponse.getTopicConfigSet()) + .extracting(TopicConfig::getPartitions) + .containsExactlyInAnyOrder("2", "4"); + assertThat(loadTopicsResponse.getTopicConfigSet()) + .extracting(TopicConfig::getReplicationFactor) + .containsExactlyInAnyOrder("2", "3"); } @Test diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java index 228336fa44..1c6ac6e18c 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/UtilMethods.java @@ -5,6 +5,7 @@ import io.aiven.klaw.clusterapi.models.ClusterAclRequest; import io.aiven.klaw.clusterapi.models.ClusterSchemaRequest; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; import io.aiven.klaw.clusterapi.models.TopicConfig; import io.aiven.klaw.clusterapi.models.confluentcloud.AclObject; import io.aiven.klaw.clusterapi.models.confluentcloud.ListAclsResponse; @@ -65,7 +66,7 @@ public Set> getAcls() { return aclsSet; } - public Set getTopics() { + public LoadTopicsResponse getTopics() { Set topicsSet = new HashSet<>(); TopicConfig hashMap = new TopicConfig(); hashMap.setTopicName("testtopic1"); @@ -73,7 +74,7 @@ public Set getTopics() { hashMap.setPartitions("2"); hashMap.setReplicationFactor("1"); topicsSet.add(hashMap); - return topicsSet; + return LoadTopicsResponse.builder().loadingInProgress(false).topicConfigSet(topicsSet).build(); } public MultiValueMap getMappedValuesTopic() { diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/controller/ClusterApiControllerTest.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/controller/ClusterApiControllerTest.java index 9962c1e841..931c003094 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/controller/ClusterApiControllerTest.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/controller/ClusterApiControllerTest.java @@ -129,7 +129,7 @@ public void getTopics() throws Exception { mvc.perform(get(urlTemplate)) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.APPLICATION_JSON)) - .andExpect(jsonPath("$", hasSize(1))); + .andExpect(jsonPath("$.topicConfigSet", hasSize(1))); } @Test diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicServiceTest.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicServiceTest.java index 24f2ecea12..287c96974f 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicServiceTest.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ApacheKafkaTopicServiceTest.java @@ -7,7 +7,7 @@ import io.aiven.klaw.clusterapi.constants.TestConstants; import io.aiven.klaw.clusterapi.models.ApiResponse; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; -import io.aiven.klaw.clusterapi.models.TopicConfig; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; import io.aiven.klaw.clusterapi.models.enums.ApiResultStatus; import io.aiven.klaw.clusterapi.models.enums.KafkaSupportedProtocol; import io.aiven.klaw.clusterapi.utils.ClusterApiUtils; @@ -114,11 +114,11 @@ void loadTopics() throws Exception { .thenReturn(KafkaFuture.completedFuture(topicDescriptionsPerAdminClient)); Mockito.when(topicDescription.partitions()).thenReturn(List.of(topicPartitionInfo)); - Set topicConfigs = + LoadTopicsResponse topicConfigs = apacheKafkaTopicService.loadTopics( TestConstants.ENVIRONMENT, protocol, TestConstants.CLUSTER_IDENTIFICATION, false); - Assertions.assertThat(topicConfigs.size()).isEqualTo(1); + Assertions.assertThat(topicConfigs.getTopicConfigSet().size()).isEqualTo(1); } @Test diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiServiceTest.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiServiceTest.java index 4b38a1b20f..bc65eec9ad 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiServiceTest.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/services/ConfluentCloudApiServiceTest.java @@ -11,7 +11,7 @@ import io.aiven.klaw.clusterapi.models.ApiResponse; import io.aiven.klaw.clusterapi.models.ClusterAclRequest; import io.aiven.klaw.clusterapi.models.ClusterTopicRequest; -import io.aiven.klaw.clusterapi.models.TopicConfig; +import io.aiven.klaw.clusterapi.models.LoadTopicsResponse; import io.aiven.klaw.clusterapi.models.confluentcloud.ListAclsResponse; import io.aiven.klaw.clusterapi.models.confluentcloud.ListTopicsResponse; import io.aiven.klaw.clusterapi.models.confluentcloud.TopicCreateRequest; @@ -66,14 +66,17 @@ public void listTopics() throws Exception { any(), (ParameterizedTypeReference) any())) .thenReturn(listTopicsResponseResponseEntity); - Set listTopicsSet = + LoadTopicsResponse listTopicsSet = confluentCloudApiService.listTopics( "localhost:443", KafkaSupportedProtocol.SSL, CLUSTER_ID); - assertThat(listTopicsSet).hasSize(2); // two topics - assertThat(listTopicsSet.stream().toList().get(0).getTopicName()).isNotNull(); - assertThat(listTopicsSet.stream().toList().get(0).getPartitions()).isNotNull(); - assertThat(listTopicsSet.stream().toList().get(0).getReplicationFactor()).isNotNull(); + assertThat(listTopicsSet.getTopicConfigSet()).hasSize(2); // two topics + assertThat(listTopicsSet.getTopicConfigSet().stream().toList().get(0).getTopicName()) + .isNotNull(); + assertThat(listTopicsSet.getTopicConfigSet().stream().toList().get(0).getPartitions()) + .isNotNull(); + assertThat(listTopicsSet.getTopicConfigSet().stream().toList().get(0).getReplicationFactor()) + .isNotNull(); } @Test diff --git a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/utils/GetAdminClientTest.java b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/utils/GetAdminClientTest.java index 05dea8bb9e..37fd3cc334 100644 --- a/cluster-api/src/test/java/io/aiven/klaw/clusterapi/utils/GetAdminClientTest.java +++ b/cluster-api/src/test/java/io/aiven/klaw/clusterapi/utils/GetAdminClientTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; @@ -59,7 +60,7 @@ public void getAdminClient1() throws Exception { when(adminClient.listTopics()).thenReturn(listTopicsResult); when(listTopicsResult.names()).thenReturn(kafkaFuture); Set setStr = new HashSet<>(); - when(kafkaFuture.get()).thenReturn(setStr); + when(kafkaFuture.get(anyLong(), any())).thenReturn(setStr); AdminClient result = getAdminClient.getAdminClient(LOCALHOST_9092, KafkaSupportedProtocol.PLAINTEXT, ""); diff --git a/core/src/main/java/io/aiven/klaw/controller/TopicSyncController.java b/core/src/main/java/io/aiven/klaw/controller/TopicSyncController.java index eceb826cce..f2c3a95fbd 100644 --- a/core/src/main/java/io/aiven/klaw/controller/TopicSyncController.java +++ b/core/src/main/java/io/aiven/klaw/controller/TopicSyncController.java @@ -106,7 +106,8 @@ public ResponseEntity getSyncTopics( currentPage, topicNameSearch, showAllTopics, - Boolean.parseBoolean(isBulkOption)), + Boolean.parseBoolean(isBulkOption), + resetTopicsCache), HttpStatus.OK); } } diff --git a/core/src/main/java/io/aiven/klaw/model/cluster/LoadTopicsResponse.java b/core/src/main/java/io/aiven/klaw/model/cluster/LoadTopicsResponse.java new file mode 100644 index 0000000000..e2e9abe7bf --- /dev/null +++ b/core/src/main/java/io/aiven/klaw/model/cluster/LoadTopicsResponse.java @@ -0,0 +1,19 @@ +package io.aiven.klaw.model.cluster; + +import io.aiven.klaw.model.response.TopicConfig; +import jakarta.validation.constraints.NotNull; +import java.util.Set; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Builder +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class LoadTopicsResponse { + @NotNull private boolean loadingInProgress; + + private Set topicConfigSet; +} diff --git a/core/src/main/java/io/aiven/klaw/model/response/SyncTopicsList.java b/core/src/main/java/io/aiven/klaw/model/response/SyncTopicsList.java index 84a94ffc17..74a26bc323 100644 --- a/core/src/main/java/io/aiven/klaw/model/response/SyncTopicsList.java +++ b/core/src/main/java/io/aiven/klaw/model/response/SyncTopicsList.java @@ -8,4 +8,5 @@ public class SyncTopicsList { private List resultSet; private int allTopicsCount; private int allTopicWarningsCount; + private boolean topicsLoadingStatus; } diff --git a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java index a7a326d2b7..8764afa068 100644 --- a/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java +++ b/core/src/main/java/io/aiven/klaw/service/ClusterApiService.java @@ -17,6 +17,7 @@ import io.aiven.klaw.model.cluster.ClusterSchemaRequest; import io.aiven.klaw.model.cluster.ClusterTopicRequest; import io.aiven.klaw.model.cluster.ConnectorsStatus; +import io.aiven.klaw.model.cluster.LoadTopicsResponse; import io.aiven.klaw.model.cluster.SchemasInfoOfClusterResponse; import io.aiven.klaw.model.cluster.consumergroup.ResetConsumerGroupOffsetsRequest; import io.aiven.klaw.model.enums.AclPatternType; @@ -31,7 +32,6 @@ import io.aiven.klaw.model.requests.KafkaConnectorRestartModel; import io.aiven.klaw.model.response.OffsetDetails; import io.aiven.klaw.model.response.ServiceAccountDetails; -import io.aiven.klaw.model.response.TopicConfig; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; import jakarta.annotation.PostConstruct; @@ -355,7 +355,7 @@ else if (KafkaFlavors.CONFLUENT_CLOUD.value.equals(kwClusters.getKafkaFlavor())) return aclListOriginal; } - public List getAllTopics( + public LoadTopicsResponse getAllTopics( String bootstrapHost, KafkaSupportedProtocol protocol, String clusterIdentification, @@ -365,7 +365,8 @@ public List getAllTopics( throws Exception { log.info("getAllTopics {} {}", bootstrapHost, protocol); getClusterApiProperties(tenantId); - List topicsList; + + LoadTopicsResponse loadTopicsResponse; String aclsNativeType = AclsNativeType.NATIVE.value; if (KafkaFlavors.CONFLUENT_CLOUD.value.equals(kafkaFlavors)) { @@ -387,17 +388,17 @@ public List getAllTopics( String.valueOf(resetTopicsCache)); HttpEntity entity = getHttpEntity(); - ResponseEntity> s = + ResponseEntity s = getRestTemplate(null) .exchange( uriGetTopicsFull, HttpMethod.GET, entity, new ParameterizedTypeReference<>() {}); - topicsList = new ArrayList<>(Objects.requireNonNull(s.getBody())); + loadTopicsResponse = Objects.requireNonNull(s.getBody()); } catch (Exception e) { log.error("Error from getAllTopics", e); throw new KlawException(CLUSTER_API_ERR_104); } - return topicsList; + return loadTopicsResponse; } public String approveConnectorRequests( diff --git a/core/src/main/java/io/aiven/klaw/service/TopicSyncControllerService.java b/core/src/main/java/io/aiven/klaw/service/TopicSyncControllerService.java index 8727b03e48..340014c4ed 100644 --- a/core/src/main/java/io/aiven/klaw/service/TopicSyncControllerService.java +++ b/core/src/main/java/io/aiven/klaw/service/TopicSyncControllerService.java @@ -32,6 +32,7 @@ import io.aiven.klaw.model.SyncTopicUpdates; import io.aiven.klaw.model.SyncTopicsBulk; import io.aiven.klaw.model.TopicInfo; +import io.aiven.klaw.model.cluster.LoadTopicsResponse; import io.aiven.klaw.model.enums.AclType; import io.aiven.klaw.model.enums.ApiResultStatus; import io.aiven.klaw.model.enums.EntityType; @@ -49,6 +50,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.regex.Matcher; @@ -99,7 +101,7 @@ public void getReconTopicsScheduled() { reconStr = new StringBuilder(); try { List results = - getReconTopics(envStr, "-1", "", null, "false", false).getResultSet(); + getReconTopics(envStr, "-1", "", null, "false", false, false).getResultSet(); for (TopicSyncResponseModel topicRequestModel : results) { reconStr @@ -130,14 +132,21 @@ public SyncTopicsList getReconTopics( String currentPage, String topicNameSearch, String showAllTopics, - boolean isBulkOption) + boolean isBulkOption, + boolean resetTopicsCache) throws Exception { SyncTopicsList syncTopicsList = new SyncTopicsList(); - List topicRequestModelList = + SyncTopicsList loadTopicsResponse = getSyncTopics( - envId, pageNo, currentPage, topicNameSearch, showAllTopics, isBulkOption, false) - .getResultSet(); + envId, + pageNo, + currentPage, + topicNameSearch, + showAllTopics, + isBulkOption, + resetTopicsCache); + List topicRequestModelList = loadTopicsResponse.getResultSet(); topicRequestModelList = topicRequestModelList.stream() @@ -175,6 +184,7 @@ public SyncTopicsList getReconTopics( syncTopicsList.setAllTopicWarningsCount( Long.valueOf(topicRequestModelList.stream().filter(req -> !req.isValidatedTopic()).count()) .intValue()); + syncTopicsList.setTopicsLoadingStatus(loadTopicsResponse.isTopicsLoadingStatus()); return syncTopicsList; } @@ -198,12 +208,13 @@ public SyncTopicsList getSyncTopics( } } - List topicFilteredList = + LoadTopicsResponse loadTopicsResponse = getTopicsFromKafkaCluster(env, topicNameSearch, resetTopicsCache); + syncTopicsList.setTopicsLoadingStatus(loadTopicsResponse.isLoadingInProgress()); List topicsList; topicsList = - topicFilteredList.stream() + loadTopicsResponse.getTopicConfigSet().stream() .sorted(new TopicControllerService.TopicNameSyncComparator()) .collect(Collectors.toList()); @@ -820,10 +831,10 @@ public ApiResponse updateSyncTopicsBulk(SyncTopicsBulk syncTopicsBulk) throws Kl } } else { try { - List topicsMap = + LoadTopicsResponse loadTopicsResponse = getTopicsFromKafkaCluster( syncTopicsBulk.getSourceEnv(), syncTopicsBulk.getTopicSearchFilter(), false); - for (TopicConfig hashMap : topicsMap) { + for (TopicConfig hashMap : loadTopicsResponse.getTopicConfigSet()) { invokeUpdateSyncAllTopics(syncTopicsBulk, logArray, hashMap); } } catch (Exception e) { @@ -864,7 +875,7 @@ private void invokeUpdateSyncAllTopics( } } - private List getTopicsFromKafkaCluster( + private LoadTopicsResponse getTopicsFromKafkaCluster( String env, String topicNameSearch, boolean resetTopicsCache) throws Exception { if (topicNameSearch != null) { topicNameSearch = topicNameSearch.trim(); @@ -876,7 +887,7 @@ private List getTopicsFromKafkaCluster( .getClusters(KafkaClustersType.KAFKA, tenantId) .get(envSelected.getClusterId()); - List topicsList = + LoadTopicsResponse loadTopicsResponse = clusterApiService.getAllTopics( kwClusters.getBootstrapServers(), kwClusters.getProtocol(), @@ -887,17 +898,23 @@ private List getTopicsFromKafkaCluster( topicCounter = 0; - List topicFilteredList = topicsList; + Set topicFilteredList = loadTopicsResponse.getTopicConfigSet(); // Filter topics on topic name for search - if (topicNameSearch != null && topicNameSearch.length() > 0) { + if (topicNameSearch != null + && topicNameSearch.length() > 0 + && !loadTopicsResponse.isLoadingInProgress()) { final String topicSearchFilter = topicNameSearch; topicFilteredList = - topicsList.stream() + loadTopicsResponse.getTopicConfigSet().stream() .filter(topic -> topic.getTopicName().contains(topicSearchFilter)) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } - return topicFilteredList; + + return LoadTopicsResponse.builder() + .topicConfigSet(topicFilteredList) + .loadingInProgress(loadTopicsResponse.isLoadingInProgress()) + .build(); } private void invokeUpdateSync( diff --git a/core/src/main/resources/static/js/synchronizeTopics.js b/core/src/main/resources/static/js/synchronizeTopics.js index 2a68d21b2c..182755bca9 100644 --- a/core/src/main/resources/static/js/synchronizeTopics.js +++ b/core/src/main/resources/static/js/synchronizeTopics.js @@ -387,12 +387,18 @@ app.controller("synchronizeTopicsCtrl", function($scope, $http, $location, $wind }).success(function(output) { $scope.ShowSpinnerStatusTopics = false; $scope.resultBrowse = output["resultSet"]; - if($scope.resultBrowse != null && $scope.resultBrowse.length != 0){ + if($scope.resultBrowse != null && $scope.resultBrowse.length !== 0){ $scope.resultPages = $scope.resultBrowse[0].allPageNos; $scope.resultPageSelected = pageNoSelected; + $scope.topicsLoadingStatus = output["topicsLoadingStatus"]; $scope.currentPageSelected = $scope.resultBrowse[0].currentPage; } - $scope.alert = ""; + if($scope.topicsLoadingStatus){ + $scope.alert = "Topics are being loaded. Please retry after a few minutes."; + }else{ + $scope.alert = ""; + } + }).error( function(error) { @@ -404,6 +410,13 @@ app.controller("synchronizeTopicsCtrl", function($scope, $http, $location, $wind }; + $scope.resetBulkTopicCacheCheck = function(resetTopicsCache){ + $scope.resetTopicsCache = resetTopicsCache; + if($scope.getTopicsBulk.envName){ + $scope.getTopicsBulk(1); + } + } + $scope.getTopicsBulk = function(pageNoSelected) { if(!$scope.getTopicsBulk.envName) @@ -426,19 +439,24 @@ app.controller("synchronizeTopicsCtrl", function($scope, $http, $location, $wind params: {'env' : $scope.getTopicsBulk.envName, 'topicnamesearch' : $scope.getTopicsBulk.topicnamesearch, 'showAllTopics' : "" + $scope.showAllTopics, + 'resetTopicsCache' : $scope.resetTopicsCache, 'isBulkOption' : "true", 'pageNo' : pageNoSelected, 'currentPage' : $scope.currentPageSelectedBulk } }).success(function(output) { $scope.ShowSpinnerStatusTopicsBulk = false; $scope.resultBrowseBulk = output["resultSet"]; - if($scope.resultBrowseBulk != null && $scope.resultBrowseBulk.length != 0){ + if($scope.resultBrowseBulk != null && $scope.resultBrowseBulk.length !== 0){ $scope.allTopicsCount = output["allTopicsCount"]; + $scope.topicsLoadingStatus = output["topicsLoadingStatus"]; $scope.allTopicWarningsCount = output["allTopicWarningsCount"]; $scope.resultPagesBulk = $scope.resultBrowseBulk[0].allPageNos; $scope.resultPageSelectedBulk = pageNoSelected; $scope.currentPageSelectedBulk = $scope.resultBrowseBulk[0].currentPage; } + if($scope.topicsLoadingStatus){ + $scope.alert = "Topics are being loaded from cluster to cache. Please retry after a few minutes."; + } }).error( function(error) { diff --git a/core/src/main/resources/templates/synchronizeTopics.html b/core/src/main/resources/templates/synchronizeTopics.html index babb03b0eb..477d9d9fd4 100644 --- a/core/src/main/resources/templates/synchronizeTopics.html +++ b/core/src/main/resources/templates/synchronizeTopics.html @@ -707,6 +707,33 @@

Synchronize Topics - From Cluster

+
+
+ + + + + +
+ Note : Topics Cache reset happens at a scheduled interval, but if you would like to retrieve current state of topics, + select this checkbox. +
+
+ + + + + + +
Reset Topics Cache.
+ +
+
+
diff --git a/core/src/test/java/io/aiven/klaw/UtilMethods.java b/core/src/test/java/io/aiven/klaw/UtilMethods.java index f8bfef1fa5..2303869792 100644 --- a/core/src/test/java/io/aiven/klaw/UtilMethods.java +++ b/core/src/test/java/io/aiven/klaw/UtilMethods.java @@ -22,6 +22,7 @@ import io.aiven.klaw.model.charts.Options; import io.aiven.klaw.model.charts.TeamOverview; import io.aiven.klaw.model.charts.Title; +import io.aiven.klaw.model.cluster.LoadTopicsResponse; import io.aiven.klaw.model.cluster.SchemaInfoOfTopic; import io.aiven.klaw.model.cluster.SchemasInfoOfClusterResponse; import io.aiven.klaw.model.cluster.consumergroup.OffsetResetType; @@ -780,8 +781,8 @@ public AclRequestsModel getAivenAclRequestModel(String topic) { return aclRequest; } - public List getClusterApiTopics(String topicPrefix, int size) { - List listTopics = new ArrayList<>(); + public LoadTopicsResponse getClusterApiTopics(String topicPrefix, int size) { + Set listTopics = new HashSet<>(); TopicConfig hashMap; for (int i = 0; i < size; i++) { hashMap = new TopicConfig(); @@ -790,7 +791,7 @@ public List getClusterApiTopics(String topicPrefix, int size) { hashMap.setPartitions("2"); listTopics.add(hashMap); } - return listTopics; + return LoadTopicsResponse.builder().loadingInProgress(false).topicConfigSet(listTopics).build(); } public List getSyncTopicUpdates() { diff --git a/core/src/test/java/io/aiven/klaw/service/ClusterApiServiceTest.java b/core/src/test/java/io/aiven/klaw/service/ClusterApiServiceTest.java index 44c5b68a62..a234e65495 100644 --- a/core/src/test/java/io/aiven/klaw/service/ClusterApiServiceTest.java +++ b/core/src/test/java/io/aiven/klaw/service/ClusterApiServiceTest.java @@ -20,6 +20,7 @@ import io.aiven.klaw.helpers.db.rdbms.HandleDbRequestsJdbc; import io.aiven.klaw.model.ApiResponse; import io.aiven.klaw.model.cluster.ClusterSchemaRequest; +import io.aiven.klaw.model.cluster.LoadTopicsResponse; import io.aiven.klaw.model.enums.AclIPPrincipleType; import io.aiven.klaw.model.enums.ApiResultStatus; import io.aiven.klaw.model.enums.ClusterStatus; @@ -178,8 +179,10 @@ public void getAclsFailure() { @Test @Order(5) public void getAllTopicsSuccess() throws Exception { - Set topicsList = getTopics(); - ResponseEntity response = new ResponseEntity<>(topicsList, HttpStatus.OK); + Set topicsList = getTopics(); + LoadTopicsResponse loadTopicsResponse = + LoadTopicsResponse.builder().topicConfigSet(topicsList).build(); + ResponseEntity response = new ResponseEntity<>(loadTopicsResponse, HttpStatus.OK); when(restTemplate.exchange( Mockito.anyString(), @@ -188,9 +191,11 @@ public void getAllTopicsSuccess() throws Exception { (ParameterizedTypeReference) any())) .thenReturn(response); - List result = - clusterApiService.getAllTopics("", KafkaSupportedProtocol.PLAINTEXT, "", "", 1, false); - assertThat(result).isEqualTo(new ArrayList<>(topicsList)); + Set result = + clusterApiService + .getAllTopics("", KafkaSupportedProtocol.PLAINTEXT, "", "", 1, false) + .getTopicConfigSet(); + assertThat(result).isEqualTo(topicsList); } @Test @@ -513,10 +518,15 @@ public void approveConnectorRequests_ISE() throws KlawException, KlawRestExcepti assertThat(Objects.requireNonNull(response1)).isEqualTo(FAILED_TO_EXECUTE_SUCCESSFULLY); } - private Set getTopics() { - Set topicsList = new HashSet<>(); - topicsList.add("topic1"); - topicsList.add("topic2"); + private Set getTopics() { + Set topicsList = new HashSet<>(); + TopicConfig tc1 = new TopicConfig(); + tc1.setTopicName("topic1"); + topicsList.add(tc1); + + TopicConfig tc2 = new TopicConfig(); + tc2.setTopicName("topic2"); + topicsList.add(tc2); return topicsList; } diff --git a/core/src/test/java/io/aiven/klaw/service/TopicSyncControllerServiceTest.java b/core/src/test/java/io/aiven/klaw/service/TopicSyncControllerServiceTest.java index b8832a4c2c..140958bf3d 100644 --- a/core/src/test/java/io/aiven/klaw/service/TopicSyncControllerServiceTest.java +++ b/core/src/test/java/io/aiven/klaw/service/TopicSyncControllerServiceTest.java @@ -25,6 +25,7 @@ import io.aiven.klaw.model.KwTenantConfigModel; import io.aiven.klaw.model.SyncBackTopics; import io.aiven.klaw.model.SyncTopicUpdates; +import io.aiven.klaw.model.cluster.LoadTopicsResponse; import io.aiven.klaw.model.enums.ApiResultStatus; import io.aiven.klaw.model.enums.KafkaClustersType; import io.aiven.klaw.model.enums.KafkaFlavors; @@ -41,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -246,21 +248,17 @@ public void getSyncTopics() throws Exception { stubUserInfo(); when(manageDatabase.getKafkaEnvList(anyInt())).thenReturn(utilMethods.getEnvLists()); when(clusterApiService.getAllTopics( - anyString(), - any(KafkaSupportedProtocol.class), - anyString(), - anyString(), - anyInt(), - anyBoolean())) + anyString(), any(), anyString(), anyString(), anyInt(), anyBoolean())) .thenReturn(utilMethods.getClusterApiTopics("topic", 10)); when(handleDbRequests.getAllTeamsOfUsers(anyString(), anyInt())) .thenReturn(getAvailableTeams()); - when(manageDatabase.getClusters(any(KafkaClustersType.class), anyInt())) - .thenReturn(clustersHashMap); + when(manageDatabase.getClusters(KafkaClustersType.KAFKA, 1)).thenReturn(clustersHashMap); when(clustersHashMap.get(any())).thenReturn(kwClusters); when(kwClusters.getBootstrapServers()).thenReturn("clusters"); when(kwClusters.getProtocol()).thenReturn(KafkaSupportedProtocol.PLAINTEXT); when(kwClusters.getClusterName()).thenReturn("cluster"); + when(kwClusters.getClusterId()).thenReturn(1); + when(kwClusters.getKafkaFlavor()).thenReturn(""); SyncTopicsList topicRequests = topicSyncControllerService.getSyncTopics( @@ -576,7 +574,7 @@ public void getReconSyncList_noValidationSet_base() throws Exception { when(manageDatabase.getTeamNameFromTeamId(eq(101), eq(10))).thenReturn("Team1"); SyncTopicsList syncTopics = - topicSyncControllerService.getReconTopics("1", "1", "", null, "false", false); + topicSyncControllerService.getReconTopics("1", "1", "", null, "false", false, false); // 14 in the DB and 14 in the cluster means we return 0 here. assertThat(syncTopics.getResultSet()).hasSize(0); @@ -645,7 +643,7 @@ public void getReconSyncList_noValidationSet_FiveNotSynched() throws Exception { when(manageDatabase.getTeamNameFromTeamId(eq(101), eq(10))).thenReturn("Team1"); SyncTopicsList syncTopics = - topicSyncControllerService.getReconTopics("1", "1", "", null, "false", false); + topicSyncControllerService.getReconTopics("1", "1", "", null, "false", false, false); // 14 in the DB and 14 in the cluster means we return 0 here. assertThat(syncTopics.getResultSet()).hasSize(5); @@ -718,7 +716,7 @@ public void getReconSyncList_noValidationSet_FourDeletedFromCluster() throws Exc when(manageDatabase.getTeamNameFromTeamId(eq(101), eq(10))).thenReturn("Team1"); SyncTopicsList syncTopics = - topicSyncControllerService.getReconTopics("1", "1", "", null, "false", false); + topicSyncControllerService.getReconTopics("1", "1", "", null, "false", false, false); // 14 in the DB and 10 in the cluster i am expecting the difference to be returned assertThat(syncTopics.getResultSet()).hasSize(4); @@ -901,7 +899,7 @@ public void getReconSyncList_ValidationOn( SyncTopicsList syncTopics = topicSyncControllerService.getReconTopics( - String.valueOf(environment), "1", "", null, "false", false); + String.valueOf(environment), "1", "", null, "false", false, false); // With 12 existing in the DB and 15 on the cluster the missing 2 are returned assertThat(syncTopics.getResultSet()).hasSize(expectedReturned); @@ -1130,7 +1128,7 @@ public void getSyncList_FailedValidationOn_ReplicationAndPartitionsAreNull() thr assertThat(actualStringValidation).isEqualTo(3); } - private List generateClusterTopics(int numberOfTopics) { + private LoadTopicsResponse generateClusterTopics(int numberOfTopics) { String[] topicNames = new String[numberOfTopics]; for (int i = 0; i < numberOfTopics; i++) { topicNames[i] = "Topic" + i; @@ -1138,18 +1136,17 @@ private List generateClusterTopics(int numberOfTopics) { return generateClusterTopics(topicNames); } - private List generateClusterTopics(String... topicNames) { - - List topics = new ArrayList<>(); + private LoadTopicsResponse generateClusterTopics(String... topicNames) { + Set topics = new HashSet<>(); - for (int i = 0; i < topicNames.length; i++) { + for (String topicName : topicNames) { TopicConfig topic = new TopicConfig(); - topic.setTopicName(topicNames[i]); + topic.setTopicName(topicName); topic.setPartitions("9"); topic.setReplicationFactor("3"); topics.add(topic); } - return topics; + return LoadTopicsResponse.builder().loadingInProgress(false).topicConfigSet(topics).build(); } private Map getKwClusters(int numberOfClusters) { diff --git a/openapi.yaml b/openapi.yaml index a9f6486d7a..1e13a5c70b 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -3410,6 +3410,14 @@ "type" : "string", "default" : "false" } + }, { + "name" : "resetTopicsCache", + "in" : "query", + "required" : false, + "schema" : { + "type" : "boolean", + "default" : false + } }, { "name" : "isBulkOption", "in" : "query", @@ -8150,6 +8158,9 @@ "allTopicWarningsCount" : { "type" : "integer", "format" : "int32" + }, + "topicsLoadingStatus" : { + "type" : "boolean" } } },