Skip to content

Commit

Permalink
Provide an option to clear the consumer view
Browse files Browse the repository at this point in the history
Fixes jlandersen#40

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Feb 2, 2021
1 parent 1809450 commit 4ff07b9
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to Kafka extension will be documented in this file.
- Selected Cluster or Topic can now be deleted via the Delete shortcut (Cmd+Backspace on Mac). See [#79](https://github.com/jlandersen/vscode-kafka/issues/79)
- Added SASL/SCRAM-256 and SASL/SCRAM-512 authentication support. See [#3](https://github.com/jlandersen/vscode-kafka/issues/3).
- Added the option to enable basic SSL support for clusters without authentication. See [#84](https://github.com/jlandersen/vscode-kafka/issues/84).
- The consumer view provides now a clear consumer command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
1 change: 1 addition & 0 deletions images/dark/clear-all.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions images/light/clear-all.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"onCommand:vscode-kafka.consumer.consume",
"onCommand:vscode-kafka.consumer.list",
"onCommand:vscode-kafka.consumer.toggle",
"onCommand:vscode-kafka.consumer.clear",
"onCommand:vscode-kafka.explorer.copylabel",
"onCommand:vscode-kafka.explorer.deleteselected",
"onView:kafkaExplorer",
Expand Down Expand Up @@ -270,6 +271,15 @@
"light": "images/light/toggle.svg",
"dark": "images/dark/toggle.svg"
}
},
{
"command": "vscode-kafka.consumer.clear",
"title": "Clear Consumer",
"category": "Kafka",
"icon": {
"light": "images/light/clear-all.svg",
"dark": "images/dark/clear-all.svg"
}
}
],
"menus": {
Expand Down Expand Up @@ -337,6 +347,11 @@
"command": "vscode-kafka.consumer.toggle",
"group": "navigation",
"when": "resourceScheme == kafka"
},
{
"command": "vscode-kafka.consumer.clear",
"group": "navigation",
"when": "resourceScheme == kafka"
}
]
},
Expand Down
39 changes: 30 additions & 9 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pickTopic } from "./common";
import { ClusterSettings } from "../settings";
import { CommonMessages } from "../constants";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";

export interface StartConsumerCommand {
clusterId: string;
Expand All @@ -17,7 +18,7 @@ export class StartConsumerCommandHandler {
private clusterSettings: ClusterSettings,
private consumerCollection: ConsumerCollection,
private explorer: KafkaExplorer
) {
) {
}

async execute(startConsumerCommand?: StartConsumerCommand): Promise<void> {
Expand Down Expand Up @@ -55,6 +56,8 @@ export class StartConsumerCommandHandler {
}

export class ToggleConsumerCommandHandler {
public static COMMAND_ID = 'vscode-kafka.consumer.toggle';

constructor(private consumerCollection: ConsumerCollection) {
}

Expand All @@ -75,6 +78,24 @@ export class ToggleConsumerCommandHandler {
}
}
}
export class ClearConsumerCommandHandler {
public static COMMAND_ID = 'vscode-kafka.consumer.clear';

constructor(private provider: ConsumerVirtualTextDocumentProvider) {

}
async execute(): Promise<void> {
if (!vscode.window.activeTextEditor) {
return;
}

const { document } = vscode.window.activeTextEditor;
if (document.uri.scheme !== "kafka") {
return;
}
this.provider.clear(document);
}
}

enum ConsumerOption {
Open,
Expand Down Expand Up @@ -144,7 +165,7 @@ async function openDocument(uri: vscode.Uri): Promise<void> {

// If there's no document we open it
if (!document) {
document = await vscode.workspace.openTextDocument(uri);
document = await vscode.workspace.openTextDocument(uri);
}

// Check if there's an active editor, to later decide in which column the consumer
Expand All @@ -163,12 +184,12 @@ async function openDocument(uri: vscode.Uri): Promise<void> {
// Instead, a new TextEditor instance is added to the active panel. This is the
// default vscode behavior
await vscode.window.showTextDocument(
document,
{
preview: false,
preserveFocus: true,
viewColumn: hasActiveEditor?vscode.ViewColumn.Beside:vscode.ViewColumn.Active,
}
);
document,
{
preview: false,
preserveFocus: true,
viewColumn: hasActiveEditor ? vscode.ViewColumn.Beside : vscode.ViewColumn.Active,
}
);
await vscode.languages.setTextDocumentLanguage(document, "kafka-consumer");
}
12 changes: 9 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
DeleteClusterCommandHandler,
SelectClusterCommandHandler,
handleErrors,
ClearConsumerCommandHandler,
} from "./commands";
import { Context } from "./context";
import { BrokerItem, KafkaExplorer, TopicItem } from "./explorer";
Expand Down Expand Up @@ -50,6 +51,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(explorer);
context.subscriptions.push(new ConsumerStatusBarItem(consumerCollection));
context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings));
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection)

// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
Expand All @@ -58,6 +60,7 @@ export function activate(context: vscode.ExtensionContext): void {
const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, clusterSettings, consumerCollection, explorer);
const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection);
const toggleConsumerCommandHandler = new ToggleConsumerCommandHandler(consumerCollection);
const clearConsumerCommandHandler = new ClearConsumerCommandHandler(consumerVirtualTextDocumentProvider);
const addClusterCommandHandler = new AddClusterCommandHandler(clusterSettings, explorer);
const deleteClusterCommandHandler = new DeleteClusterCommandHandler(clusterSettings, clientAccessor, explorer);
const selectClusterCommandHandler = new SelectClusterCommandHandler(clusterSettings);
Expand Down Expand Up @@ -108,8 +111,12 @@ export function activate(context: vscode.ExtensionContext): void {
"vscode-kafka.consumer.list",
handleErrors(() => listConsumersCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
"vscode-kafka.consumer.toggle",
ToggleConsumerCommandHandler.COMMAND_ID,
handleErrors(() => toggleConsumerCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
ClearConsumerCommandHandler.COMMAND_ID,
handleErrors(() => clearConsumerCommandHandler.execute())));

registerVSCodeKafkaDocumentationCommands(context);

// .kafka file related
Expand All @@ -123,8 +130,7 @@ export function activate(context: vscode.ExtensionContext): void {
vscode.languages.registerCodeLensProvider(documentSelector, new ProducerCodeLensProvider()));

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME,
new ConsumerVirtualTextDocumentProvider(consumerCollection)));
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));
}

export function deactivate(): void {
Expand Down
39 changes: 26 additions & 13 deletions src/providers/consumerVirtualTextDocumentProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, Consume
import { CommonMessages } from "../constants";

export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable {

public static SCHEME = "kafka";
private buffer: { [id: string]: string } = {};
private disposables: vscode.Disposable[] = [];
Expand Down Expand Up @@ -32,10 +33,9 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

public provideTextDocumentContent(uri: vscode.Uri): string {
if (!this.buffer.hasOwnProperty(uri.toString())) {
if (!this.isActive(uri)) {
return "";
}

return this.buffer[uri.toString()];
}

Expand All @@ -60,6 +60,9 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

private onDidChangeStatus(uri: vscode.Uri, status: string): void {
if (!this.isActive(uri)) {
return;
}
let uriBuffer = this.buffer[uri.toString()];
const line = `Consumer: ${status}\n\n`;
uriBuffer = uriBuffer + line;
Expand All @@ -69,11 +72,10 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
let uriBuffer = this.buffer[uri.toString()];

if (!uriBuffer) {
if (!this.isActive(uri)) {
return;
}
let uriBuffer = this.buffer[uri.toString()];

let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`;
line = line + `Value:\n${message.value}\n\n`;
Expand All @@ -82,6 +84,9 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
this.buffer[uri.toString()] = uriBuffer;
this.onDidChangeEmitter.fire(uri);
}
private isActive(uri: vscode.Uri): boolean {
return this.buffer.hasOwnProperty(uri.toString());
}

private onDidCloseConsumer(uri: vscode.Uri): void {
this.onDidChangeStatus(uri, 'closed');
Expand All @@ -93,25 +98,33 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC

private onDidCloseTextDocument(document: vscode.TextDocument): void {
// When language is plaintext we assume the event was triggered as a result of switching language mode
if (document.uri.scheme !== "kafka" || document.languageId === "plaintext") {
const uri = document.uri;
if (uri.scheme !== "kafka" || document.languageId === "plaintext") {
return;
}

const buffer = this.buffer[document.uri.toString()];

if (!buffer) {
if (!this.isActive(uri)) {
return;
}

if (this.consumerCollection.has(document.uri)) {
this.consumerCollection.close(document.uri);
if (this.consumerCollection.has(uri)) {
this.consumerCollection.close(uri);
}

delete this.buffer[document.uri.toString()];
delete this.buffer[uri.toString()];
}

public clear(document: vscode.TextDocument): void {
const uri = document.uri;
if (!this.isActive(uri)) {
return;
}
this.buffer[uri.toString()] = '';
this.onDidChangeEmitter.fire(uri);
}

public dispose(): void {
this.consumerCollection.dispose();
this.disposables.forEach(d=>d.dispose());
this.disposables.forEach(d => d.dispose());
}
}

0 comments on commit 4ff07b9

Please sign in to comment.