Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only show replication factor input when relevant #130

Merged
merged 1 commit into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to Kafka extension will be documented in this file.
### Added
- Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123).

### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).

## [0.11.0] - 2021-03-08
### Added
- Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61).
Expand Down
21 changes: 21 additions & 0 deletions src/client/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* @see https://kafka.apache.org/documentation/#brokerconfigs
*/
export namespace BrokerConfigs {

/**
* @see https://kafka.apache.org/documentation/#brokerconfigs_default.replication.factor
*/
export const DEFAULT_REPLICATION_FACTOR = 'default.replication.factor';

/**
* @see https://kafka.apache.org/documentation/#brokerconfigs_offsets.topic.replication.factor
*/
export const OFFSETS_TOPIC_REPLICATION_FACTOR = 'offsets.topic.replication.factor';

/**
* @see https://kafka.apache.org/documentation/#brokerconfigs_auto.create.topics.enable
*/
export const AUTO_CREATE_TOPIC_ENABLE = 'auto.create.topics.enable';

}
12 changes: 5 additions & 7 deletions src/commands/topics.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { ClusterSettings } from "../settings";
import { dump } from "js-yaml";
import * as vscode from "vscode";

import { dump } from "js-yaml";
import { ClusterSettings } from "../settings";
import { Topic, ClientAccessor } from "../client";
import { KafkaExplorer, TopicItem } from "../explorer";
import { OutputChannelProvider } from "../providers";
import { addTopicWizard } from "../wizards/topics";
import { pickClient, pickTopic } from "./common";

const AUTO_CREATE_TOPIC_KEY = 'auto.create.topics.enable';
import { BrokerConfigs } from "../client/config";

export class CreateTopicCommandHandler {

Expand Down Expand Up @@ -76,7 +74,7 @@ export class DeleteTopicCommandHandler {
if (brokers) {
for (let i = 0; i < brokers.length && !autoCreateTopicsEnabled; i++) {
const configs = await client?.getBrokerConfigs(brokers[i].id);
const config = configs?.find(ce => ce.configName === AUTO_CREATE_TOPIC_KEY);
const config = configs?.find(ce => ce.configName === BrokerConfigs.AUTO_CREATE_TOPIC_ENABLE);
if (config) {
autoCreateTopicsEnabled = config.configValue === 'true';
}
Expand All @@ -85,7 +83,7 @@ export class DeleteTopicCommandHandler {

let warning = `Are you sure you want to delete topic '${topicToDelete.id}'?`;
if (autoCreateTopicsEnabled) {
warning += ` The cluster is configured with '${AUTO_CREATE_TOPIC_KEY}=true', so the topic might be recreated automatically.`;
warning += ` The cluster is configured with '${BrokerConfigs.AUTO_CREATE_TOPIC_ENABLE}=true', so the topic might be recreated automatically.`;
}
const deleteConfirmation = await vscode.window.showWarningMessage(warning, 'Cancel', 'Delete');
if (deleteConfirmation !== 'Delete') {
Expand Down
57 changes: 53 additions & 4 deletions src/wizards/topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import { INPUT_TITLE } from "../constants";
import { KafkaExplorer } from "../explorer/kafkaExplorer";
import { MultiStepInput, showErrorMessage, State } from "./multiStepInput";
import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators";
import { BrokerConfigs } from "../client/config";

const DEFAULT_PARTITIONS = 1;
const DEFAULT_REPLICATION_FACTOR = 1;

interface CreateTopicState extends State {
clusterId: string;
topicName: string;
partitions: string;
replicationFactor: string;
defaultReplicas: number;
maxReplicas: number;
}

export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise<void> {
Expand Down Expand Up @@ -82,13 +83,21 @@ export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSett

async function collectInputs(state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
if (state.clusterId) {
if (!state.maxReplicas) {
await setDefaultAndMaxReplicas(clientAccessor, state);
if (state.maxReplicas! <= 1) {
state.totalSteps = state.totalSteps! - 1;
}
}
await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor));
} else {
await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor));
}
}

async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTopicState>, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) {
//reset total steps
state.totalSteps = 4;
interface ClusterPickItem extends QuickPickItem {
clusterId: string;
}
Expand All @@ -114,6 +123,10 @@ async function inputSelectCluster(input: MultiStepInput, state: Partial<CreateTo
activeItem: activeClusterItem
}));
state.clusterId = selectedCluster.clusterId;
await setDefaultAndMaxReplicas(clientAccessor, state);
if (state.maxReplicas! <= 1) {
state.totalSteps = state.totalSteps! - 1;
}
return (input: MultiStepInput) => inputTopicName(input, state, clientAccessor);
}

Expand All @@ -135,7 +148,7 @@ async function inputTopicName(input: MultiStepInput, state: Partial<CreateTopicS
}

async function getExistingTopicNames(clientAccessor: ClientAccessor, clusterId?: string): Promise<string[] | undefined> {
if (!clusterId) {return [];}
if (!clusterId) { return []; }
try {
const client = clientAccessor.get(clusterId);
return (await client.getTopics()).map(topic => topic.id);
Expand All @@ -160,12 +173,48 @@ async function inputPartitions(input: MultiStepInput, state: Partial<CreateTopic


async function inputReplicationFactor(input: MultiStepInput, state: Partial<CreateTopicState>) {
if (state.maxReplicas! <= 1) {
state.replicationFactor = state.maxReplicas!.toString();
return;
}
state.replicationFactor = await input.showInputBox({
title: INPUT_TITLE,
step: input.getStepNumber(),
totalSteps: state.totalSteps,
value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(),
value: state.replicationFactor || state.defaultReplicas!.toString(),
prompt: 'Replication Factor',
validationContext: state.maxReplicas,
validate: validateReplicationFactor
});
}
async function setDefaultAndMaxReplicas(clientAccessor: ClientAccessor, state: Partial<CreateTopicState>): Promise<void> {
const client = clientAccessor.get(state.clusterId!);
const brokers = await client.getBrokers();
let defaultReplicationFactor = -1;
let maxReplicas = 1;
if (brokers) {
maxReplicas = brokers.length;
try {
for (let i = 0; i < brokers.length && defaultReplicationFactor < 0; i++) {
const configs = await client.getBrokerConfigs(brokers[i].id);
let config = configs.find(ce => ce.configName === BrokerConfigs.OFFSETS_TOPIC_REPLICATION_FACTOR);
if (config) {
defaultReplicationFactor = parseInt(config.configValue, 10);
} else {
config = configs.find(ce => ce.configName === BrokerConfigs.DEFAULT_REPLICATION_FACTOR);
if (config) {
defaultReplicationFactor = parseInt(config.configValue, 10);
}
}
}
} catch (e) {
console.log(`Failed to read replication factor configuration from broker: ${e.message}`);
}
if (defaultReplicationFactor < 0) {
defaultReplicationFactor = Math.min(3, maxReplicas);
}
}
state.maxReplicas = maxReplicas;
state.defaultReplicas = defaultReplicationFactor;
}

13 changes: 8 additions & 5 deletions src/wizards/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ export async function validatePartitions(partitions: string): Promise<string | u
return validateFieldPositiveNumber(PARTITIONS_FIELD, partitions);
}

export async function validateReplicationFactor(replicationFactor: string): Promise<string | undefined> {
export async function validateReplicationFactor(replicationFactor: string, max: number): Promise<string | undefined> {
const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor);
if (result) {
return result;
}
return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor);
}
return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max);
}

// ------------------ Commons Validators
Expand All @@ -80,9 +80,12 @@ function validateFieldUniqueValue(name: string, value: string, values: string[])
}
}

function validateFieldPositiveNumber(name: string, value: string): string | undefined {
const valueAsNumber = parseInt(value, 10);
function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined {
const valueAsNumber = Number(value);
if (isNaN(valueAsNumber) || valueAsNumber < 1) {
return `${name} must be a positive number.`;
}
if (max && valueAsNumber > max) {
return `${name} can not be greater than ${max}.`;
}
}