Skip to content

Commit

Permalink
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 8, 2021
1 parent 7a0914c commit 109dfd8
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 17 deletions.
28 changes: 12 additions & 16 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
}

export interface Cluster extends ConnectionOptions {
export interface ClusterIdentifier {
clusterProviderId?: string;
id: string;
name: string;
}

export interface Cluster extends ClusterIdentifier, ConnectionOptions {

}

/**
* The supported SASL mechanisms for authentication.
*/
Expand Down Expand Up @@ -336,20 +343,9 @@ export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSetti
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
const provider = getClusterProvider(connectionOptions.clusterProviderId || 'default');
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
}
return kafkaJsClient;
return provider.clientFactory.createKafka(connectionOptions);
};
10 changes: 9 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { ClusterProvider, KafkaApi } from "./kafka-extensions/api";
import { registerClusterProvider } from "./kafka-extensions/registry";

export function activate(context: vscode.ExtensionContext): void {
export function activate(context: vscode.ExtensionContext): KafkaApi {
Context.register(context);

// Settings, data etc.
Expand Down Expand Up @@ -144,6 +146,12 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

return {
registerClusterProvider(provider: ClusterProvider) {
registerClusterProvider(provider);
}
};
}

export function deactivate(): void {
Expand Down
21 changes: 21 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Kafka } from "kafkajs";
import { ConnectionOptions } from "../client/client";

export interface KafkaClientFactory {

createKafka(connectionOptions: ConnectionOptions): Kafka;

}

export interface ClusterProvider {

id: string;

clientFactory: KafkaClientFactory;

}

export interface KafkaApi {

registerClusterProvider(provider: ClusterProvider): void;
}
41 changes: 41 additions & 0 deletions src/kafka-extensions/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Kafka } from "kafkajs";
import { ConnectionOptions } from "../client/client";
import { ClusterProvider, KafkaClientFactory } from "./api";

const providers: Map<string, ClusterProvider> = new Map();

export function registerClusterProvider(provider: ClusterProvider): void {
providers.set(provider.id, provider);
}

export function getClusterProvider(clusterProviderId: string): ClusterProvider | undefined {
return providers.get(clusterProviderId);
}

class DefaultKafkaClientFactory implements KafkaClientFactory {

createKafka(connectionOptions: ConnectionOptions): Kafka {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
}
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});

}
}

class DefaultClusterProvider implements ClusterProvider {
id = 'default';
label = 'Manual';
clientFactory = new DefaultKafkaClientFactory();
}

registerClusterProvider(new DefaultClusterProvider());

0 comments on commit 109dfd8

Please sign in to comment.