Skip to content

Commit

Permalink
Errors don't surface when starting a consumer fails to authenticate
Browse files Browse the repository at this point in the history
Fixes jlandersen#104

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Feb 26, 2021
1 parent 0178376 commit aa74a93
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 55 deletions.
84 changes: 62 additions & 22 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, PartitionAssigners, AssignerProtocol, SeekEntry } from "kafkajs";
import { URLSearchParams } from "url";

import * as vscode from "vscode";

import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";

Expand Down Expand Up @@ -31,12 +29,19 @@ export interface ConsumerChangedStatusEvent {
status: "created" | "rebalancing" | "rebalanced";
}

export enum ConsumerLaunchState {
none,
starting,
started,
closing,
closed
}

export interface ConsumerCollectionChangedEvent {
created: vscode.Uri[];
closed: vscode.Uri[];
consumers: Consumer[];
}

class Consumer implements vscode.Disposable {
export class Consumer implements vscode.Disposable {
private kafkaClient?: Kafka;
private consumer?: KafkaJsConsumer;
private onDidReceiveMessageEmitter = new vscode.EventEmitter<RecordReceivedEvent>();
Expand All @@ -49,25 +54,33 @@ class Consumer implements vscode.Disposable {

public readonly clusterId: string;
public readonly options: ConsumerOptions;
public state: ConsumerLaunchState = ConsumerLaunchState.none;
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

if (!cluster) {
throw new Error(`Cannot create consumer, unknown cluster ${clusterId}`);
}
try {
if (!cluster) {
throw new Error(`Cannot create consumer, unknown cluster ${clusterId}`);
}

const settings = getWorkspaceSettings();
this.options = {
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
};
const settings = getWorkspaceSettings();
this.options = {
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
};
}
catch (e) {
this.error = e;
throw e;
}
}

/***
Expand Down Expand Up @@ -210,16 +223,33 @@ export class ConsumerCollection implements vscode.Disposable {
/**
* Creates a new consumer for a provided uri.
*/
create(uri: vscode.Uri): Consumer {
async create(uri: vscode.Uri): Promise<Consumer> {
// Create the consumer
const consumer = new Consumer(uri, this.clusterSettings);
this.consumers[uri.toString()] = consumer;
consumer.start();

// Fire an event to notify that Consumer is starting
consumer.state = ConsumerLaunchState.starting;
this.onDidChangeCollectionEmitter.fire({
created: [uri],
closed: [],
consumers: [consumer]
});

// Start the consumer
await consumer.start()
.then(() => consumer.state = ConsumerLaunchState.started)
.catch(e => {
delete this.consumers[uri.toString()];
consumer.state = ConsumerLaunchState.none;
consumer.error = e;
throw e;
})
.finally(() => {
// Fire an event to notify that consumer state changed
this.onDidChangeCollectionEmitter.fire({
consumers: [consumer]
});
});

return consumer;
}

Expand Down Expand Up @@ -269,10 +299,20 @@ export class ConsumerCollection implements vscode.Disposable {
return;
}

// Fire an event to notify that consumer is closing
consumer.state = ConsumerLaunchState.closing;
this.onDidChangeCollectionEmitter.fire({
consumers: [consumer]
});

consumer.dispose();
delete this.consumers[uri.toString()];

this.onDidChangeCollectionEmitter.fire({ created: [], closed: [uri] });
// Fire an event to notify that consumer is closed
consumer.state = ConsumerLaunchState.closed;
this.onDidChangeCollectionEmitter.fire({
consumers: [consumer]
});
}

/**
Expand Down
20 changes: 16 additions & 4 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { pickClient, pickConsumerGroupId, pickTopic } from "./common";
import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, parsePartitions } from "../client";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";
import { ProgressLocation, window } from "vscode";

export interface LaunchConsumerCommand extends ConsumerInfoUri {

Expand Down Expand Up @@ -64,15 +65,26 @@ abstract class LaunchConsumerCommandHandler {

// Start the consumer and open the document which tracks consumer messages.
const consumeUri = createConsumerUri(command);
this.consumerCollection.create(consumeUri);
openDocument(consumeUri);
this.explorer.refresh();
await window.withProgress({
location: ProgressLocation.Window,
title: `Starting consumer for '${command.topicId}' topic.`,
cancellable: false
}, (progress, token) => {
return new Promise((resolve, reject) => {
this.consumerCollection.create(consumeUri)
.then(consumer => {
this.explorer.refresh();
resolve(consumer);
})
.catch(error => reject(error));
});
});
} else {
// Stop consumer
if (consumer) {
const consumeUri = consumer.uri;
this.consumerCollection.close(consumeUri);
openDocument(consumeUri);
this.explorer.refresh();
}
}
Expand All @@ -85,7 +97,7 @@ abstract class LaunchConsumerCommandHandler {

export class StartConsumerCommandHandler extends LaunchConsumerCommandHandler {

public static commandID = 'vscode-kafka.consumer.start';
public static commandId = 'vscode-kafka.consumer.start';

constructor(
clientAccessor: ClientAccessor,
Expand Down
4 changes: 2 additions & 2 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(explorer);
context.subscriptions.push(new ConsumerStatusBarItem(consumerCollection));
context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings));
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection);
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection, clusterSettings);

// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
Expand Down Expand Up @@ -110,7 +110,7 @@ export function activate(context: vscode.ExtensionContext): void {
"vscode-kafka.explorer.deleteselected",
handleErrors((_item?: any, selection?: NodeBase[]) => explorer.deleteSelectedItem(_item, selection))));
context.subscriptions.push(vscode.commands.registerCommand(
StartConsumerCommandHandler.commandID,
StartConsumerCommandHandler.commandId,
handleErrors((command?: LaunchConsumerCommand) => startConsumerCommandHandler.execute(command))));
context.subscriptions.push(vscode.commands.registerCommand(
StopConsumerCommandHandler.commandId,
Expand Down
48 changes: 33 additions & 15 deletions src/kafka-file/codeLensProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as vscode from "vscode";
import { ConsumerCollection, ConsumerCollectionChangedEvent } from "../client";
import { ConsumerCollection, ConsumerCollectionChangedEvent, ConsumerLaunchState } from "../client";
import { LaunchConsumerCommand, StartConsumerCommandHandler, StopConsumerCommandHandler, ProduceRecordCommand, ProduceRecordCommandHandler, SelectClusterCommandHandler } from "../commands";
import { ClusterSettings } from "../settings";

Expand Down Expand Up @@ -169,33 +169,51 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
const lenses: vscode.CodeLens[] = [];
if (clusterName) {
const consumer = this.consumerCollection.getByConsumerGroupId(launchCommand.clusterId, launchCommand.consumerGroupId);
const started = consumer ? true : false;
const status = started ? '$(check)' : '$(x)';
const consumerState = consumer ? consumer.state : ConsumerLaunchState.none;
const status = this.getConsumerStatus(consumerState);

// Add status lens
lenses.push(new vscode.CodeLens(lineRange, {
title: `${status}`,
command: ''
}));

// Add Start/Stop consumer lens
if (!started) {
lenses.push(new vscode.CodeLens(lineRange, {
title: `Start consumer`,
command: StartConsumerCommandHandler.commandID,
arguments: [launchCommand]
}));
} else {
lenses.push(new vscode.CodeLens(lineRange, {
title: `Stop consumer`,
command: StopConsumerCommandHandler.commandId,
arguments: [launchCommand]
}));
switch (consumerState) {
case ConsumerLaunchState.starting:
// No lens
break;
case ConsumerLaunchState.started:
lenses.push(new vscode.CodeLens(lineRange, {
title: `Stop consumer`,
command: StopConsumerCommandHandler.commandId,
arguments: [launchCommand]
}));
default:
lenses.push(new vscode.CodeLens(lineRange, {
title: `Start consumer`,
command: StartConsumerCommandHandler.commandId,
arguments: [launchCommand]
}));
break;
}
}
// Add cluster lens
lenses.push(this.createClusterLens(lineRange, clusterName));
return lenses;
}

private getConsumerStatus(state: ConsumerLaunchState): string {
switch (state) {
case ConsumerLaunchState.starting:
return 'Starting...';
case ConsumerLaunchState.started:
return '$(check)';
default:
return '$(x)';
}
}

private createLaunchConsumerCommand(document: vscode.TextDocument, range: vscode.Range, selectedClusterId: string | undefined): LaunchConsumerCommand {
let consumerGroupId;
let topicId;
Expand Down
59 changes: 48 additions & 11 deletions src/providers/consumerVirtualTextDocumentProvider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as vscode from "vscode";

import { ConsumedRecord, ConsumerChangedStatusEvent, ConsumerCollection, ConsumerCollectionChangedEvent, RecordReceivedEvent } from "../client";
import { ConsumedRecord, Consumer, ConsumerChangedStatusEvent, ConsumerCollection, ConsumerCollectionChangedEvent, ConsumerLaunchState, RecordReceivedEvent } from "../client";
import { CommonMessages } from "../constants";
import { ClusterSettings } from "../settings/clusters";

export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentContentProvider, vscode.Disposable {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand All @@ -12,22 +13,34 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
private onDidChangeEmitter = new vscode.EventEmitter<vscode.Uri>();
public onDidChange = this.onDidChangeEmitter.event;

constructor(private consumerCollection: ConsumerCollection) {
constructor(private consumerCollection: ConsumerCollection, private clusterSettings: ClusterSettings) {
this.disposables.push(vscode.workspace.onDidCloseTextDocument((e: vscode.TextDocument) => {
this.onDidCloseTextDocument(e);
}));

this.disposables.push(this.consumerCollection.onDidChangeCollection((e: ConsumerCollectionChangedEvent) => {
for (const startedUri of e.created) {
if (!this.buffer[startedUri.toString()]) {
this.buffer[startedUri.toString()] = '';
for (const consumer of e.consumers) {
const uri = consumer.uri;
switch (consumer.state) {
case ConsumerLaunchState.starting:
if (!this.buffer[uri.toString()]) {
this.buffer[uri.toString()] = '';
this.displayConsumerOptions(consumer);
}
this.onDidChangeStatus(uri, 'starting...');
this.attachToConsumer(uri);
break;
case ConsumerLaunchState.started:
this.onDidChangeStatus(uri, 'started');
break;
case ConsumerLaunchState.closed:
this.onDidCloseConsumer(uri);
default:
if (consumer.error) {
this.onDidConsumerError(uri, consumer.error);
}
break;
}
this.onDidChangeStatus(startedUri, 'started');
this.attachToConsumer(startedUri);
}

for (const closedUri of e.closed) {
this.onDidCloseConsumer(closedUri);
}
}));
}
Expand Down Expand Up @@ -59,6 +72,30 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
}));
}

private displayConsumerOptions(consumer: Consumer): void {
const clusterName = this.clusterSettings.get(consumer.clusterId)?.name;
let line = `Consumer options:\n`;
if (clusterName) {
line += ` - cluster: ${clusterName}\n`;
}
line += ` - bootstrap: ${consumer.options.bootstrap}\n`;
line += ` - topic: ${consumer.options.topicId}\n`;
line += ` - from: ${consumer.options.fromOffset}\n`;
if (consumer.options.partitions) {
line += ` - partitions: ${consumer.options.partitions}\n`;
}
line += `\n`;
this.updateBuffer(consumer.uri, line);
}

onDidConsumerError(uri: vscode.Uri, error: any): void {
if (!this.isActive(uri)) {
return;
}
const line = `Error: ${error}\n\n`;
this.updateBuffer(uri, line);
}

private onDidChangeStatus(uri: vscode.Uri, status: string): void {
if (!this.isActive(uri)) {
return;
Expand Down
2 changes: 1 addition & 1 deletion syntaxes/kafka-consumer.tmLanguage.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"patterns": [
{
"name": "meta.header.kafka.consumer",
"match": "(?i)^(?:(Consumer|Key|Partition|Offset|Value)(:)\\s*)(.*)?$",
"match": "(?i)^(?:(Consumer options|Consumer|Key|Partition|Offset|Value|Error)(:)\\s*)(.*)?$",
"captures": {
"1": {
"name": "keyword.other.kafka-consumer"
Expand Down

0 comments on commit aa74a93

Please sign in to comment.