diff --git a/src/wizards/topics.ts b/src/wizards/topics.ts index 575905b..2252374 100644 --- a/src/wizards/topics.ts +++ b/src/wizards/topics.ts @@ -7,13 +7,16 @@ import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators"; const DEFAULT_PARTITIONS = 1; -const DEFAULT_REPLICATION_FACTOR = 1; + +const REPLICATION_FACTOR_KEY = 'offsets.topic.replication.factor'; +const DEFAULT_REPLICATION_FACTOR_KEY = 'default.replication.factor'; interface CreateTopicState extends State { clusterId: string; topicName: string; partitions: string; replicationFactor: string; + maxReplicas: number; } export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise { @@ -82,6 +85,12 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett async function collectInputs(state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { if (state.clusterId) { + if (!state.maxReplicas) { + state.maxReplicas = await getMaxReplicas(clientAccessor, state.clusterId); + if (state.maxReplicas <= 1) { + state.totalSteps = state.totalSteps! -1; + } + } await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor)); } else { await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor)); @@ -89,6 +98,8 @@ async function collectInputs(state: Partial, clusterSettings: } async function inputSelectCluster(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { + //reset total steps + state.totalSteps = 4; interface ClusterPickItem extends QuickPickItem { clusterId: string; } @@ -114,6 +125,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial inputTopicName(input, state, clientAccessor); } @@ -135,7 +150,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial { - if (!clusterId) {return [];} + if (!clusterId) { return []; } try { const client = clientAccessor.get(clusterId); return (await client.getTopics()).map(topic => topic.id); @@ -160,12 +175,44 @@ async function inputPartitions(input: MultiStepInput, state: Partial) { + if (state.maxReplicas! <= 1) { + state.replicationFactor = state.maxReplicas!.toString(); + return; + } state.replicationFactor = await input.showInputBox({ title: INPUT_TITLE, step: input.getStepNumber(), totalSteps: state.totalSteps, - value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(), + value: state.replicationFactor || state.maxReplicas!.toString(), prompt: 'Replication Factor', validate: validateReplicationFactor }); } +async function getMaxReplicas(clientAccessor: ClientAccessor, clusterId: string): Promise { + const client = clientAccessor.get(clusterId); + const brokers = await client.getBrokers(); + let replicationFactor = -1; + if (brokers) { + try { + for (let i = 0; i < brokers.length && replicationFactor < 0; i++) { + const configs = await client.getBrokerConfigs(brokers[i].id); + let config = configs.find(ce => ce.configName === REPLICATION_FACTOR_KEY); + if (config) { + replicationFactor = parseInt(config.configValue, 10); + } else { + config = configs.find(ce => ce.configName === DEFAULT_REPLICATION_FACTOR_KEY); + if (config) { + replicationFactor = parseInt(config.configValue, 10); + } + } + } + } catch (e) { + console.log(`Failed to read replication factor configuration from broker: `+e.message); + } + if (replicationFactor < 0) { + replicationFactor = Math.min(3, brokers.length); + } + } + return replicationFactor; +} +