Skip to content

Commit

Permalink
Only show replication factor input when relevant
Browse files Browse the repository at this point in the history
When creating a new topic from the wizard

- read offsets.topic.replication.factor from broker config to determine max replication factor
- fallback to default.replication.factor
- fallback to the lowest value between 3 and the number of brokers (looking at you cloudkarafka)

If value <= 1, the replication factor input is not shown in the topic wizard

Fixes #64

Signed-off-by: Fred Bricon <[email protected]>
  • Loading branch information
fbricon committed Mar 16, 2021
1 parent d63a257 commit 0a199c2
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file.
### Added
- Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123).

### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).

## [0.11.0] - 2021-03-08
### Added
- Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61).
Expand Down
53 changes: 50 additions & 3 deletions src/wizards/topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import { MultiStepInput, showErrorMessage, State } from "./multiStepInput";
import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators";

const DEFAULT_PARTITIONS = 1;
const DEFAULT_REPLICATION_FACTOR = 1;

const REPLICATION_FACTOR_KEY = 'offsets.topic.replication.factor';
const DEFAULT_REPLICATION_FACTOR_KEY = 'default.replication.factor';

interface CreateTopicState extends State {
clusterId: string;
topicName: string;
partitions: string;
replicationFactor: string;
maxReplicas: number;
}

export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise<void> {
Expand Down Expand Up @@ -82,13 +85,21 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett

async function collectInputs(state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
if (state.clusterId) {
if (!state.maxReplicas) {
state.maxReplicas = await getMaxReplicas(clientAccessor, state.clusterId);
if (state.maxReplicas <= 1) {
state.totalSteps = state.totalSteps! -1;
}
}
await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor));
} else {
await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor));
}
}

async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
//reset total steps
state.totalSteps = 4;
interface ClusterPickItem extends QuickPickItem {
clusterId: string;
}
Expand All @@ -114,6 +125,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTo
activeItem: activeClusterItem
}));
state.clusterId = selectedCluster.clusterId;
state.maxReplicas = await getMaxReplicas(clientAccessor, state.clusterId!);
if (state.maxReplicas <= 1) {
state.totalSteps = state.totalSteps! -1;
}
return (input: MultiStepInput) => inputTopicName(input, state, clientAccessor);
}

Expand All @@ -135,7 +150,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial<CreateTopicS
}

async function getExistingTopicNames(clientAccessor: ClientAccessor, clusterId?: string): Promise<string[] | undefined> {
if (!clusterId) {return [];}
if (!clusterId) { return []; }
try {
const client = clientAccessor.get(clusterId);
return (await client.getTopics()).map(topic => topic.id);
Expand All @@ -160,12 +175,44 @@ async function inputPartitions(input: MultiStepInput, state: Partial<CreateTopic


async function inputReplicationFactor(input: MultiStepInput, state: Partial<CreateTopicState>) {
if (state.maxReplicas! <= 1) {
state.replicationFactor = state.maxReplicas!.toString();
return;
}
state.replicationFactor = await input.showInputBox({
title: INPUT_TITLE,
step: input.getStepNumber(),
totalSteps: state.totalSteps,
value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(),
value: state.replicationFactor || state.maxReplicas!.toString(),
prompt: 'Replication Factor',
validate: validateReplicationFactor
});
}
async function getMaxReplicas(clientAccessor: ClientAccessor, clusterId: string): Promise<number> {
const client = clientAccessor.get(clusterId);
const brokers = await client.getBrokers();
let replicationFactor = -1;
if (brokers) {
try {
for (let i = 0; i < brokers.length && replicationFactor < 0; i++) {
const configs = await client.getBrokerConfigs(brokers[i].id);
let config = configs.find(ce => ce.configName === REPLICATION_FACTOR_KEY);
if (config) {
replicationFactor = parseInt(config.configValue, 10);
} else {
config = configs.find(ce => ce.configName === DEFAULT_REPLICATION_FACTOR_KEY);
if (config) {
replicationFactor = parseInt(config.configValue, 10);
}
}
}
} catch (e) {
console.log(`Failed to read replication factor configuration from broker: `+e.message);
}
if (replicationFactor < 0) {
replicationFactor = Math.min(3, brokers.length);
}
}
return replicationFactor;
}

0 comments on commit 0a199c2

Please sign in to comment.