-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor for mono #28
Conversation
log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName()); | ||
kafkaService.loadClusterMetrics(kafkaCluster); | ||
zookeeperService.checkZookeeperStatus(kafkaCluster); | ||
Mono.just(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks strange, is it possible to avoid Mono.just here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed mono from this method
kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE); | ||
loadMetrics(kafkaCluster); | ||
loadTopicsData(kafkaCluster); | ||
Mono.just(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What sense to use Mono just here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed redundant mono usage there
} | ||
kafkaCluster.setTopics(foundTopics); | ||
ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names()) | ||
.map(tl -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something wrong with idents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
.values() | ||
.values() | ||
.iterator() | ||
.next() | ||
.get(); | ||
.next()).map(s -> Mono.just(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks dirty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
@@ -50,26 +47,26 @@ public void loadClusterMetrics(KafkaCluster kafkaCluster) { | |||
|
|||
|
|||
@SneakyThrows | |||
public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) { | |||
public Mono<ResponseEntity<Topic>> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove ResponseEntity from service layer
.values() | ||
.values() | ||
.iterator() | ||
.next() | ||
.get(); | ||
.next()).subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's pass mono here to upper level
…o refactor-for-mono
|
||
List<Topic> topics = new ArrayList<>(); | ||
private Map<String, TopicDetails> topicDetailsMap = new ConcurrentHashMap<>(); | ||
private List<Topic> topics = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this one is not final?
@@ -59,14 +60,16 @@ | |||
public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) { | |||
KafkaCluster cluster = clustersStorage.getClusterByName(name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Services should not return response entity. This is controller responsibility.
return; | ||
} | ||
|
||
kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be immutable
log.debug("Start getting metrics for kafkaCluster: {}", clusterWithId.getKafkaCluster()); | ||
return kafkaService.getUpdatedCluster(clusterWithId) | ||
.map(s -> { | ||
zookeeperService.checkZookeeperStatus(s.getKafkaCluster()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks strange
ac -> | ||
getClusterMetrics(ac, kafkaCluster).flatMap( | ||
metrics -> { | ||
Cluster cluster = kafkaCluster.getCluster(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should create new instance of cluster
}) | ||
).onErrorResume( | ||
e -> { | ||
Cluster cluster = kafkaCluster.getCluster(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should create new one
listTopicsOptions.listInternal(true); | ||
return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names()) | ||
.map(tl -> { | ||
kafkaCluster.getCluster().setTopicCount(tl.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you mutate it?
} | ||
return s; | ||
}) | ||
.flatMap(td -> collectTopicData(cluster, adminClient, td)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formating?
} | ||
|
||
@SneakyThrows | ||
public static <T> T clone(T subject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's bad idea
brokersMetrics.setBrokerCount(metrics.getBrokerCount()); | ||
brokersMetrics.activeControllers(metrics.getActiveControllers()); | ||
brokersMetrics.brokerCount(metrics.getBrokerCount()); | ||
resetMetrics(brokersMetrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better naming what you exactly reseting
|
||
topic.partitions(partitions); | ||
|
||
if (kafkaCluster.getTopicDetailsMap() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to collect on top level
.flatMap(topics -> | ||
loadTopicConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList())).collectList() | ||
.map(s -> s.stream().collect(Collectors.toMap(map -> new ArrayList<>(map.entrySet()).get(0).getKey(), | ||
e -> new ArrayList<>(e.entrySet()).get(0).getValue()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean up
} | ||
|
||
@SneakyThrows | ||
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) { | ||
public Flux<ConsumerGroup> getConsumerGroup (String clusterName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getConsumerGroup -> getConsumerGroups
Exception lastKafkaException; | ||
Exception lastZookeeperException; | ||
@Builder(toBuilder = true, builderClassName = "KafkaClusterBuilder") | ||
public class KafkaCluster implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Serializable?
NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue()); | ||
newTopic.configs(topicData.getConfigs()); | ||
|
||
createTopic(adminClient, newTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should return mono to next method
partitionDto.partition(partition.partition()); | ||
partitionDto.inSyncReplicasCount(partition.isr().size()); | ||
partitionDto.replicasCount(partition.replicas().size()); | ||
replicas.addAll(partition.replicas().stream().map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replicas should be initialized per partition
@@ -325,6 +325,16 @@ components: | |||
additionalProperties: | |||
type: string | |||
|
|||
Metrics: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it here?
kafka-ui-contract/pom.xml
Outdated
@@ -58,6 +58,7 @@ | |||
<modelPackage>com.provectus.kafka.ui.model</modelPackage> | |||
<apiPackage>com.provectus.kafka.ui.api</apiPackage> | |||
<sourceFolder>kafka-ui-contract</sourceFolder> | |||
<serializableModel>true</serializableModel> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
* Refactor for mono (#28) * starting refactor mono * topics and cluster refactored into mono way instead of get * fixed dirty mistakes * param * changed collecttopicdata method to correctly * refactored metrics to copyonwrite methoology * metrics params updated * fixed silly mistake * Let's think immutable * immutable * dumb mistakes fixed * changed to immutable * added new mappers * changed to immutable * typo was fixed * imports were cleared * Refactored * imports were optimized Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: German Osin <[email protected]> * need to refactor * issue-31/add-ui-prefix-for-react-routes (#34) * issue-31/add-ui-prefix-for-react-routes * added route for /ui path (#40) Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: Roman Nedzvetskiy <[email protected]> * fix-bug-topic-not-creatable-when-no-custom-params (#37) * fix-bug-topic-not-creatable-when-no-custom-params * remove-linter-changes-to-cleanup-PR * Enhancement/improve time to retain usability v2 (#35) * enhancement/improve-time-to-retain-usability * add-btn-controls-for-time-to-retain-for-topics * updating of topic done properly * Added supportedCommands enum in cluster for getting correct method for topics settings update, changed request version from patch to put * AdminClient now boxed into extendedAdminClient * Redundant imports removed * SupportedFeatures method moved to util * Methods of updating cluster separated * Subsribed to change topics methods, replaced path of ts class * fix-routes-for-details (#45) * Added test and changed getAdminClient logic from clusterId to clusterName * Changed updating methods mono logic, added update cluster topic method * Fixed test by removing updateCluster calls, fixed add topic * Removed redundant injection and import * Added behaviour if topic wasn't created * Removed redundant imports * Dublicate lines in ClusterService moved to updateCluster method * CreateTopic method updated -- now returns topicName, update topic method changed -- removed redundant param clusterName Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: German Osin <[email protected]> Co-authored-by: Azat Gataullin <[email protected]>
* starting refactor mono * topics and cluster refactored into mono way instead of get * fixed dirty mistakes * param * changed collecttopicdata method to correctly * refactored metrics to copyonwrite methoology * metrics params updated * fixed silly mistake * Let's think immutable * immutable * dumb mistakes fixed * changed to immutable * added new mappers * changed to immutable * typo was fixed * imports were cleared * Refactored * imports were optimized Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: German Osin <[email protected]>
No description provided.