Skip to content

Commit

Permalink
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 13, 2021
1 parent bf47c3d commit 36500cb
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 58 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log
All notable changes to Kafka extension will be documented in this file.

## [0.12.0]
### Added
- Extension API to provide cluster. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123).

## [0.11.0] - 2021-03-08
### Added
- Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61).
Expand Down
14 changes: 14 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@
],
"main": "./dist/extension",
"contributes": {
"kafka": {
"clusterProviders": [
{
"id": "vscode-kafka.manual",
"name": "Configure manually"
}
]
},
"configuration": {
"type": "object",
"title": "Kafka",
Expand Down Expand Up @@ -182,6 +190,12 @@
"path": "./snippets/consumers.json"
}
],
"jsonValidation": [
{
"fileMatch": "package.json",
"url": "./schemas/package.schema.json"
}
],
"commands": [
{
"command": "vscode-kafka.open.docs.home",
Expand Down
37 changes: 37 additions & 0 deletions schemas/package.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Kafka contributions to package.json",
"type": "object",
"properties": {
"contributes": {
"type": "object",
"properties": {
"kafka": {
"type": "object",
"markdownDescription": "Kafka extensions",
"properties": {
"clusterProviders": {
"type": "array",
"markdownDescription": "Cluster providers definitions.",
"items": [
{
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Cluster provider id."
},
"name": {
"type": "string",
"description": "Cluster provider name."
}
}
}
]
}
}
}
}
}
}
}
104 changes: 69 additions & 35 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";
import { Admin, ConfigResourceTypes, Kafka, KafkaConfig, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
Expand Down Expand Up @@ -80,7 +82,7 @@ export interface ConsumerGroupMember {

export interface Client extends Disposable {
cluster: Cluster;
producer: Producer;
producer(): Promise<Producer>;
connect(): Promise<void>;
getTopics(): Promise<Topic[]>;
getBrokers(): Promise<Broker[]>;
Expand All @@ -102,8 +104,8 @@ class EnsureConnectedDecorator implements Client {
return this.client.cluster;
}

get producer(): any {
return this.client.producer;
public producer(): any {
return this.client.producer();
}

public connect(): Promise<void> {
Expand Down Expand Up @@ -176,37 +178,61 @@ class KafkaJsClient implements Client {
public kafkaClient: any;
public kafkaCyclicProducerClient: any;
public kafkaKeyedProducerClient: any;
public producer: Producer;

private kafkaJsClient: Kafka;
private kafkaAdminClient: Admin;
private kafkaProducer: Producer | undefined;
private kafkaJsClient: Kafka | undefined;
private kafkaAdminClient: Admin | undefined;

private metadata: {
topics: Topic[];
brokers: Broker[];
};

// Promise which returns the KafkaJsClient instance when it is ready.
private kafkaPromise: Promise<KafkaJsClient>;

constructor(public readonly cluster: Cluster, workspaceSettings: WorkspaceSettings) {
this.metadata = {
brokers: [],
topics: [],
};
this.kafkaJsClient = createKafka(cluster);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
// The Kafka client is created in asynchronous since external vscode extension
// can contribute to the creation of Kafka instance.
this.kafkaPromise = createKafka(cluster)
.then(result => {
this.kafkaJsClient = result;
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.kafkaProducer = this.kafkaJsClient.producer();
return this;
});
}

public async getkafkaAdminClient(): Promise<Admin> {
const admin = (await this.kafkaPromise).kafkaAdminClient;
if (!admin) {
throw new Error('Kafka Admin cannot be null.');
}
return admin;
}

public async producer(): Promise<Producer> {
const producer = (await this.kafkaPromise).kafkaProducer;
if (!producer) {
throw new Error('Producer cannot be null.');
}
return producer;
}

canConnect(): boolean {
return this.kafkaAdminClient !== null;
}

connect(): Promise<void> {
return this.kafkaAdminClient.connect();
async connect(): Promise<void> {
return (await this.getkafkaAdminClient()).connect();
}

async getTopics(): Promise<Topic[]> {
const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata();
const listTopicsResponse = await (await this.getkafkaAdminClient()).fetchTopicMetadata();

this.metadata = {
...this.metadata,
Expand All @@ -233,7 +259,7 @@ class KafkaJsClient implements Client {
}

async getBrokers(): Promise<Broker[]> {
const describeClusterResponse = await this.kafkaAdminClient?.describeCluster();
const describeClusterResponse = await (await this.getkafkaAdminClient()).describeCluster();

this.metadata = {
...this.metadata,
Expand All @@ -251,7 +277,7 @@ class KafkaJsClient implements Client {
}

async getBrokerConfigs(brokerId: string): Promise<ConfigEntry[]> {
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
const describeConfigsResponse = await (await this.getkafkaAdminClient()).describeConfigs({
includeSynonyms: false,
resources: [
{
Expand All @@ -265,7 +291,7 @@ class KafkaJsClient implements Client {
}

async getTopicConfigs(topicId: string): Promise<ConfigEntry[]> {
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
const describeConfigsResponse = await (await this.getkafkaAdminClient()).describeConfigs({
includeSynonyms: false,
resources: [
{
Expand All @@ -279,12 +305,12 @@ class KafkaJsClient implements Client {
}

async getConsumerGroupIds(): Promise<string[]> {
const listGroupsResponse = await this.kafkaAdminClient.listGroups();
const listGroupsResponse = await (await this.getkafkaAdminClient()).listGroups();
return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId)));
}

async getConsumerGroupDetails(groupId: string): Promise<ConsumerGroup> {
const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]);
const describeGroupResponse = await (await this.getkafkaAdminClient()).describeGroups([groupId]);

const consumerGroup: ConsumerGroup = {
groupId: groupId,
Expand All @@ -304,11 +330,11 @@ class KafkaJsClient implements Client {
}

async deleteConsumerGroups(groupIds: string[]): Promise<void> {
await this.kafkaAdminClient.deleteGroups(groupIds);
await (await this.getkafkaAdminClient()).deleteGroups(groupIds);
}

async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
await this.kafkaAdminClient.createTopics({
await (await this.getkafkaAdminClient()).createTopics({
validateOnly: false,
waitForLeaders: true,
topics: [{
Expand All @@ -321,35 +347,43 @@ class KafkaJsClient implements Client {
}

async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void> {
return await this.kafkaAdminClient.deleteTopics({
return await (await this.getkafkaAdminClient()).deleteTopics({
topics: deleteTopicRequest.topics,
timeout: deleteTopicRequest.timeout
});
}

dispose() {
this.kafkaAdminClient.disconnect();
if (this.kafkaAdminClient) {
this.kafkaAdminClient.disconnect();
}
}
}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
export const createKafka = async (connectionOptions: ConnectionOptions): Promise<Kafka> => {
const provider = getClusterProvider(connectionOptions.clusterProviderId);
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
}
const kafkaConfig = await provider.createKafkaConfig(connectionOptions) || createDefaultKafkaConfig(connectionOptions);
return new Kafka(kafkaConfig);
};

export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions): KafkaConfig => {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
return {
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
};
}
return kafkaJsClient;
return {
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
};
};
3 changes: 2 additions & 1 deletion src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable {

const settings = getWorkspaceSettings();
this.options = {
clusterProviderId: cluster.clusterProviderId,
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
Expand All @@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable {
const fromOffset = this.options.fromOffset;
const topic = this.options.topicId;

this.kafkaClient = createKafka(this.options);
this.kafkaClient = await createKafka(this.options);
this.consumer = this.kafkaClient.consumer({
groupId: this.options.consumerGroupId, retry: { retries: 3 },
partitionAssigners: [
Expand Down
2 changes: 1 addition & 1 deletion src/commands/producers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class ProduceRecordCommandHandler {
return;
}

const producer = client.producer;
const producer = await client.producer();
await producer.connect();

channel.show(false);
Expand Down
6 changes: 5 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry";
import { KafkaExtensionParticipant } from "./kafka-extensions/api";

export function activate(context: vscode.ExtensionContext): void {
export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant {
Context.register(context);

// Settings, data etc.
Expand Down Expand Up @@ -144,6 +146,8 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

return getDefaultKafkaExtensionParticipant();
}

export function deactivate(): void {
Expand Down
31 changes: 31 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { KafkaConfig } from "kafkajs";
import { Cluster, ConnectionOptions } from "../client/client";
import { ClusterSettings } from "../settings/clusters";

export interface KafkaExtensionParticipant {

getClusterProviderParticipant(clusterProviderId: string) : ClusterProviderParticipant;

}

/**
* The kafka extension participant.
*/
export interface ClusterProviderParticipant {

/**
* Returns the Kafka clusters managed by this participant.
*
* @param clusterSettings the current cluster settings.
*/
coonfigureClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined>;

/**
* Create the KafkaJS client configuration from the given connection options.
* When the participant doesn't implement this method, the KafkaJS client
* configuration is created with the default client configuration factory from vscode-kafka.
*
* @param connectionOptions the Kafka connection options.
*/
createKafkaConfig?(connectionOptions: ConnectionOptions): KafkaConfig;
}
Loading

0 comments on commit 36500cb

Please sign in to comment.