Skip to content

Commit

Permalink
Execute Produce code lens in kafka file with progress
Browse files Browse the repository at this point in the history
Fixes #117

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 25, 2021
1 parent 86f7bf3 commit 2fa0a02
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 58 deletions.
7 changes: 7 additions & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,10 @@ export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions):
ssl: connectionOptions.ssl
};
};

export function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}
9 changes: 1 addition & 8 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part
import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { addQueryParameter, ConnectionOptions, createKafka } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
Expand Down Expand Up @@ -378,13 +378,6 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
return vscode.Uri.parse(path + query);
}

function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}

export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const [clusterId, consumerGroupId] = uri.path.split("/");
const urlParams = new URLSearchParams(uri.query);
Expand Down
219 changes: 219 additions & 0 deletions src/client/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import { ProducerRecord } from "kafkajs";
import * as vscode from "vscode";
import { addQueryParameter, ClientAccessor } from ".";
import { Producer as KafkaJSProducer } from "kafkajs";


export enum ProducerLaunchState {
none,
connecting,
connected,
sending,
sent,
disconnecting,
disconnected
}

export interface ProducerCollectionChangedEvent {
producers: Producer[];
}

export class Producer implements vscode.Disposable {

public state: ProducerLaunchState = ProducerLaunchState.none;

private producer: KafkaJSProducer | undefined;

constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) {

}

async start(): Promise<void> {
const [clusterId] = this.uri.path.split("/");
const client = this.clientAccessor.get(clusterId);
this.producer = await client.producer();
await this.producer.connect();
}

async send(record: ProducerRecord) {
if (this.producer) {
await this.producer.send(record);
}
}

async dispose(): Promise<void> {
if (this.producer) {
await this.producer.disconnect();
this.producer = undefined;
}
}
}

/**
* A collection of producers.
*/
export class ProducerCollection implements vscode.Disposable {

private producers: { [id: string]: Producer } = {};

private onDidChangeCollectionEmitter = new vscode.EventEmitter<ProducerCollectionChangedEvent>();

public onDidChangeCollection = this.onDidChangeCollectionEmitter.event;

constructor(private clientAccessor: ClientAccessor) {

}
/**
* Creates a new producer for a provided uri.
*/
async create(uri: vscode.Uri): Promise<Producer> {

// Create the producer
const producer = new Producer(uri, this.clientAccessor);
this.producers[uri.toString()] = producer;

// Fire an event to notify that producer is connecting
producer.state = ProducerLaunchState.connecting;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

// Start the producer
try {
await producer.start();
}
catch (e) {
delete this.producers[uri.toString()];
producer.state = ProducerLaunchState.none;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
throw e;
}

// Fire an event to notify that producer is connected
producer.state = ProducerLaunchState.connected;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

return producer;
}

dispose(): void {
this.disposeProducers();
this.onDidChangeCollectionEmitter.dispose();
}

disposeProducers(): void {
Object.keys(this.producers).forEach((key) => {
this.producers[key].dispose();
});

this.producers = {};
}

/**
* Retrieve the number of active producers
*/
length(): number {
return Object.keys(this.producers).length;
}

/**
* Retrieve an existing producer if exists.
*/
get(uri: vscode.Uri): Producer | null {
if (!this.has(uri)) {
return null;
}

return this.producers[uri.toString()];
}

/**
* Retrieve all producers
*/
getAll(): Producer[] {
return Object.keys(this.producers).map((c) => this.producers[c]);
}

/**
* Check whether a producer exists.
*/
has(uri: vscode.Uri): boolean {
return this.producers.hasOwnProperty(uri.toString());
}

async send(uri: vscode.Uri, record: ProducerRecord): Promise<void> {
const producer = this.get(uri);

if (producer === null) {
return;
}

// Fire an event to notify that producer message is sending
producer.state = ProducerLaunchState.sending;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

await producer.dispose();

// Fire an event to notify that producer message is sent
producer.state = ProducerLaunchState.sent;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
}

/**
* Disconnect an existing producer if exists.
*/
async disconnect(uri: vscode.Uri): Promise<void> {
const producer = this.get(uri);

if (producer === null) {
return;
}

// Fire an event to notify that producer is disconnecting
producer.state = ProducerLaunchState.disconnecting;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

await producer.dispose();

// Fire an event to notify that producer is disconnected
producer.state = ProducerLaunchState.disconnected;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

delete this.producers[uri.toString()];

}
}

// ---------- Producer URI utilities

export interface ProducerInfoUri {
clusterId: string;
topicId?: string;
key?: string;
value: string;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const KEY_QUERY_PARAMETER = 'key';
const VALUE_QUERY_PARAMETER = 'value';

export function createProducerUri(info: ProducerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key);
query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value);
return vscode.Uri.parse(path + query);
}
101 changes: 68 additions & 33 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as vscode from "vscode";
import * as faker from "faker";

import { performance } from "perf_hooks";
Expand All @@ -7,11 +8,10 @@ import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";
import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer";
import { ProducerRecord } from "kafkajs";

export interface ProduceRecordCommand {
topicId?: string;
key?: string;
value: string;
export interface ProduceRecordCommand extends ProducerInfoUri {
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}
Expand All @@ -22,6 +22,7 @@ export class ProduceRecordCommandHandler {

constructor(
private clientAccessor: ClientAccessor,
private producerCollection: ProducerCollection,
private channelProvider: OutputChannelProvider,
private explorer: KafkaExplorer,
private settings: WorkspaceSettings
Expand Down Expand Up @@ -67,35 +68,69 @@ export class ProduceRecordCommandHandler {
};
});

const producer = await client.producer();
await producer.connect();

channel.appendLine(`Producing record(s)`);
const startOperation = performance.now();

try {
await producer.send({
topic: topicId,
messages: messages,
});


const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);

channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`);

this.explorer.refresh();
} catch (error) {
const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);
channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`);
command.clusterId = client.cluster.id;
const producerUri = createProducerUri(command);
const record = {
topic: topicId,
messages: messages,
};
// Start the producer
await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer);
}
}

if (error.message) {
channel.appendLine(`Error: ${error.message}`);
} else {
channel.appendLine(`Error: ${error}`);
}
}
async function startProducerWithProgress(producerUri: vscode.Uri, record: ProducerRecord, producerCollection: ProducerCollection, channel: vscode.OutputChannel, times: number, explorer?: KafkaExplorer) {
const producer = producerCollection.get(producerUri);
if (producer && producer.state === ProducerLaunchState.connecting) {
vscode.window.showErrorMessage(`The producer cannot be started because it is producing.`);
return;
}
await vscode.window.withProgress({
location: vscode.ProgressLocation.Window,
title: `Starting producer '${producerUri}'.`,
cancellable: false
}, (progress, token) => {
return new Promise((resolve, reject) => {
// 1. Connect the producer
producerCollection
.create(producerUri)
.then(producer => {

channel.appendLine(`Producing record(s)`);
const startOperation = performance.now();

// 2. Send the producer record.
producer
.send(record)
.then(() => {

const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);

channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`);

if (explorer) {
explorer.refresh();
}

// 3. Disconnect the producer
producerCollection
.disconnect(producerUri)
.then(() => resolve(producer));
})
.catch(error => {
const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);
channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`);
if (error.message) {
channel.appendLine(`Error: ${error.message}`);
} else {
channel.appendLine(`Error: ${error}`);
}
reject(error);
});
})
.catch(error => reject(error));
});
});
}
Loading

0 comments on commit 2fa0a02

Please sign in to comment.