diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 64392cb..ba613bb 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -26,12 +26,12 @@ export interface ConsumedRecord { key?: string | Buffer; } -interface ConsumerChangedStatusEvent { +export interface ConsumerChangedStatusEvent { uri: vscode.Uri; status: "created" | "rebalancing" | "rebalanced"; } -interface ConsumerCollectionChangedEvent { +export interface ConsumerCollectionChangedEvent { created: vscode.Uri[]; closed: vscode.Uri[]; } diff --git a/src/providers/consumerVirtualTextDocumentProvider.ts b/src/providers/consumerVirtualTextDocumentProvider.ts index 2f5f036..8dffd82 100644 --- a/src/providers/consumerVirtualTextDocumentProvider.ts +++ b/src/providers/consumerVirtualTextDocumentProvider.ts @@ -1,6 +1,6 @@ import * as vscode from "vscode"; -import { ConsumedRecord, ConsumerCollection } from "../client"; +import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, ConsumerCollectionChangedEvent, RecordReceivedEvent } from "../client"; import { CommonMessages } from "../constants"; export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable { @@ -12,12 +12,12 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC public onDidChange = this.onDidChangeEmitter.event; constructor(private consumerCollection: ConsumerCollection) { - this.disposables.push(vscode.workspace.onDidCloseTextDocument((e: any) => { + this.disposables.push(vscode.workspace.onDidCloseTextDocument((e: vscode.TextDocument) => { this.onDidCloseTextDocument(e); })); - this.disposables.push(this.consumerCollection.onDidChangeCollection((arg: any) => { - for (const startedUri of arg.created) { + this.disposables.push(this.consumerCollection.onDidChangeCollection((e: ConsumerCollectionChangedEvent) => { + for (const startedUri of e.created) { if (!this.buffer[startedUri.toString()]) { this.buffer[startedUri.toString()] = ''; } @@ -25,7 +25,7 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC this.attachToConsumer(startedUri); } - for (const closedUri of arg.closed) { + for (const closedUri of e.closed) { this.onDidCloseConsumer(closedUri); } })); @@ -39,23 +39,27 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC return this.buffer[uri.toString()]; } - public attachToConsumer(uri: vscode.Uri): void { + private attachToConsumer(uri: vscode.Uri): void { const consumer = this.consumerCollection.get(uri); if (consumer === null) { return; } - this.disposables.push(consumer.onDidReceiveRecord((arg: any) => { - this.onDidReceiveRecord(arg.uri, arg.record); + this.disposables.push(consumer.onDidReceiveRecord((e: RecordReceivedEvent) => { + this.onDidReceiveRecord(e.uri, e.record); })); - this.disposables.push(consumer.onDidChangeStatus((arg: any) => { - this.onDidChangeStatus(arg.uri, arg.status); + this.disposables.push(consumer.onDidChangeStatus((e: ConsumerChangedStatusEvent) => { + this.onDidChangeStatus(e.uri, e.status); + })); + + this.disposables.push(consumer.onDidReceiveError((e: any) => { + this.onDidReceiveError(e); })); } - public onDidChangeStatus(uri: vscode.Uri, status: string): void { + private onDidChangeStatus(uri: vscode.Uri, status: string): void { let uriBuffer = this.buffer[uri.toString()]; const line = `Consumer: ${status}\n\n`; uriBuffer = uriBuffer + line; @@ -64,7 +68,7 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC this.onDidChangeEmitter.fire(uri); } - public onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void { + private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void { let uriBuffer = this.buffer[uri.toString()]; if (!uriBuffer) { @@ -79,15 +83,15 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC this.onDidChangeEmitter.fire(uri); } - public onDidCloseConsumer(uri: vscode.Uri): void { + private onDidCloseConsumer(uri: vscode.Uri): void { this.onDidChangeStatus(uri, 'closed'); } - public onDidReceiveError(error: any): void { + private onDidReceiveError(error: any): void { CommonMessages.showUnhandledError(error); } - public onDidCloseTextDocument(document: vscode.TextDocument): void { + private onDidCloseTextDocument(document: vscode.TextDocument): void { if (document.uri.scheme !== "kafka") { return; } @@ -107,5 +111,6 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC public dispose(): void { this.consumerCollection.dispose(); + this.disposables.forEach(d=>d.dispose()); } }