From d3694f4d310af50fbfe02e38209aedb4d24eeaa5 Mon Sep 17 00:00:00 2001 From: Fred Bricon Date: Tue, 16 Mar 2021 11:31:27 +0100 Subject: [PATCH] Only show replication factor input when relevant When creating a new topic from the wizard: - read offsets.topic.replication.factor from broker config to determine default replication factor - fallback to default.replication.factor - fallback to the lowest value between 3 and the number of brokers (looking at you cloudkarafka) The max number of replicas is determined by the number of brokers. If value <= 1, the replication factor input is not shown in the topic wizard If value > max replicas, a validation error is displayed Signed-off-by: Fred Bricon --- CHANGELOG.md | 3 +++ src/client/config.ts | 21 +++++++++++++++ src/commands/topics.ts | 12 ++++----- src/wizards/topics.ts | 57 ++++++++++++++++++++++++++++++++++++--- src/wizards/validators.ts | 13 +++++---- 5 files changed, 90 insertions(+), 16 deletions(-) create mode 100644 src/client/config.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ddff6a5..991ef15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file. ### Added - Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123). +### Changed +- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64). + ## [0.11.0] - 2021-03-08 ### Added - Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61). diff --git a/src/client/config.ts b/src/client/config.ts new file mode 100644 index 0000000..05d6606 --- /dev/null +++ b/src/client/config.ts @@ -0,0 +1,21 @@ +/** + * @see https://kafka.apache.org/documentation/#brokerconfigs + */ +export namespace BrokerConfigs { + + /** + * @see https://kafka.apache.org/documentation/#brokerconfigs_default.replication.factor + */ + export const DEFAULT_REPLICATION_FACTOR = 'default.replication.factor'; + + /** + * @see https://kafka.apache.org/documentation/#brokerconfigs_offsets.topic.replication.factor + */ + export const OFFSETS_TOPIC_REPLICATION_FACTOR = 'offsets.topic.replication.factor'; + + /** + * @see https://kafka.apache.org/documentation/#brokerconfigs_auto.create.topics.enable + */ + export const AUTO_CREATE_TOPIC_ENABLE = 'auto.create.topics.enable'; + +} diff --git a/src/commands/topics.ts b/src/commands/topics.ts index 9f4e2cf..586dc1a 100644 --- a/src/commands/topics.ts +++ b/src/commands/topics.ts @@ -1,14 +1,12 @@ -import { ClusterSettings } from "../settings"; -import { dump } from "js-yaml"; import * as vscode from "vscode"; - +import { dump } from "js-yaml"; +import { ClusterSettings } from "../settings"; import { Topic, ClientAccessor } from "../client"; import { KafkaExplorer, TopicItem } from "../explorer"; import { OutputChannelProvider } from "../providers"; import { addTopicWizard } from "../wizards/topics"; import { pickClient, pickTopic } from "./common"; - -const AUTO_CREATE_TOPIC_KEY = 'auto.create.topics.enable'; +import { BrokerConfigs } from "../client/config"; export class CreateTopicCommandHandler { @@ -76,7 +74,7 @@ export class DeleteTopicCommandHandler { if (brokers) { for (let i = 0; i < brokers.length && !autoCreateTopicsEnabled; i++) { const configs = await client?.getBrokerConfigs(brokers[i].id); - const config = configs?.find(ce => ce.configName === AUTO_CREATE_TOPIC_KEY); + const config = configs?.find(ce => ce.configName === BrokerConfigs.AUTO_CREATE_TOPIC_ENABLE); if (config) { autoCreateTopicsEnabled = config.configValue === 'true'; } @@ -85,7 +83,7 @@ export class DeleteTopicCommandHandler { let warning = `Are you sure you want to delete topic '${topicToDelete.id}'?`; if (autoCreateTopicsEnabled) { - warning += ` The cluster is configured with '${AUTO_CREATE_TOPIC_KEY}=true', so the topic might be recreated automatically.`; + warning += ` The cluster is configured with '${BrokerConfigs.AUTO_CREATE_TOPIC_ENABLE}=true', so the topic might be recreated automatically.`; } const deleteConfirmation = await vscode.window.showWarningMessage(warning, 'Cancel', 'Delete'); if (deleteConfirmation !== 'Delete') { diff --git a/src/wizards/topics.ts b/src/wizards/topics.ts index 575905b..d0d9610 100644 --- a/src/wizards/topics.ts +++ b/src/wizards/topics.ts @@ -5,15 +5,16 @@ import { INPUT_TITLE } from "../constants"; import { KafkaExplorer } from "../explorer/kafkaExplorer"; import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators"; +import { BrokerConfigs } from "../client/config"; const DEFAULT_PARTITIONS = 1; -const DEFAULT_REPLICATION_FACTOR = 1; - interface CreateTopicState extends State { clusterId: string; topicName: string; partitions: string; replicationFactor: string; + defaultReplicas: number; + maxReplicas: number; } export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise { @@ -82,6 +83,12 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett async function collectInputs(state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { if (state.clusterId) { + if (!state.maxReplicas) { + await setDefaultAndMaxReplicas(clientAccessor, state); + if (state.maxReplicas! <= 1) { + state.totalSteps = state.totalSteps! - 1; + } + } await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor)); } else { await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor)); @@ -89,6 +96,8 @@ async function collectInputs(state: Partial, clusterSettings: } async function inputSelectCluster(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { + //reset total steps + state.totalSteps = 4; interface ClusterPickItem extends QuickPickItem { clusterId: string; } @@ -114,6 +123,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial inputTopicName(input, state, clientAccessor); } @@ -135,7 +148,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial { - if (!clusterId) {return [];} + if (!clusterId) { return []; } try { const client = clientAccessor.get(clusterId); return (await client.getTopics()).map(topic => topic.id); @@ -160,12 +173,48 @@ async function inputPartitions(input: MultiStepInput, state: Partial) { + if (state.maxReplicas! <= 1) { + state.replicationFactor = state.maxReplicas!.toString(); + return; + } state.replicationFactor = await input.showInputBox({ title: INPUT_TITLE, step: input.getStepNumber(), totalSteps: state.totalSteps, - value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(), + value: state.replicationFactor || state.defaultReplicas!.toString(), prompt: 'Replication Factor', + validationContext: state.maxReplicas, validate: validateReplicationFactor }); } +async function setDefaultAndMaxReplicas(clientAccessor: ClientAccessor, state: Partial): Promise { + const client = clientAccessor.get(state.clusterId!); + const brokers = await client.getBrokers(); + let defaultReplicationFactor = -1; + let maxReplicas = 1; + if (brokers) { + maxReplicas = brokers.length; + try { + for (let i = 0; i < brokers.length && defaultReplicationFactor < 0; i++) { + const configs = await client.getBrokerConfigs(brokers[i].id); + let config = configs.find(ce => ce.configName === BrokerConfigs.OFFSETS_TOPIC_REPLICATION_FACTOR); + if (config) { + defaultReplicationFactor = parseInt(config.configValue, 10); + } else { + config = configs.find(ce => ce.configName === BrokerConfigs.DEFAULT_REPLICATION_FACTOR); + if (config) { + defaultReplicationFactor = parseInt(config.configValue, 10); + } + } + } + } catch (e) { + console.log(`Failed to read replication factor configuration from broker: ${e.message}`); + } + if (defaultReplicationFactor < 0) { + defaultReplicationFactor = Math.min(3, maxReplicas); + } + } + state.maxReplicas = maxReplicas; + state.defaultReplicas = defaultReplicationFactor; +} + diff --git a/src/wizards/validators.ts b/src/wizards/validators.ts index cd52032..07f9035 100644 --- a/src/wizards/validators.ts +++ b/src/wizards/validators.ts @@ -55,12 +55,12 @@ export async function validatePartitions(partitions: string): Promise { +export async function validateReplicationFactor(replicationFactor: string, max: number): Promise { const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor); if (result) { return result; - } - return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor); + } + return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max); } // ------------------ Commons Validators @@ -80,9 +80,12 @@ function validateFieldUniqueValue(name: string, value: string, values: string[]) } } -function validateFieldPositiveNumber(name: string, value: string): string | undefined { - const valueAsNumber = parseInt(value, 10); +function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined { + const valueAsNumber = Number(value); if (isNaN(valueAsNumber) || valueAsNumber < 1) { return `${name} must be a positive number.`; } + if (max && valueAsNumber > max) { + return `${name} can not be greater than ${max}.`; + } }