diff --git a/package.json b/package.json index 94e88dc..a2de484 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,15 @@ ], "main": "./dist/extension", "contributes": { + "kafka": { + "clusterProviders": [ + { + "id": "vscode-kafka.manual", + "label": "Manual", + "commandId": "vscode-kafka.cluster.providers" + } + ] + }, "configuration": { "type": "object", "title": "Kafka", diff --git a/src/client/client.ts b/src/client/client.ts index 33f0a73..89dd522 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,19 +1,26 @@ import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs"; import { Disposable } from "vscode"; +import { getClusterProvider } from "../kafka-extensions/registry"; import { WorkspaceSettings } from "../settings"; export interface ConnectionOptions { + clusterProviderId?: string; bootstrap: string; saslOption?: SaslOption; ssl?: boolean; } -export interface Cluster extends ConnectionOptions { +export interface ClusterIdentifier { + clusterProviderId?: string; id: string; name: string; } +export interface Cluster extends ClusterIdentifier, ConnectionOptions { + +} + /** * The supported SASL mechanisms for authentication. */ @@ -176,10 +183,9 @@ class KafkaJsClient implements Client { public kafkaClient: any; public kafkaCyclicProducerClient: any; public kafkaKeyedProducerClient: any; - public producer: Producer; - - private kafkaJsClient: Kafka; - private kafkaAdminClient: Admin; + private kafkaProducer: Producer | undefined; + private kafkaJsClient: Kafka | undefined; + private kafkaAdminClient: Admin | undefined; private metadata: { topics: Topic[]; @@ -191,21 +197,37 @@ class KafkaJsClient implements Client { brokers: [], topics: [], }; - this.kafkaJsClient = createKafka(cluster); - this.kafkaClient = this.kafkaJsClient; - this.kafkaAdminClient = this.kafkaJsClient.admin(); - this.producer = this.kafkaJsClient.producer(); + createKafka(cluster) + .then(result => { + this.kafkaJsClient = result; + this.kafkaClient = this.kafkaJsClient; + this.kafkaAdminClient = this.kafkaJsClient.admin(); + this.kafkaProducer = this.kafkaJsClient.producer(); + }); + } + + + public get producer() : Producer { + if (this.kafkaProducer) { + return this.kafkaProducer; + } + throw new Error('TODO'); } canConnect(): boolean { return this.kafkaAdminClient !== null; } - connect(): Promise { - return this.kafkaAdminClient.connect(); + async connect(): Promise { + if (this.kafkaAdminClient) { + return this.kafkaAdminClient.connect(); + } } async getTopics(): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata(); this.metadata = { @@ -233,7 +255,10 @@ class KafkaJsClient implements Client { } async getBrokers(): Promise { - const describeClusterResponse = await this.kafkaAdminClient?.describeCluster(); + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } + const describeClusterResponse = await this.kafkaAdminClient.describeCluster(); this.metadata = { ...this.metadata, @@ -251,6 +276,9 @@ class KafkaJsClient implements Client { } async getBrokerConfigs(brokerId: string): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({ includeSynonyms: false, resources: [ @@ -265,6 +293,9 @@ class KafkaJsClient implements Client { } async getTopicConfigs(topicId: string): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({ includeSynonyms: false, resources: [ @@ -279,11 +310,17 @@ class KafkaJsClient implements Client { } async getConsumerGroupIds(): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } const listGroupsResponse = await this.kafkaAdminClient.listGroups(); return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId))); } async getConsumerGroupDetails(groupId: string): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]); const consumerGroup: ConsumerGroup = { @@ -304,10 +341,16 @@ class KafkaJsClient implements Client { } async deleteConsumerGroups(groupIds: string[]): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } await this.kafkaAdminClient.deleteGroups(groupIds); } async createTopic(createTopicRequest: CreateTopicRequest): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } await this.kafkaAdminClient.createTopics({ validateOnly: false, waitForLeaders: true, @@ -321,6 +364,9 @@ class KafkaJsClient implements Client { } async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise { + if (!this.kafkaAdminClient) { + throw new Error('TODO'); + } return await this.kafkaAdminClient.deleteTopics({ topics: deleteTopicRequest.topics, timeout: deleteTopicRequest.timeout @@ -328,28 +374,37 @@ class KafkaJsClient implements Client { } dispose() { + if (this.kafkaAdminClient) { this.kafkaAdminClient.disconnect(); + } } } export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator( new KafkaJsClient(cluster, workspaceSettings)); -export const createKafka = (connectionOptions: ConnectionOptions): Kafka => { - let kafkaJsClient: Kafka; +export const createKafka = async (connectionOptions: ConnectionOptions): Promise => { + const provider = getClusterProvider(connectionOptions.clusterProviderId); + if (!provider) { + throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`); + //return createDefaultKafka(connectionOptions); + } + const kafka = await provider.createKafka(connectionOptions); + return kafka || createDefaultKafka(connectionOptions); +}; + +export const createDefaultKafka = (connectionOptions: ConnectionOptions): Kafka => { if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) { - kafkaJsClient = new Kafka({ + return new Kafka({ clientId: "vscode-kafka", brokers: connectionOptions.bootstrap.split(","), ssl: true, sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password }, }); - } else { - kafkaJsClient = new Kafka({ - clientId: "vscode-kafka", - brokers: connectionOptions.bootstrap.split(","), - ssl: connectionOptions.ssl - }); } - return kafkaJsClient; + return new Kafka({ + clientId: "vscode-kafka", + brokers: connectionOptions.bootstrap.split(","), + ssl: connectionOptions.ssl + }); }; diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 4eb76a7..d281f3e 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable { const settings = getWorkspaceSettings(); this.options = { + clusterProviderId: cluster.clusterProviderId, bootstrap: cluster.bootstrap, saslOption: cluster.saslOption, consumerGroupId: consumerGroupId, @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable { const fromOffset = this.options.fromOffset; const topic = this.options.topicId; - this.kafkaClient = createKafka(this.options); + this.kafkaClient = await createKafka(this.options); this.consumer = this.kafkaClient.consumer({ groupId: this.options.consumerGroupId, retry: { retries: 3 }, partitionAssigners: [ diff --git a/src/extension.ts b/src/extension.ts index 2ac0e32..4286697 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -34,6 +34,7 @@ import { NodeBase } from "./explorer/models/nodeBase"; import * as path from 'path'; import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { KafkaFileCodeLensProvider } from "./kafka-file"; +import { onExtensionChange, registerClusterProcessorCommand } from "./kafka-extensions/registry"; export function activate(context: vscode.ExtensionContext): void { Context.register(context); @@ -144,6 +145,13 @@ export function activate(context: vscode.ExtensionContext): void { context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider( ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)); + + if (vscode.extensions.onDidChange) {// Theia doesn't support this API yet + context.subscriptions.push(vscode.extensions.onDidChange(() => { + onExtensionChange(vscode.extensions.all); + })); + } + registerClusterProcessorCommand(); } export function deactivate(): void { diff --git a/src/kafka-extensions/api.ts b/src/kafka-extensions/api.ts new file mode 100644 index 0000000..e9bc33b --- /dev/null +++ b/src/kafka-extensions/api.ts @@ -0,0 +1,8 @@ +import { Kafka } from "kafkajs"; +import { Cluster, ConnectionOptions } from "../client/client"; +import { ClusterSettings } from "../settings/clusters"; + +export interface ClusterProviderProcessor { + collectClusters(clusterSettings: ClusterSettings): Promise; + createKafka?(connectionOptions: ConnectionOptions): Kafka; +} diff --git a/src/kafka-extensions/registry.ts b/src/kafka-extensions/registry.ts new file mode 100644 index 0000000..7fd85ff --- /dev/null +++ b/src/kafka-extensions/registry.ts @@ -0,0 +1,131 @@ +import * as vscode from "vscode"; +import { Kafka } from "kafkajs"; +import { Cluster, ConnectionOptions, createDefaultKafka } from "../client/client"; +import { ClusterSettings } from "../settings/clusters"; +import { collectDefaultClusters } from "../wizards/clusters"; +import { ClusterProviderProcessor } from "./api"; + +export class ClusterProvider { + + private processor: ClusterProviderProcessor | undefined; + + constructor(private definition: ClusterProviderDefinition, private extensionId: string) { + + } + + public get id(): string { + return this.definition.id; + } + + public get label(): string { + return this.definition.label || this.definition.id; + } + + async collectClusters(clusterSettings: ClusterSettings): Promise { + const processor = await this.getProcessor(); + return processor.collectClusters(clusterSettings); + } + + async createKafka(connectionOptions: ConnectionOptions): Promise { + const processor = await this.getProcessor(); + if (processor.createKafka) { + return processor.createKafka(connectionOptions); + } + } + + private async getProcessor(): Promise { + if (this.processor) { + return this.processor; + } + const extension = vscode.extensions.getExtension(this.extensionId); + if (!extension) { + throw new Error('Extension is not started'); + } + + // Wait for extension is activated in order to register the commands. + await extension.activate(); + + const processor = await vscode.commands.executeCommand(this.definition.commandId); + if (!processor) { + throw new Error('Cannot find cluster provider processor'); + } + return this.processor = processor; + } + +} + +const defaultClusterProviderId = 'vscode-kafka.manual'; + +let providers: Map = new Map(); + +export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined { + intializeIfNeeded(); + return providers.get(clusterProviderId || defaultClusterProviderId); +} + +export function getClusterProviders(): ClusterProvider[] { + intializeIfNeeded(); + return [...providers.values()]; +} + +function intializeIfNeeded() { + if (providers.size === 0) { + providers = collectClusterProviderDefinitions(vscode.extensions.all); + } +} + +export interface ClusterProviderDefinition { + id: string; + label?: string; + commandId: string; +} + +let existingExtensions: Array; + +export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension[]): Map { + const result: Map = new Map(); + if (extensions && extensions.length) { + for (const extension of extensions) { + const contributesSection = extension.packageJSON['contributes']; + if (contributesSection) { + const kafkaExtension = contributesSection['kafka']; + if (kafkaExtension) { + const clusterProviders = kafkaExtension['clusterProviders']; + if (Array.isArray(clusterProviders) && clusterProviders.length) { + for (const item of clusterProviders) { + const definition = item as ClusterProviderDefinition; + result.set(definition.id, new ClusterProvider(definition, extension.id)); + } + } + } + } + } + } + return result; +} + +export function onExtensionChange(extensions: readonly vscode.Extension[]) { + if (!existingExtensions) { + return; + } + //const oldExtensions = new Set(existingExtensions.slice()); + //const newExtensions = collectClusterProviderDefinitions(extensions); + /*let hasChanged = (oldExtensions.size !== newExtensions.length); + if (!hasChanged) { + for (const newExtension of newExtensions) { + if (!oldExtensions.has(newExtension)) { + hasChanged = true; + break; + } + } + }*/ +} + +export function registerClusterProcessorCommand() { + vscode.commands.registerCommand('vscode-kafka.cluster.providers', () => { + return { + collectClusters: (clusterSettings: ClusterSettings): Promise => collectDefaultClusters(clusterSettings), + createKafka: (connectionOptions: ConnectionOptions): Kafka => createDefaultKafka(connectionOptions) + } as ClusterProviderProcessor; + }); +} diff --git a/src/wizards/clusters.ts b/src/wizards/clusters.ts index ccd6b32..5b08fec 100644 --- a/src/wizards/clusters.ts +++ b/src/wizards/clusters.ts @@ -1,13 +1,12 @@ import { QuickPickItem, window } from "vscode"; -import { ConnectionOptions, SaslMechanism } from "../client"; +import { Cluster, ConnectionOptions, SaslMechanism } from "../client"; import { INPUT_TITLE } from "../constants"; import { KafkaExplorer } from "../explorer/kafkaExplorer"; +import { ClusterProvider, getClusterProviders } from "../kafka-extensions/registry"; import { ClusterSettings } from "../settings/clusters"; import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateBroker, validateClusterName, validateAuthentificationUserName } from "./validators"; -const DEFAULT_BROKER = 'localhost:9092'; - interface AddClusterState extends State, ConnectionOptions { name: string; } @@ -16,15 +15,69 @@ const DEFAULT_STEPS = 4; export async function addClusterWizard(clusterSettings: ClusterSettings, explorer: KafkaExplorer): Promise { + async function pickClusterProvider(): Promise { + const providers = getClusterProviders(); + if (providers.length === 1) { + return providers[0]; + } + + const existingLabels: Array = []; + const providerItems: QuickPickItem[] = providers + .map(provider => { + let label = provider.label; + if (existingLabels.indexOf(label) !== -1) { + label += ` [${provider.id}]`; + } else { + existingLabels.push(label); + } + return { "label": label }; + }); + const selected = (await window.showQuickPick(providerItems))?.label; + if (!selected) { + return; + } + return providers.find(provider => provider.label === selected); + } + + const provider = await pickClusterProvider(); + if (!provider) { + return; + } + + const clusters = await provider.collectClusters(clusterSettings); + if (!clusters || clusters.length === 0) { + return; + } + try { + for (const cluster of clusters) { + clusterSettings.upsert(cluster); + window.showInformationMessage(`Cluster '${cluster.name}' created successfully`); + } + explorer.refresh(); + // Selecting the created cluster is done with TreeView#reveal + // 1. Show the treeview of the explorer (otherwise reveal will not work) + explorer.show(); + // 2. the reveal() call must occur within a timeout(), + // while waiting for a fix in https://github.com/microsoft/vscode/issues/114149 + setTimeout(() => { + explorer.selectClusterByName(clusters[0].name); + }, 1000); + } + catch (error) { + showErrorMessage(`Error while creating cluster`, error); + } +} +const DEFAULT_BROKER = 'localhost:9092'; +export async function collectDefaultClusters(clusterSettings: ClusterSettings): Promise { const state: Partial = { totalSteps: DEFAULT_STEPS }; async function collectInputs(state: Partial, clusterSettings: ClusterSettings) { - await MultiStepInput.run(input => inputBrokers(input, state, clusterSettings)); + return MultiStepInput.run(input => inputBrokers(input, state, clusterSettings)); } async function inputBrokers(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings) { @@ -131,12 +184,11 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore } } - try { - await collectInputs(state, clusterSettings); - } catch (e) { - showErrorMessage('Error while collecting inputs for creating cluster', e); - return; - } + //try { + await collectInputs(state, clusterSettings); + //} catch (e) { + // showErrorMessage('Error while collecting inputs for creating cluster', e); + //} const addClusterState: AddClusterState = state as AddClusterState; const bootstrap = state.bootstrap; @@ -151,26 +203,13 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, ""); const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, ""); - try { - clusterSettings.upsert({ - id: `${sanitizedName}-${suffix}`, - bootstrap, - name, - saslOption, - ssl: state.ssl - }); - explorer.refresh(); - window.showInformationMessage(`Cluster '${name}' created successfully`); - // Selecting the created cluster is done with TreeView#reveal - // 1. Show the treeview of the explorer (otherwise reveal will not work) - explorer.show(); - // 2. the reveal() call must occur within a timeout(), - // while waiting for a fix in https://github.com/microsoft/vscode/issues/114149 - setTimeout(() => { - explorer.selectClusterByName(name); - }, 1000); - } - catch (error) { - showErrorMessage(`Error while creating cluster`, error); - } + return [{ + id: `${sanitizedName}-${suffix}`, + bootstrap, + name, + saslOption, + ssl: state.ssl + }]; } + +