-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backend topics changing #44
Backend topics changing #44
Conversation
* starting refactor mono * topics and cluster refactored into mono way instead of get * fixed dirty mistakes * param * changed collecttopicdata method to correctly * refactored metrics to copyonwrite methoology * metrics params updated * fixed silly mistake * Let's think immutable * immutable * dumb mistakes fixed * changed to immutable * added new mappers * changed to immutable * typo was fixed * imports were cleared * Refactored * imports were optimized Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: German Osin <[email protected]>
* issue-31/add-ui-prefix-for-react-routes * added route for /ui path (#40) Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: Roman Nedzvetskiy <[email protected]> Co-authored-by: Roman Nedzvetskiy <[email protected]>
* fix-bug-topic-not-creatable-when-no-custom-params * remove-linter-changes-to-cleanup-PR
* enhancement/improve-time-to-retain-usability * add-btn-controls-for-time-to-retain-for-topics
@@ -62,6 +63,14 @@ | |||
return kafkaService.createTopic(cluster, topicFormData); | |||
} | |||
|
|||
public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) { | |||
KafkaCluster cluster = clustersStorage.getClusterByName(clusterName); | |||
if (cluster == null) return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is bad idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to return Mono.error
public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) { | ||
KafkaCluster cluster = clustersStorage.getClusterByName(clusterName); | ||
if (cluster == null) return null; | ||
return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked to move this logic outside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to controller level
@@ -72,6 +77,27 @@ public void loadClusterMetrics(KafkaCluster kafkaCluster) { | |||
); | |||
} | |||
|
|||
public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData, Integer id) { | |||
ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refactor it
List<ConfigResource> brokerCR = Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())); | ||
return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(brokerCR).all()) | ||
.flatMap(c -> { | ||
if (oldClusterVersion(c, cluster)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could cache cluster version on adminclient cache level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cluster supported commands enum added to KafkaCluster object
@@ -7,4 +7,7 @@ private ZooKeeperConstants() {} | |||
public static int ONLINE = 1; | |||
public static int OFFLINE = 0; | |||
|
|||
public static int CONNECTION_TIMEOUT_MS = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
@@ -141,6 +141,34 @@ paths: | |||
application/json: | |||
schema: | |||
$ref: '#/components/schemas/TopicDetails' | |||
patch: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not PUT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to put
…r topics settings update, changed request version from patch to put
@@ -31,4 +39,22 @@ public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, K | |||
consumerGroup.setNumTopics(topics.size()); | |||
return consumerGroup; | |||
} | |||
|
|||
public static void setSupportedCommands(KafkaCluster cluster, Map<ConfigResource, Config> configs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method should provide immutable list of supported features
public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData, Integer id) { | ||
ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName); | ||
List<ConfigResource> brokerCR = Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())); | ||
return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(brokerCR).all()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do it only once on adminclient init
ClusterUtil.setSupportedCommands(cluster, c); | ||
} | ||
if (cluster.getSupportedCommands().contains(SupportedCommands.INCREMENTAL_ALTER_CONFIGS)) { | ||
List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll separate these methods for clear code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separated
} | ||
|
||
@SneakyThrows | ||
private String getClusterId(KafkaCluster kafkaCluster) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking method?
…hod changed -- removed redundant param clusterName
No description provided.