From 8cd5f5a5f34211b793e1c24bd4bbb108e59af2a1 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Mon, 8 Mar 2021 15:31:11 +0100 Subject: [PATCH] Cluster provider API extension Fixes #123 Signed-off-by: azerr --- package.json | 14 +++ schemas/package.schema.json | 37 ++++++++ src/client/client.ts | 94 +++++++++++++----- src/client/consumer.ts | 3 +- src/extension.ts | 6 +- src/kafka-extensions/api.ts | 25 +++++ src/kafka-extensions/registry.ts | 157 +++++++++++++++++++++++++++++++ src/wizards/clusters.ts | 103 +++++++++++++------- 8 files changed, 383 insertions(+), 56 deletions(-) create mode 100644 schemas/package.schema.json create mode 100644 src/kafka-extensions/api.ts create mode 100644 src/kafka-extensions/registry.ts diff --git a/package.json b/package.json index 94e88dc..8f016a6 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,14 @@ ], "main": "./dist/extension", "contributes": { + "kafka": { + "clusterProviders": [ + { + "id": "vscode-kafka.manual", + "label": "Manual" + } + ] + }, "configuration": { "type": "object", "title": "Kafka", @@ -182,6 +190,12 @@ "path": "./snippets/consumers.json" } ], + "jsonValidation": [ + { + "fileMatch": "package.json", + "url": "./schemas/package.schema.json" + } + ], "commands": [ { "command": "vscode-kafka.open.docs.home", diff --git a/schemas/package.schema.json b/schemas/package.schema.json new file mode 100644 index 0000000..7411d51 --- /dev/null +++ b/schemas/package.schema.json @@ -0,0 +1,37 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "Kafka contributions to package.json", + "type": "object", + "properties": { + "contributes": { + "type": "object", + "properties": { + "kafka": { + "type": "object", + "markdownDescription": "Kafka extensions", + "properties": { + "clusterProviders": { + "type": "array", + "markdownDescription": "Cluster providers definitions.", + "items": [ + { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Cluster provider id." + }, + "label": { + "type": "string", + "description": "Cluster provider label." + } + } + } + ] + } + } + } + } + } + } +} diff --git a/src/client/client.ts b/src/client/client.ts index 33f0a73..00fcf4e 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,9 +1,11 @@ import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs"; import { Disposable } from "vscode"; +import { getClusterProvider } from "../kafka-extensions/registry"; import { WorkspaceSettings } from "../settings"; export interface ConnectionOptions { + clusterProviderId?: string; bootstrap: string; saslOption?: SaslOption; ssl?: boolean; @@ -176,10 +178,9 @@ class KafkaJsClient implements Client { public kafkaClient: any; public kafkaCyclicProducerClient: any; public kafkaKeyedProducerClient: any; - public producer: Producer; - - private kafkaJsClient: Kafka; - private kafkaAdminClient: Admin; + private kafkaProducer: Producer | undefined; + private kafkaJsClient: Kafka | undefined; + private kafkaAdminClient: Admin | undefined; private metadata: { topics: Topic[]; @@ -191,21 +192,38 @@ class KafkaJsClient implements Client { brokers: [], topics: [], }; - this.kafkaJsClient = createKafka(cluster); - this.kafkaClient = this.kafkaJsClient; - this.kafkaAdminClient = this.kafkaJsClient.admin(); - this.producer = this.kafkaJsClient.producer(); + // The Kafka client is created in asynchronous since external vscode extension + // can contribute to the creation of Kafka instance. + createKafka(cluster) + .then(result => { + this.kafkaJsClient = result; + this.kafkaClient = this.kafkaJsClient; + this.kafkaAdminClient = this.kafkaJsClient.admin(); + this.kafkaProducer = this.kafkaJsClient.producer(); + }); + } + + public get producer(): Producer { + if (this.kafkaProducer) { + return this.kafkaProducer; + } + throw new Error('Impossible to get producer. Kafka client is not connected!'); } canConnect(): boolean { return this.kafkaAdminClient !== null; } - connect(): Promise { - return this.kafkaAdminClient.connect(); + async connect(): Promise { + if (this.kafkaAdminClient) { + return this.kafkaAdminClient.connect(); + } } async getTopics(): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to get topics. Kafka client is not connected!'); + } const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata(); this.metadata = { @@ -233,7 +251,10 @@ class KafkaJsClient implements Client { } async getBrokers(): Promise { - const describeClusterResponse = await this.kafkaAdminClient?.describeCluster(); + if (!this.kafkaAdminClient) { + throw new Error('Impossible to get brokers. Kafka client is not connected!'); + } + const describeClusterResponse = await this.kafkaAdminClient.describeCluster(); this.metadata = { ...this.metadata, @@ -251,6 +272,9 @@ class KafkaJsClient implements Client { } async getBrokerConfigs(brokerId: string): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to get broker config. Kafka client is not connected!'); + } const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({ includeSynonyms: false, resources: [ @@ -265,6 +289,9 @@ class KafkaJsClient implements Client { } async getTopicConfigs(topicId: string): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to get topic configs. Kafka client is not connected!'); + } const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({ includeSynonyms: false, resources: [ @@ -279,11 +306,17 @@ class KafkaJsClient implements Client { } async getConsumerGroupIds(): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to get consumer group ids. Kafka client is not connected!'); + } const listGroupsResponse = await this.kafkaAdminClient.listGroups(); return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId))); } async getConsumerGroupDetails(groupId: string): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to get consumer group details. Kafka client is not connected!'); + } const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]); const consumerGroup: ConsumerGroup = { @@ -304,10 +337,16 @@ class KafkaJsClient implements Client { } async deleteConsumerGroups(groupIds: string[]): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to delete consumer groups. Kafka client is not connected!'); + } await this.kafkaAdminClient.deleteGroups(groupIds); } async createTopic(createTopicRequest: CreateTopicRequest): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to create topic. Kafka client is not connected!'); + } await this.kafkaAdminClient.createTopics({ validateOnly: false, waitForLeaders: true, @@ -321,6 +360,9 @@ class KafkaJsClient implements Client { } async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise { + if (!this.kafkaAdminClient) { + throw new Error('Impossible to delete topic. Kafka client is not connected!'); + } return await this.kafkaAdminClient.deleteTopics({ topics: deleteTopicRequest.topics, timeout: deleteTopicRequest.timeout @@ -328,28 +370,36 @@ class KafkaJsClient implements Client { } dispose() { - this.kafkaAdminClient.disconnect(); + if (this.kafkaAdminClient) { + this.kafkaAdminClient.disconnect(); + } } } export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator( new KafkaJsClient(cluster, workspaceSettings)); -export const createKafka = (connectionOptions: ConnectionOptions): Kafka => { - let kafkaJsClient: Kafka; +export const createKafka = async (connectionOptions: ConnectionOptions): Promise => { + const provider = getClusterProvider(connectionOptions.clusterProviderId); + if (!provider) { + throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`); + } + const kafka = await provider.createKafka(connectionOptions); + return kafka || createDefaultKafka(connectionOptions); +}; + +export const createDefaultKafka = (connectionOptions: ConnectionOptions): Kafka => { if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) { - kafkaJsClient = new Kafka({ + return new Kafka({ clientId: "vscode-kafka", brokers: connectionOptions.bootstrap.split(","), ssl: true, sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password }, }); - } else { - kafkaJsClient = new Kafka({ - clientId: "vscode-kafka", - brokers: connectionOptions.bootstrap.split(","), - ssl: connectionOptions.ssl - }); } - return kafkaJsClient; + return new Kafka({ + clientId: "vscode-kafka", + brokers: connectionOptions.bootstrap.split(","), + ssl: connectionOptions.ssl + }); }; diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 4eb76a7..d281f3e 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable { const settings = getWorkspaceSettings(); this.options = { + clusterProviderId: cluster.clusterProviderId, bootstrap: cluster.bootstrap, saslOption: cluster.saslOption, consumerGroupId: consumerGroupId, @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable { const fromOffset = this.options.fromOffset; const topic = this.options.topicId; - this.kafkaClient = createKafka(this.options); + this.kafkaClient = await createKafka(this.options); this.consumer = this.kafkaClient.consumer({ groupId: this.options.consumerGroupId, retry: { retries: 3 }, partitionAssigners: [ diff --git a/src/extension.ts b/src/extension.ts index 2ac0e32..a2eae0e 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase"; import * as path from 'path'; import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { KafkaFileCodeLensProvider } from "./kafka-file"; +import { getDefaultClusterProcessorCommand } from "./kafka-extensions/registry"; +import { ClusterProviderProcessor } from "./kafka-extensions/api"; -export function activate(context: vscode.ExtensionContext): void { +export function activate(context: vscode.ExtensionContext): ClusterProviderProcessor { Context.register(context); // Settings, data etc. @@ -144,6 +146,8 @@ export function activate(context: vscode.ExtensionContext): void { context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider( ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)); + + return getDefaultClusterProcessorCommand(); } export function deactivate(): void { diff --git a/src/kafka-extensions/api.ts b/src/kafka-extensions/api.ts new file mode 100644 index 0000000..7aec419 --- /dev/null +++ b/src/kafka-extensions/api.ts @@ -0,0 +1,25 @@ +import { Kafka } from "kafkajs"; +import { Cluster, ConnectionOptions } from "../client/client"; +import { ClusterSettings } from "../settings/clusters"; + +/** + * The cluster provider processor. + */ +export interface ClusterProviderProcessor { + + /** + * Returns the clusters managed by the processor which must be added to the kafka explorer. + * + * @param clusterSettings the cluster settings. + */ + collectClusters(clusterSettings: ClusterSettings): Promise; + + /** + * Create the Kafka JS client instance fromthe given connection options. + * When processor doesn't implement this method, the Kafka JS client + * instance is created with the default client factory from the vscode-kafka. + * + * @param connectionOptions the connection options. + */ + createKafka?(connectionOptions: ConnectionOptions): Kafka; +} diff --git a/src/kafka-extensions/registry.ts b/src/kafka-extensions/registry.ts new file mode 100644 index 0000000..15cf26e --- /dev/null +++ b/src/kafka-extensions/registry.ts @@ -0,0 +1,157 @@ +import * as vscode from "vscode"; +import { Kafka } from "kafkajs"; +import { Cluster, ConnectionOptions, createDefaultKafka } from "../client/client"; +import { ClusterSettings } from "../settings/clusters"; +import { collectDefaultClusters } from "../wizards/clusters"; +import { ClusterProviderProcessor } from "./api"; + +/** + * Cluster provider is used to: + * + * - collect clusters (eg: create a cluster from a wizard, import clusters from a repository, ...) + * and add thento the Kafka Explorer. + * - create a Kafka client from a complex process (eg : use SSO to connect to the cluster) + * + * Implementing a cluster provider in custom vscode extension is done in 2 steps: + * + * - define the cluster provider (id, label) in the package.json in the contributes/kafka/clusterProviders section. + * - return the cluster provider processor (See ClusterProviderProcessor) to use in the activate() of the extension. + * + */ +export class ClusterProvider { + + private processor: ClusterProviderProcessor | undefined; + + constructor(private definition: ClusterProviderDefinition, private extensionId: string) { + + } + + /** + * Returns the cluster provider id. + */ + public get id(): string { + return this.definition.id; + } + + /** + * Returns the cluster provider label. + */ + public get label(): string { + return this.definition.label || this.definition.id; + } + + /** + * Returns the clusters managed by the provider which must be added to the kafka explorer. + * + * @param clusterSettings the cluster settings. + */ + async collectClusters(clusterSettings: ClusterSettings): Promise { + const processor = await this.getProcessor(); + return processor.collectClusters(clusterSettings); + } + + /** + * Create the Kafka JS client instance from the given connection options. + * + * @param connectionOptions the connection options. + */ + async createKafka(connectionOptions: ConnectionOptions): Promise { + const processor = await this.getProcessor(); + if (processor.createKafka) { + return processor.createKafka(connectionOptions); + } + } + + private async getProcessor(): Promise { + if (this.processor) { + return this.processor; + } + // The cluster provider processor is not already loaded, try to activate the owner extension. + // The return of the extension activate() method must return the cluster provider processor. + const extension = vscode.extensions.getExtension(this.extensionId); + if (!extension) { + throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId} doesn't exists`); + } + + // Wait for extension is activated to get the processor + const result = await extension.activate(); + this.processor = result; + if (!this.processor) { + throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId}.activate should return the processor`); + } + return this.processor; + } +} + +const defaultClusterProviderId = 'vscode-kafka.manual'; + +let providers: Map = new Map(); + +export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined { + intializeIfNeeded(); + return providers.get(clusterProviderId || defaultClusterProviderId); +} + +export function getClusterProviders(): ClusterProvider[] { + intializeIfNeeded(); + return [...providers.values()]; +} + +function intializeIfNeeded() { + if (providers.size === 0) { + providers = collectClusterProviderDefinitions(vscode.extensions.all); + } +} + +export interface ClusterProviderDefinition { + id: string; + label?: string; +} + +/** + * Collect cluster providers defined in package.json (see vscode-kafka which implements default cluster provider with 'Manual' wizard.) + * + * ```json + * "contributes": { + * "kafka": { + * "clusterProviders": [ + * { + * "id": "vscode-kafka.manual", + * "label": "Manual" + * } + * ] + * } + * ``` + * + * @param extensions all installed vscode extensions + * + * @returns the map of cluster providers. + */ +export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension[]): Map { + const result: Map = new Map(); + if (extensions && extensions.length) { + for (const extension of extensions) { + const contributesSection = extension.packageJSON['contributes']; + if (contributesSection) { + const kafkaExtension = contributesSection['kafka']; + if (kafkaExtension) { + const clusterProviders = kafkaExtension['clusterProviders']; + if (Array.isArray(clusterProviders) && clusterProviders.length) { + for (const item of clusterProviders) { + const definition = item as ClusterProviderDefinition; + result.set(definition.id, new ClusterProvider(definition, extension.id)); + } + } + } + } + } + } + return result; +} + +export function getDefaultClusterProcessorCommand(): ClusterProviderProcessor { + return { + collectClusters: (clusterSettings: ClusterSettings): Promise => collectDefaultClusters(clusterSettings), + createKafka: (connectionOptions: ConnectionOptions): Kafka => createDefaultKafka(connectionOptions) + } as ClusterProviderProcessor; +} diff --git a/src/wizards/clusters.ts b/src/wizards/clusters.ts index ccd6b32..5b08fec 100644 --- a/src/wizards/clusters.ts +++ b/src/wizards/clusters.ts @@ -1,13 +1,12 @@ import { QuickPickItem, window } from "vscode"; -import { ConnectionOptions, SaslMechanism } from "../client"; +import { Cluster, ConnectionOptions, SaslMechanism } from "../client"; import { INPUT_TITLE } from "../constants"; import { KafkaExplorer } from "../explorer/kafkaExplorer"; +import { ClusterProvider, getClusterProviders } from "../kafka-extensions/registry"; import { ClusterSettings } from "../settings/clusters"; import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateBroker, validateClusterName, validateAuthentificationUserName } from "./validators"; -const DEFAULT_BROKER = 'localhost:9092'; - interface AddClusterState extends State, ConnectionOptions { name: string; } @@ -16,15 +15,69 @@ const DEFAULT_STEPS = 4; export async function addClusterWizard(clusterSettings: ClusterSettings, explorer: KafkaExplorer): Promise { + async function pickClusterProvider(): Promise { + const providers = getClusterProviders(); + if (providers.length === 1) { + return providers[0]; + } + + const existingLabels: Array = []; + const providerItems: QuickPickItem[] = providers + .map(provider => { + let label = provider.label; + if (existingLabels.indexOf(label) !== -1) { + label += ` [${provider.id}]`; + } else { + existingLabels.push(label); + } + return { "label": label }; + }); + const selected = (await window.showQuickPick(providerItems))?.label; + if (!selected) { + return; + } + return providers.find(provider => provider.label === selected); + } + + const provider = await pickClusterProvider(); + if (!provider) { + return; + } + + const clusters = await provider.collectClusters(clusterSettings); + if (!clusters || clusters.length === 0) { + return; + } + try { + for (const cluster of clusters) { + clusterSettings.upsert(cluster); + window.showInformationMessage(`Cluster '${cluster.name}' created successfully`); + } + explorer.refresh(); + // Selecting the created cluster is done with TreeView#reveal + // 1. Show the treeview of the explorer (otherwise reveal will not work) + explorer.show(); + // 2. the reveal() call must occur within a timeout(), + // while waiting for a fix in https://github.com/microsoft/vscode/issues/114149 + setTimeout(() => { + explorer.selectClusterByName(clusters[0].name); + }, 1000); + } + catch (error) { + showErrorMessage(`Error while creating cluster`, error); + } +} +const DEFAULT_BROKER = 'localhost:9092'; +export async function collectDefaultClusters(clusterSettings: ClusterSettings): Promise { const state: Partial = { totalSteps: DEFAULT_STEPS }; async function collectInputs(state: Partial, clusterSettings: ClusterSettings) { - await MultiStepInput.run(input => inputBrokers(input, state, clusterSettings)); + return MultiStepInput.run(input => inputBrokers(input, state, clusterSettings)); } async function inputBrokers(input: MultiStepInput, state: Partial, clusterSettings: ClusterSettings) { @@ -131,12 +184,11 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore } } - try { - await collectInputs(state, clusterSettings); - } catch (e) { - showErrorMessage('Error while collecting inputs for creating cluster', e); - return; - } + //try { + await collectInputs(state, clusterSettings); + //} catch (e) { + // showErrorMessage('Error while collecting inputs for creating cluster', e); + //} const addClusterState: AddClusterState = state as AddClusterState; const bootstrap = state.bootstrap; @@ -151,26 +203,13 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, ""); const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, ""); - try { - clusterSettings.upsert({ - id: `${sanitizedName}-${suffix}`, - bootstrap, - name, - saslOption, - ssl: state.ssl - }); - explorer.refresh(); - window.showInformationMessage(`Cluster '${name}' created successfully`); - // Selecting the created cluster is done with TreeView#reveal - // 1. Show the treeview of the explorer (otherwise reveal will not work) - explorer.show(); - // 2. the reveal() call must occur within a timeout(), - // while waiting for a fix in https://github.com/microsoft/vscode/issues/114149 - setTimeout(() => { - explorer.selectClusterByName(name); - }, 1000); - } - catch (error) { - showErrorMessage(`Error while creating cluster`, error); - } + return [{ + id: `${sanitizedName}-${suffix}`, + bootstrap, + name, + saslOption, + ssl: state.ssl + }]; } + +