Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cluster provider API extension
Browse files Browse the repository at this point in the history
Fixes jlandersen#123

Signed-off-by: azerr <azerr@redhat.com>
angelozerr committed Mar 12, 2021
1 parent bf47c3d commit ed10564
Showing 9 changed files with 419 additions and 58 deletions.
14 changes: 14 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -48,6 +48,14 @@
],
"main": "./dist/extension",
"contributes": {
"kafka": {
"clusterProviders": [
{
"id": "vscode-kafka.manual",
"name": "Configure manually"
}
]
},
"configuration": {
"type": "object",
"title": "Kafka",
@@ -182,6 +190,12 @@
"path": "./snippets/consumers.json"
}
],
"jsonValidation": [
{
"fileMatch": "package.json",
"url": "./schemas/package.schema.json"
}
],
"commands": [
{
"command": "vscode-kafka.open.docs.home",
37 changes: 37 additions & 0 deletions schemas/package.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Kafka contributions to package.json",
"type": "object",
"properties": {
"contributes": {
"type": "object",
"properties": {
"kafka": {
"type": "object",
"markdownDescription": "Kafka extensions",
"properties": {
"clusterProviders": {
"type": "array",
"markdownDescription": "Cluster providers definitions.",
"items": [
{
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Cluster provider id."
},
"name": {
"type": "string",
"description": "Cluster provider name."
}
}
}
]
}
}
}
}
}
}
}
104 changes: 69 additions & 35 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";
import { Admin, ConfigResourceTypes, Kafka, KafkaConfig, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

export interface ConnectionOptions {
clusterProviderId?: string;
bootstrap: string;
saslOption?: SaslOption;
ssl?: boolean;
@@ -80,7 +82,7 @@ export interface ConsumerGroupMember {

export interface Client extends Disposable {
cluster: Cluster;
producer: Producer;
producer(): Promise<Producer>;
connect(): Promise<void>;
getTopics(): Promise<Topic[]>;
getBrokers(): Promise<Broker[]>;
@@ -102,8 +104,8 @@ class EnsureConnectedDecorator implements Client {
return this.client.cluster;
}

get producer(): any {
return this.client.producer;
public producer(): any {
return this.client.producer();
}

public connect(): Promise<void> {
@@ -176,37 +178,61 @@ class KafkaJsClient implements Client {
public kafkaClient: any;
public kafkaCyclicProducerClient: any;
public kafkaKeyedProducerClient: any;
public producer: Producer;

private kafkaJsClient: Kafka;
private kafkaAdminClient: Admin;
private kafkaProducer: Producer | undefined;
private kafkaJsClient: Kafka | undefined;
private kafkaAdminClient: Admin | undefined;

private metadata: {
topics: Topic[];
brokers: Broker[];
};

// Promise which returns the KafkaJsClient instance when it is ready.
private kafkaPromise: Promise<KafkaJsClient>;

constructor(public readonly cluster: Cluster, workspaceSettings: WorkspaceSettings) {
this.metadata = {
brokers: [],
topics: [],
};
this.kafkaJsClient = createKafka(cluster);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
// The Kafka client is created in asynchronous since external vscode extension
// can contribute to the creation of Kafka instance.
this.kafkaPromise = createKafka(cluster)
.then(result => {
this.kafkaJsClient = result;
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.kafkaProducer = this.kafkaJsClient.producer();
return this;
});
}

public async getkafkaAdminClient(): Promise<Admin> {
const admin = (await this.kafkaPromise).kafkaAdminClient;
if (!admin) {
throw new Error('Kafka Admin cannot be null.');
}
return admin;
}

public async producer(): Promise<Producer> {
const producer = (await this.kafkaPromise).kafkaProducer;
if (!producer) {
throw new Error('Producer cannot be null.');
}
return producer;
}

canConnect(): boolean {
return this.kafkaAdminClient !== null;
}

connect(): Promise<void> {
return this.kafkaAdminClient.connect();
async connect(): Promise<void> {
return (await this.getkafkaAdminClient()).connect();
}

async getTopics(): Promise<Topic[]> {
const listTopicsResponse = await this.kafkaAdminClient.fetchTopicMetadata();
const listTopicsResponse = await (await this.getkafkaAdminClient()).fetchTopicMetadata();

this.metadata = {
...this.metadata,
@@ -233,7 +259,7 @@ class KafkaJsClient implements Client {
}

async getBrokers(): Promise<Broker[]> {
const describeClusterResponse = await this.kafkaAdminClient?.describeCluster();
const describeClusterResponse = await (await this.getkafkaAdminClient()).describeCluster();

this.metadata = {
...this.metadata,
@@ -251,7 +277,7 @@ class KafkaJsClient implements Client {
}

async getBrokerConfigs(brokerId: string): Promise<ConfigEntry[]> {
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
const describeConfigsResponse = await (await this.getkafkaAdminClient()).describeConfigs({
includeSynonyms: false,
resources: [
{
@@ -265,7 +291,7 @@ class KafkaJsClient implements Client {
}

async getTopicConfigs(topicId: string): Promise<ConfigEntry[]> {
const describeConfigsResponse = await this.kafkaAdminClient.describeConfigs({
const describeConfigsResponse = await (await this.getkafkaAdminClient()).describeConfigs({
includeSynonyms: false,
resources: [
{
@@ -279,12 +305,12 @@ class KafkaJsClient implements Client {
}

async getConsumerGroupIds(): Promise<string[]> {
const listGroupsResponse = await this.kafkaAdminClient.listGroups();
const listGroupsResponse = await (await this.getkafkaAdminClient()).listGroups();
return Promise.resolve(listGroupsResponse.groups.map((g) => (g.groupId)));
}

async getConsumerGroupDetails(groupId: string): Promise<ConsumerGroup> {
const describeGroupResponse = await this.kafkaAdminClient.describeGroups([groupId]);
const describeGroupResponse = await (await this.getkafkaAdminClient()).describeGroups([groupId]);

const consumerGroup: ConsumerGroup = {
groupId: groupId,
@@ -304,11 +330,11 @@ class KafkaJsClient implements Client {
}

async deleteConsumerGroups(groupIds: string[]): Promise<void> {
await this.kafkaAdminClient.deleteGroups(groupIds);
await (await this.getkafkaAdminClient()).deleteGroups(groupIds);
}

async createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]> {
await this.kafkaAdminClient.createTopics({
await (await this.getkafkaAdminClient()).createTopics({
validateOnly: false,
waitForLeaders: true,
topics: [{
@@ -321,35 +347,43 @@ class KafkaJsClient implements Client {
}

async deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void> {
return await this.kafkaAdminClient.deleteTopics({
return await (await this.getkafkaAdminClient()).deleteTopics({
topics: deleteTopicRequest.topics,
timeout: deleteTopicRequest.timeout
});
}

dispose() {
this.kafkaAdminClient.disconnect();
if (this.kafkaAdminClient) {
this.kafkaAdminClient.disconnect();
}
}
}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
export const createKafka = async (connectionOptions: ConnectionOptions): Promise<Kafka> => {
const provider = getClusterProvider(connectionOptions.clusterProviderId);
if (!provider) {
throw new Error(`Cannot find cluster provider for '${connectionOptions.clusterProviderId}' ID.`);
}
const kafkaConfig = await provider.createKafkaConfig(connectionOptions) || createDefaultKafkaConfig(connectionOptions);
return new Kafka(kafkaConfig);
};

export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions): KafkaConfig => {
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
return {
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
});
};
}
return kafkaJsClient;
return {
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: connectionOptions.ssl
};
};
3 changes: 2 additions & 1 deletion src/client/consumer.ts
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ export class Consumer implements vscode.Disposable {

const settings = getWorkspaceSettings();
this.options = {
clusterProviderId: cluster.clusterProviderId,
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
@@ -93,7 +94,7 @@ export class Consumer implements vscode.Disposable {
const fromOffset = this.options.fromOffset;
const topic = this.options.topicId;

this.kafkaClient = createKafka(this.options);
this.kafkaClient = await createKafka(this.options);
this.consumer = this.kafkaClient.consumer({
groupId: this.options.consumerGroupId, retry: { retries: 3 },
partitionAssigners: [
2 changes: 1 addition & 1 deletion src/commands/producers.ts
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ export class ProduceRecordCommandHandler {
return;
}

const producer = client.producer;
const producer = await client.producer();
await producer.connect();

channel.show(false);
6 changes: 5 additions & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
@@ -34,8 +34,10 @@ import { NodeBase } from "./explorer/models/nodeBase";
import * as path from 'path';
import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry";
import { KafkaExtensionParticipant } from "./kafka-extensions/api";

export function activate(context: vscode.ExtensionContext): void {
export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant {
Context.register(context);

// Settings, data etc.
@@ -144,6 +146,8 @@ export function activate(context: vscode.ExtensionContext): void {

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));

return getDefaultKafkaExtensionParticipant();
}

export function deactivate(): void {
31 changes: 31 additions & 0 deletions src/kafka-extensions/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { KafkaConfig } from "kafkajs";
import { Cluster, ConnectionOptions } from "../client/client";
import { ClusterSettings } from "../settings/clusters";

export interface KafkaExtensionParticipant {

getClusterProviderParticipant(clusterProviderId: string) : ClusterProviderParticipant;

}

/**
* The kafka extension participant.
*/
export interface ClusterProviderParticipant {

/**
* Returns the Kafka clusters managed by this participant.
*
* @param clusterSettings the current cluster settings.
*/
coonfigureClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined>;

/**
* Create the KafkaJS client configuration from the given connection options.
* When the participant doesn't implement this method, the KafkaJS client
* configuration is created with the default client configuration factory from vscode-kafka.
*
* @param connectionOptions the Kafka connection options.
*/
createKafkaConfig?(connectionOptions: ConnectionOptions): KafkaConfig;
}
181 changes: 181 additions & 0 deletions src/kafka-extensions/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import * as vscode from "vscode";
import { KafkaConfig } from "kafkajs";
import { Cluster, ConnectionOptions, createDefaultKafkaConfig as createDefaultKafkaConfig } from "../client/client";
import { ClusterSettings } from "../settings/clusters";
import { collectDefaultClusters } from "../wizards/clusters";
import { ClusterProviderParticipant, KafkaExtensionParticipant } from "./api";

/**
* Cluster provider is used to:
*
* - collect clusters (eg: create a cluster from a wizard, import clusters from a repository, ...)
* and add thento the Kafka Explorer.
* - create a Kafka client from a complex process (eg : use SSO to connect to the cluster)
*
* Implementing a cluster provider in custom vscode extension is done in 2 steps:
*
* - define the cluster provider (id, name) in the package.json in the contributes/kafka/clusterProviders section.
* - return the cluster provider processor (See ClusterProviderProcessor) to use in the activate() of the extension.
*
*/
export class ClusterProvider {

private processor: ClusterProviderParticipant | undefined;

constructor(private definition: ClusterProviderDefinition, private extensionId: string) {

}

/**
* Returns the cluster provider id.
*/
public get id(): string {
return this.definition.id;
}

/**
* Returns the cluster provider name.
*/
public get name(): string {
return this.definition.name || this.definition.id;
}

/**
* Returns the clusters managed by the provider which must be added to the kafka explorer.
*
* @param clusterSettings the cluster settings.
*/
async collectClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> {
const processor = await this.getProcessor();
return processor.coonfigureClusters(clusterSettings);
}

/**
* Create the Kafka JS client instance from the given connection options.
*
* @param connectionOptions the connection options.
*/
async createKafkaConfig(connectionOptions: ConnectionOptions): Promise<KafkaConfig | undefined> {
const processor = await this.getProcessor();
if (processor.createKafkaConfig) {
return processor.createKafkaConfig(connectionOptions);
}
}

private async getProcessor(): Promise<ClusterProviderParticipant> {
if (this.processor) {
return this.processor;
}
// The cluster provider processor is not already loaded, try to activate the owner extension.
// The return of the extension activate() method must return the cluster provider processor.
const extension = vscode.extensions.getExtension(this.extensionId);
if (!extension) {
throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId} is not available.`);
}

// Wait for extension is activated to get the processor
const result = await extension.activate();
if (!result) {
throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId}.activate() should return 'KafkaExtensionParticipant'.`);
}
if ('getClusterProviderParticipant' in result) {
this.processor = (<KafkaExtensionParticipant>result).getClusterProviderParticipant(this.id);
}
if (!this.processor) {
throw new Error(`Error while getting cluster provider processor. Extension ${this.extensionId}.activate() should return 'KafkaExtensionParticipant'.`);
}
return this.processor;
}
}

const defaultClusterProviderId = 'vscode-kafka.manual';

let providers: Map<string, ClusterProvider> = new Map();

export function getClusterProvider(clusterProviderId?: string): ClusterProvider | undefined {
intializeIfNeeded();
return providers.get(clusterProviderId || defaultClusterProviderId);
}

export function getClusterProviders(): ClusterProvider[] {
intializeIfNeeded();
// "Configure manually" provider must be the first
const manual = getClusterProvider(defaultClusterProviderId);
// Other providers must be sorted by name ascending
const others = [...providers.values()]
.filter(provider => provider.id !== defaultClusterProviderId)
.sort(sortByNameAscending);
if (manual) {
return [manual, ...others];
}
return others;
}

function sortByNameAscending(a: ClusterProvider, b: ClusterProvider): -1 | 0 | 1 {
if (a.name.toLowerCase() < b.name.toLowerCase()) { return -1; }
if (a.name.toLowerCase() > b.name.toLowerCase()) { return 1; }
return 0;
}

function intializeIfNeeded() {
if (providers.size === 0) {
providers = collectClusterProviderDefinitions(vscode.extensions.all);
}
}

export interface ClusterProviderDefinition {
id: string;
name?: string;
}

/**
* Collect cluster providers defined in package.json (see vscode-kafka which implements default cluster provider with 'Manual' wizard.)
*
* ```json
* "contributes": {
* "kafka": {
* "clusterProviders": [
* {
* "id": "vscode-kafka.manual",
* "name": "Manual"
* }
* ]
* }
* ```
*
* @param extensions all installed vscode extensions
*
* @returns the map of cluster providers.
*/
export function collectClusterProviderDefinitions(extensions: readonly vscode.Extension<any>[]): Map<string, ClusterProvider> {
const result: Map<string, ClusterProvider> = new Map();
if (extensions && extensions.length) {
for (const extension of extensions) {
const contributesSection = extension.packageJSON['contributes'];
if (contributesSection) {
const kafkaExtension = contributesSection['kafka'];
if (kafkaExtension) {
const clusterProviders = kafkaExtension['clusterProviders'];
if (Array.isArray(clusterProviders) && clusterProviders.length) {
for (const item of clusterProviders) {
const definition = item as ClusterProviderDefinition;
result.set(definition.id, new ClusterProvider(definition, extension.id));
}
}
}
}
}
}
return result;
}

export function getDefaultKafkaExtensionParticipant(): KafkaExtensionParticipant {
return {
getClusterProviderParticipant(clusterProviderId: string): ClusterProviderParticipant {
return {
coonfigureClusters: (clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> => collectDefaultClusters(clusterSettings),
createKafkaConfig: (connectionOptions: ConnectionOptions): KafkaConfig => createDefaultKafkaConfig(connectionOptions)
} as ClusterProviderParticipant;
}
};
}
99 changes: 79 additions & 20 deletions src/wizards/clusters.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { QuickPickItem, window } from "vscode";
import { ConnectionOptions, SaslMechanism } from "../client";
import { Cluster, ConnectionOptions, SaslMechanism } from "../client";
import { INPUT_TITLE } from "../constants";
import { KafkaExplorer } from "../explorer/kafkaExplorer";
import { ClusterProvider, getClusterProviders } from "../kafka-extensions/registry";
import { ClusterSettings } from "../settings/clusters";
import { MultiStepInput, showErrorMessage, State } from "./multiStepInput";
import { validateBroker, validateClusterName, validateAuthentificationUserName } from "./validators";

const DEFAULT_BROKER = 'localhost:9092';

interface AddClusterState extends State, ConnectionOptions {
name: string;
}
@@ -16,8 +15,79 @@ const DEFAULT_STEPS = 4;

export async function addClusterWizard(clusterSettings: ClusterSettings, explorer: KafkaExplorer): Promise<void> {

async function pickClusterProvider(): Promise<ClusterProvider | undefined> {
const providers = getClusterProviders();
if (providers.length === 1) {
// By default, it exists the default cluster provider 'Manual' from vscode-kafka
// to fill a cluster with a wizard, return it.
return providers[0];
}

const providerItems: QuickPickItem[] = providers
.map(provider => {
return { "label": provider.name };
});
const selected = (await window.showQuickPick(providerItems))?.label;
if (!selected) {
return;
}
return providers.find(provider => provider.name === selected);
}

// Pick the cluster provider which provides teh capability to return a list of cluster to add to the kafka explorer
// eg (fill clust from a wizard, import clusters from a repository, etc)
const provider = await pickClusterProvider();
if (!provider) {
return;
}

// Collect clusters...
let clusters: Cluster[] | undefined;
try {
clusters = await provider.collectClusters(clusterSettings);
if (!clusters || clusters.length === 0) {
return;
}
}
catch (error) {
showErrorMessage(`Error while collecting cluster(s)`, error);
return;
}

try {
// Save collected clusters in settings.
let createdClusterNames = '';
for (const cluster of clusters) {
clusterSettings.upsert(cluster);
if (createdClusterNames !== '') {
createdClusterNames += '\', \'';
}
createdClusterNames += cluster.name;
}
window.showInformationMessage(`${clusters.length > 1 ? `${clusters.length} clusters` : 'Cluster'} '${createdClusterNames}' created successfully`);

// Refresh the explorer
explorer.refresh();

// Selecting the created cluster is done with TreeView#reveal
// 1. Show the treeview of the explorer (otherwise reveal will not work)
explorer.show();
// 2. the reveal() call must occur within a timeout(),
// while waiting for a fix in https://github.com/microsoft/vscode/issues/114149
setTimeout(() => {
if (clusters) {
explorer.selectClusterByName(clusters[0].name);
}
}, 1000);
}
catch (error) {
showErrorMessage(`Error while creating cluster`, error);
}
}

const DEFAULT_BROKER = 'localhost:9092';

export async function collectDefaultClusters(clusterSettings: ClusterSettings): Promise<Cluster[] | undefined> {

const state: Partial<AddClusterState> = {
totalSteps: DEFAULT_STEPS
@@ -151,26 +221,15 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore
const sanitizedName = name.replace(/[^a-zA-Z0-9]/g, "");
const suffix = Buffer.from(bootstrap).toString("base64").replace(/=/g, "");

try {
clusterSettings.upsert({
return [
{
id: `${sanitizedName}-${suffix}`,
bootstrap,
name,
saslOption,
ssl: state.ssl
});
explorer.refresh();
window.showInformationMessage(`Cluster '${name}' created successfully`);
// Selecting the created cluster is done with TreeView#reveal
// 1. Show the treeview of the explorer (otherwise reveal will not work)
explorer.show();
// 2. the reveal() call must occur within a timeout(),
// while waiting for a fix in https://github.com/microsoft/vscode/issues/114149
setTimeout(() => {
explorer.selectClusterByName(name);
}, 1000);
}
catch (error) {
showErrorMessage(`Error while creating cluster`, error);
}
}
];
}


0 comments on commit ed10564

Please sign in to comment.