Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor for mono #28

Merged
merged 20 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.provectus.kafka.ui.cluster;

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.ArrayList;

@Component
@RequiredArgsConstructor
Expand All @@ -19,8 +22,9 @@ public class ClustersMetricsScheduler {

@Scheduled(fixedRate = 30000)
public void updateMetrics() {
for (KafkaCluster kafkaCluster : clustersStorage.getKafkaClusters()) {
metricsUpdateService.updateMetrics(kafkaCluster);
}
Flux.range(0, clustersStorage.getKafkaClusters().size())
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> metricsUpdateService.updateMetrics(new ArrayList<>(clustersStorage.getKafkaClusters()).get(s)))
.subscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String name, Stri
public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) {
KafkaCluster cluster = clustersStorage.getClusterByName(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Services should not return response entity. This is controller responsibility.

if (cluster == null) return null;
return kafkaService.createTopic(cluster, topicFormData);
return kafkaService.createTopic(cluster.getAdminClient(), cluster, topicFormData);
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
Expand All @@ -16,9 +15,8 @@ public class MetricsUpdateService {
private final KafkaService kafkaService;
private final ZookeeperService zookeeperService;

@Async
public void updateMetrics(KafkaCluster kafkaCluster) {
log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName());
log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
kafkaService.loadClusterMetrics(kafkaCluster);
zookeeperService.checkZookeeperStatus(kafkaCluster);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.kafka;

import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
Expand All @@ -10,8 +11,8 @@
import org.apache.kafka.common.config.ConfigResource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.*;
Expand All @@ -25,23 +26,19 @@
public class KafkaService {

@SneakyThrows
@Async
public void loadClusterMetrics(KafkaCluster kafkaCluster) {
log.debug("Start getting Kafka metrics for cluster: " + kafkaCluster.getName());
boolean isConnected = false;
var isConnected = false;
log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
if (kafkaCluster.getAdminClient() != null) {
isConnected = isAdminClientConnected(kafkaCluster);
}
if (kafkaCluster.getAdminClient() == null || !isConnected) {
isConnected = createAdminClient(kafkaCluster);
}

if (!isConnected) {
kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);

return;
}

kafkaCluster.getCluster().setId(kafkaCluster.getId());
kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
loadMetrics(kafkaCluster);
Expand All @@ -50,26 +47,26 @@ public void loadClusterMetrics(KafkaCluster kafkaCluster) {


@SneakyThrows
public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
public Mono<ResponseEntity<Topic>> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove ResponseEntity from service layer

return topicFormData.flatMap(
topicData -> {
AdminClient adminClient = cluster.getAdminClient();
NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
newTopic.configs(topicData.getConfigs());

createTopic(adminClient, newTopic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should return mono to next method


return topicFormData;
}).flatMap(topicData -> {
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
var topicDescription = getTopicDescription(entry);
if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));

Topic topic = collectTopicData(cluster, topicDescription);
return topicDescription;
}).flatMap(td ->
collectTopicData(cluster, td))
.map(topic -> {
cluster.getTopics().add(topic);
return Mono.just(new ResponseEntity<>(topic, HttpStatus.CREATED));
}
);
return new ResponseEntity<>(topic, HttpStatus.CREATED);
});
}

@SneakyThrows
Expand Down Expand Up @@ -113,21 +110,20 @@ private void loadTopicsData(KafkaCluster kafkaCluster) {
AdminClient adminClient = kafkaCluster.getAdminClient();
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
var topicListings = adminClient.listTopics(listTopicsOptions).names().get();
kafkaCluster.getCluster().setTopicCount(topicListings.size());

DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(topicListings);
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
List<Topic> foundTopics = new ArrayList<>();
resetMetrics(kafkaCluster);

for (var entry : topicDescriptionFuturesMap.entrySet()) {
var topicDescription = getTopicDescription(entry);
if (topicDescription == null) continue;
Topic topic = collectTopicData(kafkaCluster, topicDescription);
foundTopics.add(topic);
}
kafkaCluster.setTopics(foundTopics);
ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
.map(tl -> {
kafkaCluster.getCluster().setTopicCount(tl.size());
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
resetMetrics(kafkaCluster);
return topicDescriptionFuturesMap.entrySet();
})
.flatMapMany(Flux::fromIterable)
.flatMap(s -> ClusterUtil.toMono(s.getValue()))
.flatMap(e -> collectTopicData(kafkaCluster, e))
.collectList()
.doOnNext(kafkaCluster::setTopics)
.subscribe();
}

private void resetMetrics(KafkaCluster kafkaCluster) {
Expand All @@ -138,7 +134,7 @@ private void resetMetrics(KafkaCluster kafkaCluster) {
kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(0);
}

private Topic collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
private Mono<Topic> collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
var topic = new Topic();
topic.setInternal(topicDescription.isInternal());
topic.setName(topicDescription.name());
Expand Down Expand Up @@ -197,74 +193,73 @@ private Topic collectTopicData(KafkaCluster kafkaCluster, TopicDescription topic
? topicDescription.partitions().get(0).replicas().size()
: null);
topicDetails.setUnderReplicatedPartitions(urpCount);

loadTopicConfig(kafkaCluster, topicDescription.name());

return topic;
return loadTopicConfig(kafkaCluster, topic.getName()).map(l -> topic);
}

private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
try {
return entry.getValue().get();
} catch (Exception e) {
log.error("Can't get topic with name: " + entry.getKey(), e);

return null;
}
private Mono<TopicDescription> getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
return ClusterUtil.toMono(entry.getValue())
.onErrorResume(e -> {
log.error("Can't get topic with name: " + entry.getKey());
return Mono.empty();
});
}

private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
private void loadMetrics(KafkaCluster kafkaCluster) {
AdminClient adminClient = kafkaCluster.getAdminClient();
int brokerCount = adminClient.describeCluster().nodes().get().size();
kafkaCluster.getCluster().setBrokerCount(brokerCount);
kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
kafkaCluster.getBrokersMetrics().setActiveControllers(adminClient.describeCluster().controller().get() != null ? 1 : 0);

for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
&& metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
}
if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
&& metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
var brokerCount = brokers.size();
kafkaCluster.getCluster().setBrokerCount(brokerCount);
kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
return ClusterUtil.toMono(adminClient.describeCluster().controller());
}).doOnNext(c -> {
kafkaCluster.getBrokersMetrics().setActiveControllers(c != null ? 1 : 0);
for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
&& metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
}
if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
&& metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
}
}
}
}).subscribe();
}

@SneakyThrows
private void loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
private Mono<List<TopicConfig>> loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
AdminClient adminClient = kafkaCluster.getAdminClient();

Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
final Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();

if (configs.isEmpty()) return;

Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
List<TopicConfig> topicConfigs = new ArrayList<>();
for (ConfigEntry entry : entries) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setName(entry.name());
topicConfig.setValue(entry.value());
if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
topicConfig.setDefaultValue(topicConfig.getValue());
} else {
topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
}
topicConfigs.add(topicConfig);
}

kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
.map(configs -> {

if (!configs.isEmpty()) return Collections.emptyList();

Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
List<TopicConfig> topicConfigs = new ArrayList<>();
for (ConfigEntry entry : entries) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setName(entry.name());
topicConfig.setValue(entry.value());
if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
topicConfig.setDefaultValue(topicConfig.getValue());
} else {
topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
}
topicConfigs.add(topicConfig);
}

return kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
});
}

@SneakyThrows
private void createTopic(AdminClient adminClient, NewTopic newTopic) {
adminClient.createTopics(Collections.singletonList(newTopic))
ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
.values()
.values()
.iterator()
.next()
.get();
.next()).subscribe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pass mono here to upper level

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,26 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Log4j2
public class ZookeeperService {

@Async
public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
log.debug("Start getting Zookeeper metrics for kafkaCluster: " + kafkaCluster.getName());
boolean isConnected = false;
var isConnected = false;
log.debug("Start getting Zookeeper metrics for kafkaCluster: {}", kafkaCluster.getName());
if (kafkaCluster.getZkClient() != null) {
isConnected = isZkClientConnected(kafkaCluster);
}
if (kafkaCluster.getZkClient() == null || !isConnected) {
isConnected = createZookeeperConnection(kafkaCluster);
}

if (!isConnected) {
kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);

return;
}

kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be immutable

}

Expand Down