Skip to content

Commit

Permalink
Execute Produce code lens in kafka file with progress
Browse files Browse the repository at this point in the history
Fixes #117

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 26, 2021
1 parent 86f7bf3 commit 4f3b013
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to Kafka extension will be documented in this file.
### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).
- The "Kafka Producer Log" output view is no longer shown automatically when producing messages. See [#134](https://github.com/jlandersen/vscode-kafka/issues/134).
- a progress notification is displayed when producing messages. See [#117](https://github.com/jlandersen/vscode-kafka/issues/117).

## [0.11.0] - 2021-03-08
### Added
Expand Down
7 changes: 7 additions & 0 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,10 @@ export const createDefaultKafkaConfig = (connectionOptions: ConnectionOptions):
ssl: connectionOptions.ssl
};
};

export function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}
9 changes: 1 addition & 8 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part
import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { addQueryParameter, ConnectionOptions, createKafka } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
Expand Down Expand Up @@ -378,13 +378,6 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
return vscode.Uri.parse(path + query);
}

function addQueryParameter(query: string, name: string, value?: string): string {
if (value === undefined) {
return query;
}
return `${query}${query.length > 0 ? '&' : '?'}${name}=${value}`;
}

export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const [clusterId, consumerGroupId] = uri.path.split("/");
const urlParams = new URLSearchParams(uri.query);
Expand Down
233 changes: 233 additions & 0 deletions src/client/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import { ProducerRecord } from "kafkajs";
import * as vscode from "vscode";
import { addQueryParameter, ClientAccessor } from ".";
import { Producer as KafkaJSProducer } from "kafkajs";


export enum ProducerLaunchState {
none,
connecting,
connected,
sending,
sent,
disconnecting,
disconnected
}

export interface ProducerCollectionChangedEvent {
producers: Producer[];
}

export class Producer implements vscode.Disposable {

public state: ProducerLaunchState = ProducerLaunchState.none;

private producer: KafkaJSProducer | undefined;

constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) {

}

async start(): Promise<void> {
const [clusterId] = this.uri.path.split("/");
const client = this.clientAccessor.get(clusterId);
this.producer = await client.producer();
await this.producer.connect();
}

async send(record: ProducerRecord) {
if (this.producer) {
await this.producer.send(record);
}
}

async dispose(): Promise<void> {
if (this.producer) {
await this.producer.disconnect();
this.producer = undefined;
}
}
}

/**
* A collection of producers.
*/
export class ProducerCollection implements vscode.Disposable {

private producers: { [id: string]: Producer } = {};

private onDidChangeCollectionEmitter = new vscode.EventEmitter<ProducerCollectionChangedEvent>();

public onDidChangeCollection = this.onDidChangeCollectionEmitter.event;

constructor(private clientAccessor: ClientAccessor) {

}
/**
* Creates a new producer for a provided uri.
*/
async create(uri: vscode.Uri): Promise<Producer> {

// Create the producer
const producer = new Producer(uri, this.clientAccessor);
this.producers[uri.toString()] = producer;

// Fire an event to notify that producer is connecting
producer.state = ProducerLaunchState.connecting;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

// Start the producer
try {
await producer.start();
}
catch (e) {
this.handleProducerError(producer, e);
}

// Fire an event to notify that producer is connected
producer.state = ProducerLaunchState.connected;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

return producer;
}

handleProducerError(producer: Producer, e: Error) {
producer.state = ProducerLaunchState.none;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
delete this.producers[producer.uri.toString()];
throw e;
}

dispose(): void {
this.disposeProducers();
this.onDidChangeCollectionEmitter.dispose();
}

disposeProducers(): void {
Object.keys(this.producers).forEach((key) => {
this.producers[key].dispose();
});

this.producers = {};
}

/**
* Retrieve the number of active producers
*/
length(): number {
return Object.keys(this.producers).length;
}

/**
* Retrieve an existing producer if exists.
*/
get(uri: vscode.Uri): Producer | null {
if (!this.has(uri)) {
return null;
}

return this.producers[uri.toString()];
}

/**
* Retrieve all producers
*/
getAll(): Producer[] {
return Object.keys(this.producers).map((c) => this.producers[c]);
}

/**
* Check whether a producer exists.
*/
has(uri: vscode.Uri): boolean {
return this.producers.hasOwnProperty(uri.toString());
}

async send(uri: vscode.Uri, record: ProducerRecord): Promise<void> {
const producer = this.get(uri);

if (producer === null) {
return;
}

// Fire an event to notify that producer message is sending
producer.state = ProducerLaunchState.sending;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

// Send messages
try {
await producer.send(record);
}
catch (e) {
this.handleProducerError(producer, e);
}

// Fire an event to notify that producer message is sent
producer.state = ProducerLaunchState.sent;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
}

/**
* Disconnect an existing producer if exists.
*/
async disconnect(uri: vscode.Uri): Promise<void> {
const producer = this.get(uri);

if (producer === null) {
return;
}

// Fire an event to notify that producer is disconnecting
producer.state = ProducerLaunchState.disconnecting;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});

try {
await producer.dispose();
}
catch (e) {
this.handleProducerError(producer, e);
}

// Fire an event to notify that producer is disconnected
producer.state = ProducerLaunchState.disconnected;
this.onDidChangeCollectionEmitter.fire({
producers: [producer]
});
delete this.producers[uri.toString()];

}
}

// ---------- Producer URI utilities

export interface ProducerInfoUri {
clusterId: string;
topicId?: string;
key?: string;
value: string;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const KEY_QUERY_PARAMETER = 'key';
const VALUE_QUERY_PARAMETER = 'value';

export function createProducerUri(info: ProducerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key);
query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value);
return vscode.Uri.parse(path + query);
}
Loading

0 comments on commit 4f3b013

Please sign in to comment.