From 2fa0a02c325ca999d389f5483e4cdb25b48c8114 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Fri, 19 Mar 2021 17:01:11 +0100 Subject: [PATCH] Execute Produce code lens in kafka file with progress Fixes #117 Signed-off-by: azerr --- src/client/client.ts | 7 + src/client/consumer.ts | 9 +- src/client/producer.ts | 219 +++++++++++++++++++++++++++++ src/commands/producers.ts | 101 ++++++++----- src/extension.ts | 8 +- src/kafka-file/codeLensProvider.ts | 49 +++++-- 6 files changed, 335 insertions(+), 58 deletions(-) create mode 100644 src/client/producer.ts diff --git a/src/client/client.ts b/src/client/client.ts index 03fe0e4..4a1a856 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -387,3 +387,10 @@ export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions): ssl: connectionOptions.ssl }; }; + +export function addQueryParameter(query: string, name: string, value?: string): string { + if (value === undefined) { + return query; + } + return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`; +} diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 44159d1..79d1205 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -2,7 +2,7 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part import { URLSearchParams } from "url"; import * as vscode from "vscode"; import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings"; -import { ConnectionOptions, createKafka } from "./client"; +import { addQueryParameter, ConnectionOptions, createKafka } from "./client"; import { deserialize, MessageFormat, SerializationdResult } from "./serialization"; interface ConsumerOptions extends ConnectionOptions { @@ -378,13 +378,6 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { return vscode.Uri.parse(path + query); } -function addQueryParameter(query: string, name: string, value?: string): string { - if (value === undefined) { - return query; - } - return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`; -} - export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { const [clusterId, consumerGroupId] = uri.path.split("/"); const urlParams = new URLSearchParams(uri.query); diff --git a/src/client/producer.ts b/src/client/producer.ts new file mode 100644 index 0000000..1d48edb --- /dev/null +++ b/src/client/producer.ts @@ -0,0 +1,219 @@ +import { ProducerRecord } from "kafkajs"; +import * as vscode from "vscode"; +import { addQueryParameter, ClientAccessor } from "."; +import { Producer as KafkaJSProducer } from "kafkajs"; + + +export enum ProducerLaunchState { + none, + connecting, + connected, + sending, + sent, + disconnecting, + disconnected +} + +export interface ProducerCollectionChangedEvent { + producers: Producer[]; +} + +export class Producer implements vscode.Disposable { + + public state: ProducerLaunchState = ProducerLaunchState.none; + + private producer: KafkaJSProducer | undefined; + + constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) { + + } + + async start(): Promise { + const [clusterId] = this.uri.path.split("/"); + const client = this.clientAccessor.get(clusterId); + this.producer = await client.producer(); + await this.producer.connect(); + } + + async send(record: ProducerRecord) { + if (this.producer) { + await this.producer.send(record); + } + } + + async dispose(): Promise { + if (this.producer) { + await this.producer.disconnect(); + this.producer = undefined; + } + } +} + +/** + * A collection of producers. + */ +export class ProducerCollection implements vscode.Disposable { + + private producers: { [id: string]: Producer } = {}; + + private onDidChangeCollectionEmitter = new vscode.EventEmitter(); + + public onDidChangeCollection = this.onDidChangeCollectionEmitter.event; + + constructor(private clientAccessor: ClientAccessor) { + + } + /** + * Creates a new producer for a provided uri. + */ + async create(uri: vscode.Uri): Promise { + + // Create the producer + const producer = new Producer(uri, this.clientAccessor); + this.producers[uri.toString()] = producer; + + // Fire an event to notify that producer is connecting + producer.state = ProducerLaunchState.connecting; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + + // Start the producer + try { + await producer.start(); + } + catch (e) { + delete this.producers[uri.toString()]; + producer.state = ProducerLaunchState.none; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + throw e; + } + + // Fire an event to notify that producer is connected + producer.state = ProducerLaunchState.connected; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + + return producer; + } + + dispose(): void { + this.disposeProducers(); + this.onDidChangeCollectionEmitter.dispose(); + } + + disposeProducers(): void { + Object.keys(this.producers).forEach((key) => { + this.producers[key].dispose(); + }); + + this.producers = {}; + } + + /** + * Retrieve the number of active producers + */ + length(): number { + return Object.keys(this.producers).length; + } + + /** + * Retrieve an existing producer if exists. + */ + get(uri: vscode.Uri): Producer | null { + if (!this.has(uri)) { + return null; + } + + return this.producers[uri.toString()]; + } + + /** + * Retrieve all producers + */ + getAll(): Producer[] { + return Object.keys(this.producers).map((c) => this.producers[c]); + } + + /** + * Check whether a producer exists. + */ + has(uri: vscode.Uri): boolean { + return this.producers.hasOwnProperty(uri.toString()); + } + + async send(uri: vscode.Uri, record: ProducerRecord): Promise { + const producer = this.get(uri); + + if (producer === null) { + return; + } + + // Fire an event to notify that producer message is sending + producer.state = ProducerLaunchState.sending; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + + await producer.dispose(); + + // Fire an event to notify that producer message is sent + producer.state = ProducerLaunchState.sent; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + } + + /** + * Disconnect an existing producer if exists. + */ + async disconnect(uri: vscode.Uri): Promise { + const producer = this.get(uri); + + if (producer === null) { + return; + } + + // Fire an event to notify that producer is disconnecting + producer.state = ProducerLaunchState.disconnecting; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + + await producer.dispose(); + + // Fire an event to notify that producer is disconnected + producer.state = ProducerLaunchState.disconnected; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + + delete this.producers[uri.toString()]; + + } +} + +// ---------- Producer URI utilities + +export interface ProducerInfoUri { + clusterId: string; + topicId?: string; + key?: string; + value: string; +} + +const TOPIC_QUERY_PARAMETER = 'topic'; +const KEY_QUERY_PARAMETER = 'key'; +const VALUE_QUERY_PARAMETER = 'value'; + +export function createProducerUri(info: ProducerInfoUri): vscode.Uri { + const path = `kafka:${info.clusterId}`; + let query = ''; + query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId); + query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key); + query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value); + return vscode.Uri.parse(path + query); +} diff --git a/src/commands/producers.ts b/src/commands/producers.ts index 8062743..4ce0d98 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -1,3 +1,4 @@ +import * as vscode from "vscode"; import * as faker from "faker"; import { performance } from "perf_hooks"; @@ -7,11 +8,10 @@ import { KafkaExplorer } from "../explorer"; import { WorkspaceSettings } from "../settings"; import { pickClient } from "./common"; import { MessageFormat, serialize } from "../client/serialization"; +import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer"; +import { ProducerRecord } from "kafkajs"; -export interface ProduceRecordCommand { - topicId?: string; - key?: string; - value: string; +export interface ProduceRecordCommand extends ProducerInfoUri { messageKeyFormat?: MessageFormat; messageValueFormat?: MessageFormat; } @@ -22,6 +22,7 @@ export class ProduceRecordCommandHandler { constructor( private clientAccessor: ClientAccessor, + private producerCollection: ProducerCollection, private channelProvider: OutputChannelProvider, private explorer: KafkaExplorer, private settings: WorkspaceSettings @@ -67,35 +68,69 @@ export class ProduceRecordCommandHandler { }; }); - const producer = await client.producer(); - await producer.connect(); - - channel.appendLine(`Producing record(s)`); - const startOperation = performance.now(); - - try { - await producer.send({ - topic: topicId, - messages: messages, - }); - - - const finishedOperation = performance.now(); - const elapsed = (finishedOperation - startOperation).toFixed(2); - - channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`); - - this.explorer.refresh(); - } catch (error) { - const finishedOperation = performance.now(); - const elapsed = (finishedOperation - startOperation).toFixed(2); - channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`); + command.clusterId = client.cluster.id; + const producerUri = createProducerUri(command); + const record = { + topic: topicId, + messages: messages, + }; + // Start the producer + await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer); + } +} - if (error.message) { - channel.appendLine(`Error: ${error.message}`); - } else { - channel.appendLine(`Error: ${error}`); - } - } +async function startProducerWithProgress(producerUri: vscode.Uri, record: ProducerRecord, producerCollection: ProducerCollection, channel: vscode.OutputChannel, times: number, explorer?: KafkaExplorer) { + const producer = producerCollection.get(producerUri); + if (producer && producer.state === ProducerLaunchState.connecting) { + vscode.window.showErrorMessage(`The producer cannot be started because it is producing.`); + return; } + await vscode.window.withProgress({ + location: vscode.ProgressLocation.Window, + title: `Starting producer '${producerUri}'.`, + cancellable: false + }, (progress, token) => { + return new Promise((resolve, reject) => { + // 1. Connect the producer + producerCollection + .create(producerUri) + .then(producer => { + + channel.appendLine(`Producing record(s)`); + const startOperation = performance.now(); + + // 2. Send the producer record. + producer + .send(record) + .then(() => { + + const finishedOperation = performance.now(); + const elapsed = (finishedOperation - startOperation).toFixed(2); + + channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`); + + if (explorer) { + explorer.refresh(); + } + + // 3. Disconnect the producer + producerCollection + .disconnect(producerUri) + .then(() => resolve(producer)); + }) + .catch(error => { + const finishedOperation = performance.now(); + const elapsed = (finishedOperation - startOperation).toFixed(2); + channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`); + if (error.message) { + channel.appendLine(`Error: ${error.message}`); + } else { + channel.appendLine(`Error: ${error}`); + } + reject(error); + }); + }) + .catch(error => reject(error)); + }); + }); } diff --git a/src/extension.ts b/src/extension.ts index 5b4f1ef..e2d63ef 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -36,6 +36,7 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { KafkaFileCodeLensProvider } from "./kafka-file"; import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry"; import { KafkaExtensionParticipant } from "./kafka-extensions/api"; +import { ProducerCollection } from "./client/producer"; export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant { Context.register(context); @@ -48,9 +49,10 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic const clusterSettings = getClusterSettings(); const clientAccessor = getClientAccessor(); const consumerCollection = new ConsumerCollection(clusterSettings); + const producerCollection = new ProducerCollection(clientAccessor); context.subscriptions.push(clientAccessor); context.subscriptions.push(consumerCollection); - + context.subscriptions.push(producerCollection); // Views (sidebar, status bar items etc.) const outputChannelProvider = new OutputChannelProvider(); @@ -64,7 +66,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic // Commands const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer); const deleteTopicCommandHandler = new DeleteTopicCommandHandler(clientAccessor, explorer); - const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, outputChannelProvider, explorer, workspaceSettings); + const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, producerCollection, outputChannelProvider, explorer, workspaceSettings); const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, consumerCollection, explorer); const stopConsumerCommandHandler = new StopConsumerCommandHandler(clientAccessor, consumerCollection, explorer); const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection); @@ -142,7 +144,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic ]; context.subscriptions.push( - vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, consumerCollection))); + vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, producerCollection, consumerCollection))); context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider( ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)); diff --git a/src/kafka-file/codeLensProvider.ts b/src/kafka-file/codeLensProvider.ts index 3daacb7..2721be2 100644 --- a/src/kafka-file/codeLensProvider.ts +++ b/src/kafka-file/codeLensProvider.ts @@ -1,5 +1,6 @@ import * as vscode from "vscode"; import { ConsumerCollection, ConsumerCollectionChangedEvent, ConsumerLaunchState } from "../client"; +import { createProducerUri, ProducerCollection, ProducerCollectionChangedEvent, ProducerLaunchState } from "../client/producer"; import { LaunchConsumerCommand, StartConsumerCommandHandler, StopConsumerCommandHandler, ProduceRecordCommand, ProduceRecordCommandHandler, SelectClusterCommandHandler } from "../commands"; import { ClusterSettings } from "../settings"; @@ -16,6 +17,7 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod constructor( private clusterSettings: ClusterSettings, + private producerCollection: ProducerCollection, private consumerCollection: ConsumerCollection ) { // Refresh the code lenses when: @@ -23,7 +25,11 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod this.disposables.push(this.consumerCollection.onDidChangeCollection((e: ConsumerCollectionChangedEvent) => { this._onDidChangeCodeLenses.fire(); })); - // 2. a cluster is selected + // 2. a producer is started / stopped to refresh the status of each declared PRODUCER + this.disposables.push(this.producerCollection.onDidChangeCollection((e: ProducerCollectionChangedEvent) => { + this._onDidChangeCodeLenses.fire(); + })); + // 3. a cluster is selected this.clusterSettings.onDidChangeSelected((e) => { this._onDidChangeCodeLenses.fire(); }); @@ -112,27 +118,41 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod } private createProducerLens(lineRange: vscode.Range, range: vscode.Range, document: vscode.TextDocument, clusterName: string | undefined): vscode.CodeLens[] { - const produceRecordCommand = this.createProduceRecordCommand(document, range); + const produceRecordCommand = this.createProduceRecordCommand(document, range, this.clusterSettings.selected?.id); const lenses: vscode.CodeLens[] = []; if (clusterName) { - // Add Produce lenses - lenses.push(new vscode.CodeLens(lineRange, { - title: "$(run) Produce record", - command: ProduceRecordCommandHandler.commandId, - arguments: [produceRecordCommand, 1] - })); - lenses.push(new vscode.CodeLens(lineRange, { - title: "$(run-all) Produce record x 10", - command: ProduceRecordCommandHandler.commandId, - arguments: [produceRecordCommand, 10] - })); + const producerUri = createProducerUri(produceRecordCommand); + const producer = this.producerCollection.get(producerUri); + const producerState = producer ? producer.state : ProducerLaunchState.none; + switch (producerState) { + case ProducerLaunchState.connecting: + const status = 'Producing...'; + lenses.push(new vscode.CodeLens(lineRange, { + title: `$(sync~spin) ${status}...`, + command: '' + })); + break; + default: + // Add Produce lenses + lenses.push(new vscode.CodeLens(lineRange, { + title: "$(run) Produce record", + command: ProduceRecordCommandHandler.commandId, + arguments: [produceRecordCommand, 1] + })); + lenses.push(new vscode.CodeLens(lineRange, { + title: "$(run-all) Produce record x 10", + command: ProduceRecordCommandHandler.commandId, + arguments: [produceRecordCommand, 10] + })); + break; + } } // Add cluster lens lenses.push(this.createClusterLens(lineRange, clusterName)); return lenses; } - private createProduceRecordCommand(document: vscode.TextDocument, range: vscode.Range): ProduceRecordCommand { + private createProduceRecordCommand(document: vscode.TextDocument, range: vscode.Range, selectedClusterId: string | undefined): ProduceRecordCommand { let topicId; let key; let value = ""; @@ -170,6 +190,7 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod } return { + clusterId: selectedClusterId, topicId, key, value,