diff --git a/CHANGELOG.md b/CHANGELOG.md index ddff6a5..991ef15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file. ### Added - Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123). +### Changed +- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64). + ## [0.11.0] - 2021-03-08 ### Added - Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61). diff --git a/src/wizards/topics.ts b/src/wizards/topics.ts index 575905b..b799847 100644 --- a/src/wizards/topics.ts +++ b/src/wizards/topics.ts @@ -7,13 +7,17 @@ import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators"; const DEFAULT_PARTITIONS = 1; -const DEFAULT_REPLICATION_FACTOR = 1; + +const REPLICATION_FACTOR_KEY = 'offsets.topic.replication.factor'; +const DEFAULT_REPLICATION_FACTOR_KEY = 'default.replication.factor'; interface CreateTopicState extends State { clusterId: string; topicName: string; partitions: string; replicationFactor: string; + defaultReplicas: number; + maxReplicas: number; } export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise { @@ -82,6 +86,12 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett async function collectInputs(state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { if (state.clusterId) { + if (!state.maxReplicas) { + await setDefaultAndMaxReplicas(clientAccessor, state); + if (state.maxReplicas! <= 1) { + state.totalSteps = state.totalSteps! -1; + } + } await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor)); } else { await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor)); @@ -89,6 +99,8 @@ async function collectInputs(state: Partial, clusterSettings: } async function inputSelectCluster(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { + //reset total steps + state.totalSteps = 4; interface ClusterPickItem extends QuickPickItem { clusterId: string; } @@ -114,6 +126,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial inputTopicName(input, state, clientAccessor); } @@ -135,7 +151,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial { - if (!clusterId) {return [];} + if (!clusterId) { return []; } try { const client = clientAccessor.get(clusterId); return (await client.getTopics()).map(topic => topic.id); @@ -160,12 +176,48 @@ async function inputPartitions(input: MultiStepInput, state: Partial) { + if (state.maxReplicas! <= 1) { + state.replicationFactor = state.maxReplicas!.toString(); + return; + } state.replicationFactor = await input.showInputBox({ title: INPUT_TITLE, step: input.getStepNumber(), totalSteps: state.totalSteps, - value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(), + value: state.replicationFactor || state.defaultReplicas!.toString(), prompt: 'Replication Factor', + validationContext: state.maxReplicas, validate: validateReplicationFactor }); } +async function setDefaultAndMaxReplicas(clientAccessor: ClientAccessor, state: Partial): Promise { + const client = clientAccessor.get(state.clusterId!); + const brokers = await client.getBrokers(); + let defaultReplicationFactor = -1; + let maxReplicas = 1; + if (brokers) { + maxReplicas = brokers.length; + try { + for (let i = 0; i < brokers.length && defaultReplicationFactor < 0; i++) { + const configs = await client.getBrokerConfigs(brokers[i].id); + let config = configs.find(ce => ce.configName === REPLICATION_FACTOR_KEY); + if (config) { + defaultReplicationFactor = parseInt(config.configValue, 10); + } else { + config = configs.find(ce => ce.configName === DEFAULT_REPLICATION_FACTOR_KEY); + if (config) { + defaultReplicationFactor = parseInt(config.configValue, 10); + } + } + } + } catch (e) { + console.log(`Failed to read replication factor configuration from broker: `+e.message); + } + if (defaultReplicationFactor < 0) { + defaultReplicationFactor = Math.min(3, maxReplicas); + } + } + state.maxReplicas = maxReplicas; + state.defaultReplicas = defaultReplicationFactor; +} + diff --git a/src/wizards/validators.ts b/src/wizards/validators.ts index cd52032..07f9035 100644 --- a/src/wizards/validators.ts +++ b/src/wizards/validators.ts @@ -55,12 +55,12 @@ export async function validatePartitions(partitions: string): Promise { +export async function validateReplicationFactor(replicationFactor: string, max: number): Promise { const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor); if (result) { return result; - } - return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor); + } + return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max); } // ------------------ Commons Validators @@ -80,9 +80,12 @@ function validateFieldUniqueValue(name: string, value: string, values: string[]) } } -function validateFieldPositiveNumber(name: string, value: string): string | undefined { - const valueAsNumber = parseInt(value, 10); +function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined { + const valueAsNumber = Number(value); if (isNaN(valueAsNumber) || valueAsNumber < 1) { return `${name} must be a positive number.`; } + if (max && valueAsNumber > max) { + return `${name} can not be greater than ${max}.`; + } }