Skip to content

Commit

Permalink
Errors don't surface when starting a consumer fails to authenticate
Browse files Browse the repository at this point in the history
Fixes jlandersen#104

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Feb 26, 2021
1 parent 0178376 commit 23919d7
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 66 deletions.
90 changes: 67 additions & 23 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, PartitionAssigners, AssignerProtocol, SeekEntry } from "kafkajs";
import { URLSearchParams } from "url";

import * as vscode from "vscode";

import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";

Expand Down Expand Up @@ -31,12 +29,19 @@ export interface ConsumerChangedStatusEvent {
status: "created" | "rebalancing" | "rebalanced";
}

export enum ConsumerLaunchState {
none,
starting,
started,
closing,
closed
}

export interface ConsumerCollectionChangedEvent {
created: vscode.Uri[];
closed: vscode.Uri[];
consumers: Consumer[];
}

class Consumer implements vscode.Disposable {
export class Consumer implements vscode.Disposable {
private kafkaClient?: Kafka;
private consumer?: KafkaJsConsumer;
private onDidReceiveMessageEmitter = new vscode.EventEmitter<RecordReceivedEvent>();
Expand All @@ -49,25 +54,33 @@ class Consumer implements vscode.Disposable {

public readonly clusterId: string;
public readonly options: ConsumerOptions;
public state: ConsumerLaunchState = ConsumerLaunchState.none;
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

if (!cluster) {
throw new Error(`Cannot create consumer, unknown cluster ${clusterId}`);
}
try {
if (!cluster) {
throw new Error(`Cannot create consumer, unknown cluster ${clusterId}`);
}

const settings = getWorkspaceSettings();
this.options = {
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
};
const settings = getWorkspaceSettings();
this.options = {
bootstrap: cluster.bootstrap,
saslOption: cluster.saslOption,
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
};
}
catch (e) {
this.error = e;
throw e;
}
}

/***
Expand Down Expand Up @@ -210,16 +223,37 @@ export class ConsumerCollection implements vscode.Disposable {
/**
* Creates a new consumer for a provided uri.
*/
create(uri: vscode.Uri): Consumer {
async create(uri: vscode.Uri): Promise<Consumer> {
// Create the consumer
const consumer = new Consumer(uri, this.clusterSettings);
this.consumers[uri.toString()] = consumer;
consumer.start();

// Fire an event to notify that Consumer is starting
consumer.state = ConsumerLaunchState.starting;
this.onDidChangeCollectionEmitter.fire({
created: [uri],
closed: [],
consumers: [consumer]
});

// Start the consumer
await consumer.start()
.then(() => consumer.state = ConsumerLaunchState.started)
.catch(e => {
delete this.consumers[uri.toString()];
consumer.state = ConsumerLaunchState.none;
consumer.error = e;
throw e;
})
.finally(() => {
// Fire an event to notify that consumer state changed
// with a delay because when start is done quickly
// the trace 'Consumer: started' is not displayed.
setTimeout(() => {
this.onDidChangeCollectionEmitter.fire({
consumers: [consumer]
});
}, 200);
});

return consumer;
}

Expand Down Expand Up @@ -262,17 +296,27 @@ export class ConsumerCollection implements vscode.Disposable {
/**
* Closes an existing consumer if exists.
*/
close(uri: vscode.Uri): void {
async close(uri: vscode.Uri): Promise<void> {
const consumer = this.get(uri);

if (consumer === null) {
return;
}

// Fire an event to notify that consumer is closing
consumer.state = ConsumerLaunchState.closing;
this.onDidChangeCollectionEmitter.fire({
consumers: [consumer]
});

consumer.dispose();
delete this.consumers[uri.toString()];

this.onDidChangeCollectionEmitter.fire({ created: [], closed: [uri] });
// Fire an event to notify that consumer is closed
consumer.state = ConsumerLaunchState.closed;
this.onDidChangeCollectionEmitter.fire({
consumers: [consumer]
});
}

/**
Expand Down
76 changes: 62 additions & 14 deletions src/commands/consumers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import * as vscode from "vscode";

import { pickClient, pickConsumerGroupId, pickTopic } from "./common";
import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, parsePartitions } from "../client";
import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, parsePartitions, ConsumerLaunchState } from "../client";
import { KafkaExplorer } from "../explorer";
import { ConsumerVirtualTextDocumentProvider } from "../providers";
import { ProgressLocation, window } from "vscode";

export interface LaunchConsumerCommand extends ConsumerInfoUri {

Expand Down Expand Up @@ -62,18 +63,17 @@ abstract class LaunchConsumerCommandHandler {
validatePartitions(command.partitions);
validateOffset(command.fromOffset);

// Start the consumer and open the document which tracks consumer messages.
// Open the document which tracks consumer messages.
const consumeUri = createConsumerUri(command);
this.consumerCollection.create(consumeUri);
openDocument(consumeUri);
this.explorer.refresh();

// Start the consumer
await startConsumerWithProgress(consumeUri, this.consumerCollection, this.explorer);
} else {
// Stop consumer
// Stop the consumer
if (consumer) {
const consumeUri = consumer.uri;
this.consumerCollection.close(consumeUri);
openDocument(consumeUri);
this.explorer.refresh();
await stopConsumerWithProgress(consumeUri, this.consumerCollection, this.explorer);
}
}
}
Expand All @@ -85,7 +85,7 @@ abstract class LaunchConsumerCommandHandler {

export class StartConsumerCommandHandler extends LaunchConsumerCommandHandler {

public static commandID = 'vscode-kafka.consumer.start';
public static commandId = 'vscode-kafka.consumer.start';

constructor(
clientAccessor: ClientAccessor,
Expand Down Expand Up @@ -138,15 +138,15 @@ export class ToggleConsumerCommandHandler {
return;
}

const { document } = vscode.window.activeTextEditor;
if (document.uri.scheme !== "kafka") {
const { uri } = vscode.window.activeTextEditor.document;
if (uri.scheme !== "kafka") {
return;
}

if (this.consumerCollection.has(document.uri)) {
this.consumerCollection.close(document.uri);
if (this.consumerCollection.has(uri)) {
await stopConsumerWithProgress(uri, this.consumerCollection);
} else {
this.consumerCollection.create(document.uri);
await startConsumerWithProgress(uri, this.consumerCollection);
}
}
}
Expand Down Expand Up @@ -271,6 +271,54 @@ export class DeleteConsumerGroupCommandHandler {
}
}

async function startConsumerWithProgress(consumeUri: vscode.Uri, consumerCollection: ConsumerCollection, explorer?: KafkaExplorer) {
const consumer = consumerCollection.get(consumeUri);
if (consumer && consumer.state === ConsumerLaunchState.closing) {
vscode.window.showErrorMessage(`The consumer cannot be started because it is stopping.`);
return;
}
await window.withProgress({
location: ProgressLocation.Window,
title: `Starting consumer '${consumeUri}'.`,
cancellable: false
}, (progress, token) => {
return new Promise((resolve, reject) => {
consumerCollection.create(consumeUri)
.then(consumer => {
if (explorer) {
explorer.refresh();
}
resolve(consumer);
})
.catch(error => reject(error));
});
});
}

async function stopConsumerWithProgress(consumeUri: vscode.Uri, consumerCollection: ConsumerCollection, explorer?: KafkaExplorer) {
const consumer = consumerCollection.get(consumeUri);
if (consumer && consumer.state === ConsumerLaunchState.starting) {
vscode.window.showErrorMessage(`The consumer cannot be stopped because it is starting.`);
return;
}
await window.withProgress({
location: ProgressLocation.Window,
title: `Stopping consumer '${consumeUri}'.`,
cancellable: false
}, (progress, token) => {
return new Promise((resolve, reject) => {
consumerCollection.close(consumeUri)
.then(() => {
if (explorer) {
explorer.refresh();
}
resolve(true);
})
.catch(error => reject(error));
});
});
}

async function openDocument(uri: vscode.Uri): Promise<void> {

const visibleConsumerEditor = vscode.window.visibleTextEditors.find(te => te.document.uri.toString() === uri.toString());
Expand Down
4 changes: 2 additions & 2 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function activate(context: vscode.ExtensionContext): void {
context.subscriptions.push(explorer);
context.subscriptions.push(new ConsumerStatusBarItem(consumerCollection));
context.subscriptions.push(new SelectedClusterStatusBarItem(clusterSettings));
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection);
const consumerVirtualTextDocumentProvider = new ConsumerVirtualTextDocumentProvider(consumerCollection, clusterSettings);

// Commands
const createTopicCommandHandler = new CreateTopicCommandHandler(clientAccessor, clusterSettings, explorer);
Expand Down Expand Up @@ -110,7 +110,7 @@ export function activate(context: vscode.ExtensionContext): void {
"vscode-kafka.explorer.deleteselected",
handleErrors((_item?: any, selection?: NodeBase[]) => explorer.deleteSelectedItem(_item, selection))));
context.subscriptions.push(vscode.commands.registerCommand(
StartConsumerCommandHandler.commandID,
StartConsumerCommandHandler.commandId,
handleErrors((command?: LaunchConsumerCommand) => startConsumerCommandHandler.execute(command))));
context.subscriptions.push(vscode.commands.registerCommand(
StopConsumerCommandHandler.commandId,
Expand Down
52 changes: 37 additions & 15 deletions src/kafka-file/codeLensProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as vscode from "vscode";
import { ConsumerCollection, ConsumerCollectionChangedEvent } from "../client";
import { ConsumerCollection, ConsumerCollectionChangedEvent, ConsumerLaunchState } from "../client";
import { LaunchConsumerCommand, StartConsumerCommandHandler, StopConsumerCommandHandler, ProduceRecordCommand, ProduceRecordCommandHandler, SelectClusterCommandHandler } from "../commands";
import { ClusterSettings } from "../settings";

Expand Down Expand Up @@ -169,33 +169,55 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
const lenses: vscode.CodeLens[] = [];
if (clusterName) {
const consumer = this.consumerCollection.getByConsumerGroupId(launchCommand.clusterId, launchCommand.consumerGroupId);
const started = consumer ? true : false;
const status = started ? '$(check)' : '$(x)';
const consumerState = consumer ? consumer.state : ConsumerLaunchState.none;
const status = this.getConsumerStatus(consumerState);

// Add status lens
lenses.push(new vscode.CodeLens(lineRange, {
title: `${status}`,
command: ''
}));

// Add Start/Stop consumer lens
if (!started) {
lenses.push(new vscode.CodeLens(lineRange, {
title: `Start consumer`,
command: StartConsumerCommandHandler.commandID,
arguments: [launchCommand]
}));
} else {
lenses.push(new vscode.CodeLens(lineRange, {
title: `Stop consumer`,
command: StopConsumerCommandHandler.commandId,
arguments: [launchCommand]
}));
switch (consumerState) {
case ConsumerLaunchState.starting:
case ConsumerLaunchState.closing:
// No lens
break;
case ConsumerLaunchState.started:
lenses.push(new vscode.CodeLens(lineRange, {
title: `Stop consumer`,
command: StopConsumerCommandHandler.commandId,
arguments: [launchCommand]
}));
break;
default:
lenses.push(new vscode.CodeLens(lineRange, {
title: `Start consumer`,
command: StartConsumerCommandHandler.commandId,
arguments: [launchCommand]
}));
break;
}
}
// Add cluster lens
lenses.push(this.createClusterLens(lineRange, clusterName));
return lenses;
}

private getConsumerStatus(state: ConsumerLaunchState): string {
switch (state) {
case ConsumerLaunchState.starting:
return 'Starting...';
case ConsumerLaunchState.closing:
return 'Stopping...';
case ConsumerLaunchState.started:
return '$(check)';
default:
return '$(x)';
}
}

private createLaunchConsumerCommand(document: vscode.TextDocument, range: vscode.Range, selectedClusterId: string | undefined): LaunchConsumerCommand {
let consumerGroupId;
let topicId;
Expand Down
Loading

0 comments on commit 23919d7

Please sign in to comment.