Skip to content

Commit

Permalink
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 10, 2021
1 parent bf47c3d commit ad0cff0
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 55 deletions.
9 changes: 9 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@
],
"main": "./dist/extension",
"contributes": {
"kafka": {
"clusterProviders": [
{
"id": "vscode-kafka.manual",
"label": "Manual",
"commandId": "vscode-kafka.cluster.providers"
}
]
},
"configuration": {
"type": "object",
"title": "Kafka",
Expand Down
99 changes: 77 additions & 22 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
}

export interface Cluster extends ConnectionOptions {
export interface ClusterIdentifier {
clusterProviderId?: string;
id: string;
name: string;
}

export interface Cluster extends ClusterIdentifier, ConnectionOptions {

}

/**
* The supported SASL mechanisms for authentication.
*/
Expand Down Expand Up @@ -176,10 +183,9 @@ class KafkaJsClient implements Client {
public kafkaClient: any;
public kafkaCyclicProducerClient: any;
public kafkaKeyedProducerClient: any;
public producer: Producer;

private kafkaJsClient: Kafka;
private kafkaAdminClient: Admin;
private kafkaProducer: Producer | undefined;
private kafkaJsClient: Kafka | undefined;
private kafkaAdminClient: Admin | undefined;

private metadata: {
topics: Topic[];
Expand All @@ -191,21 +197,37 @@ class KafkaJsClient implements Client {
brokers: [],
topics: [],
};
this.kafkaJsClient = createKafka(cluster);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
createKafka(cluster)
.then(result => {
this.kafkaJsClient = result;
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.kafkaProducer = this.kafkaJsClient.producer();
});
}


public get producer() : Producer {
if (this.kafkaProducer) {
return this.kafkaProducer;
}
throw new Error('TODO');
}

canConnect(): boolean {
return this.kafkaAdminClient !== null;
}

connect(): Promise<void> {
return this.kafkaAdminClient.connect();
async connect(): Promise<void> {
if (this.kafkaAdminClient) {
return this.kafkaAdminClient.connect();
}
}

async getTopics(): Promise<Topic[]> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata();

this.metadata = {
Expand Down Expand Up @@ -233,7 +255,10 @@ class KafkaJsClient implements Client {
}

async getBrokers(): Promise<Broker[]> {
const describeClusterResponse = await this.kafkaAdminClient?.describeCluster();
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
const describeClusterResponse = await this.kafkaAdminClient.describeCluster();

this.metadata = {
...this.metadata,
Expand All @@ -251,6 +276,9 @@ class KafkaJsClient implements Client {
}

async getBrokerConfigs(brokerId: string): Promise<ConfigEntry[]> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
includeSynonyms: false,
resources: [
Expand All @@ -265,6 +293,9 @@ class KafkaJsClient implements Client {
}

async getTopicConfigs(topicId: string): Promise<ConfigEntry[]> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
includeSynonyms: false,
resources: [
Expand All @@ -279,11 +310,17 @@ class KafkaJsClient implements Client {
}

async getConsumerGroupIds(): Promise<string[]> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
const listGroupsResponse = await this.kafkaAdminClient.listGroups();
return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId)));
}

async getConsumerGroupDetails(groupId: string): Promise<ConsumerGroup> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]);

const consumerGroup: ConsumerGroup = {
Expand All @@ -304,10 +341,16 @@ class KafkaJsClient implements Client {
}

async deleteConsumerGroups(groupIds: string[]): Promise<void> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
await this.kafkaAdminClient.deleteGroups(groupIds);
}

async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
await this.kafkaAdminClient.createTopics({
validateOnly: false,
waitForLeaders: true,
Expand All @@ -321,35 +364,47 @@ class KafkaJsClient implements Client {
}

async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void> {
if (!this.kafkaAdminClient) {
throw new Error('TODO');
}
return await this.kafkaAdminClient.deleteTopics({
topics: deleteTopicRequest.topics,
timeout: deleteTopicRequest.timeout
});
}

dispose() {
if (this.kafkaAdminClient) {
this.kafkaAdminClient.disconnect();
}
}
}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
export const createKafka = async (connectionOptions: ConnectionOptions): Promise<Kafka> => {
const provider = getClusterProvider(connectionOptions.clusterProviderId);
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
//return createDefaultKafka(connectionOptions);
}
const kafka = await provider.createKafka(connectionOptions);
return kafka || createDefaultKafka(connectionOptions);
};

export const createDefaultKafka = (connectionOptions: ConnectionOptions): Kafka => {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
}
return kafkaJsClient;
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
};
3 changes: 2 additions & 1 deletion src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable {

const settings = getWorkspaceSettings();
this.options = {
clusterProviderId: cluster.clusterProviderId,
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
Expand All @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable {
const fromOffset = this.options.fromOffset;
const topic = this.options.topicId;

this.kafkaClient = createKafka(this.options);
this.kafkaClient = await createKafka(this.options);
this.consumer = this.kafkaClient.consumer({
groupId: this.options.consumerGroupId, retry: { retries: 3 },
partitionAssigners: [
Expand Down
8 changes: 8 additions & 0 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { onExtensionChange, registerClusterProcessorCommand } from "./kafka-extensions/registry";

export function activate(context: vscode.ExtensionContext): void {
Context.register(context);
Expand Down Expand Up @@ -144,6 +145,13 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

if (vscode.extensions.onDidChange) {// Theia doesn't support this API yet
context.subscriptions.push(vscode.extensions.onDidChange(() => {
onExtensionChange(vscode.extensions.all);
}));
}
registerClusterProcessorCommand();
}

export function deactivate(): void {
Expand Down
8 changes: 8 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Kafka } from "kafkajs";
import { Cluster, ConnectionOptions } from "../client/client";
import { ClusterSettings } from "../settings/clusters";

export interface ClusterProviderProcessor {
collectClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined>;
createKafka?(connectionOptions: ConnectionOptions): Kafka;
}
Loading

0 comments on commit ad0cff0

Please sign in to comment.