From a919ee012980ac2f38dc5bcf381eb450d70771a1 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Mon, 8 Mar 2021 15:31:11 +0100 Subject: [PATCH] Cluster provider API extension Fixes #123 Signed-off-by: azerr --- package.json | 14 +++ schemas/package.schema.json | 37 +++++++ src/client/client.ts | 104 ++++++++++++------ src/client/consumer.ts | 3 +- src/commands/producers.ts | 2 +- src/extension.ts | 6 +- src/kafka-extensions/api.ts | 31 ++++++ src/kafka-extensions/registry.ts | 179 +++++++++++++++++++++++++++++++ src/wizards/clusters.ts | 99 +++++++++++++---- 9 files changed, 417 insertions(+), 58 deletions(-) create mode 100644 schemas/package.schema.json create mode 100644 src/kafka-extensions/api.ts create mode 100644 src/kafka-extensions/registry.ts diff --git a/package.json b/package.json index 94e88dc..0c57603 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,14 @@ ], "main": "./dist/extension", "contributes": { + "kafka": { + "clusterProviders": [ + { + "id": "vscode-kafka.manual", + "name": "Configure manually" + } + ] + }, "configuration": { "type": "object", "title": "Kafka", @@ -182,6 +190,12 @@ "path": "./snippets/consumers.json" } ], + "jsonValidation": [ + { + "fileMatch": "package.json", + "url": "./schemas/package.schema.json" + } + ], "commands": [ { "command": "vscode-kafka.open.docs.home", diff --git a/schemas/package.schema.json b/schemas/package.schema.json new file mode 100644 index 0000000..7e5f92e --- /dev/null +++ b/schemas/package.schema.json @@ -0,0 +1,37 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "Kafka contributions to package.json", + "type": "object", + "properties": { + "contributes": { + "type": "object", + "properties": { + "kafka": { + "type": "object", + "markdownDescription": "Kafka extensions", + "properties": { + "clusterProviders": { + "type": "array", + "markdownDescription": "Cluster providers definitions.", + "items": [ + { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Cluster provider id." + }, + "name": { + "type": "string", + "description": "Cluster provider name." + } + } + } + ] + } + } + } + } + } + } +} diff --git a/src/client/client.ts b/src/client/client.ts index 33f0a73..03fe0e4 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,9 +1,11 @@ -import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs"; +import { Admin, ConfigResourceTypes, Kafka, KafkaConfig, Producer } from "kafkajs"; import { Disposable } from "vscode"; +import { getClusterProvider } from "../kafka-extensions/registry"; import { WorkspaceSettings } from "../settings"; export interface ConnectionOptions { + clusterProviderId?: string; bootstrap: string; saslOption?: SaslOption; ssl?: boolean; @@ -80,7 +82,7 @@ export interface ConsumerGroupMember { export interface Client extends Disposable { cluster: Cluster; - producer: Producer; + producer(): Promise; connect(): Promise; getTopics(): Promise; getBrokers(): Promise; @@ -102,8 +104,8 @@ class EnsureConnectedDecorator implements Client { return this.client.cluster; } - get producer(): any { - return this.client.producer; + public producer(): any { + return this.client.producer(); } public connect(): Promise { @@ -176,37 +178,61 @@ class KafkaJsClient implements Client { public kafkaClient: any; public kafkaCyclicProducerClient: any; public kafkaKeyedProducerClient: any; - public producer: Producer; - - private kafkaJsClient: Kafka; - private kafkaAdminClient: Admin; + private kafkaProducer: Producer | undefined; + private kafkaJsClient: Kafka | undefined; + private kafkaAdminClient: Admin | undefined; private metadata: { topics: Topic[]; brokers: Broker[]; }; + // Promise which returns the KafkaJsClient instance when it is ready. + private kafkaPromise: Promise; + constructor(public readonly cluster: Cluster, workspaceSettings: WorkspaceSettings) { this.metadata = { brokers: [], topics: [], }; - this.kafkaJsClient = createKafka(cluster); - this.kafkaClient = this.kafkaJsClient; - this.kafkaAdminClient = this.kafkaJsClient.admin(); - this.producer = this.kafkaJsClient.producer(); + // The Kafka client is created in asynchronous since external vscode extension + // can contribute to the creation of Kafka instance. + this.kafkaPromise = createKafka(cluster) + .then(result => { + this.kafkaJsClient = result; + this.kafkaClient = this.kafkaJsClient; + this.kafkaAdminClient = this.kafkaJsClient.admin(); + this.kafkaProducer = this.kafkaJsClient.producer(); + return this; + }); + } + + public async getkafkaAdminClient(): Promise { + const admin = (await this.kafkaPromise).kafkaAdminClient; + if (!admin) { + throw new Error('Kafka Admin cannot be null.'); + } + return admin; + } + + public async producer(): Promise { + const producer = (await this.kafkaPromise).kafkaProducer; + if (!producer) { + throw new Error('Producer cannot be null.'); + } + return producer; } canConnect(): boolean { return this.kafkaAdminClient !== null; } - connect(): Promise { - return this.kafkaAdminClient.connect(); + async connect(): Promise { + return (await this.getkafkaAdminClient()).connect(); } async getTopics(): Promise { - const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata(); + const listTopicsResponse = await (await this.getkafkaAdminClient()).fetchTopicMetadata(); this.metadata = { ...this.metadata, @@ -233,7 +259,7 @@ class KafkaJsClient implements Client { } async getBrokers(): Promise { - const describeClusterResponse = await this.kafkaAdminClient?.describeCluster(); + const describeClusterResponse = await (await this.getkafkaAdminClient()).describeCluster(); this.metadata = { ...this.metadata, @@ -251,7 +277,7 @@ class KafkaJsClient implements Client { } async getBrokerConfigs(brokerId: string): Promise { - const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({ + const describeConfigsResponse = await (await this.getkafkaAdminClient()).describeConfigs({ includeSynonyms: false, resources: [ { @@ -265,7 +291,7 @@ class KafkaJsClient implements Client { } async getTopicConfigs(topicId: string): Promise { - const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({ + const describeConfigsResponse = await (await this.getkafkaAdminClient()).describeConfigs({ includeSynonyms: false, resources: [ { @@ -279,12 +305,12 @@ class KafkaJsClient implements Client { } async getConsumerGroupIds(): Promise { - const listGroupsResponse = await this.kafkaAdminClient.listGroups(); + const listGroupsResponse = await (await this.getkafkaAdminClient()).listGroups(); return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId))); } async getConsumerGroupDetails(groupId: string): Promise { - const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]); + const describeGroupResponse = await (await this.getkafkaAdminClient()).describeGroups([groupId]); const consumerGroup: ConsumerGroup = { groupId: groupId, @@ -304,11 +330,11 @@ class KafkaJsClient implements Client { } async deleteConsumerGroups(groupIds: string[]): Promise { - await this.kafkaAdminClient.deleteGroups(groupIds); + await (await this.getkafkaAdminClient()).deleteGroups(groupIds); } async createTopic(createTopicRequest: CreateTopicRequest): Promise { - await this.kafkaAdminClient.createTopics({ + await (await this.getkafkaAdminClient()).createTopics({ validateOnly: false, waitForLeaders: true, topics: [{ @@ -321,35 +347,43 @@ class KafkaJsClient implements Client { } async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise { - return await this.kafkaAdminClient.deleteTopics({ + return await (await this.getkafkaAdminClient()).deleteTopics({ topics: deleteTopicRequest.topics, timeout: deleteTopicRequest.timeout }); } dispose() { - this.kafkaAdminClient.disconnect(); + if (this.kafkaAdminClient) { + this.kafkaAdminClient.disconnect(); + } } } export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator( new KafkaJsClient(cluster, workspaceSettings)); -export const createKafka = (connectionOptions: ConnectionOptions): Kafka => { - let kafkaJsClient: Kafka; +export const createKafka = async (connectionOptions: ConnectionOptions): Promise => { + const provider = getClusterProvider(connectionOptions.clusterProviderId); + if (!provider) { + throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`); + } + const kafkaConfig = await provider.createKafkaConfig(connectionOptions) || createDefaultKafkaConfig(connectionOptions); + return new Kafka(kafkaConfig); +}; + +export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions): KafkaConfig => { if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) { - kafkaJsClient = new Kafka({ + return { clientId: "vscode-kafka", brokers: connectionOptions.bootstrap.split(","), ssl: true, sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password }, - }); - } else { - kafkaJsClient = new Kafka({ - clientId: "vscode-kafka", - brokers: connectionOptions.bootstrap.split(","), - ssl: connectionOptions.ssl - }); + }; } - return kafkaJsClient; + return { + clientId: "vscode-kafka", + brokers: connectionOptions.bootstrap.split(","), + ssl: connectionOptions.ssl + }; }; diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 4eb76a7..d281f3e 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable { const settings = getWorkspaceSettings(); this.options = { + clusterProviderId: cluster.clusterProviderId, bootstrap: cluster.bootstrap, saslOption: cluster.saslOption, consumerGroupId: consumerGroupId, @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable { const fromOffset = this.options.fromOffset; const topic = this.options.topicId; - this.kafkaClient = createKafka(this.options); + this.kafkaClient = await createKafka(this.options); this.consumer = this.kafkaClient.consumer({ groupId: this.options.consumerGroupId, retry: { retries: 3 }, partitionAssigners: [ diff --git a/src/commands/producers.ts b/src/commands/producers.ts index c8dcaa7..9ed7100 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -64,7 +64,7 @@ export class ProduceRecordCommandHandler { return; } - const producer = client.producer; + const producer = await client.producer(); await producer.connect(); channel.show(false); diff --git a/src/extension.ts b/src/extension.ts index 2ac0e32..a2eae0e 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase"; import * as path from 'path'; import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { KafkaFileCodeLensProvider } from "./kafka-file"; +import { getDefaultClusterProcessorCommand } from "./kafka-extensions/registry"; +import { ClusterProviderProcessor } from "./kafka-extensions/api"; -export function activate(context: vscode.ExtensionContext): void { +export function activate(context: vscode.ExtensionContext): ClusterProviderProcessor { Context.register(context); // Settings, data etc. @@ -144,6 +146,8 @@ export function activate(context: vscode.ExtensionContext): void { context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider( ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)); + + return getDefaultClusterProcessorCommand(); } export function deactivate(): void { diff --git a/src/kafka-extensions/api.ts b/src/kafka-extensions/api.ts new file mode 100644 index 0000000..8ec1da4 --- /dev/null +++ b/src/kafka-extensions/api.ts @@ -0,0 +1,31 @@ +import { KafkaConfig } from "kafkajs"; +import { Cluster, ConnectionOptions } from "../client/client"; +import { ClusterSettings } from "../settings/clusters"; + +export interface ClusterProviderProcessorProvider { + + getClusterProviderProcessor(clusterProviderId: string) : ClusterProviderProcessor; + +} + +/** + * The cluster provider processor. + */ +export interface ClusterProviderProcessor { + + /** + * Returns the clusters managed by the processor which must be added to the kafka explorer. + * + * @param clusterSettings the cluster settings. + */ + collectClusters(clusterSettings: ClusterSettings): Promise; + + /** + * Create the Kafka JS client configuration from the given connection options. + * When processor doesn't implement this method, the Kafka JS client + * configuration is created with the default client configuration factory from the vscode-kafka. + * + * @param connectionOptions the connection options. + */ + createKafkaConfig?(connectionOptions: ConnectionOptions): KafkaConfig; +} diff --git a/src/kafka-extensions/registry.ts b/src/kafka-extensions/registry.ts new file mode 100644 index 0000000..c289110 --- /dev/null +++ b/src/kafka-extensions/registry.ts @@ -0,0 +1,179 @@ +import * as vscode from "vscode"; +import { KafkaConfig } from "kafkajs"; +import { Cluster, ConnectionOptions, createDefaultKafkaConfig as createDefaultKafkaConfig } from "../client/client"; +import { ClusterSettings } from "../settings/clusters"; +import { collectDefaultClusters } from "../wizards/clusters"; +import { ClusterProviderProcessor, ClusterProviderProcessorProvider } from "./api"; + +/** + * Cluster provider is used to: + * + * - collect clusters (eg: create a cluster from a wizard, import clusters from a repository, ...) + * and add thento the Kafka Explorer. + * - create a Kafka client from a complex process (eg : use SSO to connect to the cluster) + * + * Implementing a cluster provider in custom vscode extension is done in 2 steps: + * + * - define the cluster provider (id, name) in the package.json in the contributes/kafka/clusterProviders section. + * - return the cluster provider processor (See ClusterProviderProcessor) to use in the activate() of the extension. + * + */ +export class ClusterProvider { + + private processor: ClusterProviderProcessor | undefined; + + constructor(private definition: ClusterProviderDefinition, private extensionId: string) { + + } + + /** + * Returns the cluster provider id. + */ + public get id(): string { + return this.definition.id; + } + + /** + * Returns the cluster provider name. + */ + public get name(): string { + return this.definition.name || this.definition.id; + } + + /** + * Returns the clusters managed by the provider which must be added to the kafka explorer. + * + * @param clusterSettings the cluster settings. + */ + async collectClusters(clusterSettings: ClusterSettings): Promise { + const processor = await this.getProcessor(); + return processor.collectClusters(clusterSettings); + } + + /** + * Create the Kafka JS client instance from the given connection options. + * + * @param connectionOptions the connection options. + */ + async createKafkaConfig(connectionOptions: ConnectionOptions): Promise { + const processor = await this.getProcessor(); + if (processor.createKafkaConfig) { + return processor.createKafkaConfig(connectionOptions); + } + } + + private async getProcessor(): Promise { + if (this.processor) { + return this.processor; + } + // The cluster provider processor is not already loaded, try to activate the owner extension. + // The return of the extension activate() method must return the cluster provider processor. + const extension = vscode.extensions.getExtension(this.extensionId); + if (!extension) { + throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId} is not available.`); + } + + // Wait for extension is activated to get the processor + const result = await extension.activate(); + if (!result) { + throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId}.activate() should return 'ClusterProviderProcessor' or 'ClusterProviderProcessorProvider'.`); + } + if ('getClusterProviderProcessor' in result) { + this.processor = (result).getClusterProviderProcessor(this.id); + } else { + this.processor = result; + } + if (!this.processor) { + throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId}.activate() should return 'ClusterProviderProcessor' or 'ClusterProviderProcessorProvider'.`); + } + return this.processor; + } +} + +const defaultClusterProviderId = 'vscode-kafka.manual'; + +let providers: Map = new Map(); + +export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined { + intializeIfNeeded(); + return providers.get(clusterProviderId || defaultClusterProviderId); +} + +export function getClusterProviders(): ClusterProvider[] { + intializeIfNeeded(); + // "Configure manually" provider must be the first + const manual = getClusterProvider(defaultClusterProviderId); + // Other providers must be sorted by name ascending + const others = [...providers.values()] + .filter(provider => provider.id !== defaultClusterProviderId) + .sort(sortByNameAscending); + if (manual) { + return [manual, ...others]; + } + return others; +} + +function sortByNameAscending(a: ClusterProvider, b: ClusterProvider): -1 | 0 | 1 { + if (a.name.toLowerCase() < b.name.toLowerCase()) { return -1; } + if (a.name.toLowerCase() > b.name.toLowerCase()) { return 1; } + return 0; +} + +function intializeIfNeeded() { + if (providers.size === 0) { + providers = collectClusterProviderDefinitions(vscode.extensions.all); + } +} + +export interface ClusterProviderDefinition { + id: string; + name?: string; +} + +/** + * Collect cluster providers defined in package.json (see vscode-kafka which implements default cluster provider with 'Manual' wizard.) + * + * ```json + * "contributes": { + * "kafka": { + * "clusterProviders": [ + * { + * "id": "vscode-kafka.manual", + * "name": "Manual" + * } + * ] + * } + * ``` + * + * @param extensions all installed vscode extensions + * + * @returns the map of cluster providers. + */ +export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension[]): Map { + const result: Map = new Map(); + if (extensions && extensions.length) { + for (const extension of extensions) { + const contributesSection = extension.packageJSON['contributes']; + if (contributesSection) { + const kafkaExtension = contributesSection['kafka']; + if (kafkaExtension) { + const clusterProviders = kafkaExtension['clusterProviders']; + if (Array.isArray(clusterProviders) && clusterProviders.length) { + for (const item of clusterProviders) { + const definition = item as ClusterProviderDefinition; + result.set(definition.id, new ClusterProvider(definition, extension.id)); + } + } + } + } + } + } + return result; +} + +export function getDefaultClusterProcessorCommand(): ClusterProviderProcessor { + return { + collectClusters: (clusterSettings: ClusterSettings): Promise => collectDefaultClusters(clusterSettings), + createKafkaConfig: (connectionOptions: ConnectionOptions): KafkaConfig => createDefaultKafkaConfig(connectionOptions) + } as ClusterProviderProcessor; +} diff --git a/src/wizards/clusters.ts b/src/wizards/clusters.ts index ccd6b32..ecbe617 100644 --- a/src/wizards/clusters.ts +++ b/src/wizards/clusters.ts @@ -1,13 +1,12 @@ import { QuickPickItem, window } from "vscode"; -import { ConnectionOptions, SaslMechanism } from "../client"; +import { Cluster, ConnectionOptions, SaslMechanism } from "../client"; import { INPUT_TITLE } from "../constants"; import { KafkaExplorer } from "../explorer/kafkaExplorer"; +import { ClusterProvider, getClusterProviders } from "../kafka-extensions/registry"; import { ClusterSettings } from "../settings/clusters"; import { MultiStepInput, showErrorMessage, State } from "./multiStepInput"; import { validateBroker, validateClusterName, validateAuthentificationUserName } from "./validators"; -const DEFAULT_BROKER = 'localhost:9092'; - interface AddClusterState extends State, ConnectionOptions { name: string; } @@ -16,8 +15,79 @@ const DEFAULT_STEPS = 4; export async function addClusterWizard(clusterSettings: ClusterSettings, explorer: KafkaExplorer): Promise { + async function pickClusterProvider(): Promise { + const providers = getClusterProviders(); + if (providers.length === 1) { + // By default, it exists the default cluster provider 'Manual' from vscode-kafka + // to fill a cluster with a wizard, return it. + return providers[0]; + } + + const providerItems: QuickPickItem[] = providers + .map(provider => { + return { "label": provider.name }; + }); + const selected = (await window.showQuickPick(providerItems))?.label; + if (!selected) { + return; + } + return providers.find(provider => provider.name === selected); + } + + // Pick the cluster provider which provides teh capability to return a list of cluster to add to the kafka explorer + // eg (fill clust from a wizard, import clusters from a repository, etc) + const provider = await pickClusterProvider(); + if (!provider) { + return; + } + + // Collect clusters... + let clusters: Cluster[] | undefined; + try { + clusters = await provider.collectClusters(clusterSettings); + if (!clusters || clusters.length === 0) { + return; + } + } + catch (error) { + showErrorMessage(`Error while collecting cluster(s)`, error); + return; + } + + try { + // Save collected clusters in settings. + let createdClusterNames = ''; + for (const cluster of clusters) { + clusterSettings.upsert(cluster); + if (createdClusterNames !== '') { + createdClusterNames += '\', \''; + } + createdClusterNames += cluster.name; + } + window.showInformationMessage(`${clusters.length > 1 ? `${clusters.length} clusters` : 'Cluster'} '${createdClusterNames}' created successfully`); + + // Refresh the explorer + explorer.refresh(); + + // Selecting the created cluster is done with TreeView#reveal + // 1. Show the treeview of the explorer (otherwise reveal will not work) + explorer.show(); + // 2. the reveal() call must occur within a timeout(), + // while waiting for a fix in https://github.com/microsoft/vscode/issues/114149 + setTimeout(() => { + if (clusters) { + explorer.selectClusterByName(clusters[0].name); + } + }, 1000); + } + catch (error) { + showErrorMessage(`Error while creating cluster`, error); + } +} +const DEFAULT_BROKER = 'localhost:9092'; +export async function collectDefaultClusters(clusterSettings: ClusterSettings): Promise { const state: Partial = { totalSteps: DEFAULT_STEPS @@ -151,26 +221,15 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, ""); const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, ""); - try { - clusterSettings.upsert({ + return [ + { id: `${sanitizedName}-${suffix}`, bootstrap, name, saslOption, ssl: state.ssl - }); - explorer.refresh(); - window.showInformationMessage(`Cluster '${name}' created successfully`); - // Selecting the created cluster is done with TreeView#reveal - // 1. Show the treeview of the explorer (otherwise reveal will not work) - explorer.show(); - // 2. the reveal() call must occur within a timeout(), - // while waiting for a fix in https://github.com/microsoft/vscode/issues/114149 - setTimeout(() => { - explorer.selectClusterByName(name); - }, 1000); - } - catch (error) { - showErrorMessage(`Error while creating cluster`, error); - } + } + ]; } + +