Skip to content

Commit

Permalink
Provide an option to clear the consumer view
Browse files Browse the repository at this point in the history
Fixes #40

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Feb 4, 2021
1 parent 1809450 commit 7724101
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to Kafka extension will be documented in this file.
- Selected Cluster or Topic can now be deleted via the Delete shortcut (Cmd+Backspace on Mac). See [#79](https://github.com/jlandersen/vscode-kafka/issues/79)
- Added SASL/SCRAM-256 and SASL/SCRAM-512 authentication support. See [#3](https://github.com/jlandersen/vscode-kafka/issues/3).
- Added the option to enable basic SSL support for clusters without authentication. See [#84](https://github.com/jlandersen/vscode-kafka/issues/84).
- The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
36 changes: 35 additions & 1 deletion docs/Consuming.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,44 @@
# Consuming messages

Consuming topics can be done by right clicking a topic in the explorer or from the command palette. Some things to note about consuming:
Consuming topics can be done:

* with [Kafka Explorer](#kafka-explorer) by right clicking a topic in the explorer.
* with the [Start command](#start-command) from the command palette.

## Consume with ...

### Kafka explorer

You can start consumer with [Kafka Explorer](Explorer.md#explorer) by right clicking a topic in the explorer:

![Start Consumer with Explorer](assets/start-consumer-from-explorer.png)

Once this command is executed, it creates a consumer group (with generated id) and opens the [Consumer View](#consumer-view) where you can see the producing messages:

![Consumer group / Consumer View](assets/consumer-group-after-starting-from-explorer.png)

In this case, you cannot configure offset, partition and consumer group id. The offset can be just configured with the [kafka.consumers.offset](#kafkaconsumersoffset) preference.

Some things to note about consuming:

* UTF-8 encoded keys and values only. If data is encoded differently, it will not be pretty.
* One consumer group is created per topic (may change in the future to just have one for the extension).

### Start command

![Start Consumer with palette](assets/start-consumer-from-command.png)

## Consumer View

`Consumer View` is a read-only editor which shows producing messages for a given topic:

![Consumer view](assets/consumer-view.png)

This editor provides 2 commands on the right of the editor:

* `Clear Consumer View` to clear the content of the view.
* `Start/Stop` to stop or restart the consumer.

Consumers are based on virtual documents, available in the VS Code extension API. A consumer will keep running even if you close the document in the editor. You should make sure to close the consumer explicitly, either via the command palette, the status bar element or the start/stop action button as well. The VS Code API does not support detecting if a virtual document is closed immediately. Instead, the underlying virtual document is automatically closed after two minutes if the document is closed in the editor.

## Preferences
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/consumer-view.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/start-consumer-from-command.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/start-consumer-from-explorer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions images/dark/clear-all.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions images/light/clear-all.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"onCommand:vscode-kafka.consumer.consume",
"onCommand:vscode-kafka.consumer.list",
"onCommand:vscode-kafka.consumer.toggle",
"onCommand:vscode-kafka.consumer.clear",
"onCommand:vscode-kafka.explorer.copylabel",
"onCommand:vscode-kafka.explorer.deleteselected",
"onView:kafkaExplorer",
Expand Down Expand Up @@ -270,6 +271,15 @@
"light": "images/light/toggle.svg",
"dark": "images/dark/toggle.svg"
}
},
{
"command": "vscode-kafka.consumer.clear",
"title": "Clear Consumer View",
"category": "Kafka",
"icon": {
"light": "images/light/clear-all.svg",
"dark": "images/dark/clear-all.svg"
}
}
],
"menus": {
Expand Down Expand Up @@ -337,6 +347,11 @@
"command": "vscode-kafka.consumer.toggle",
"group": "navigation",
"when": "resourceScheme == kafka"
},
{
"command": "vscode-kafka.consumer.clear",
"group": "navigation",
"when": "resourceScheme == kafka"
}
]
},
Expand Down
41 changes: 32 additions & 9 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pickTopic } from "./common";
import { ClusterSettings } from "../settings";
import { CommonMessages } from "../constants";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";

export interface StartConsumerCommand {
clusterId: string;
Expand All @@ -17,7 +18,7 @@ export class StartConsumerCommandHandler {
private clusterSettings: ClusterSettings,
private consumerCollection: ConsumerCollection,
private explorer: KafkaExplorer
) {
) {
}

async execute(startConsumerCommand?: StartConsumerCommand): Promise<void> {
Expand Down Expand Up @@ -55,6 +56,9 @@ export class StartConsumerCommandHandler {
}

export class ToggleConsumerCommandHandler {

public static COMMAND_ID = 'vscode-kafka.consumer.toggle';

constructor(private consumerCollection: ConsumerCollection) {
}

Expand All @@ -75,6 +79,25 @@ export class ToggleConsumerCommandHandler {
}
}
}
export class ClearConsumerViewCommandHandler {

public static COMMAND_ID = 'vscode-kafka.consumer.clear';

constructor(private provider: ConsumerVirtualTextDocumentProvider) {

}
async execute(): Promise<void> {
if (!vscode.window.activeTextEditor) {
return;
}

const { document } = vscode.window.activeTextEditor;
if (document.uri.scheme !== "kafka") {
return;
}
this.provider.clear(document);
}
}

enum ConsumerOption {
Open,
Expand Down Expand Up @@ -144,7 +167,7 @@ async function openDocument(uri: vscode.Uri): Promise<void> {

// If there's no document we open it
if (!document) {
document = await vscode.workspace.openTextDocument(uri);
document = await vscode.workspace.openTextDocument(uri);
}

// Check if there's an active editor, to later decide in which column the consumer
Expand All @@ -163,12 +186,12 @@ async function openDocument(uri: vscode.Uri): Promise<void> {
// Instead, a new TextEditor instance is added to the active panel. This is the
// default vscode behavior
await vscode.window.showTextDocument(
document,
{
preview: false,
preserveFocus: true,
viewColumn: hasActiveEditor?vscode.ViewColumn.Beside:vscode.ViewColumn.Active,
}
);
document,
{
preview: false,
preserveFocus: true,
viewColumn: hasActiveEditor ? vscode.ViewColumn.Beside : vscode.ViewColumn.Active,
}
);
await vscode.languages.setTextDocumentLanguage(document, "kafka-consumer");
}
12 changes: 9 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
DeleteClusterCommandHandler,
SelectClusterCommandHandler,
handleErrors,
ClearConsumerViewCommandHandler,
} from "./commands";
import { Context } from "./context";
import { BrokerItem, KafkaExplorer, TopicItem } from "./explorer";
Expand Down Expand Up @@ -50,6 +51,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(explorer);
context.subscriptions.push(new ConsumerStatusBarItem(consumerCollection));
context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings));
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection)

// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
Expand All @@ -58,6 +60,7 @@ export function activate(context: vscode.ExtensionContext): void {
const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, clusterSettings, consumerCollection, explorer);
const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection);
const toggleConsumerCommandHandler = new ToggleConsumerCommandHandler(consumerCollection);
const clearConsumerViewCommandHandler = new ClearConsumerViewCommandHandler(consumerVirtualTextDocumentProvider);
const addClusterCommandHandler = new AddClusterCommandHandler(clusterSettings, explorer);
const deleteClusterCommandHandler = new DeleteClusterCommandHandler(clusterSettings, clientAccessor, explorer);
const selectClusterCommandHandler = new SelectClusterCommandHandler(clusterSettings);
Expand Down Expand Up @@ -108,8 +111,12 @@ export function activate(context: vscode.ExtensionContext): void {
"vscode-kafka.consumer.list",
handleErrors(() => listConsumersCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
"vscode-kafka.consumer.toggle",
ToggleConsumerCommandHandler.COMMAND_ID,
handleErrors(() => toggleConsumerCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
ClearConsumerViewCommandHandler.COMMAND_ID,
handleErrors(() => clearConsumerViewCommandHandler.execute())));

registerVSCodeKafkaDocumentationCommands(context);

// .kafka file related
Expand All @@ -123,8 +130,7 @@ export function activate(context: vscode.ExtensionContext): void {
vscode.languages.registerCodeLensProvider(documentSelector, new ProducerCodeLensProvider()));

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME,
new ConsumerVirtualTextDocumentProvider(consumerCollection)));
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));
}

export function deactivate(): void {
Expand Down
60 changes: 37 additions & 23 deletions src/providers/consumerVirtualTextDocumentProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, Consume
import { CommonMessages } from "../constants";

export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable {

public static SCHEME = "kafka";
private buffer: { [id: string]: string } = {};
private disposables: vscode.Disposable[] = [];
Expand Down Expand Up @@ -32,10 +33,9 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

public provideTextDocumentContent(uri: vscode.Uri): string {
if (!this.buffer.hasOwnProperty(uri.toString())) {
if (!this.isActive(uri)) {
return "";
}

return this.buffer[uri.toString()];
}

Expand All @@ -60,27 +60,20 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

private onDidChangeStatus(uri: vscode.Uri, status: string): void {
let uriBuffer = this.buffer[uri.toString()];
if (!this.isActive(uri)) {
return;
}
const line = `Consumer: ${status}\n\n`;
uriBuffer = uriBuffer + line;

this.buffer[uri.toString()] = uriBuffer;
this.onDidChangeEmitter.fire(uri);
this.updateBuffer(uri, line);
}

private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
let uriBuffer = this.buffer[uri.toString()];

if (!uriBuffer) {
if (!this.isActive(uri)) {
return;
}

let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`;
line = line + `Value:\n${message.value}\n\n`;
uriBuffer = uriBuffer + line;

this.buffer[uri.toString()] = uriBuffer;
this.onDidChangeEmitter.fire(uri);
this.updateBuffer(uri, line);
}

private onDidCloseConsumer(uri: vscode.Uri): void {
Expand All @@ -93,25 +86,46 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC

private onDidCloseTextDocument(document: vscode.TextDocument): void {
// When language is plaintext we assume the event was triggered as a result of switching language mode
if (document.uri.scheme !== "kafka" || document.languageId === "plaintext") {
const uri = document.uri;
if (uri.scheme !== "kafka" || document.languageId === "plaintext") {
return;
}

const buffer = this.buffer[document.uri.toString()];

if (!buffer) {
if (!this.isActive(uri)) {
return;
}

if (this.consumerCollection.has(document.uri)) {
this.consumerCollection.close(document.uri);
if (this.consumerCollection.has(uri)) {
this.consumerCollection.close(uri);
}

delete this.buffer[document.uri.toString()];
delete this.buffer[uri.toString()];
}

public clear(document: vscode.TextDocument): void {
const uri = document.uri;
if (!this.isActive(uri)) {
return;
}
this.updateBuffer(uri, '', true);
}

public dispose(): void {
this.consumerCollection.dispose();
this.disposables.forEach(d=>d.dispose());
this.disposables.forEach(d => d.dispose());
}

private isActive(uri: vscode.Uri): boolean {
return this.buffer.hasOwnProperty(uri.toString());
}

private updateBuffer(uri: vscode.Uri, content: string, replace = false) {
if (replace) {
this.buffer[uri.toString()] = content;
} else {
const uriBuffer = this.buffer[uri.toString()];
this.buffer[uri.toString()] = uriBuffer + content;
}
this.onDidChangeEmitter.fire(uri);
}
}

0 comments on commit 7724101

Please sign in to comment.