Skip to content

Commit

Permalink
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 11, 2021
1 parent bf47c3d commit e2eefef
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 55 deletions.
9 changes: 9 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@
],
"main": "./dist/extension",
"contributes": {
"kafka": {
"clusterProviders": [
{
"id": "vscode-kafka.manual",
"label": "Manual",
"commandId": "vscode-kafka.cluster.providers"
}
]
},
"configuration": {
"type": "object",
"title": "Kafka",
Expand Down
95 changes: 73 additions & 22 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
Expand Down Expand Up @@ -176,10 +178,9 @@ class KafkaJsClient implements Client {
public kafkaClient: any;
public kafkaCyclicProducerClient: any;
public kafkaKeyedProducerClient: any;
public producer: Producer;

private kafkaJsClient: Kafka;
private kafkaAdminClient: Admin;
private kafkaProducer: Producer | undefined;
private kafkaJsClient: Kafka | undefined;
private kafkaAdminClient: Admin | undefined;

private metadata: {
topics: Topic[];
Expand All @@ -191,21 +192,39 @@ class KafkaJsClient implements Client {
brokers: [],
topics: [],
};
this.kafkaJsClient = createKafka(cluster);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
// The Kafka client is created in asynchronous since external vscode extension
// can contribute to the creation of Kafka instance.
createKafka(cluster)
.then(result => {
this.kafkaJsClient = result;
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.kafkaProducer = this.kafkaJsClient.producer();
});
}


public get producer(): Producer {
if (this.kafkaProducer) {
return this.kafkaProducer;
}
throw new Error('Impossible to get producer. Kafka client is not connected!');
}

canConnect(): boolean {
return this.kafkaAdminClient !== null;
}

connect(): Promise<void> {
return this.kafkaAdminClient.connect();
async connect(): Promise<void> {
if (this.kafkaAdminClient) {
return this.kafkaAdminClient.connect();
}
}

async getTopics(): Promise<Topic[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get topics. Kafka client is not connected!');
}
const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata();

this.metadata = {
Expand Down Expand Up @@ -233,7 +252,10 @@ class KafkaJsClient implements Client {
}

async getBrokers(): Promise<Broker[]> {
const describeClusterResponse = await this.kafkaAdminClient?.describeCluster();
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get brokers. Kafka client is not connected!');
}
const describeClusterResponse = await this.kafkaAdminClient.describeCluster();

this.metadata = {
...this.metadata,
Expand All @@ -251,6 +273,9 @@ class KafkaJsClient implements Client {
}

async getBrokerConfigs(brokerId: string): Promise<ConfigEntry[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get broker config. Kafka client is not connected!');
}
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
includeSynonyms: false,
resources: [
Expand All @@ -265,6 +290,9 @@ class KafkaJsClient implements Client {
}

async getTopicConfigs(topicId: string): Promise<ConfigEntry[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get topic configs. Kafka client is not connected!');
}
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
includeSynonyms: false,
resources: [
Expand All @@ -279,11 +307,17 @@ class KafkaJsClient implements Client {
}

async getConsumerGroupIds(): Promise<string[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get consumer group ids. Kafka client is not connected!');
}
const listGroupsResponse = await this.kafkaAdminClient.listGroups();
return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId)));
}

async getConsumerGroupDetails(groupId: string): Promise<ConsumerGroup> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get consumer group details. Kafka client is not connected!');
}
const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]);

const consumerGroup: ConsumerGroup = {
Expand All @@ -304,10 +338,16 @@ class KafkaJsClient implements Client {
}

async deleteConsumerGroups(groupIds: string[]): Promise<void> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to delete consumer groups. Kafka client is not connected!');
}
await this.kafkaAdminClient.deleteGroups(groupIds);
}

async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to create topic. Kafka client is not connected!');
}
await this.kafkaAdminClient.createTopics({
validateOnly: false,
waitForLeaders: true,
Expand All @@ -321,35 +361,46 @@ class KafkaJsClient implements Client {
}

async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to delete topic. Kafka client is not connected!');
}
return await this.kafkaAdminClient.deleteTopics({
topics: deleteTopicRequest.topics,
timeout: deleteTopicRequest.timeout
});
}

dispose() {
this.kafkaAdminClient.disconnect();
if (this.kafkaAdminClient) {
this.kafkaAdminClient.disconnect();
}
}
}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
export const createKafka = async (connectionOptions: ConnectionOptions): Promise<Kafka> => {
const provider = getClusterProvider(connectionOptions.clusterProviderId);
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
}
const kafka = await provider.createKafka(connectionOptions);
return kafka || createDefaultKafka(connectionOptions);
};

export const createDefaultKafka = (connectionOptions: ConnectionOptions): Kafka => {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
}
return kafkaJsClient;
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
};
3 changes: 2 additions & 1 deletion src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable {

const settings = getWorkspaceSettings();
this.options = {
clusterProviderId: cluster.clusterProviderId,
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
Expand All @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable {
const fromOffset = this.options.fromOffset;
const topic = this.options.topicId;

this.kafkaClient = createKafka(this.options);
this.kafkaClient = await createKafka(this.options);
this.consumer = this.kafkaClient.consumer({
groupId: this.options.consumerGroupId, retry: { retries: 3 },
partitionAssigners: [
Expand Down
8 changes: 8 additions & 0 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { onExtensionChange, registerClusterProcessorCommand } from "./kafka-extensions/registry";

export function activate(context: vscode.ExtensionContext): void {
Context.register(context);
Expand Down Expand Up @@ -144,6 +145,13 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

if (vscode.extensions.onDidChange) {// Theia doesn't support this API yet
context.subscriptions.push(vscode.extensions.onDidChange(() => {
onExtensionChange(vscode.extensions.all);
}));
}
registerClusterProcessorCommand(context);
}

export function deactivate(): void {
Expand Down
8 changes: 8 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Kafka } from "kafkajs";
import { Cluster, ConnectionOptions } from "../client/client";
import { ClusterSettings } from "../settings/clusters";

export interface ClusterProviderProcessor {
collectClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined>;
createKafka?(connectionOptions: ConnectionOptions): Kafka;
}
131 changes: 131 additions & 0 deletions src/kafka-extensions/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import * as vscode from "vscode";
import { Kafka } from "kafkajs";
import { Cluster, ConnectionOptions, createDefaultKafka } from "../client/client";
import { ClusterSettings } from "../settings/clusters";
import { collectDefaultClusters } from "../wizards/clusters";
import { ClusterProviderProcessor } from "./api";

export class ClusterProvider {

private processor: ClusterProviderProcessor | undefined;

constructor(private definition: ClusterProviderDefinition, private extensionId: string) {

}

public get id(): string {
return this.definition.id;
}

public get label(): string {
return this.definition.label || this.definition.id;
}

async collectClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> {
const processor = await this.getProcessor();
return processor.collectClusters(clusterSettings);
}

async createKafka(connectionOptions: ConnectionOptions): Promise<Kafka | undefined> {
const processor = await this.getProcessor();
if (processor.createKafka) {
return processor.createKafka(connectionOptions);
}
}

private async getProcessor(): Promise<ClusterProviderProcessor> {
if (this.processor) {
return this.processor;
}
const extension = vscode.extensions.getExtension(this.extensionId);
if (!extension) {
throw new Error('Extension is not started');
}

// Wait for extension is activated in order to register the commands.
await extension.activate();

const processor = await vscode.commands.executeCommand(this.definition.commandId);
if (!processor) {
throw new Error('Cannot find cluster provider processor');
}
return this.processor = <ClusterProviderProcessor>processor;
}

}

const defaultClusterProviderId = 'vscode-kafka.manual';

let providers: Map<string, ClusterProvider> = new Map();

export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined {
intializeIfNeeded();
return providers.get(clusterProviderId || defaultClusterProviderId);
}

export function getClusterProviders(): ClusterProvider[] {
intializeIfNeeded();
return [...providers.values()];
}

function intializeIfNeeded() {
if (providers.size === 0) {
providers = collectClusterProviderDefinitions(vscode.extensions.all);
}
}

export interface ClusterProviderDefinition {
id: string;
label?: string;
commandId: string;
}

let existingExtensions: Array<ClusterProviderDefinition>;

export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension<any>[]): Map<string, ClusterProvider> {
const result: Map<string, ClusterProvider> = new Map();
if (extensions && extensions.length) {
for (const extension of extensions) {
const contributesSection = extension.packageJSON['contributes'];
if (contributesSection) {
const kafkaExtension = contributesSection['kafka'];
if (kafkaExtension) {
const clusterProviders = kafkaExtension['clusterProviders'];
if (Array.isArray(clusterProviders) && clusterProviders.length) {
for (const item of clusterProviders) {
const definition = item as ClusterProviderDefinition;
result.set(definition.id, new ClusterProvider(definition, extension.id));
}
}
}
}
}
}
return result;
}

export function onExtensionChange(extensions: readonly vscode.Extension<any>[]) {
if (!existingExtensions) {
return;
}
//const oldExtensions = new Set(existingExtensions.slice());
//const newExtensions = collectClusterProviderDefinitions(extensions);
/*let hasChanged = (oldExtensions.size !== newExtensions.length);
if (!hasChanged) {
for (const newExtension of newExtensions) {
if (!oldExtensions.has(newExtension)) {
hasChanged = true;
break;
}
}
}*/
}

export function registerClusterProcessorCommand(context: vscode.ExtensionContext) {
context.subscriptions.push(vscode.commands.registerCommand('vscode-kafka.cluster.providers', () => {
return {
collectClusters: (clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> => collectDefaultClusters(clusterSettings),
createKafka: (connectionOptions: ConnectionOptions): Kafka => createDefaultKafka(connectionOptions)
} as ClusterProviderProcessor;
}));
}
Loading

0 comments on commit e2eefef

Please sign in to comment.