diff --git a/src/commands/cluster.ts b/src/commands/cluster.ts index 419d3a4..45e6fa3 100644 --- a/src/commands/cluster.ts +++ b/src/commands/cluster.ts @@ -1,62 +1,24 @@ import * as vscode from "vscode"; import { dump } from "js-yaml"; -import { Broker, ClientAccessor, SaslOption } from "../client"; +import { Broker, ClientAccessor } from "../client"; import { BrokerItem } from "../explorer/models/brokers"; import { OutputChannelProvider } from "../providers"; import { pickBroker, pickCluster } from "./common"; import { ClusterSettings } from "../settings"; import { KafkaExplorer } from "../explorer"; +import { addClusterWizard } from "../wizards/clusters"; /** * Adds a new cluster to the collection. */ export class AddClusterCommandHandler { - private readonly AuthOptions = ["None", "SASL/PLAIN"]; - constructor(private clusterSettings: ClusterSettings, private explorer: KafkaExplorer) { } async execute(): Promise { - const bootstrap = await vscode.window.showInputBox({ placeHolder: "Broker(s) (localhost:9092,localhost:9093...)", ignoreFocusOut: true }); - - if (!bootstrap) { - return; - } - - const name = await vscode.window.showInputBox({ placeHolder: "Friendly name", ignoreFocusOut: true }); - - if (!name) { - return; - } - - const pickedAuthOption = await vscode.window.showQuickPick(this.AuthOptions, { placeHolder: "Authentication", ignoreFocusOut: true }); - let saslOption: SaslOption | undefined; - - if (pickedAuthOption && pickedAuthOption === this.AuthOptions[1]) { - const username = await vscode.window.showInputBox({ placeHolder: "Username", ignoreFocusOut: true }); - const password = await vscode.window.showInputBox({ placeHolder: "Password", password: true, ignoreFocusOut: true }); - - if (username && password) { - saslOption = { - mechanism: "plain", - username, - password, - }; - } - } - - const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, ""); - const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, ""); - - this.clusterSettings.upsert({ - id: `${sanitizedName}-${suffix}`, - bootstrap, - name, - saslOption, - }); - - this.explorer.refresh(); + addClusterWizard(this.clusterSettings, this.explorer); } + } /** diff --git a/src/commands/topics.ts b/src/commands/topics.ts index 9cbccd5..df2e75f 100644 --- a/src/commands/topics.ts +++ b/src/commands/topics.ts @@ -1,81 +1,22 @@ +import { ClusterSettings } from "../settings"; import { dump } from "js-yaml"; import * as vscode from "vscode"; import { Topic, ClientAccessor, Client } from "../client"; import { KafkaExplorer, TopicItem } from "../explorer"; import { OutputChannelProvider } from "../providers"; +import { addTopicWizard } from "../wizards/topics"; import { pickTopicFromSelectedCluster } from "./common"; const AUTO_CREATE_TOPIC_KEY = 'auto.create.topics.enable'; export class CreateTopicCommandHandler { - constructor(private clientAccessor: ClientAccessor, private explorer: KafkaExplorer) { - } - - private validatePositiveNumber(value?: string): string | undefined { - if (!value) { - return "Must be a positive number"; - } - const valueAsNumber = parseInt(value, 10); - - if (isNaN(valueAsNumber) || valueAsNumber < 1) { - return "Must be a positive number"; - } + constructor(private clientAccessor: ClientAccessor, private clusterSettings : ClusterSettings, private explorer: KafkaExplorer) { } async execute(clusterId?: string): Promise { - if (!clusterId) { - return; - } - - const topic = await vscode.window.showInputBox({ placeHolder: "Topic name", ignoreFocusOut: true }); - - if (!topic) { - return; - } - - const partitions = await vscode.window.showInputBox({ - placeHolder: "Number of partitions", - validateInput: this.validatePositiveNumber, - ignoreFocusOut: true - }); - - if (!partitions) { - return; - } - - const replicationFactor = await vscode.window.showInputBox({ - placeHolder: "Replication Factor", - validateInput: this.validatePositiveNumber, - ignoreFocusOut: true - }); - - if (!replicationFactor) { - return; - } - - try { - const client = this.clientAccessor.get(clusterId); - const result = await client.createTopic({ - topic, - partitions: parseInt(partitions, 10), - replicationFactor: parseInt(replicationFactor, 10), - }); - - if (result.length > 0) { - vscode.window.showErrorMessage(result[0].error); - } else { - this.explorer.refresh(); - vscode.window.showInformationMessage(`Topic '${topic}' created successfully`); - } - } catch (error) { - if (error.message) { - vscode.window.showErrorMessage(error.message); - } else { - vscode.window.showErrorMessage(error); - } - } + addTopicWizard(this.clientAccessor, this.clusterSettings, this.explorer, clusterId); } } @@ -164,7 +105,7 @@ export class DeleteTopicCommandHandler { return; } - await client.deleteTopic({topics:[ topicToDelete.id ]}); + await client.deleteTopic({ topics: [topicToDelete.id] }); this.explorer.refresh(); vscode.window.showInformationMessage(`Topic '${topicToDelete.id}' deleted successfully`); } catch (error) { @@ -175,4 +116,4 @@ export class DeleteTopicCommandHandler { } } } -} \ No newline at end of file +} diff --git a/src/constants.ts b/src/constants.ts index e56726b..b4e9d2c 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -3,6 +3,7 @@ import * as path from "path"; import { Context } from "./context"; export const imagesPath = "images"; +export const INPUT_TITLE = 'Kafka Tools'; type DarkLightPath = { light: string; dark: string} @@ -36,7 +37,7 @@ export class Icons { public static get Trash(): DarkLightPath { return getDarkLightPath("trashcan.svg"); - } + } public static get Warning(): string { return getIconPath("warning.svg"); diff --git a/src/extension.ts b/src/extension.ts index 844ad27..5f882d1 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -50,7 +50,7 @@ export function activate(context: vscode.ExtensionContext): void { context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings)); // Commands - const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, explorer); + const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer); const deleteTopicCommandHandler = new DeleteTopicCommandHandler(clientAccessor, explorer); const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, outputChannelProvider, explorer, workspaceSettings); const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, clusterSettings, consumerCollection, explorer); diff --git a/src/wizards/clusters.ts b/src/wizards/clusters.ts new file mode 100644 index 0000000..b1b5d89 --- /dev/null +++ b/src/wizards/clusters.ts @@ -0,0 +1,134 @@ +import { QuickPickItem, window } from "vscode"; +import { SaslOption } from "../client"; +import { INPUT_TITLE } from "../constants"; +import { KafkaExplorer } from "../explorer/kafkaExplorer"; +import { ClusterSettings } from "../settings/clusters"; +import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; +import { validateBroker, validateClusterName, validateAuthentificationUserName } from "./validators"; + +const DEFAULT_BROKER = 'localhost:9092'; + +interface AddClusterState extends State { + bootstrap: string; + name: string; + saslOption: SaslOption; +} + +export async function addClusterWizard(clusterSettings: ClusterSettings, explorer: KafkaExplorer): Promise { + + const state: Partial = { + totalSteps: 3 + }; + + async function collectInputs(state: Partial, clusterSettings: ClusterSettings) { + await MultiStepInput.run(input => inputBrokers(input, state, clusterSettings)); + } + + async function inputBrokers(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings) { + state.bootstrap = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.bootstrap ? state.bootstrap : DEFAULT_BROKER, + prompt: 'Broker(s) (localhost:9092,localhost:9093...)', + validate: validateBroker + }); + return (input: MultiStepInput) => inputClusterName(input, state, clusterSettings); + } + + async function inputClusterName(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings) { + const existingClusterNames = clusterSettings.getAll().map(cluster => cluster.name); + state.name = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.name || '', + prompt: 'Friendly name', + validationContext: existingClusterNames, + validate: validateClusterName + }); + + return (input: MultiStepInput) => inputAuthentification(input, state); + } + + async function inputAuthentification(input: MultiStepInput, state: Partial) { + const authOptions: QuickPickItem[] = [{ "label": "None" }, { "label": "SASL/PLAIN" }]; + const authentification = (await input.showQuickPick({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + placeholder: 'Pick authentification', + items: authOptions, + activeItem: authOptions[0] + })).label; + if (authentification && authentification == authOptions[1].label) { + state.saslOption = { mechanism: 'plain' }; + return (input: MultiStepInput) => inputAuthentificationUserName(input, state); + } + return undefined; + } + + async function inputAuthentificationUserName(input: MultiStepInput, state: Partial) { + if (!state.saslOption) { + return; + } + state.saslOption.username = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.saslOption?.username || '', + prompt: ' Username', + validate: validateAuthentificationUserName + }); + + return (input: MultiStepInput) => inputAuthentificationPassword(input, state); + } + + async function inputAuthentificationPassword(input: MultiStepInput, state: Partial) { + if (!state.saslOption) { + return; + } + state.saslOption.password = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.saslOption.password || '', + prompt: ' Password', + password: true + }); + } + + try { + await collectInputs(state, clusterSettings); + } catch (e) { + showErrorMessage('Error while collecting inputs for creating cluster', e); + return; + } + + const addClusterState: AddClusterState = state as AddClusterState; + const bootstrap = state.bootstrap; + if (!bootstrap) { + return; + } + const name = state.name; + if (!name) { + return; + } + const saslOption = addClusterState.saslOption; + const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, ""); + const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, ""); + + try { + clusterSettings.upsert({ + id: `${sanitizedName}-${suffix}`, + bootstrap, + name, + saslOption, + }); + explorer.refresh(); + window.showInformationMessage(`Cluster '${name}' created successfully`); + } + catch (error) { + showErrorMessage(`Error while creating cluster`, error); + } +} diff --git a/src/wizards/multiStepInput.ts b/src/wizards/multiStepInput.ts new file mode 100644 index 0000000..fb41f25 --- /dev/null +++ b/src/wizards/multiStepInput.ts @@ -0,0 +1,263 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +// ------------------------------------------------------- +// Reference: +// https://github.com/microsoft/vscode-extension-samples/blob/master/quickinput-sample/src/multiStepInput.ts +// ------------------------------------------------------- +import { ConfigurationChangeEvent, Disposable, InputBox, QuickInput, QuickInputButton, QuickInputButtons, QuickPick, QuickPickItem, window, workspace } from 'vscode'; + +class InputFlowAction { + static back = new InputFlowAction(); + static cancel = new InputFlowAction(); +} + +type InputStep = (input: MultiStepInput) => Thenable; + +export interface QuickPickParameters { + title: string; + step?: number; + totalSteps?: number; + items: T[]; + activeItem?: T; + placeholder?: string; + buttons?: QuickInputButtonWithCallback[]; + configChanges?: ConfigChangeCallback[]; +} + +export interface InputBoxParameters { + title: string; + step?: number; + totalSteps?: number; + value: string; + placeholder?: string, + prompt: string; + password?: boolean; + validationContext?: any; + validate?: (value: string, validationContext?: any) => Promise; + buttons?: QuickInputButtonWithCallback[]; + configChanges?: ConfigChangeCallback[]; +} + +export interface QuickInputButtonWithCallback extends QuickInputButton { + callback: () => any; +} + +export interface ConfigChangeCallback { + configName: string; + callback: () => any; +} + +export interface State { + totalSteps: number; +} + +export class MultiStepInput { + + static async run(start: InputStep): Promise { + const input = new MultiStepInput(); + return input.stepThrough(start); + } + private current?: QuickInput; + private steps: InputStep[] = []; + + public getStepNumber(): number { + return this.steps.length; + } + + public ignoreStep(): void { + this.steps.pop(); + } + + private async stepThrough(start: InputStep) { + let step: InputStep | void = start; + while (step) { + this.steps.push(step); + if (this.current) { + this.current.enabled = false; + this.current.busy = true; + } + try { + step = await step(this); + } catch (err) { + if (err === InputFlowAction.back) { + this.steps.pop(); + step = this.steps.pop(); + } else if (err === InputFlowAction.cancel) { + step = undefined; + } else { + throw err; + } + } + } + if (this.current) { + this.current.dispose(); + } + } + + async showQuickPick>({ title, step, totalSteps, items, activeItem, placeholder, buttons, configChanges }: P): Promise { + const disposables: Disposable[] = []; + const displaySteps: boolean = typeof step !== 'undefined' && typeof totalSteps !== 'undefined'; + + return await new Promise((resolve, reject) => { + const input: QuickPick = window.createQuickPick(); + input.title = title; + + if (displaySteps) { + input.step = step; + input.totalSteps = totalSteps; + } + + input.totalSteps = totalSteps; + input.placeholder = placeholder; + input.items = items; + if (activeItem) { + input.activeItems = [activeItem]; + } + + input.buttons = [ + ...(this.steps.length > 1 ? [QuickInputButtons.Back] : []), + ...(buttons || []) + ]; + input.ignoreFocusOut = true; + disposables.push( + input.onDidTriggerButton((item: QuickInputButton) => { + disposables.forEach(d => d.dispose()); + if (buttons && buttons.includes(item as QuickInputButtonWithCallback)) { + (item as QuickInputButtonWithCallback).callback(); + // resolve(); + } else if (item === QuickInputButtons.Back) { + reject(InputFlowAction.back); + } else { + resolve(item); + } + }), + input.onDidChangeSelection(items => { + disposables.forEach(d => d.dispose()); + resolve(items[0]); + }), + input.onDidHide(() => { + input.dispose(); + disposables.forEach(d => d.dispose()); + }) + ); + + if (configChanges) { + disposables.push(workspace.onDidChangeConfiguration((event: ConfigurationChangeEvent) => { + const configNames: string[] = configChanges.map((configChange: ConfigChangeCallback) => configChange.configName); + const configName: string | undefined = configNames.find((name: string) => event.affectsConfiguration(name)); + if (!configName) return; + + configChanges.forEach((configChange: ConfigChangeCallback) => { + if (configChange.configName === configName) { + configChange.callback(); + // resolve(); + } + }); + })); + } + + this.current = input; + this.current.show(); + }); + } + + async showInputBox

( + { + title, + step, + totalSteps, + value, + placeholder, + prompt, + password = false, + validationContext, + validate = async function (value: string, validationContext: any) { return undefined; }, + buttons, + configChanges + }: P): Promise { + const disposables: Disposable[] = []; + const displaySteps: boolean = typeof step !== 'undefined' && typeof totalSteps !== 'undefined'; + + return await new Promise((resolve, reject) => { + const input: InputBox = window.createInputBox(); + input.title = title; + input.placeholder = placeholder; + if (displaySteps) { + input.step = step; + input.totalSteps = totalSteps; + } + input.value = value; + input.prompt = prompt; + input.password = password; + input.buttons = [ + ...(this.steps.length > 1 ? [QuickInputButtons.Back] : []), + ...(buttons || []) + ]; + input.ignoreFocusOut = true; + validate(input.value, validationContext) + .then(r => input.validationMessage = r); + disposables.push( + input.onDidTriggerButton((item: QuickInputButton) => { + disposables.forEach(d => d.dispose()); + if (buttons && buttons.includes(item as QuickInputButtonWithCallback)) { + (item as QuickInputButtonWithCallback).callback(); + // resolve(); + } else if (item === QuickInputButtons.Back) { + reject(InputFlowAction.back); + } else { + resolve(item); + } + }), + input.onDidAccept(async () => { + const value = input.value.trim(); + input.enabled = false; + input.busy = true; + if (!(await validate(value, validationContext))) { + resolve(value); + } + input.enabled = true; + input.busy = false; + }), + input.onDidChangeValue(async text => { + const current = validate(text, validationContext); + const validating = current; + const validationMessage = await current; + if (current === validating) { + input.validationMessage = validationMessage; + } + }), + input.onDidHide(() => { + input.dispose(); + disposables.forEach(d => d.dispose()); + }) + ); + + if (configChanges) { + disposables.push(workspace.onDidChangeConfiguration((event: ConfigurationChangeEvent) => { + const configNames: string[] = configChanges.map((configChange: ConfigChangeCallback) => configChange.configName); + const configName: string | undefined = configNames.find((name: string) => event.affectsConfiguration(name)); + if (!configName) return; + + configChanges.forEach((configChange: ConfigChangeCallback) => { + if (configChange.configName === configName) { + configChange.callback(); + // resolve(); + } + }); + })); + } + + this.current = input; + this.current.show(); + }); + } +} + +// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types +export function showErrorMessage(description: string, error: any): void { + const message = description + ':' + (error.message ? error.message : error); + window.showErrorMessage(message); +} diff --git a/src/wizards/topics.ts b/src/wizards/topics.ts new file mode 100644 index 0000000..0f69968 --- /dev/null +++ b/src/wizards/topics.ts @@ -0,0 +1,163 @@ +import { ClusterSettings } from "../settings"; +import { QuickPickItem, window } from "vscode"; +import { ClientAccessor } from "../client"; +import { INPUT_TITLE } from "../constants"; +import { KafkaExplorer } from "../explorer/kafkaExplorer"; +import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; +import { validateTopicName, validatePartitions, validateReplicationFactor } from "./validators"; + +const DEFAULT_PARTITIONS = 1; +const DEFAULT_REPLICATION_FACTOR = 1; + +interface CreateTopicState extends State { + clusterId: string; + topicName: string; + partitions: string; + replicationFactor: string; +} + +export async function addTopicWizard(clientAccessor: ClientAccessor, clusterSettings: ClusterSettings, explorer: KafkaExplorer, clusterId?: string): Promise { + const clusters = clusterSettings.getAll(); + if (clusters.length == 0) { + window.showErrorMessage('No clusters'); + return; + } + const state: Partial = { + clusterId: clusterId, + totalSteps: clusterId ? 3 : 4 + }; + + try { + await collectInputs(state, clusterSettings, clientAccessor); + } catch (e) { + showErrorMessage('Error while collecting inputs for creating topic', e); + return; + } + const selectedClusterId = state.clusterId; + if (!selectedClusterId) { + return; + } + const topic = state.topicName; + if (!topic) { + return; + } + + const partitions = state.partitions; + if (!partitions) { + return; + } + + const replicationFactor = state.replicationFactor; + if (!replicationFactor) { + return; + } + + const clusterName = clusterSettings.get(selectedClusterId)?.name || selectedClusterId; + try { + const client = clientAccessor.get(selectedClusterId); + const result = await client.createTopic({ + topic, + partitions: parseInt(partitions, 10), + replicationFactor: parseInt(replicationFactor, 10), + }); + + if (result.length > 0) { + showErrorMessage(`Error while creating topic for cluster '${clusterName}'`, result[0].error); + } else { + explorer.refresh(); + window.showInformationMessage(`Topic '${topic}' in cluster '${clusterName}' created successfully`); + } + } catch (error) { + showErrorMessage(`Error while creating topic for cluster '${clusterName}'`, error); + } +} + +async function collectInputs(state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { + if (state.clusterId) { + await MultiStepInput.run(input => inputTopicName(input, state, clientAccessor)); + } else { + await MultiStepInput.run(input => inputSelectCluster(input, state, clusterSettings, clientAccessor)); + } +} + +async function inputSelectCluster(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings, clientAccessor: ClientAccessor) { + interface ClusterPickItem extends QuickPickItem { + clusterId: string; + } + const clusters = clusterSettings.getAll(); + const selected = clusterSettings.selected; + let activeClusterItem: ClusterPickItem | undefined; + const clusterItems: ClusterPickItem[] = clusters.map((cluster) => { + const item = { label: cluster.name, clusterId: cluster.id }; + if (selected && cluster == selected) { + activeClusterItem = item; + } + return item; + }); + if (!activeClusterItem) { + activeClusterItem = clusterItems[0]; + } + const selectedCluster: ClusterPickItem = (await input.showQuickPick({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + placeholder: 'Pick a cluster', + items: clusterItems, + activeItem: activeClusterItem + })); + state.clusterId = selectedCluster.clusterId; + return (input: MultiStepInput) => inputTopicName(input, state, clientAccessor); +} + +async function inputTopicName(input: MultiStepInput, state: Partial, clientAccessor: ClientAccessor) { + const existingTopicNames = await getExistingTopicNames(clientAccessor, state.clusterId); + if (existingTopicNames === undefined) { + return; + } + state.topicName = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.topicName || '', + prompt: 'Topic name', + validationContext: existingTopicNames, + validate: validateTopicName + }); + return (input: MultiStepInput) => inputPartitions(input, state); +} + +async function getExistingTopicNames(clientAccessor: ClientAccessor, clusterId?: string): Promise { + if (!clusterId) return []; + try { + const client = clientAccessor.get(clusterId); + return (await client.getTopics()).map(topic => topic.id); + } + catch (error) { + showErrorMessage(`Error while getting topics for cluster '${clusterId}'`, error); + return undefined; + } +} + +async function inputPartitions(input: MultiStepInput, state: Partial) { + state.partitions = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.partitions || DEFAULT_PARTITIONS.toString(), + prompt: 'Number of partitions', + validate: validatePartitions + }); + return (input: MultiStepInput) => inputReplicationFactor(input, state); +} + + +async function inputReplicationFactor(input: MultiStepInput, state: Partial) { + state.replicationFactor = await input.showInputBox({ + title: INPUT_TITLE, + step: input.getStepNumber(), + totalSteps: state.totalSteps, + value: state.replicationFactor || DEFAULT_REPLICATION_FACTOR.toString(), + prompt: 'Replication Factor', + validate: validateReplicationFactor + }); +} diff --git a/src/wizards/validators.ts b/src/wizards/validators.ts new file mode 100644 index 0000000..785623f --- /dev/null +++ b/src/wizards/validators.ts @@ -0,0 +1,88 @@ +// ------------------ Cluster validators + +const BROKER_FIELD = 'Broker'; +const CLUSTER_FIELD = 'Cluster name'; +const USERNAME_FIELD = 'User name'; + +export async function validateBroker(broker: string): Promise { + return validateFieldRequired(BROKER_FIELD, broker); +} + +export async function validateClusterName(cluster: string, existingClusterNames: string[]): Promise { + const result = validateFieldRequired(CLUSTER_FIELD, cluster); + if (result) { + return result; + } + return validateFieldUniqueValue(CLUSTER_FIELD, cluster, existingClusterNames); +} + +export async function validateAuthentificationUserName(userName: string): Promise { + return validateFieldRequired(USERNAME_FIELD, userName); +} + +// ------------------ Topic validators + +const TOPIC_FIELD = 'Topic name'; +const PARTITIONS_FIELD = 'Number of partitions'; +const REPLICATION_FACTOR_FIELD = 'Replication Factor'; + +const maxNameLength = 249 +const legalChars = /^[a-zA-Z0-9\\.\\-]*$/; + +export async function validateTopicName(topic: string, existingTopicNames: string[]): Promise { + // See topic name validation rule at https://github.com/apache/kafka/blob/8007211cc982d8458223e866c1ee7d94b69e0249/core/src/main/scala/kafka/common/Topic.scala#L33 + const result = validateFieldRequired(TOPIC_FIELD, topic); + if (result) { + return result; + } + else if (topic == "." || topic == "..") { + return `${TOPIC_FIELD} cannot be '.' or '..'`; + } + else if (topic.length > maxNameLength) { + return `${TOPIC_FIELD} is illegal, cannot be longer than ${maxNameLength} characters`; + } + else if (!legalChars.test(topic)) { + return `${TOPIC_FIELD} '${topic}' is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'`; + } + return validateFieldUniqueValue(TOPIC_FIELD, topic, existingTopicNames); +} + +export async function validatePartitions(partitions: string): Promise { + const result = validateFieldRequired(PARTITIONS_FIELD, partitions); + if (result) { + return result; + } + return validateFieldPositiveNumber(PARTITIONS_FIELD, partitions); +} + +export async function validateReplicationFactor(replicationFactor: string): Promise { + const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor); + if (result) { + return result; + } + return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor); +} + +// ------------------ Commons Validators + +function validateFieldRequired(name: string, value: string): string | undefined { + if (value.length <= 0) { + return `${name} is required.`; + } + if (value.trim().length == 0) { + return `${name} cannot be blank.`; + } +} + +function validateFieldUniqueValue(name: string, value: string, values: string[]): string | undefined { + if (values.indexOf(value) != -1) { + return `${name} '${value}' already exists.`; + } +} + +function validateFieldPositiveNumber(name: string, value: string): string | undefined { + const valueAsNumber = parseInt(value, 10); + if (isNaN(valueAsNumber) || valueAsNumber < 1) { + return `${name} must be a positive number.`; + } +}