diff --git a/CHANGELOG.md b/CHANGELOG.md
index 15ccdb0..bfae418 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@ All notable changes to Kafka extension will be documented in this file.
- Selected Cluster or Topic can now be deleted via the Delete shortcut (Cmd+Backspace on Mac). See [#79](https://github.com/jlandersen/vscode-kafka/issues/79)
- Added SASL/SCRAM-256 and SASL/SCRAM-512 authentication support. See [#3](https://github.com/jlandersen/vscode-kafka/issues/3).
- Added the option to enable basic SSL support for clusters without authentication. See [#84](https://github.com/jlandersen/vscode-kafka/issues/84).
+- The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).
### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
diff --git a/docs/Consuming.md b/docs/Consuming.md
index d278bf8..01a4ea3 100644
--- a/docs/Consuming.md
+++ b/docs/Consuming.md
@@ -1,10 +1,44 @@
# Consuming messages
-Consuming topics can be done by right clicking a topic in the explorer or from the command palette. Some things to note about consuming:
+Consuming topics can be done:
+
+* from the [Kafka Explorer](#kafka-explorer), by right-clicking on a topic and selecting "Start Consumer".
+* from the [Start Consumer](#start-consumer-command), from the command palette.
+
+## Consume with ...
+
+### Kafka explorer
+
+You can start consuming messages from the [Kafka Explorer](Explorer.md#explorer), by right-clicking on a topic:
+
+![Start Consumer with Explorer](assets/start-consumer-from-explorer.png)
+
+Once this command is launched, it creates a consumer group (with an auto-generated id), and opens the [Consumer View](#consumer-view) where you can see the messages being consumed:
+
+![Consumer group / Consumer View](assets/consumer-group-after-starting-from-explorer.png)
+
+In this case, the starting offset can be only be configured via the [kafka.consumers.offset](#kafkaconsumersoffset) preference.
+
+Known limitations:
* UTF-8 encoded keys and values only. If data is encoded differently, it will not be pretty.
* One consumer group is created per topic (may change in the future to just have one for the extension).
+### Start Consumer command
+
+![Start Consumer from command palette](assets/start-consumer-from-command.png)
+
+## Consumer View
+
+The `Consumer View` is a read-only editor which shows consumed messages for a given topic:
+
+![Consumer view](assets/consumer-view.png)
+
+This editor provides 2 commands on the top right of the editor:
+
+ * `Clear Consumer View`: clears the view.
+ * `Start/Stop`: to stop or (re)start the consumer.
+
Consumers are based on virtual documents, available in the VS Code extension API. A consumer will keep running even if you close the document in the editor. You should make sure to close the consumer explicitly, either via the command palette, the status bar element or the start/stop action button as well. The VS Code API does not support detecting if a virtual document is closed immediately. Instead, the underlying virtual document is automatically closed after two minutes if the document is closed in the editor.
## Preferences
diff --git a/docs/assets/consumer-group-after-starting-from-explorer.png b/docs/assets/consumer-group-after-starting-from-explorer.png
new file mode 100644
index 0000000..f92cdb3
Binary files /dev/null and b/docs/assets/consumer-group-after-starting-from-explorer.png differ
diff --git a/docs/assets/consumer-view.png b/docs/assets/consumer-view.png
new file mode 100644
index 0000000..71ab5ef
Binary files /dev/null and b/docs/assets/consumer-view.png differ
diff --git a/docs/assets/start-consumer-from-command.png b/docs/assets/start-consumer-from-command.png
new file mode 100644
index 0000000..22bf4d9
Binary files /dev/null and b/docs/assets/start-consumer-from-command.png differ
diff --git a/docs/assets/start-consumer-from-explorer.png b/docs/assets/start-consumer-from-explorer.png
new file mode 100644
index 0000000..f037ed8
Binary files /dev/null and b/docs/assets/start-consumer-from-explorer.png differ
diff --git a/images/dark/clear-all.svg b/images/dark/clear-all.svg
new file mode 100644
index 0000000..316bbe2
--- /dev/null
+++ b/images/dark/clear-all.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/images/light/clear-all.svg b/images/light/clear-all.svg
new file mode 100644
index 0000000..3bb24db
--- /dev/null
+++ b/images/light/clear-all.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/package.json b/package.json
index ced50e7..ae2aa9d 100644
--- a/package.json
+++ b/package.json
@@ -37,6 +37,7 @@
"onCommand:vscode-kafka.consumer.consume",
"onCommand:vscode-kafka.consumer.list",
"onCommand:vscode-kafka.consumer.toggle",
+ "onCommand:vscode-kafka.consumer.clear",
"onCommand:vscode-kafka.explorer.copylabel",
"onCommand:vscode-kafka.explorer.deleteselected",
"onView:kafkaExplorer",
@@ -270,6 +271,15 @@
"light": "images/light/toggle.svg",
"dark": "images/dark/toggle.svg"
}
+ },
+ {
+ "command": "vscode-kafka.consumer.clear",
+ "title": "Clear Consumer View",
+ "category": "Kafka",
+ "icon": {
+ "light": "images/light/clear-all.svg",
+ "dark": "images/dark/clear-all.svg"
+ }
}
],
"menus": {
@@ -337,6 +347,11 @@
"command": "vscode-kafka.consumer.toggle",
"group": "navigation",
"when": "resourceScheme == kafka"
+ },
+ {
+ "command": "vscode-kafka.consumer.clear",
+ "group": "navigation",
+ "when": "resourceScheme == kafka"
}
]
},
diff --git a/src/commands/consumers.ts b/src/commands/consumers.ts
index 9204721..7da8b81 100644
--- a/src/commands/consumers.ts
+++ b/src/commands/consumers.ts
@@ -5,6 +5,7 @@ import { pickTopic } from "./common";
import { ClusterSettings } from "../settings";
import { CommonMessages } from "../constants";
import { KafkaExplorer } from "../explorer";
+import { ConsumerVirtualTextDocumentProvider } from "../providers";
export interface StartConsumerCommand {
clusterId: string;
@@ -17,7 +18,7 @@ export class StartConsumerCommandHandler {
private clusterSettings: ClusterSettings,
private consumerCollection: ConsumerCollection,
private explorer: KafkaExplorer
- ) {
+ ) {
}
async execute(startConsumerCommand?: StartConsumerCommand): Promise {
@@ -55,6 +56,9 @@ export class StartConsumerCommandHandler {
}
export class ToggleConsumerCommandHandler {
+
+ public static COMMAND_ID = 'vscode-kafka.consumer.toggle';
+
constructor(private consumerCollection: ConsumerCollection) {
}
@@ -75,6 +79,25 @@ export class ToggleConsumerCommandHandler {
}
}
}
+export class ClearConsumerViewCommandHandler {
+
+ public static COMMAND_ID = 'vscode-kafka.consumer.clear';
+
+ constructor(private provider: ConsumerVirtualTextDocumentProvider) {
+
+ }
+ async execute(): Promise {
+ if (!vscode.window.activeTextEditor) {
+ return;
+ }
+
+ const { document } = vscode.window.activeTextEditor;
+ if (document.uri.scheme !== "kafka") {
+ return;
+ }
+ this.provider.clear(document);
+ }
+}
enum ConsumerOption {
Open,
@@ -144,7 +167,7 @@ async function openDocument(uri: vscode.Uri): Promise {
// If there's no document we open it
if (!document) {
- document = await vscode.workspace.openTextDocument(uri);
+ document = await vscode.workspace.openTextDocument(uri);
}
// Check if there's an active editor, to later decide in which column the consumer
@@ -163,12 +186,12 @@ async function openDocument(uri: vscode.Uri): Promise {
// Instead, a new TextEditor instance is added to the active panel. This is the
// default vscode behavior
await vscode.window.showTextDocument(
- document,
- {
- preview: false,
- preserveFocus: true,
- viewColumn: hasActiveEditor?vscode.ViewColumn.Beside:vscode.ViewColumn.Active,
- }
- );
+ document,
+ {
+ preview: false,
+ preserveFocus: true,
+ viewColumn: hasActiveEditor ? vscode.ViewColumn.Beside : vscode.ViewColumn.Active,
+ }
+ );
await vscode.languages.setTextDocumentLanguage(document, "kafka-consumer");
}
diff --git a/src/extension.ts b/src/extension.ts
index e77511e..f18026b 100644
--- a/src/extension.ts
+++ b/src/extension.ts
@@ -15,6 +15,7 @@ import {
DeleteClusterCommandHandler,
SelectClusterCommandHandler,
handleErrors,
+ ClearConsumerViewCommandHandler,
} from "./commands";
import { Context } from "./context";
import { BrokerItem, KafkaExplorer, TopicItem } from "./explorer";
@@ -50,6 +51,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(explorer);
context.subscriptions.push(new ConsumerStatusBarItem(consumerCollection));
context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings));
+ const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection)
// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
@@ -58,6 +60,7 @@ export function activate(context: vscode.ExtensionContext): void {
const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, clusterSettings, consumerCollection, explorer);
const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection);
const toggleConsumerCommandHandler = new ToggleConsumerCommandHandler(consumerCollection);
+ const clearConsumerViewCommandHandler = new ClearConsumerViewCommandHandler(consumerVirtualTextDocumentProvider);
const addClusterCommandHandler = new AddClusterCommandHandler(clusterSettings, explorer);
const deleteClusterCommandHandler = new DeleteClusterCommandHandler(clusterSettings, clientAccessor, explorer);
const selectClusterCommandHandler = new SelectClusterCommandHandler(clusterSettings);
@@ -108,8 +111,12 @@ export function activate(context: vscode.ExtensionContext): void {
"vscode-kafka.consumer.list",
handleErrors(() => listConsumersCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
- "vscode-kafka.consumer.toggle",
+ ToggleConsumerCommandHandler.COMMAND_ID,
handleErrors(() => toggleConsumerCommandHandler.execute())));
+ context.subscriptions.push(vscode.commands.registerCommand(
+ ClearConsumerViewCommandHandler.COMMAND_ID,
+ handleErrors(() => clearConsumerViewCommandHandler.execute())));
+
registerVSCodeKafkaDocumentationCommands(context);
// .kafka file related
@@ -123,8 +130,7 @@ export function activate(context: vscode.ExtensionContext): void {
vscode.languages.registerCodeLensProvider(documentSelector, new ProducerCodeLensProvider()));
context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
- ConsumerVirtualTextDocumentProvider.SCHEME,
- new ConsumerVirtualTextDocumentProvider(consumerCollection)));
+ ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));
}
export function deactivate(): void {
diff --git a/src/providers/consumerVirtualTextDocumentProvider.ts b/src/providers/consumerVirtualTextDocumentProvider.ts
index 9f8935b..f041fa4 100644
--- a/src/providers/consumerVirtualTextDocumentProvider.ts
+++ b/src/providers/consumerVirtualTextDocumentProvider.ts
@@ -4,6 +4,7 @@ import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, Consume
import { CommonMessages } from "../constants";
export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable {
+
public static SCHEME = "kafka";
private buffer: { [id: string]: string } = {};
private disposables: vscode.Disposable[] = [];
@@ -32,10 +33,9 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}
public provideTextDocumentContent(uri: vscode.Uri): string {
- if (!this.buffer.hasOwnProperty(uri.toString())) {
+ if (!this.isActive(uri)) {
return "";
}
-
return this.buffer[uri.toString()];
}
@@ -60,27 +60,20 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}
private onDidChangeStatus(uri: vscode.Uri, status: string): void {
- let uriBuffer = this.buffer[uri.toString()];
+ if (!this.isActive(uri)) {
+ return;
+ }
const line = `Consumer: ${status}\n\n`;
- uriBuffer = uriBuffer + line;
-
- this.buffer[uri.toString()] = uriBuffer;
- this.onDidChangeEmitter.fire(uri);
+ this.updateBuffer(uri, line);
}
private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
- let uriBuffer = this.buffer[uri.toString()];
-
- if (!uriBuffer) {
+ if (!this.isActive(uri)) {
return;
}
-
let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`;
line = line + `Value:\n${message.value}\n\n`;
- uriBuffer = uriBuffer + line;
-
- this.buffer[uri.toString()] = uriBuffer;
- this.onDidChangeEmitter.fire(uri);
+ this.updateBuffer(uri, line);
}
private onDidCloseConsumer(uri: vscode.Uri): void {
@@ -93,25 +86,46 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
private onDidCloseTextDocument(document: vscode.TextDocument): void {
// When language is plaintext we assume the event was triggered as a result of switching language mode
- if (document.uri.scheme !== "kafka" || document.languageId === "plaintext") {
+ const uri = document.uri;
+ if (uri.scheme !== "kafka" || document.languageId === "plaintext") {
return;
}
- const buffer = this.buffer[document.uri.toString()];
-
- if (!buffer) {
+ if (!this.isActive(uri)) {
return;
}
- if (this.consumerCollection.has(document.uri)) {
- this.consumerCollection.close(document.uri);
+ if (this.consumerCollection.has(uri)) {
+ this.consumerCollection.close(uri);
}
- delete this.buffer[document.uri.toString()];
+ delete this.buffer[uri.toString()];
+ }
+
+ public clear(document: vscode.TextDocument): void {
+ const uri = document.uri;
+ if (!this.isActive(uri)) {
+ return;
+ }
+ this.updateBuffer(uri, '', true);
}
public dispose(): void {
this.consumerCollection.dispose();
- this.disposables.forEach(d=>d.dispose());
+ this.disposables.forEach(d => d.dispose());
+ }
+
+ private isActive(uri: vscode.Uri): boolean {
+ return this.buffer.hasOwnProperty(uri.toString());
+ }
+
+ private updateBuffer(uri: vscode.Uri, content: string, replace = false) {
+ if (replace) {
+ this.buffer[uri.toString()] = content;
+ } else {
+ const uriBuffer = this.buffer[uri.toString()];
+ this.buffer[uri.toString()] = uriBuffer + content;
+ }
+ this.onDidChangeEmitter.fire(uri);
}
}