diff --git a/CHANGELOG.md b/CHANGELOG.md index db1f939..7c76a2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to Kafka extension will be documented in this file. ### Changed - Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64). - The "Kafka Producer Log" output view is no longer shown automatically when producing messages. See [#134](https://github.com/jlandersen/vscode-kafka/issues/134). +- a progress notification is displayed when producing messages. See [#117](https://github.com/jlandersen/vscode-kafka/issues/117). ## [0.11.0] - 2021-03-08 ### Added diff --git a/src/client/client.ts b/src/client/client.ts index 03fe0e4..2971573 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -208,6 +208,9 @@ class KafkaJsClient implements Client { } public async getkafkaAdminClient(): Promise { + if (this.kafkaAdminClient) { + return this.kafkaAdminClient; + } const admin = (await this.kafkaPromise).kafkaAdminClient; if (!admin) { throw new Error('Kafka Admin cannot be null.'); @@ -216,6 +219,9 @@ class KafkaJsClient implements Client { } public async producer(): Promise { + if (this.kafkaProducer) { + return this.kafkaProducer; + } const producer = (await this.kafkaPromise).kafkaProducer; if (!producer) { throw new Error('Producer cannot be null.'); @@ -387,3 +393,10 @@ export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions): ssl: connectionOptions.ssl }; }; + +export function addQueryParameter(query: string, name: string, value?: string): string { + if (value === undefined) { + return query; + } + return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`; +} diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 44159d1..79d1205 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -2,7 +2,7 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part import { URLSearchParams } from "url"; import * as vscode from "vscode"; import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings"; -import { ConnectionOptions, createKafka } from "./client"; +import { addQueryParameter, ConnectionOptions, createKafka } from "./client"; import { deserialize, MessageFormat, SerializationdResult } from "./serialization"; interface ConsumerOptions extends ConnectionOptions { @@ -378,13 +378,6 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { return vscode.Uri.parse(path + query); } -function addQueryParameter(query: string, name: string, value?: string): string { - if (value === undefined) { - return query; - } - return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`; -} - export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { const [clusterId, consumerGroupId] = uri.path.split("/"); const urlParams = new URLSearchParams(uri.query); diff --git a/src/client/producer.ts b/src/client/producer.ts new file mode 100644 index 0000000..91d8e6a --- /dev/null +++ b/src/client/producer.ts @@ -0,0 +1,203 @@ +import { ProducerRecord, RecordMetadata } from "kafkajs"; +import * as vscode from "vscode"; +import { addQueryParameter, ClientAccessor } from "."; +import { Producer as KafkaJSProducer } from "kafkajs"; + + +export enum ProducerLaunchState { + none, + connecting, + connected, + sending, + sent +} + +export interface ProducerCollectionChangedEvent { + producers: Producer[]; +} + +export class Producer implements vscode.Disposable { + + public state: ProducerLaunchState = ProducerLaunchState.none; + + private producer: KafkaJSProducer | undefined; + + constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) { + + } + + async start(): Promise { + const [clusterId] = this.uri.path.split("/"); + const client = this.clientAccessor.get(clusterId); + this.producer = await client.producer(); + await this.producer.connect(); + } + + async send(record: ProducerRecord): Promise { + if (this.producer) { + return this.producer.send(record); + } + } + + async dispose(): Promise { + if (this.producer) { + this.producer = undefined; + } + } +} + +/** + * A collection of producers. + */ +export class ProducerCollection implements vscode.Disposable { + + private producers: { [id: string]: Producer } = {}; + + private onDidChangeCollectionEmitter = new vscode.EventEmitter(); + + public onDidChangeCollection = this.onDidChangeCollectionEmitter.event; + + constructor(private clientAccessor: ClientAccessor) { + + } + /** + * Creates a new producer for a provided uri. + */ + async create(uri: vscode.Uri): Promise { + + // Create the producer + const producer = new Producer(uri, this.clientAccessor); + this.producers[uri.toString()] = producer; + + // Fire an event to notify that producer is connecting + this.changeState(producer, ProducerLaunchState.connecting); + + // Start the producer + try { + await producer.start(); + // Fire an event to notify that producer is connected + this.changeState(producer, ProducerLaunchState.connected); + } + catch (e) { + this.handleProducerError(producer, e); + } + + return producer; + } + + dispose(): void { + this.disposeProducers(); + this.onDidChangeCollectionEmitter.dispose(); + } + + disposeProducers(): void { + Object.keys(this.producers).forEach((key) => { + this.producers[key].dispose(); + }); + + this.producers = {}; + } + + /** + * Retrieve the number of active producers + */ + length(): number { + return Object.keys(this.producers).length; + } + + /** + * Retrieve an existing producer if exists. + */ + get(uri: vscode.Uri): Producer | null { + if (!this.has(uri)) { + return null; + } + + return this.producers[uri.toString()]; + } + + /** + * Retrieve all producers + */ + getAll(): Producer[] { + return Object.keys(this.producers).map((c) => this.producers[c]); + } + + /** + * Check whether a producer exists. + */ + has(uri: vscode.Uri): boolean { + return this.producers.hasOwnProperty(uri.toString()); + } + + async send(uri: vscode.Uri, record: ProducerRecord): Promise { + const producer = this.get(uri); + + if (producer === null) { + return; + } + + // Fire an event to notify that producer message is sending + this.changeState(producer, ProducerLaunchState.sending); + + // Send messages + try { + await producer.send(record); + // Fire an event to notify that producer message is sent + this.changeState(producer, ProducerLaunchState.sent); + } + catch (e) { + this.handleProducerError(producer, e); + } + } + + /** + * Close an existing producer if exists. + */ + async close(uri: vscode.Uri): Promise { + const producer = this.get(uri); + + if (producer === null) { + return; + } + + // Fire an event to notify that producer is none + this.changeState(producer, ProducerLaunchState.none); + delete this.producers[uri.toString()]; + } + + private handleProducerError(producer: Producer, e: Error) { + this.changeState(producer, ProducerLaunchState.none); + delete this.producers[producer.uri.toString()]; + throw e; + } + + private changeState(producer: Producer, state: ProducerLaunchState) { + producer.state = state; + this.onDidChangeCollectionEmitter.fire({ + producers: [producer] + }); + } +} + +// ---------- Producer URI utilities + +export interface ProducerInfoUri { + clusterId: string; + topicId?: string; + key?: string; + value: string; +} + +const TOPIC_QUERY_PARAMETER = 'topic'; +const KEY_QUERY_PARAMETER = 'key'; +const VALUE_QUERY_PARAMETER = 'value'; + +export function createProducerUri(info: ProducerInfoUri): vscode.Uri { + const path = `kafka:${info.clusterId}`; + let query = ''; + query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId); + query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key); + query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value); + return vscode.Uri.parse(path + query); +} diff --git a/src/commands/producers.ts b/src/commands/producers.ts index 8062743..5e26bd8 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -1,3 +1,4 @@ +import * as vscode from "vscode"; import * as faker from "faker"; import { performance } from "perf_hooks"; @@ -7,11 +8,10 @@ import { KafkaExplorer } from "../explorer"; import { WorkspaceSettings } from "../settings"; import { pickClient } from "./common"; import { MessageFormat, serialize } from "../client/serialization"; +import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer"; +import { ProducerRecord } from "kafkajs"; -export interface ProduceRecordCommand { - topicId?: string; - key?: string; - value: string; +export interface ProduceRecordCommand extends ProducerInfoUri { messageKeyFormat?: MessageFormat; messageValueFormat?: MessageFormat; } @@ -22,6 +22,7 @@ export class ProduceRecordCommandHandler { constructor( private clientAccessor: ClientAccessor, + private producerCollection: ProducerCollection, private channelProvider: OutputChannelProvider, private explorer: KafkaExplorer, private settings: WorkspaceSettings @@ -67,25 +68,48 @@ export class ProduceRecordCommandHandler { }; }); - const producer = await client.producer(); - await producer.connect(); + command.clusterId = client.cluster.id; + const producerUri = createProducerUri(command); + const record = { + topic: topicId, + messages: messages, + }; + // Start the producer + await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer); + } +} +async function startProducerWithProgress(producerUri: vscode.Uri, record: ProducerRecord, producerCollection: ProducerCollection, channel: vscode.OutputChannel, times: number, explorer?: KafkaExplorer) { + const producer = producerCollection.get(producerUri); + if (producer && producer.state === ProducerLaunchState.connecting) { + vscode.window.showErrorMessage(`The producer cannot be started because it is producing.`); + return; + } + await vscode.window.withProgress({ + location: vscode.ProgressLocation.Window, + title: `Starting producer '${producerUri}'.`, + cancellable: false + }, async (progress, token) => { + + // 1. Connect the producer + progress.report({ message: `Connecting producer '${producerUri}'.`, increment: 30 }); + await producerCollection.create(producerUri); + + // 2. Send the producer record. + progress.report({ message: `Producing record(s) '${producerUri}'.`, increment: 30 }); channel.appendLine(`Producing record(s)`); const startOperation = performance.now(); try { - await producer.send({ - topic: topicId, - messages: messages, - }); - + await producerCollection.send(producerUri, record); const finishedOperation = performance.now(); const elapsed = (finishedOperation - startOperation).toFixed(2); channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`); - - this.explorer.refresh(); + if (explorer) { + explorer.refresh(); + } } catch (error) { const finishedOperation = performance.now(); const elapsed = (finishedOperation - startOperation).toFixed(2); @@ -96,6 +120,12 @@ export class ProduceRecordCommandHandler { } else { channel.appendLine(`Error: ${error}`); } + throw error; } - } + finally { + // 3. Close the producer + progress.report({ message: `Closing producer '${producerUri}'.`, increment: 40 }); + await producerCollection.close(producerUri); + } + }); } diff --git a/src/extension.ts b/src/extension.ts index 5b4f1ef..e2d63ef 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -36,6 +36,7 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { KafkaFileCodeLensProvider } from "./kafka-file"; import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry"; import { KafkaExtensionParticipant } from "./kafka-extensions/api"; +import { ProducerCollection } from "./client/producer"; export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant { Context.register(context); @@ -48,9 +49,10 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic const clusterSettings = getClusterSettings(); const clientAccessor = getClientAccessor(); const consumerCollection = new ConsumerCollection(clusterSettings); + const producerCollection = new ProducerCollection(clientAccessor); context.subscriptions.push(clientAccessor); context.subscriptions.push(consumerCollection); - + context.subscriptions.push(producerCollection); // Views (sidebar, status bar items etc.) const outputChannelProvider = new OutputChannelProvider(); @@ -64,7 +66,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic // Commands const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer); const deleteTopicCommandHandler = new DeleteTopicCommandHandler(clientAccessor, explorer); - const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, outputChannelProvider, explorer, workspaceSettings); + const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, producerCollection, outputChannelProvider, explorer, workspaceSettings); const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, consumerCollection, explorer); const stopConsumerCommandHandler = new StopConsumerCommandHandler(clientAccessor, consumerCollection, explorer); const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection); @@ -142,7 +144,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic ]; context.subscriptions.push( - vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, consumerCollection))); + vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, producerCollection, consumerCollection))); context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider( ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider)); diff --git a/src/kafka-file/codeLensProvider.ts b/src/kafka-file/codeLensProvider.ts index 3daacb7..cd91837 100644 --- a/src/kafka-file/codeLensProvider.ts +++ b/src/kafka-file/codeLensProvider.ts @@ -1,5 +1,6 @@ import * as vscode from "vscode"; import { ConsumerCollection, ConsumerCollectionChangedEvent, ConsumerLaunchState } from "../client"; +import { createProducerUri, ProducerCollection, ProducerCollectionChangedEvent, ProducerLaunchState } from "../client/producer"; import { LaunchConsumerCommand, StartConsumerCommandHandler, StopConsumerCommandHandler, ProduceRecordCommand, ProduceRecordCommandHandler, SelectClusterCommandHandler } from "../commands"; import { ClusterSettings } from "../settings"; @@ -16,6 +17,7 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod constructor( private clusterSettings: ClusterSettings, + private producerCollection: ProducerCollection, private consumerCollection: ConsumerCollection ) { // Refresh the code lenses when: @@ -23,7 +25,11 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod this.disposables.push(this.consumerCollection.onDidChangeCollection((e: ConsumerCollectionChangedEvent) => { this._onDidChangeCodeLenses.fire(); })); - // 2. a cluster is selected + // 2. a producer is started / stopped to refresh the status of each declared PRODUCER + this.disposables.push(this.producerCollection.onDidChangeCollection((e: ProducerCollectionChangedEvent) => { + this._onDidChangeCodeLenses.fire(); + })); + // 3. a cluster is selected this.clusterSettings.onDidChangeSelected((e) => { this._onDidChangeCodeLenses.fire(); }); @@ -112,27 +118,57 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod } private createProducerLens(lineRange: vscode.Range, range: vscode.Range, document: vscode.TextDocument, clusterName: string | undefined): vscode.CodeLens[] { - const produceRecordCommand = this.createProduceRecordCommand(document, range); + const produceRecordCommand = this.createProduceRecordCommand(document, range, this.clusterSettings.selected?.id); const lenses: vscode.CodeLens[] = []; if (clusterName) { - // Add Produce lenses - lenses.push(new vscode.CodeLens(lineRange, { - title: "$(run) Produce record", - command: ProduceRecordCommandHandler.commandId, - arguments: [produceRecordCommand, 1] - })); - lenses.push(new vscode.CodeLens(lineRange, { - title: "$(run-all) Produce record x 10", - command: ProduceRecordCommandHandler.commandId, - arguments: [produceRecordCommand, 10] - })); + const producerUri = createProducerUri(produceRecordCommand); + const producer = this.producerCollection.get(producerUri); + const producerState = producer ? producer.state : ProducerLaunchState.none; + switch (producerState) { + case ProducerLaunchState.connecting: + case ProducerLaunchState.sending: + const status = this.getProducerStatus(producerState); + lenses.push(new vscode.CodeLens(lineRange, { + title: `$(sync~spin) ${status}...`, + command: '' + })); + break; + default: + // Add Produce lenses + lenses.push(new vscode.CodeLens(lineRange, { + title: "$(run) Produce record", + command: ProduceRecordCommandHandler.commandId, + arguments: [produceRecordCommand, 1] + })); + lenses.push(new vscode.CodeLens(lineRange, { + title: "$(run-all) Produce record x 10", + command: ProduceRecordCommandHandler.commandId, + arguments: [produceRecordCommand, 10] + })); + break; + } } // Add cluster lens lenses.push(this.createClusterLens(lineRange, clusterName)); return lenses; } - private createProduceRecordCommand(document: vscode.TextDocument, range: vscode.Range): ProduceRecordCommand { + private getProducerStatus(state: ProducerLaunchState): string { + switch (state) { + case ProducerLaunchState.connecting: + return 'Connecting'; + case ProducerLaunchState.connected: + return 'Connected'; + case ProducerLaunchState.sending: + return 'Sending'; + case ProducerLaunchState.sent: + return 'Sent'; + default: + return ''; + } + } + + private createProduceRecordCommand(document: vscode.TextDocument, range: vscode.Range, selectedClusterId: string | undefined): ProduceRecordCommand { let topicId; let key; let value = ""; @@ -170,6 +206,7 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod } return { + clusterId: selectedClusterId, topicId, key, value,