diff --git a/src/client/client.ts b/src/client/client.ts index 33f0a73..b758f1a 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,19 +1,26 @@ import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs"; import { Disposable } from "vscode"; +import { getClusterProvider } from "../kafka-extensions/registry"; import { WorkspaceSettings } from "../settings"; export interface ConnectionOptions { + clusterProviderId?: string; bootstrap: string; saslOption?: SaslOption; ssl?: boolean; } -export interface Cluster extends ConnectionOptions { +export interface ClusterIdentifier { + clusterProviderId?: string; id: string; name: string; } +export interface Cluster extends ClusterIdentifier, ConnectionOptions { + +} + /** * The supported SASL mechanisms for authentication. */ @@ -336,20 +343,9 @@ export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSetti new KafkaJsClient(cluster, workspaceSettings)); export const createKafka = (connectionOptions: ConnectionOptions): Kafka => { - let kafkaJsClient: Kafka; - if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) { - kafkaJsClient = new Kafka({ - clientId: "vscode-kafka", - brokers: connectionOptions.bootstrap.split(","), - ssl: true, - sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password }, - }); - } else { - kafkaJsClient = new Kafka({ - clientId: "vscode-kafka", - brokers: connectionOptions.bootstrap.split(","), - ssl: connectionOptions.ssl - }); + const provider = getClusterProvider(connectionOptions.clusterProviderId || 'default'); + if (!provider) { + throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`); } - return kafkaJsClient; + return provider.clientFactory.createKafka(connectionOptions); }; diff --git a/src/extension.ts b/src/extension.ts index 2ac0e32..6ef45f7 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase"; import * as path from 'path'; import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { KafkaFileCodeLensProvider } from "./kafka-file"; +import { ClusterProvider, KafkaApi } from "./kafka-extensions/api"; +import { registerClusterProvider } from "./kafka-extensions/registry"; -export function activate(context: vscode.ExtensionContext): void { +export function activate(context: vscode.ExtensionContext): KafkaApi { Context.register(context); // Settings, data etc. @@ -144,6 +146,12 @@ export function activate(context: vscode.ExtensionContext): void { context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider( ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)); + + return { + registerClusterProvider(provider: ClusterProvider) { + registerClusterProvider(provider); + } + }; } export function deactivate(): void { diff --git a/src/kafka-extensions/api.ts b/src/kafka-extensions/api.ts new file mode 100644 index 0000000..d439303 --- /dev/null +++ b/src/kafka-extensions/api.ts @@ -0,0 +1,21 @@ +import { Kafka } from "kafkajs"; +import { ConnectionOptions } from "../client/client"; + +export interface KafkaClientFactory { + + createKafka(connectionOptions: ConnectionOptions): Kafka; + +} + +export interface ClusterProvider { + + id: string; + + clientFactory: KafkaClientFactory; + +} + +export interface KafkaApi { + + registerClusterProvider(provider: ClusterProvider): void; +} diff --git a/src/kafka-extensions/registry.ts b/src/kafka-extensions/registry.ts new file mode 100644 index 0000000..f7dee29 --- /dev/null +++ b/src/kafka-extensions/registry.ts @@ -0,0 +1,41 @@ +import { Kafka } from "kafkajs"; +import { ConnectionOptions } from "../client/client"; +import { ClusterProvider, KafkaClientFactory } from "./api"; + +const providers: Map = new Map(); + +export function registerClusterProvider(provider: ClusterProvider): void { + providers.set(provider.id, provider); +} + +export function getClusterProvider(clusterProviderId: string): ClusterProvider | undefined { + return providers.get(clusterProviderId); +} + +class DefaultKafkaClientFactory implements KafkaClientFactory { + + createKafka(connectionOptions: ConnectionOptions): Kafka { + if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) { + return new Kafka({ + clientId: "vscode-kafka", + brokers: connectionOptions.bootstrap.split(","), + ssl: true, + sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password }, + }); + } + return new Kafka({ + clientId: "vscode-kafka", + brokers: connectionOptions.bootstrap.split(","), + ssl: connectionOptions.ssl + }); + + } +} + +class DefaultClusterProvider implements ClusterProvider { + id = 'default'; + label = 'Manual'; + clientFactory = new DefaultKafkaClientFactory(); +} + +registerClusterProvider(new DefaultClusterProvider());