Skip to content

Commit

Permalink
Execute Produce code lens in kafka file with progress
Browse files Browse the repository at this point in the history
Fixes jlandersen#117

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 25, 2021
1 parent 86f7bf3 commit 9472586
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 50 deletions.
161 changes: 161 additions & 0 deletions src/client/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { ProducerRecord } from "kafkajs";
import * as vscode from "vscode";
import { ClientAccessor } from ".";
import { Producer as KafkaJSProducer } from "kafkajs";

export enum ProducerLaunchState {
none,
producing,
produced
}

export interface ProducerCollectionChangedEvent {
producers: Producer[];
}

export class Producer implements vscode.Disposable {

public state: ProducerLaunchState = ProducerLaunchState.none;

private producer: KafkaJSProducer | undefined;

constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) {

}

async start(): Promise<void> {
const [clusterId] = this.uri.path.split("/");
const client = this.clientAccessor.get(clusterId);
this.producer = await client.producer();
await this.producer.connect();
}

async send(record: ProducerRecord) {
if (this.producer) {
await this.producer.send(record);
}
}

async dispose(): Promise<void> {
if (this.producer) {
await this.producer.disconnect();
this.producer = undefined;
}
}
}

/**
* A collection of producers.
*/
export class ProducerCollection implements vscode.Disposable {

private producers: { [id: string]: Producer } = {};
// private disposables: vscode.Disposable[] = [];

private onDidChangeCollectionEmitter = new vscode.EventEmitter<ProducerCollectionChangedEvent>();

public onDidChangeCollection = this.onDidChangeCollectionEmitter.event;

constructor(private clientAccessor: ClientAccessor) {

}
/**
* Creates a new provider for a provided uri.
*/
async create(uri: vscode.Uri): Promise<Producer> {

// Create the consumer
const producer = new Producer(uri, this.clientAccessor);
this.producers[uri.toString()] = producer;

// Fire an event to notify that Producer is producing
producer.state = ProducerLaunchState.producing;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

// Start the producer
try {
await producer.start();
}
catch (e) {
delete this.producers[uri.toString()];
producer.state = ProducerLaunchState.none;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
throw e;
}

// Fire an event to notify that producer is finished
producer.state = ProducerLaunchState.produced;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

return producer;
}
dispose(): void {
}

/**
* Retrieve the number of active producers
*/
length(): number {
return Object.keys(this.producers).length;
}

/**
* Retrieve an existing producer if exists.
*/
get(uri: vscode.Uri): Producer | null {
if (!this.has(uri)) {
return null;
}

return this.producers[uri.toString()];
}

/**
* Retrieve all producers
*/
getAll(): Producer[] {
return Object.keys(this.producers).map((c) => this.producers[c]);
}

/**
* Check whether a producer exists.
*/
has(uri: vscode.Uri): boolean {
return this.producers.hasOwnProperty(uri.toString());
}
}

// ---------- Producer URI utilities

export interface ProducerInfoUri {
clusterId: string;
topicId?: string;
key?: string;
value: string;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const KEY_QUERY_PARAMETER = 'key';
const VALUE_QUERY_PARAMETER = 'value';

export function createProducerUri(info: ProducerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key);
query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value);
return vscode.Uri.parse(path + query);
}

function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}
96 changes: 63 additions & 33 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as vscode from "vscode";
import * as faker from "faker";

import { performance } from "perf_hooks";
Expand All @@ -7,11 +8,10 @@ import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";
import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer";
import { ProducerRecord } from "kafkajs";

export interface ProduceRecordCommand {
topicId?: string;
key?: string;
value: string;
export interface ProduceRecordCommand extends ProducerInfoUri {
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}
Expand All @@ -22,6 +22,7 @@ export class ProduceRecordCommandHandler {

constructor(
private clientAccessor: ClientAccessor,
private producerCollection: ProducerCollection,
private channelProvider: OutputChannelProvider,
private explorer: KafkaExplorer,
private settings: WorkspaceSettings
Expand Down Expand Up @@ -67,35 +68,64 @@ export class ProduceRecordCommandHandler {
};
});

const producer = await client.producer();
await producer.connect();

channel.appendLine(`Producing record(s)`);
const startOperation = performance.now();

try {
await producer.send({
topic: topicId,
messages: messages,
});


const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);

channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`);

this.explorer.refresh();
} catch (error) {
const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);
channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`);
command.clusterId = client.cluster.id;
const producerUri = createProducerUri(command);
const record = {
topic: topicId,
messages: messages,
};
// Start the producer
await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer);
}
}

if (error.message) {
channel.appendLine(`Error: ${error.message}`);
} else {
channel.appendLine(`Error: ${error}`);
}
}
async function startProducerWithProgress(producerUri: vscode.Uri, record: ProducerRecord, producerCollection: ProducerCollection, channel: vscode.OutputChannel, times: number, explorer?: KafkaExplorer) {
const producer = producerCollection.get(producerUri);
if (producer && producer.state === ProducerLaunchState.producing) {
vscode.window.showErrorMessage(`The producer cannot be started because it is producing.`);
return;
}
await vscode.window.withProgress({
location: vscode.ProgressLocation.Window,
title: `Starting producer '${producerUri}'.`,
cancellable: false
}, (progress, token) => {
return new Promise((resolve, reject) => {
// Create the producer
producerCollection.create(producerUri)
.then(producer => {

channel.appendLine(`Producing record(s)`);
const startOperation = performance.now();

// Send the record.
producer.send(record)
.then(() => {

const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);

channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`);

if (explorer) {
explorer.refresh();
}
producer.dispose();
resolve(producer);
})
.catch(error => {
const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);
channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`);
if (error.message) {
channel.appendLine(`Error: ${error.message}`);
} else {
channel.appendLine(`Error: ${error}`);
}
reject(error);
});
})
.catch(error => reject(error));
});
});
}
8 changes: 5 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry";
import { KafkaExtensionParticipant } from "./kafka-extensions/api";
import { ProducerCollection } from "./client/producer";

export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant {
Context.register(context);
Expand All @@ -48,9 +49,10 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
const clusterSettings = getClusterSettings();
const clientAccessor = getClientAccessor();
const consumerCollection = new ConsumerCollection(clusterSettings);
const producerCollection = new ProducerCollection(clientAccessor);
context.subscriptions.push(clientAccessor);
context.subscriptions.push(consumerCollection);

context.subscriptions.push(producerCollection);

// Views (sidebar, status bar items etc.)
const outputChannelProvider = new OutputChannelProvider();
Expand All @@ -64,7 +66,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
const deleteTopicCommandHandler = new DeleteTopicCommandHandler(clientAccessor, explorer);
const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, outputChannelProvider, explorer, workspaceSettings);
const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, producerCollection, outputChannelProvider, explorer, workspaceSettings);
const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, consumerCollection, explorer);
const stopConsumerCommandHandler = new StopConsumerCommandHandler(clientAccessor, consumerCollection, explorer);
const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection);
Expand Down Expand Up @@ -142,7 +144,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
];

context.subscriptions.push(
vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, consumerCollection)));
vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, producerCollection, consumerCollection)));

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));
Expand Down
Loading

0 comments on commit 9472586

Please sign in to comment.