Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an option to clear the consumer view #105

Merged
merged 1 commit into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to Kafka extension will be documented in this file.
- Selected Cluster or Topic can now be deleted via the Delete shortcut (Cmd+Backspace on Mac). See [#79](https://github.com/jlandersen/vscode-kafka/issues/79)
- Added SASL/SCRAM-256 and SASL/SCRAM-512 authentication support. See [#3](https://github.com/jlandersen/vscode-kafka/issues/3).
- Added the option to enable basic SSL support for clusters without authentication. See [#84](https://github.com/jlandersen/vscode-kafka/issues/84).
- The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
36 changes: 35 additions & 1 deletion docs/Consuming.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,44 @@
# Consuming messages

Consuming topics can be done by right clicking a topic in the explorer or from the command palette. Some things to note about consuming:
Consuming topics can be done:

* from the [Kafka Explorer](#kafka-explorer), by right-clicking on a topic and selecting "Start Consumer".
* from the [Start Consumer](#start-consumer-command), from the command palette.

## Consume with ...

### Kafka explorer

You can start consuming messages from the [Kafka Explorer](Explorer.md#explorer), by right-clicking on a topic:

![Start Consumer with Explorer](assets/start-consumer-from-explorer.png)

Once this command is launched, it creates a consumer group (with an auto-generated id), and opens the [Consumer View](#consumer-view) where you can see the messages being consumed:

![Consumer group / Consumer View](assets/consumer-group-after-starting-from-explorer.png)

In this case, the starting offset can be only be configured via the [kafka.consumers.offset](#kafkaconsumersoffset) preference.

Known limitations:

* UTF-8 encoded keys and values only. If data is encoded differently, it will not be pretty.
* One consumer group is created per topic (may change in the future to just have one for the extension).

### Start Consumer command

![Start Consumer from command palette](assets/start-consumer-from-command.png)

## Consumer View

The `Consumer View` is a read-only editor which shows consumed messages for a given topic:

![Consumer view](assets/consumer-view.png)

This editor provides 2 commands on the top right of the editor:

* `Clear Consumer View`: clears the view.
* `Start/Stop`: to stop or (re)start the consumer.

Consumers are based on virtual documents, available in the VS Code extension API. A consumer will keep running even if you close the document in the editor. You should make sure to close the consumer explicitly, either via the command palette, the status bar element or the start/stop action button as well. The VS Code API does not support detecting if a virtual document is closed immediately. Instead, the underlying virtual document is automatically closed after two minutes if the document is closed in the editor.

## Preferences
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/consumer-view.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/start-consumer-from-command.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/start-consumer-from-explorer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions images/dark/clear-all.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions images/light/clear-all.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 15 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"onCommand:vscode-kafka.consumer.consume",
"onCommand:vscode-kafka.consumer.list",
"onCommand:vscode-kafka.consumer.toggle",
"onCommand:vscode-kafka.consumer.clear",
"onCommand:vscode-kafka.explorer.copylabel",
"onCommand:vscode-kafka.explorer.deleteselected",
"onView:kafkaExplorer",
Expand Down Expand Up @@ -270,6 +271,15 @@
"light": "images/light/toggle.svg",
"dark": "images/dark/toggle.svg"
}
},
{
"command": "vscode-kafka.consumer.clear",
"title": "Clear Consumer View",
"category": "Kafka",
"icon": {
"light": "images/light/clear-all.svg",
"dark": "images/dark/clear-all.svg"
}
}
],
"menus": {
Expand Down Expand Up @@ -337,6 +347,11 @@
"command": "vscode-kafka.consumer.toggle",
"group": "navigation",
"when": "resourceScheme == kafka"
},
{
"command": "vscode-kafka.consumer.clear",
"group": "navigation",
"when": "resourceScheme == kafka"
}
]
},
Expand Down
41 changes: 32 additions & 9 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pickTopic } from "./common";
import { ClusterSettings } from "../settings";
import { CommonMessages } from "../constants";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";

export interface StartConsumerCommand {
clusterId: string;
Expand All @@ -17,7 +18,7 @@ export class StartConsumerCommandHandler {
private clusterSettings: ClusterSettings,
private consumerCollection: ConsumerCollection,
private explorer: KafkaExplorer
) {
) {
}

async execute(startConsumerCommand?: StartConsumerCommand): Promise<void> {
Expand Down Expand Up @@ -55,6 +56,9 @@ export class StartConsumerCommandHandler {
}

export class ToggleConsumerCommandHandler {

public static COMMAND_ID = 'vscode-kafka.consumer.toggle';

constructor(private consumerCollection: ConsumerCollection) {
}

Expand All @@ -75,6 +79,25 @@ export class ToggleConsumerCommandHandler {
}
}
}
export class ClearConsumerViewCommandHandler {

public static COMMAND_ID = 'vscode-kafka.consumer.clear';

constructor(private provider: ConsumerVirtualTextDocumentProvider) {

}
async execute(): Promise<void> {
if (!vscode.window.activeTextEditor) {
return;
}

const { document } = vscode.window.activeTextEditor;
if (document.uri.scheme !== "kafka") {
return;
}
this.provider.clear(document);
}
}

enum ConsumerOption {
Open,
Expand Down Expand Up @@ -144,7 +167,7 @@ async function openDocument(uri: vscode.Uri): Promise<void> {

// If there's no document we open it
if (!document) {
document = await vscode.workspace.openTextDocument(uri);
document = await vscode.workspace.openTextDocument(uri);
}

// Check if there's an active editor, to later decide in which column the consumer
Expand All @@ -163,12 +186,12 @@ async function openDocument(uri: vscode.Uri): Promise<void> {
// Instead, a new TextEditor instance is added to the active panel. This is the
// default vscode behavior
await vscode.window.showTextDocument(
document,
{
preview: false,
preserveFocus: true,
viewColumn: hasActiveEditor?vscode.ViewColumn.Beside:vscode.ViewColumn.Active,
}
);
document,
{
preview: false,
preserveFocus: true,
viewColumn: hasActiveEditor ? vscode.ViewColumn.Beside : vscode.ViewColumn.Active,
}
);
await vscode.languages.setTextDocumentLanguage(document, "kafka-consumer");
}
12 changes: 9 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
DeleteClusterCommandHandler,
SelectClusterCommandHandler,
handleErrors,
ClearConsumerViewCommandHandler,
} from "./commands";
import { Context } from "./context";
import { BrokerItem, KafkaExplorer, TopicItem } from "./explorer";
Expand Down Expand Up @@ -50,6 +51,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(explorer);
context.subscriptions.push(new ConsumerStatusBarItem(consumerCollection));
context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings));
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection)

// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
Expand All @@ -58,6 +60,7 @@ export function activate(context: vscode.ExtensionContext): void {
const startConsumerCommandHandler = new StartConsumerCommandHandler(clientAccessor, clusterSettings, consumerCollection, explorer);
const listConsumersCommandHandler = new ListConsumersCommandHandler(consumerCollection);
const toggleConsumerCommandHandler = new ToggleConsumerCommandHandler(consumerCollection);
const clearConsumerViewCommandHandler = new ClearConsumerViewCommandHandler(consumerVirtualTextDocumentProvider);
const addClusterCommandHandler = new AddClusterCommandHandler(clusterSettings, explorer);
const deleteClusterCommandHandler = new DeleteClusterCommandHandler(clusterSettings, clientAccessor, explorer);
const selectClusterCommandHandler = new SelectClusterCommandHandler(clusterSettings);
Expand Down Expand Up @@ -108,8 +111,12 @@ export function activate(context: vscode.ExtensionContext): void {
"vscode-kafka.consumer.list",
handleErrors(() => listConsumersCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
"vscode-kafka.consumer.toggle",
ToggleConsumerCommandHandler.COMMAND_ID,
handleErrors(() => toggleConsumerCommandHandler.execute())));
context.subscriptions.push(vscode.commands.registerCommand(
ClearConsumerViewCommandHandler.COMMAND_ID,
handleErrors(() => clearConsumerViewCommandHandler.execute())));

registerVSCodeKafkaDocumentationCommands(context);

// .kafka file related
Expand All @@ -123,8 +130,7 @@ export function activate(context: vscode.ExtensionContext): void {
vscode.languages.registerCodeLensProvider(documentSelector, new ProducerCodeLensProvider()));

context.subscriptions.push(vscode.workspace.registerTextDocumentContentProvider(
ConsumerVirtualTextDocumentProvider.SCHEME,
new ConsumerVirtualTextDocumentProvider(consumerCollection)));
ConsumerVirtualTextDocumentProvider.SCHEME, consumerVirtualTextDocumentProvider));
}

export function deactivate(): void {
Expand Down
60 changes: 37 additions & 23 deletions src/providers/consumerVirtualTextDocumentProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, Consume
import { CommonMessages } from "../constants";

export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable {

public static SCHEME = "kafka";
private buffer: { [id: string]: string } = {};
private disposables: vscode.Disposable[] = [];
Expand Down Expand Up @@ -32,10 +33,9 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

public provideTextDocumentContent(uri: vscode.Uri): string {
if (!this.buffer.hasOwnProperty(uri.toString())) {
if (!this.isActive(uri)) {
return "";
}

return this.buffer[uri.toString()];
}

Expand All @@ -60,27 +60,20 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}

private onDidChangeStatus(uri: vscode.Uri, status: string): void {
let uriBuffer = this.buffer[uri.toString()];
if (!this.isActive(uri)) {
return;
}
const line = `Consumer: ${status}\n\n`;
uriBuffer = uriBuffer + line;

this.buffer[uri.toString()] = uriBuffer;
this.onDidChangeEmitter.fire(uri);
this.updateBuffer(uri, line);
}

private onDidReceiveRecord(uri: vscode.Uri, message: ConsumedRecord): void {
let uriBuffer = this.buffer[uri.toString()];

if (!uriBuffer) {
if (!this.isActive(uri)) {
return;
}

let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`;
line = line + `Value:\n${message.value}\n\n`;
uriBuffer = uriBuffer + line;

this.buffer[uri.toString()] = uriBuffer;
this.onDidChangeEmitter.fire(uri);
this.updateBuffer(uri, line);
}

private onDidCloseConsumer(uri: vscode.Uri): void {
Expand All @@ -93,25 +86,46 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC

private onDidCloseTextDocument(document: vscode.TextDocument): void {
// When language is plaintext we assume the event was triggered as a result of switching language mode
if (document.uri.scheme !== "kafka" || document.languageId === "plaintext") {
const uri = document.uri;
if (uri.scheme !== "kafka" || document.languageId === "plaintext") {
return;
}

const buffer = this.buffer[document.uri.toString()];

if (!buffer) {
if (!this.isActive(uri)) {
return;
}

if (this.consumerCollection.has(document.uri)) {
this.consumerCollection.close(document.uri);
if (this.consumerCollection.has(uri)) {
this.consumerCollection.close(uri);
}

delete this.buffer[document.uri.toString()];
delete this.buffer[uri.toString()];
}

public clear(document: vscode.TextDocument): void {
const uri = document.uri;
if (!this.isActive(uri)) {
return;
}
this.updateBuffer(uri, '', true);
}

public dispose(): void {
this.consumerCollection.dispose();
this.disposables.forEach(d=>d.dispose());
this.disposables.forEach(d => d.dispose());
}

private isActive(uri: vscode.Uri): boolean {
return this.buffer.hasOwnProperty(uri.toString());
}

private updateBuffer(uri: vscode.Uri, content: string, replace = false) {
if (replace) {
this.buffer[uri.toString()] = content;
} else {
const uriBuffer = this.buffer[uri.toString()];
this.buffer[uri.toString()] = uriBuffer + content;
}
this.onDidChangeEmitter.fire(uri);
}
}