Skip to content

Commit

Permalink
Only show replication factor input when relevant
Browse files Browse the repository at this point in the history
When creating a new topic from the wizard:

 - read offsets.topic.replication.factor from broker config to determine default replication factor
 - fallback to default.replication.factor
 - fallback to the lowest value between 3 and the number of brokers (looking at you cloudkarafka)

The max number of replicas is determined by the number of brokers.

If value <= 1, the replication factor input is not shown in the topic wizard
If value > max replicas, a validation error is displayed

Signed-off-by: Fred Bricon <[email protected]>
  • Loading branch information
fbricon committed Mar 17, 2021
1 parent d63a257 commit e99a28d
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file.
### Added
- Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123).

### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).

## [0.11.0] - 2021-03-08
### Added
- Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61).
Expand Down
21 changes: 21 additions & 0 deletions src/client/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* @see https://kafka.apache.org/documentation/#brokerconfigs
*/
export namespace BrokerConfigs {

/**
* @see https://kafka.apache.org/documentation/#brokerconfigs_default.replication.factor
*/
export const DEFAULT_REPLICATION_FACTOR = 'default.replication.factor';

/**
* @see https://kafka.apache.org/documentation/#brokerconfigs_offsets.topic.replication.factor
*/
export const OFFSETS_TOPIC_REPLICATION_FACTOR = 'offsets.topic.replication.factor';

/**
* @see https://kafka.apache.org/documentation/#brokerconfigs_auto.create.topics.enable
*/
export const AUTO_CREATE_TOPIC_ENABLE = 'auto.create.topics.enable';

}
12 changes: 5 additions & 7 deletions src/commands/topics.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { ClusterSettings } from "../settings";
import { dump } from "js-yaml";
import * as vscode from "vscode";

import { dump } from "js-yaml";
import { ClusterSettings } from "../settings";
import { Topic, ClientAccessor } from "../client";
import { KafkaExplorer, TopicItem } from "../explorer";
import { OutputChannelProvider } from "../providers";
import { addTopicWizard } from "../wizards/topics";
import { pickClient, pickTopic } from "./common";

const AUTO_CREATE_TOPIC_KEY = 'auto.create.topics.enable';
import { BrokerConfigs } from "../client/config";

export class CreateTopicCommandHandler {

Expand Down Expand Up @@ -76,7 +74,7 @@ export class DeleteTopicCommandHandler {
if (brokers) {
for (let i = 0; i < brokers.length && !autoCreateTopicsEnabled; i++) {
const configs = await client?.getBrokerConfigs(brokers[i].id);
const config = configs?.find(ce => ce.configName === AUTO_CREATE_TOPIC_KEY);
const config = configs?.find(ce => ce.configName === BrokerConfigs.AUTO_CREATE_TOPIC_ENABLE);
if (config) {
autoCreateTopicsEnabled = config.configValue === 'true';
}
Expand All @@ -85,7 +83,7 @@ export class DeleteTopicCommandHandler {

let warning = `Are you sure you want to delete topic '${topicToDelete.id}'?`;
if (autoCreateTopicsEnabled) {
warning += ` The cluster is configured with '${AUTO_CREATE_TOPIC_KEY}=true', so the topic might be recreated automatically.`;
warning += ` The cluster is configured with '${BrokerConfigs.AUTO_CREATE_TOPIC_ENABLE}=true', so the topic might be recreated automatically.`;
}
const deleteConfirmation = await vscode.window.showWarningMessage(warning, 'Cancel', 'Delete');
if (deleteConfirmation !== 'Delete') {
Expand Down
57 changes: 53 additions & 4 deletions src/wizards/topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import { INPUT_TITLE } from "../constants";
import { KafkaExplorer } from "../explorer/kafkaExplorer";
import { MultiStepInput, showErrorMessage, State } from "./multiStepInput";
import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators";
import { BrokerConfigs } from "../client/config";

const DEFAULT_PARTITIONS = 1;
const DEFAULT_REPLICATION_FACTOR = 1;

interface CreateTopicState extends State {
clusterId: string;
topicName: string;
partitions: string;
replicationFactor: string;
defaultReplicas: number;
maxReplicas: number;
}

export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise<void> {
Expand Down Expand Up @@ -82,13 +83,21 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett

async function collectInputs(state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
if (state.clusterId) {
if (!state.maxReplicas) {
await setDefaultAndMaxReplicas(clientAccessor, state);
if (state.maxReplicas! <= 1) {
state.totalSteps = state.totalSteps! - 1;
}
}
await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor));
} else {
await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor));
}
}

async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
//reset total steps
state.totalSteps = 4;
interface ClusterPickItem extends QuickPickItem {
clusterId: string;
}
Expand All @@ -114,6 +123,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTo
activeItem: activeClusterItem
}));
state.clusterId = selectedCluster.clusterId;
await setDefaultAndMaxReplicas(clientAccessor, state);
if (state.maxReplicas! <= 1) {
state.totalSteps = state.totalSteps! - 1;
}
return (input: MultiStepInput) => inputTopicName(input, state, clientAccessor);
}

Expand All @@ -135,7 +148,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial<CreateTopicS
}

async function getExistingTopicNames(clientAccessor: ClientAccessor, clusterId?: string): Promise<string[] | undefined> {
if (!clusterId) {return [];}
if (!clusterId) { return []; }
try {
const client = clientAccessor.get(clusterId);
return (await client.getTopics()).map(topic => topic.id);
Expand All @@ -160,12 +173,48 @@ async function inputPartitions(input: MultiStepInput, state: Partial<CreateTopic


async function inputReplicationFactor(input: MultiStepInput, state: Partial<CreateTopicState>) {
if (state.maxReplicas! <= 1) {
state.replicationFactor = state.maxReplicas!.toString();
return;
}
state.replicationFactor = await input.showInputBox({
title: INPUT_TITLE,
step: input.getStepNumber(),
totalSteps: state.totalSteps,
value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(),
value: state.replicationFactor || state.defaultReplicas!.toString(),
prompt: 'Replication Factor',
validationContext: state.maxReplicas,
validate: validateReplicationFactor
});
}
async function setDefaultAndMaxReplicas(clientAccessor: ClientAccessor, state: Partial<CreateTopicState>): Promise<void> {
const client = clientAccessor.get(state.clusterId!);
const brokers = await client.getBrokers();
let defaultReplicationFactor = -1;
let maxReplicas = 1;
if (brokers) {
maxReplicas = brokers.length;
try {
for (let i = 0; i < brokers.length && defaultReplicationFactor < 0; i++) {
const configs = await client.getBrokerConfigs(brokers[i].id);
let config = configs.find(ce => ce.configName === BrokerConfigs.OFFSETS_TOPIC_REPLICATION_FACTOR);
if (config) {
defaultReplicationFactor = parseInt(config.configValue, 10);
} else {
config = configs.find(ce => ce.configName === BrokerConfigs.DEFAULT_REPLICATION_FACTOR);
if (config) {
defaultReplicationFactor = parseInt(config.configValue, 10);
}
}
}
} catch (e) {
console.log(`Failed to read replication factor configuration from broker: ${e.message}`);
}
if (defaultReplicationFactor < 0) {
defaultReplicationFactor = Math.min(3, maxReplicas);
}
}
state.maxReplicas = maxReplicas;
state.defaultReplicas = defaultReplicationFactor;
}

13 changes: 8 additions & 5 deletions src/wizards/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ export async function validatePartitions(partitions: string): Promise<string | u
return validateFieldPositiveNumber(PARTITIONS_FIELD, partitions);
}

export async function validateReplicationFactor(replicationFactor: string): Promise<string | undefined> {
export async function validateReplicationFactor(replicationFactor: string, max: number): Promise<string | undefined> {
const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor);
if (result) {
return result;
}
return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor);
}
return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max);
}

// ------------------ Commons Validators
Expand All @@ -80,9 +80,12 @@ function validateFieldUniqueValue(name: string, value: string, values: string[])
}
}

function validateFieldPositiveNumber(name: string, value: string): string | undefined {
const valueAsNumber = parseInt(value, 10);
function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined {
const valueAsNumber = Number(value);
if (isNaN(valueAsNumber) || valueAsNumber < 1) {
return `${name} must be a positive number.`;
}
if (max && valueAsNumber > max) {
return `${name} can not be greater than ${max}.`;
}
}

0 comments on commit e99a28d

Please sign in to comment.