Skip to content

Commit

Permalink
Clean consumer code.
Browse files Browse the repository at this point in the history
Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and jlandersen committed Dec 17, 2020
1 parent 27698e9 commit e2f0b8f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ export interface ConsumedRecord {
key?: string | Buffer;
}

interface ConsumerChangedStatusEvent {
export interface ConsumerChangedStatusEvent {
uri: vscode.Uri;
status: "created" | "rebalancing" | "rebalanced";
}

interface ConsumerCollectionChangedEvent {
export interface ConsumerCollectionChangedEvent {
created: vscode.Uri[];
closed: vscode.Uri[];
}
Expand Down
35 changes: 20 additions & 15 deletions src/providers/consumerVirtualTextDocumentProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as vscode from "vscode";

import { ConsumedRecord, ConsumerCollection } from "../client";
import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, ConsumerCollectionChangedEvent, RecordReceivedEvent } from "../client";
import { CommonMessages } from "../constants";

export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable {
Expand All @@ -12,20 +12,20 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
public onDidChange = this.onDidChangeEmitter.event;

constructor(private consumerCollection: ConsumerCollection) {
this.disposables.push(vscode.workspace.onDidCloseTextDocument((e: any) => {
this.disposables.push(vscode.workspace.onDidCloseTextDocument((e: vscode.TextDocument) => {
this.onDidCloseTextDocument(e);
}));

this.disposables.push(this.consumerCollection.onDidChangeCollection((arg: any) => {
for (const startedUri of arg.created) {
this.disposables.push(this.consumerCollection.onDidChangeCollection((e: ConsumerCollectionChangedEvent) => {
for (const startedUri of e.created) {
if (!this.buffer[startedUri.toString()]) {
this.buffer[startedUri.toString()] = '';
}
this.onDidChangeStatus(startedUri, 'started');
this.attachToConsumer(startedUri);
}

for (const closedUri of arg.closed) {
for (const closedUri of e.closed) {
this.onDidCloseConsumer(closedUri);
}
}));
Expand All @@ -39,23 +39,27 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
return this.buffer[uri.toString()];
}

public attachToConsumer(uri: vscode.Uri): void {
private attachToConsumer(uri: vscode.Uri): void {
const consumer = this.consumerCollection.get(uri);

if (consumer === null) {
return;
}

this.disposables.push(consumer.onDidReceiveRecord((arg: any) => {
this.onDidReceiveRecord(arg.uri, arg.record);
this.disposables.push(consumer.onDidReceiveRecord((e: RecordReceivedEvent) => {
this.onDidReceiveRecord(e.uri, e.record);
}));

this.disposables.push(consumer.onDidChangeStatus((arg: any) => {
this.onDidChangeStatus(arg.uri, arg.status);
this.disposables.push(consumer.onDidChangeStatus((e: ConsumerChangedStatusEvent) => {
this.onDidChangeStatus(e.uri, e.status);
}));

this.disposables.push(consumer.onDidReceiveError((e: any) => {
this.onDidReceiveError(e);
}));
}

public onDidChangeStatus(uri: vscode.Uri, status: string): void {
private onDidChangeStatus(uri: vscode.Uri, status: string): void {
let uriBuffer = this.buffer[uri.toString()];
const line = `Consumer: ${status}\n\n`;
uriBuffer = uriBuffer + line;
Expand All @@ -64,7 +68,7 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
this.onDidChangeEmitter.fire(uri);
}

public onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
let uriBuffer = this.buffer[uri.toString()];

if (!uriBuffer) {
Expand All @@ -79,15 +83,15 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
this.onDidChangeEmitter.fire(uri);
}

public onDidCloseConsumer(uri: vscode.Uri): void {
private onDidCloseConsumer(uri: vscode.Uri): void {
this.onDidChangeStatus(uri, 'closed');
}

public onDidReceiveError(error: any): void {
private onDidReceiveError(error: any): void {
CommonMessages.showUnhandledError(error);
}

public onDidCloseTextDocument(document: vscode.TextDocument): void {
private onDidCloseTextDocument(document: vscode.TextDocument): void {
if (document.uri.scheme !== "kafka") {
return;
}
Expand All @@ -107,5 +111,6 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC

public dispose(): void {
this.consumerCollection.dispose();
this.disposables.forEach(d=>d.dispose());
}
}

0 comments on commit e2f0b8f

Please sign in to comment.