From 0a199c2114f764448273d033952f76747a1d4510 Mon Sep 17 00:00:00 2001 From: Fred Bricon Date: Tue, 16 Mar 2021 11:31:27 +0100 Subject: [PATCH] Only show replication factor input when relevant When creating a new topic from the wizard - read offsets.topic.replication.factor from broker config to determine max replication factor - fallback to default.replication.factor - fallback to the lowest value between 3 and the number of brokers (looking at you cloudkarafka) If value <= 1, the replication factor input is not shown in the topic wizard Fixes #64 Signed-off-by: Fred Bricon --- CHANGELOG.md | 3 +++ src/wizards/topics.ts | 53 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ddff6a5..991ef15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file. ### Added - Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123). +### Changed +- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64). + ## [0.11.0] - 2021-03-08 ### Added - Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61). diff --git a/src/wizards/topics.ts b/src/wizards/topics.ts index 575905b..2252374 100644 --- a/src/wizards/topics.ts +++ b/src/wizards/topics.ts @@ -7,13 +7,16 @@ import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators"; const DEFAULT_PARTITIONS = 1; -const DEFAULT_REPLICATION_FACTOR = 1; + +const REPLICATION_FACTOR_KEY = 'offsets.topic.replication.factor'; +const DEFAULT_REPLICATION_FACTOR_KEY = 'default.replication.factor'; interface CreateTopicState extends State { clusterId: string; topicName: string; partitions: string; replicationFactor: string; + maxReplicas: number; } export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise { @@ -82,6 +85,12 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett async function collectInputs(state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { if (state.clusterId) { + if (!state.maxReplicas) { + state.maxReplicas = await getMaxReplicas(clientAccessor, state.clusterId); + if (state.maxReplicas <= 1) { + state.totalSteps = state.totalSteps! -1; + } + } await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor)); } else { await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor)); @@ -89,6 +98,8 @@ async function collectInputs(state: Partial, clusterSettings: } async function inputSelectCluster(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { + //reset total steps + state.totalSteps = 4; interface ClusterPickItem extends QuickPickItem { clusterId: string; } @@ -114,6 +125,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial inputTopicName(input, state, clientAccessor); } @@ -135,7 +150,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial { - if (!clusterId) {return [];} + if (!clusterId) { return []; } try { const client = clientAccessor.get(clusterId); return (await client.getTopics()).map(topic => topic.id); @@ -160,12 +175,44 @@ async function inputPartitions(input: MultiStepInput, state: Partial) { + if (state.maxReplicas! <= 1) { + state.replicationFactor = state.maxReplicas!.toString(); + return; + } state.replicationFactor = await input.showInputBox({ title: INPUT_TITLE, step: input.getStepNumber(), totalSteps: state.totalSteps, - value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(), + value: state.replicationFactor || state.maxReplicas!.toString(), prompt: 'Replication Factor', validate: validateReplicationFactor }); } +async function getMaxReplicas(clientAccessor: ClientAccessor, clusterId: string): Promise { + const client = clientAccessor.get(clusterId); + const brokers = await client.getBrokers(); + let replicationFactor = -1; + if (brokers) { + try { + for (let i = 0; i < brokers.length && replicationFactor < 0; i++) { + const configs = await client.getBrokerConfigs(brokers[i].id); + let config = configs.find(ce => ce.configName === REPLICATION_FACTOR_KEY); + if (config) { + replicationFactor = parseInt(config.configValue, 10); + } else { + config = configs.find(ce => ce.configName === DEFAULT_REPLICATION_FACTOR_KEY); + if (config) { + replicationFactor = parseInt(config.configValue, 10); + } + } + } + } catch (e) { + console.log(`Failed to read replication factor configuration from broker: `+e.message); + } + if (replicationFactor < 0) { + replicationFactor = Math.min(3, brokers.length); + } + } + return replicationFactor; +} +