Skip to content

Commit

Permalink
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 8, 2021
1 parent 7a0914c commit c9096ef
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 43 deletions.
32 changes: 22 additions & 10 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
}

export interface Cluster extends ConnectionOptions {
export interface ClusterIdentifier {
clusterProviderId?: string;
id: string;
name: string;
}

export interface Cluster extends ClusterIdentifier, ConnectionOptions {

}

/**
* The supported SASL mechanisms for authentication.
*/
Expand Down Expand Up @@ -336,20 +343,25 @@ export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSetti
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
const provider = getClusterProvider(connectionOptions.clusterProviderId );
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
}
return provider.createKafka(connectionOptions);
};

export const createDefaultKafka = (connectionOptions: ConnectionOptions): Kafka => {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
}
return kafkaJsClient;
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
};
10 changes: 9 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { ClusterProvider, KafkaApi } from "./kafka-extensions/api";
import { registerClusterProvider } from "./kafka-extensions/registry";

export function activate(context: vscode.ExtensionContext): void {
export function activate(context: vscode.ExtensionContext): KafkaApi {
Context.register(context);

// Settings, data etc.
Expand Down Expand Up @@ -144,6 +146,12 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

return {
registerClusterProvider(provider: ClusterProvider) {
registerClusterProvider(provider);
}
};
}

export function deactivate(): void {
Expand Down
14 changes: 14 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Kafka } from "kafkajs";
import { Cluster, ConnectionOptions } from "../client/client";
import { ClusterSettings } from "../settings/clusters";

export interface ClusterProvider {
id: string;
name?: string;
collectClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined>;
createKafka(connectionOptions: ConnectionOptions): Kafka;
}

export interface KafkaApi {
registerClusterProvider(provider: ClusterProvider): void;
}
30 changes: 30 additions & 0 deletions src/kafka-extensions/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Kafka } from "kafkajs";
import { Cluster, ConnectionOptions, createDefaultKafka } from "../client/client";
import { ClusterSettings } from "../settings/clusters";
import { collectDefaultClusters } from "../wizards/clusters";
import { ClusterProvider } from "./api";

const defaultClusterProviderId = 'default';

const providers: Map<string, ClusterProvider> = new Map();

export function registerClusterProvider(provider: ClusterProvider): void {
providers.set(provider.id, provider);
}

export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined {
return providers.get(clusterProviderId || defaultClusterProviderId);
}

export function getClusterProviders(): ClusterProvider[] {
return [...providers.values()];
}

class DefaultClusterProvider implements ClusterProvider {
id = defaultClusterProviderId;
name = 'Manual';
createKafka = (connectionOptions: ConnectionOptions): Kafka => createDefaultKafka(connectionOptions);
collectClusters = (clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> => collectDefaultClusters(clusterSettings);
}

registerClusterProvider(new DefaultClusterProvider());
93 changes: 61 additions & 32 deletions src/wizards/clusters.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { QuickPickItem, window } from "vscode";
import { ConnectionOptions, SaslMechanism } from "../client";
import { Cluster, ConnectionOptions, SaslMechanism } from "../client";
import { INPUT_TITLE } from "../constants";
import { KafkaExplorer } from "../explorer/kafkaExplorer";
import { ClusterProvider } from "../kafka-extensions/api";
import { getClusterProviders } from "../kafka-extensions/registry";
import { ClusterSettings } from "../settings/clusters";
import { MultiStepInput, showErrorMessage, State } from "./multiStepInput";
import { validateBroker, validateClusterName, validateAuthentificationUserName } from "./validators";

const DEFAULT_BROKER = 'localhost:9092';

interface AddClusterState extends State, ConnectionOptions {
name: string;
}
Expand All @@ -16,15 +16,58 @@ const DEFAULT_STEPS = 4;

export async function addClusterWizard(clusterSettings: ClusterSettings, explorer: KafkaExplorer): Promise<void> {

async function pickClusterProvider(): Promise<ClusterProvider | undefined> {
const providers = getClusterProviders();
if (providers.length === 1) {
return providers[0];
}

const providerItems: QuickPickItem[] = providers.map(provider => { return { "label": provider.id }; });
const selected = (await window.showQuickPick(providerItems))?.label;
if (!selected) {
return;
}
return providers.find(provider => provider.id === selected);
}

const provider = await pickClusterProvider();
if (!provider) {
return;
}

const clusters = await provider.collectClusters(clusterSettings);
if (!clusters || clusters.length === 0) {
return;
}

try {
for (const cluster of clusters) {
clusterSettings.upsert(cluster);
window.showInformationMessage(`Cluster '${cluster.name}' created successfully`);
}
// Selecting the created cluster is done with TreeView#reveal
// 1. Show the treeview of the explorer (otherwise reveal will not work)
explorer.show();
// 2. the reveal() call must occur within a timeout(),
// while waiting for a fix in https://github.com/microsoft/vscode/issues/114149
setTimeout(() => {
explorer.selectClusterByName(clusters[0].name);
}, 1000);
}
catch (error) {
showErrorMessage(`Error while creating cluster`, error);
}
}

const DEFAULT_BROKER = 'localhost:9092';

export async function collectDefaultClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> {
const state: Partial<AddClusterState> = {
totalSteps: DEFAULT_STEPS
};

async function collectInputs(state: Partial<AddClusterState>, clusterSettings: ClusterSettings) {
await MultiStepInput.run(input => inputBrokers(input, state, clusterSettings));
return MultiStepInput.run(input => inputBrokers(input, state, clusterSettings));
}

async function inputBrokers(input: MultiStepInput, state: Partial<AddClusterState>, clusterSettings: ClusterSettings) {
Expand Down Expand Up @@ -131,12 +174,11 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore
}
}

try {
await collectInputs(state, clusterSettings);
} catch (e) {
showErrorMessage('Error while collecting inputs for creating cluster', e);
return;
}
//try {
await collectInputs(state, clusterSettings);
//} catch (e) {
// showErrorMessage('Error while collecting inputs for creating cluster', e);
//}

const addClusterState: AddClusterState = state as AddClusterState;
const bootstrap = state.bootstrap;
Expand All @@ -151,26 +193,13 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore
const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, "");
const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, "");

try {
clusterSettings.upsert({
id: `${sanitizedName}-${suffix}`,
bootstrap,
name,
saslOption,
ssl: state.ssl
});
explorer.refresh();
window.showInformationMessage(`Cluster '${name}' created successfully`);
// Selecting the created cluster is done with TreeView#reveal
// 1. Show the treeview of the explorer (otherwise reveal will not work)
explorer.show();
// 2. the reveal() call must occur within a timeout(),
// while waiting for a fix in https://github.com/microsoft/vscode/issues/114149
setTimeout(() => {
explorer.selectClusterByName(name);
}, 1000);
}
catch (error) {
showErrorMessage(`Error while creating cluster`, error);
}
return [{
id: `${sanitizedName}-${suffix}`,
bootstrap,
name,
saslOption,
ssl: state.ssl
}];
}


0 comments on commit c9096ef

Please sign in to comment.