forked from jlandersen/vscode-kafka
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Execute Produce code lens in kafka file with progress
Fixes jlandersen#117 Signed-off-by: azerr <[email protected]>
- Loading branch information
1 parent
86f7bf3
commit b14f13e
Showing
6 changed files
with
311 additions
and
58 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
import { ProducerRecord } from "kafkajs"; | ||
import * as vscode from "vscode"; | ||
import { addQueryParameter, ClientAccessor } from "."; | ||
import { Producer as KafkaJSProducer } from "kafkajs"; | ||
|
||
|
||
export enum ProducerLaunchState { | ||
none, | ||
connecting, | ||
connected, | ||
disconnecting, | ||
disconnected | ||
} | ||
|
||
export interface ProducerCollectionChangedEvent { | ||
producers: Producer[]; | ||
} | ||
|
||
export class Producer implements vscode.Disposable { | ||
|
||
public state: ProducerLaunchState = ProducerLaunchState.none; | ||
|
||
private producer: KafkaJSProducer | undefined; | ||
|
||
constructor(public uri: vscode.Uri, private clientAccessor: ClientAccessor) { | ||
|
||
} | ||
|
||
async start(): Promise<void> { | ||
const [clusterId] = this.uri.path.split("/"); | ||
const client = this.clientAccessor.get(clusterId); | ||
this.producer = await client.producer(); | ||
await this.producer.connect(); | ||
} | ||
|
||
async send(record: ProducerRecord) { | ||
if (this.producer) { | ||
await this.producer.send(record); | ||
} | ||
} | ||
|
||
async dispose(): Promise<void> { | ||
if (this.producer) { | ||
await this.producer.disconnect(); | ||
this.producer = undefined; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* A collection of producers. | ||
*/ | ||
export class ProducerCollection implements vscode.Disposable { | ||
|
||
private producers: { [id: string]: Producer } = {}; | ||
|
||
private onDidChangeCollectionEmitter = new vscode.EventEmitter<ProducerCollectionChangedEvent>(); | ||
|
||
public onDidChangeCollection = this.onDidChangeCollectionEmitter.event; | ||
|
||
constructor(private clientAccessor: ClientAccessor) { | ||
|
||
} | ||
/** | ||
* Creates a new provider for a provided uri. | ||
*/ | ||
async create(uri: vscode.Uri): Promise<Producer> { | ||
|
||
// Create the consumer | ||
const producer = new Producer(uri, this.clientAccessor); | ||
this.producers[uri.toString()] = producer; | ||
|
||
// Fire an event to notify that Producer is producing | ||
producer.state = ProducerLaunchState.connecting; | ||
this.onDidChangeCollectionEmitter.fire({ | ||
producers: [producer] | ||
}); | ||
|
||
// Start the producer | ||
try { | ||
await producer.start(); | ||
} | ||
catch (e) { | ||
delete this.producers[uri.toString()]; | ||
producer.state = ProducerLaunchState.none; | ||
this.onDidChangeCollectionEmitter.fire({ | ||
producers: [producer] | ||
}); | ||
throw e; | ||
} | ||
|
||
// Fire an event to notify that producer is finished | ||
producer.state = ProducerLaunchState.connected; | ||
this.onDidChangeCollectionEmitter.fire({ | ||
producers: [producer] | ||
}); | ||
|
||
return producer; | ||
} | ||
|
||
dispose(): void { | ||
this.disposeProducers(); | ||
this.onDidChangeCollectionEmitter.dispose(); | ||
} | ||
|
||
disposeProducers(): void { | ||
Object.keys(this.producers).forEach((key) => { | ||
this.producers[key].dispose(); | ||
}); | ||
|
||
this.producers = {}; | ||
} | ||
|
||
/** | ||
* Retrieve the number of active producers | ||
*/ | ||
length(): number { | ||
return Object.keys(this.producers).length; | ||
} | ||
|
||
/** | ||
* Retrieve an existing producer if exists. | ||
*/ | ||
get(uri: vscode.Uri): Producer | null { | ||
if (!this.has(uri)) { | ||
return null; | ||
} | ||
|
||
return this.producers[uri.toString()]; | ||
} | ||
|
||
/** | ||
* Retrieve all producers | ||
*/ | ||
getAll(): Producer[] { | ||
return Object.keys(this.producers).map((c) => this.producers[c]); | ||
} | ||
|
||
/** | ||
* Check whether a producer exists. | ||
*/ | ||
has(uri: vscode.Uri): boolean { | ||
return this.producers.hasOwnProperty(uri.toString()); | ||
} | ||
|
||
/** | ||
* Disconnect an existing producer if exists. | ||
*/ | ||
async disconnect(uri: vscode.Uri): Promise<void> { | ||
const producer = this.get(uri); | ||
|
||
if (producer === null) { | ||
return; | ||
} | ||
|
||
// Fire an event to notify that producer is disconnecting | ||
producer.state = ProducerLaunchState.disconnecting; | ||
this.onDidChangeCollectionEmitter.fire({ | ||
producers: [producer] | ||
}); | ||
|
||
await producer.dispose(); | ||
|
||
// Fire an event to notify that producer is disconnected | ||
producer.state = ProducerLaunchState.disconnected; | ||
this.onDidChangeCollectionEmitter.fire({ | ||
producers: [producer] | ||
}); | ||
|
||
delete this.producers[uri.toString()]; | ||
|
||
} | ||
} | ||
|
||
// ---------- Producer URI utilities | ||
|
||
export interface ProducerInfoUri { | ||
clusterId: string; | ||
topicId?: string; | ||
key?: string; | ||
value: string; | ||
} | ||
|
||
const TOPIC_QUERY_PARAMETER = 'topic'; | ||
const KEY_QUERY_PARAMETER = 'key'; | ||
const VALUE_QUERY_PARAMETER = 'value'; | ||
|
||
export function createProducerUri(info: ProducerInfoUri): vscode.Uri { | ||
const path = `kafka:${info.clusterId}`; | ||
let query = ''; | ||
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId); | ||
query = addQueryParameter(query, KEY_QUERY_PARAMETER, info.key); | ||
query = addQueryParameter(query, VALUE_QUERY_PARAMETER, info.value); | ||
return vscode.Uri.parse(path + query); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.