Skip to content

Commit

Permalink
Only show replication factor input when relevant
Browse files Browse the repository at this point in the history
When creating a new topic from the wizard

- read offsets.topic.replication.factor from broker config to determine max replication factor
- fallback to default.replication.factor
- fallback to the lowest value between 3 and the number of brokers (looking at you cloudkarafka)

If value <= 1, the replication factor input is not shown in the topic wizard

Fixes jlandersen#64

Signed-off-by: Fred Bricon <[email protected]>
  • Loading branch information
fbricon committed Mar 16, 2021
1 parent d63a257 commit f6efaac
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file.
### Added
- Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123).

### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).

## [0.11.0] - 2021-03-08
### Added
- Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61).
Expand Down
58 changes: 55 additions & 3 deletions src/wizards/topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ import { MultiStepInput, showErrorMessage, State } from "./multiStepInput";
import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators";

const DEFAULT_PARTITIONS = 1;
const DEFAULT_REPLICATION_FACTOR = 1;

const REPLICATION_FACTOR_KEY = 'offsets.topic.replication.factor';
const DEFAULT_REPLICATION_FACTOR_KEY = 'default.replication.factor';

interface CreateTopicState extends State {
clusterId: string;
topicName: string;
partitions: string;
replicationFactor: string;
defaultReplicas: number;
maxReplicas: number;
}

export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise<void> {
Expand Down Expand Up @@ -82,13 +86,21 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett

async function collectInputs(state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
if (state.clusterId) {
if (!state.maxReplicas) {
await setDefaultAndMaxReplicas(clientAccessor, state);
if (state.maxReplicas! <= 1) {
state.totalSteps = state.totalSteps! -1;
}
}
await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor));
} else {
await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor));
}
}

async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
//reset total steps
state.totalSteps = 4;
interface ClusterPickItem extends QuickPickItem {
clusterId: string;
}
Expand All @@ -114,6 +126,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTo
activeItem: activeClusterItem
}));
state.clusterId = selectedCluster.clusterId;
await setDefaultAndMaxReplicas(clientAccessor, state);
if (state.maxReplicas! <= 1) {
state.totalSteps = state.totalSteps! -1;
}
return (input: MultiStepInput) => inputTopicName(input, state, clientAccessor);
}

Expand All @@ -135,7 +151,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial<CreateTopicS
}

async function getExistingTopicNames(clientAccessor: ClientAccessor, clusterId?: string): Promise<string[] | undefined> {
if (!clusterId) {return [];}
if (!clusterId) { return []; }
try {
const client = clientAccessor.get(clusterId);
return (await client.getTopics()).map(topic => topic.id);
Expand All @@ -160,12 +176,48 @@ async function inputPartitions(input: MultiStepInput, state: Partial<CreateTopic


async function inputReplicationFactor(input: MultiStepInput, state: Partial<CreateTopicState>) {
if (state.maxReplicas! <= 1) {
state.replicationFactor = state.maxReplicas!.toString();
return;
}
state.replicationFactor = await input.showInputBox({
title: INPUT_TITLE,
step: input.getStepNumber(),
totalSteps: state.totalSteps,
value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(),
value: state.replicationFactor || state.defaultReplicas!.toString(),
prompt: 'Replication Factor',
validationContext: state.maxReplicas,
validate: validateReplicationFactor
});
}
async function setDefaultAndMaxReplicas(clientAccessor: ClientAccessor, state: Partial<CreateTopicState>): Promise<void> {
const client = clientAccessor.get(state.clusterId!);
const brokers = await client.getBrokers();
let defaultReplicationFactor = -1;
let maxReplicas = 1;
if (brokers) {
maxReplicas = brokers.length;
try {
for (let i = 0; i < brokers.length && defaultReplicationFactor < 0; i++) {
const configs = await client.getBrokerConfigs(brokers[i].id);
let config = configs.find(ce => ce.configName === REPLICATION_FACTOR_KEY);
if (config) {
defaultReplicationFactor = parseInt(config.configValue, 10);
} else {
config = configs.find(ce => ce.configName === DEFAULT_REPLICATION_FACTOR_KEY);
if (config) {
defaultReplicationFactor = parseInt(config.configValue, 10);
}
}
}
} catch (e) {
console.log(`Failed to read replication factor configuration from broker: `+e.message);
}
if (defaultReplicationFactor < 0) {
defaultReplicationFactor = Math.min(3, maxReplicas);
}
}
state.maxReplicas = maxReplicas;
state.defaultReplicas = defaultReplicationFactor;
}

13 changes: 8 additions & 5 deletions src/wizards/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ export async function validatePartitions(partitions: string): Promise<string | u
return validateFieldPositiveNumber(PARTITIONS_FIELD, partitions);
}

export async function validateReplicationFactor(replicationFactor: string): Promise<string | undefined> {
export async function validateReplicationFactor(replicationFactor: string, max: number): Promise<string | undefined> {
const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor);
if (result) {
return result;
}
return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor);
}
return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max);
}

// ------------------ Commons Validators
Expand All @@ -80,9 +80,12 @@ function validateFieldUniqueValue(name: string, value: string, values: string[])
}
}

function validateFieldPositiveNumber(name: string, value: string): string | undefined {
const valueAsNumber = parseInt(value, 10);
function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined {
const valueAsNumber = Number(value);
if (isNaN(valueAsNumber) || valueAsNumber < 1) {
return `${name} must be a positive number.`;
}
if (max && valueAsNumber > max) {
return `${name} can not be greater than ${max}.`;
}
}

0 comments on commit f6efaac

Please sign in to comment.