Skip to content

Commit

Permalink
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 11, 2021
1 parent bf47c3d commit 8cd5f5a
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 56 deletions.
14 changes: 14 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@
],
"main": "./dist/extension",
"contributes": {
"kafka": {
"clusterProviders": [
{
"id": "vscode-kafka.manual",
"label": "Manual"
}
]
},
"configuration": {
"type": "object",
"title": "Kafka",
Expand Down Expand Up @@ -182,6 +190,12 @@
"path": "./snippets/consumers.json"
}
],
"jsonValidation": [
{
"fileMatch": "package.json",
"url": "./schemas/package.schema.json"
}
],
"commands": [
{
"command": "vscode-kafka.open.docs.home",
Expand Down
37 changes: 37 additions & 0 deletions schemas/package.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Kafka contributions to package.json",
"type": "object",
"properties": {
"contributes": {
"type": "object",
"properties": {
"kafka": {
"type": "object",
"markdownDescription": "Kafka extensions",
"properties": {
"clusterProviders": {
"type": "array",
"markdownDescription": "Cluster providers definitions.",
"items": [
{
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Cluster provider id."
},
"label": {
"type": "string",
"description": "Cluster provider label."
}
}
}
]
}
}
}
}
}
}
}
94 changes: 72 additions & 22 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
Expand Down Expand Up @@ -176,10 +178,9 @@ class KafkaJsClient implements Client {
public kafkaClient: any;
public kafkaCyclicProducerClient: any;
public kafkaKeyedProducerClient: any;
public producer: Producer;

private kafkaJsClient: Kafka;
private kafkaAdminClient: Admin;
private kafkaProducer: Producer | undefined;
private kafkaJsClient: Kafka | undefined;
private kafkaAdminClient: Admin | undefined;

private metadata: {
topics: Topic[];
Expand All @@ -191,21 +192,38 @@ class KafkaJsClient implements Client {
brokers: [],
topics: [],
};
this.kafkaJsClient = createKafka(cluster);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
// The Kafka client is created in asynchronous since external vscode extension
// can contribute to the creation of Kafka instance.
createKafka(cluster)
.then(result => {
this.kafkaJsClient = result;
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.kafkaProducer = this.kafkaJsClient.producer();
});
}

public get producer(): Producer {
if (this.kafkaProducer) {
return this.kafkaProducer;
}
throw new Error('Impossible to get producer. Kafka client is not connected!');
}

canConnect(): boolean {
return this.kafkaAdminClient !== null;
}

connect(): Promise<void> {
return this.kafkaAdminClient.connect();
async connect(): Promise<void> {
if (this.kafkaAdminClient) {
return this.kafkaAdminClient.connect();
}
}

async getTopics(): Promise<Topic[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get topics. Kafka client is not connected!');
}
const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata();

this.metadata = {
Expand Down Expand Up @@ -233,7 +251,10 @@ class KafkaJsClient implements Client {
}

async getBrokers(): Promise<Broker[]> {
const describeClusterResponse = await this.kafkaAdminClient?.describeCluster();
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get brokers. Kafka client is not connected!');
}
const describeClusterResponse = await this.kafkaAdminClient.describeCluster();

this.metadata = {
...this.metadata,
Expand All @@ -251,6 +272,9 @@ class KafkaJsClient implements Client {
}

async getBrokerConfigs(brokerId: string): Promise<ConfigEntry[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get broker config. Kafka client is not connected!');
}
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
includeSynonyms: false,
resources: [
Expand All @@ -265,6 +289,9 @@ class KafkaJsClient implements Client {
}

async getTopicConfigs(topicId: string): Promise<ConfigEntry[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get topic configs. Kafka client is not connected!');
}
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
includeSynonyms: false,
resources: [
Expand All @@ -279,11 +306,17 @@ class KafkaJsClient implements Client {
}

async getConsumerGroupIds(): Promise<string[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get consumer group ids. Kafka client is not connected!');
}
const listGroupsResponse = await this.kafkaAdminClient.listGroups();
return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId)));
}

async getConsumerGroupDetails(groupId: string): Promise<ConsumerGroup> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to get consumer group details. Kafka client is not connected!');
}
const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]);

const consumerGroup: ConsumerGroup = {
Expand All @@ -304,10 +337,16 @@ class KafkaJsClient implements Client {
}

async deleteConsumerGroups(groupIds: string[]): Promise<void> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to delete consumer groups. Kafka client is not connected!');
}
await this.kafkaAdminClient.deleteGroups(groupIds);
}

async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to create topic. Kafka client is not connected!');
}
await this.kafkaAdminClient.createTopics({
validateOnly: false,
waitForLeaders: true,
Expand All @@ -321,35 +360,46 @@ class KafkaJsClient implements Client {
}

async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void> {
if (!this.kafkaAdminClient) {
throw new Error('Impossible to delete topic. Kafka client is not connected!');
}
return await this.kafkaAdminClient.deleteTopics({
topics: deleteTopicRequest.topics,
timeout: deleteTopicRequest.timeout
});
}

dispose() {
this.kafkaAdminClient.disconnect();
if (this.kafkaAdminClient) {
this.kafkaAdminClient.disconnect();
}
}
}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
export const createKafka = async (connectionOptions: ConnectionOptions): Promise<Kafka> => {
const provider = getClusterProvider(connectionOptions.clusterProviderId);
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
}
const kafka = await provider.createKafka(connectionOptions);
return kafka || createDefaultKafka(connectionOptions);
};

export const createDefaultKafka = (connectionOptions: ConnectionOptions): Kafka => {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
}
return kafkaJsClient;
return new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
};
3 changes: 2 additions & 1 deletion src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable {

const settings = getWorkspaceSettings();
this.options = {
clusterProviderId: cluster.clusterProviderId,
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
Expand All @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable {
const fromOffset = this.options.fromOffset;
const topic = this.options.topicId;

this.kafkaClient = createKafka(this.options);
this.kafkaClient = await createKafka(this.options);
this.consumer = this.kafkaClient.consumer({
groupId: this.options.consumerGroupId, retry: { retries: 3 },
partitionAssigners: [
Expand Down
6 changes: 5 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { getDefaultClusterProcessorCommand } from "./kafka-extensions/registry";
import { ClusterProviderProcessor } from "./kafka-extensions/api";

export function activate(context: vscode.ExtensionContext): void {
export function activate(context: vscode.ExtensionContext): ClusterProviderProcessor {
Context.register(context);

// Settings, data etc.
Expand Down Expand Up @@ -144,6 +146,8 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

return getDefaultClusterProcessorCommand();
}

export function deactivate(): void {
Expand Down
25 changes: 25 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Kafka } from "kafkajs";
import { Cluster, ConnectionOptions } from "../client/client";
import { ClusterSettings } from "../settings/clusters";

/**
* The cluster provider processor.
*/
export interface ClusterProviderProcessor {

/**
* Returns the clusters managed by the processor which must be added to the kafka explorer.
*
* @param clusterSettings the cluster settings.
*/
collectClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined>;

/**
* Create the Kafka JS client instance fromthe given connection options.
* When processor doesn't implement this method, the Kafka JS client
* instance is created with the default client factory from the vscode-kafka.
*
* @param connectionOptions the connection options.
*/
createKafka?(connectionOptions: ConnectionOptions): Kafka;
}
Loading

0 comments on commit 8cd5f5a

Please sign in to comment.