Skip to content

Commit

Permalink
Execute Produce code lens in kafka file with progress
Browse files Browse the repository at this point in the history
Fixes jlandersen#117

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 26, 2021
1 parent 86f7bf3 commit 65414a2
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to Kafka extension will be documented in this file.
### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).
- The "Kafka Producer Log" output view is no longer shown automatically when producing messages. See [#134](https://github.com/jlandersen/vscode-kafka/issues/134).
- a progress notification is displayed when producing messages. See [#117](https://github.com/jlandersen/vscode-kafka/issues/117).

## [0.11.0] - 2021-03-08
### Added
Expand Down
7 changes: 7 additions & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,10 @@ export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions):
ssl: connectionOptions.ssl
};
};

export function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}
9 changes: 1 addition & 8 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part
import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { addQueryParameter, ConnectionOptions, createKafka } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
Expand Down Expand Up @@ -378,13 +378,6 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
return vscode.Uri.parse(path + query);
}

function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}

export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const [clusterId, consumerGroupId] = uri.path.split("/");
const urlParams = new URLSearchParams(uri.query);
Expand Down
219 changes: 219 additions & 0 deletions src/client/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import { ProducerRecord } from "kafkajs";
import * as vscode from "vscode";
import { addQueryParameter, ClientAccessor } from ".";
import { Producer as KafkaJSProducer } from "kafkajs";


export enum ProducerLaunchState {
none,
connecting,
connected,
sending,
sent,
disconnecting,
disconnected
}

export interface ProducerCollectionChangedEvent {
producers: Producer[];
}

export class Producer implements vscode.Disposable {

public state: ProducerLaunchState = ProducerLaunchState.none;

private producer: KafkaJSProducer | undefined;

constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) {

}

async start(): Promise<void> {
const [clusterId] = this.uri.path.split("/");
const client = this.clientAccessor.get(clusterId);
this.producer = await client.producer();
await this.producer.connect();
}

async send(record: ProducerRecord) {
if (this.producer) {
await this.producer.send(record);
}
}

async dispose(): Promise<void> {
if (this.producer) {
await this.producer.disconnect();
this.producer = undefined;
}
}
}

/**
* A collection of producers.
*/
export class ProducerCollection implements vscode.Disposable {

private producers: { [id: string]: Producer } = {};

private onDidChangeCollectionEmitter = new vscode.EventEmitter<ProducerCollectionChangedEvent>();

public onDidChangeCollection = this.onDidChangeCollectionEmitter.event;

constructor(private clientAccessor: ClientAccessor) {

}
/**
* Creates a new producer for a provided uri.
*/
async create(uri: vscode.Uri): Promise<Producer> {

// Create the producer
const producer = new Producer(uri, this.clientAccessor);
this.producers[uri.toString()] = producer;

// Fire an event to notify that producer is connecting
this.changeState(producer, ProducerLaunchState.connecting);

// Start the producer
try {
await producer.start();
}
catch (e) {
this.handleProducerError(producer, e);
}

// Fire an event to notify that producer is connected
this.changeState(producer, ProducerLaunchState.connected);

return producer;
}

dispose(): void {
this.disposeProducers();
this.onDidChangeCollectionEmitter.dispose();
}

disposeProducers(): void {
Object.keys(this.producers).forEach((key) => {
this.producers[key].dispose();
});

this.producers = {};
}

/**
* Retrieve the number of active producers
*/
length(): number {
return Object.keys(this.producers).length;
}

/**
* Retrieve an existing producer if exists.
*/
get(uri: vscode.Uri): Producer | null {
if (!this.has(uri)) {
return null;
}

return this.producers[uri.toString()];
}

/**
* Retrieve all producers
*/
getAll(): Producer[] {
return Object.keys(this.producers).map((c) => this.producers[c]);
}

/**
* Check whether a producer exists.
*/
has(uri: vscode.Uri): boolean {
return this.producers.hasOwnProperty(uri.toString());
}

async send(uri: vscode.Uri, record: ProducerRecord): Promise<void> {
const producer = this.get(uri);

if (producer === null) {
return;
}

// Fire an event to notify that producer message is sending
this.changeState(producer, ProducerLaunchState.sending);

// Send messages
try {
await producer.send(record);
}
catch (e) {
this.handleProducerError(producer, e);
}

// Fire an event to notify that producer message is sent
this.changeState(producer, ProducerLaunchState.sent);
}

/**
* Disconnect an existing producer if exists.
*/
async disconnect(uri: vscode.Uri): Promise<void> {
const producer = this.get(uri);

if (producer === null) {
return;
}

// Fire an event to notify that producer is disconnecting
this.changeState(producer, ProducerLaunchState.disconnecting);

try {
await producer.dispose();
}
catch (e) {
this.handleProducerError(producer, e);
}

// Fire an event to notify that producer is disconnected
this.changeState(producer, ProducerLaunchState.disconnected);
delete this.producers[uri.toString()];

}

private handleProducerError(producer: Producer, e: Error) {
this.changeState(producer, ProducerLaunchState.none);
delete this.producers[producer.uri.toString()];
throw e;
}

private changeState(producer: Producer, state: ProducerLaunchState) {
producer.state = state;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
}
}

// ---------- Producer URI utilities

export interface ProducerInfoUri {
clusterId: string;
topicId?: string;
key?: string;
value: string;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const KEY_QUERY_PARAMETER = 'key';
const VALUE_QUERY_PARAMETER = 'value';

export function createProducerUri(info: ProducerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key);
query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value);
return vscode.Uri.parse(path + query);
}
58 changes: 44 additions & 14 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as vscode from "vscode";
import * as faker from "faker";

import { performance } from "perf_hooks";
Expand All @@ -7,11 +8,10 @@ import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";
import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer";
import { ProducerRecord } from "kafkajs";

export interface ProduceRecordCommand {
topicId?: string;
key?: string;
value: string;
export interface ProduceRecordCommand extends ProducerInfoUri {
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}
Expand All @@ -22,6 +22,7 @@ export class ProduceRecordCommandHandler {

constructor(
private clientAccessor: ClientAccessor,
private producerCollection: ProducerCollection,
private channelProvider: OutputChannelProvider,
private explorer: KafkaExplorer,
private settings: WorkspaceSettings
Expand Down Expand Up @@ -67,25 +68,48 @@ export class ProduceRecordCommandHandler {
};
});

const producer = await client.producer();
await producer.connect();
command.clusterId = client.cluster.id;
const producerUri = createProducerUri(command);
const record = {
topic: topicId,
messages: messages,
};
// Start the producer
await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer);
}
}

async function startProducerWithProgress(producerUri: vscode.Uri, record: ProducerRecord, producerCollection: ProducerCollection, channel: vscode.OutputChannel, times: number, explorer?: KafkaExplorer) {
const producer = producerCollection.get(producerUri);
if (producer && producer.state === ProducerLaunchState.connecting) {
vscode.window.showErrorMessage(`The producer cannot be started because it is producing.`);
return;
}
await vscode.window.withProgress({
location: vscode.ProgressLocation.Window,
title: `Starting producer '${producerUri}'.`,
cancellable: false
}, async (progress, token) => {

// 1. Connect the producer
progress.report({message: `Connecting producer '${producerUri}'.`});
await producerCollection.create(producerUri);

// 2. Send the producer record.
progress.report({message: `Producing record(s) '${producerUri}'.`, increment: 30});
channel.appendLine(`Producing record(s)`);
const startOperation = performance.now();

try {
await producer.send({
topic: topicId,
messages: messages,
});

await producerCollection.send(producerUri, record);

const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);

channel.appendLine(`Produced ${times} record(s) (${elapsed}ms)`);

this.explorer.refresh();
if (explorer) {
explorer.refresh();
}
} catch (error) {
const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);
Expand All @@ -96,6 +120,12 @@ export class ProduceRecordCommandHandler {
} else {
channel.appendLine(`Error: ${error}`);
}
throw error;
}
}
finally {
// 3. Disconnect the producer
progress.report({message: `Disconnecting producer '${producerUri}'.`, increment: 60});
//await producerCollection.disconnect(producerUri);
}
});
}
8 changes: 5 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider";
import { KafkaFileCodeLensProvider } from "./kafka-file";
import { getDefaultKafkaExtensionParticipant } from "./kafka-extensions/registry";
import { KafkaExtensionParticipant } from "./kafka-extensions/api";
import { ProducerCollection } from "./client/producer";

export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant {
Context.register(context);
Expand All @@ -48,9 +49,10 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
const clusterSettings = getClusterSettings();
const clientAccessor = getClientAccessor();
const consumerCollection = new ConsumerCollection(clusterSettings);
const producerCollection = new ProducerCollection(clientAccessor);
context.subscriptions.push(clientAccessor);
context.subscriptions.push(consumerCollection);

context.subscriptions.push(producerCollection);

// Views (sidebar, status bar items etc.)
const outputChannelProvider = new OutputChannelProvider();
Expand All @@ -64,7 +66,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
const deleteTopicCommandHandler = new DeleteTopicCommandHandler(clientAccessor, explorer);
const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, outputChannelProvider, explorer, workspaceSettings);
const produceRecordCommandHandler = new ProduceRecordCommandHandler(clientAccessor, producerCollection, outputChannelProvider, explorer, workspaceSettings);
const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, consumerCollection, explorer);
const stopConsumerCommandHandler = new StopConsumerCommandHandler(clientAccessor, consumerCollection, explorer);
const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection);
Expand Down Expand Up @@ -142,7 +144,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic
];

context.subscriptions.push(
vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, consumerCollection)));
vscode.languages.registerCodeLensProvider(documentSelector, new KafkaFileCodeLensProvider(clusterSettings, producerCollection, consumerCollection)));

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));
Expand Down
Loading

0 comments on commit 65414a2

Please sign in to comment.