Skip to content

Commit

Permalink
Show cluster state in kafka file
Browse files Browse the repository at this point in the history
Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed Apr 28, 2021
1 parent 33eb1a2 commit e265c0a
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Change Log
All notable changes to `Tools for Apache Kafka®` are documented in this file.

## [0.13.0]
### Added
- Show cluster state in kafka file. See [#175](https://github.com/jlandersen/vscode-kafka/pull/175).

## [0.12.0] - 2021-04-26
### Added
- Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123) and [#160](https://github.com/jlandersen/vscode-kafka/pull/160).
Expand Down
28 changes: 21 additions & 7 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Admin, ConfigResourceTypes, Consumer, ConsumerConfig, Kafka, KafkaConfig, Producer, SeekEntry } from "kafkajs";

import { Disposable } from "vscode";
import { ClientAccessor, ClientState } from ".";
import { getClusterProvider } from "../kafka-extensions/registry";
import { WorkspaceSettings } from "../settings";

Expand Down Expand Up @@ -81,6 +82,7 @@ export interface ConsumerGroupMember {
}

export interface Client extends Disposable {
state: ClientState;
cluster: Cluster;
producer(): Promise<Producer>;
consumer(config?: ConsumerConfig): Promise<Consumer>;
Expand All @@ -95,10 +97,11 @@ export interface Client extends Disposable {
createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]>;
deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void>;
fetchTopicPartitions(topic: string): Promise<number[]>;
fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>>;
fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>>;
}

class EnsureConnectedDecorator implements Client {
public state = ClientState.disconnected;

constructor(private client: Client) {
}
Expand All @@ -107,12 +110,14 @@ class EnsureConnectedDecorator implements Client {
return this.client.cluster;
}

public producer(): Promise<Producer> {
return this.client.producer();
public async producer(): Promise<Producer> {
await this.waitUntilConnected();
return await this.client.producer();
}

public consumer(config?: ConsumerConfig): Promise<Consumer> {
return this.client.consumer(config);
public async consumer(config?: ConsumerConfig): Promise<Consumer> {
await this.waitUntilConnected();
return await this.client.consumer(config);
}

public connect(): Promise<void> {
Expand Down Expand Up @@ -169,7 +174,7 @@ class EnsureConnectedDecorator implements Client {
return await this.client.fetchTopicPartitions(topic);
}

public async fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>> {
public async fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>> {
await this.waitUntilConnected();
return await this.client.fetchTopicOffsets(topic);
}
Expand All @@ -179,9 +184,13 @@ class EnsureConnectedDecorator implements Client {
}

private async waitUntilConnected(): Promise<void> {
const clientAccessor = ClientAccessor.getInstance();
try {
clientAccessor.changeState(this, ClientState.connecting);
await this.client.connect();
clientAccessor.changeState(this, ClientState.connected);
} catch (error) {
clientAccessor.changeState(this, ClientState.invalid);
if (error.message) {
throw new Error(`Failed operation - ${error.message}`);
} else {
Expand Down Expand Up @@ -220,6 +229,11 @@ class KafkaJsClient implements Client {
});
}


public get state(): ClientState {
return ClientState.disconnected;
}

private async getkafkaClient(): Promise<Kafka> {
const client = (await this.kafkaPromise).kafkaJsClient;
if (!client) {
Expand Down Expand Up @@ -387,7 +401,7 @@ class KafkaJsClient implements Client {
return partitionMetadata?.topics[0].partitions.map(m => m.partitionId) || [0];
}

async fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>> {
async fetchTopicOffsets(topic: string): Promise<Array<SeekEntry & { high: string; low: string }>> {
// returns the topics partitions
return (await this.getkafkaAdminClient()).fetchTopicOffsets(topic);
}
Expand Down
33 changes: 32 additions & 1 deletion src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
import { createClient, Client } from "./client";
import { getClusterSettings, ClusterSettings, getWorkspaceSettings } from "../settings";
import { Disposable } from "vscode";
import { Disposable, EventEmitter } from "vscode";

export enum ClientState {
connecting,
connected,
invalid,
disconnecting,
disconnected
}

interface ClientStateEvent {
client: Client;
}

/**
* Represents an accessor for retrieving the kafka client used for a cluster.
*/
export class ClientAccessor implements Disposable {

private static instance: ClientAccessor;
private clientsById: { [id: string]: Client } = {};
private clusterSettings: ClusterSettings;
private onDidChangeClientStateEmitter = new EventEmitter<ClientStateEvent>();

public onDidChangeClientState = this.onDidChangeClientStateEmitter.event;

constructor(clusterSettings: ClusterSettings) {
this.clusterSettings = clusterSettings;
Expand All @@ -32,6 +48,21 @@ export class ClientAccessor implements Disposable {
return this.clientsById.hasOwnProperty(clusterId);
}

public getState(clusterId: string): ClientState {
if (!this.has(clusterId)) {
return ClientState.disconnected;
}
const client = this.get(clusterId);
return client.state;
}

changeState(client: Client, state: ClientState) {
client.state = state;
this.onDidChangeClientStateEmitter.fire({
client
});
}

public getSelectedClusterClient(): Client | undefined {
const selectedCluster = this.clusterSettings.selected;

Expand Down
2 changes: 1 addition & 1 deletion src/explorer/models/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class ClusterItem extends NodeBase implements Disposable {
}

public dispose(): void {
this.client.dispose();
this.clientAccessor.remove(this.cluster.id);
}

async findTopictemByName(topicName: string): Promise<NodeBase | TopicItem | undefined> {
Expand Down
2 changes: 1 addition & 1 deletion src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic

// .kafka file related
context.subscriptions.push(
startLanguageClient(clusterSettings, workspaceSettings, producerCollection, consumerCollection, explorer, context)
startLanguageClient(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context)
);

context.subscriptions.push(
Expand Down
13 changes: 10 additions & 3 deletions src/kafka-file/kafkaFileClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import { TopicItem } from "../explorer";
import { KafkaModelProvider } from "../explorer/models/kafka";
import { ThrottledDelayer } from "./utils/async";
import { WorkspaceSettings } from "../settings";
import { ClientAccessor } from "../client";

export function startLanguageClient(
clusterSettings: ClusterSettings,
clientAccessor : ClientAccessor,
workspaceSettings: WorkspaceSettings,
producerCollection: ProducerCollection,
consumerCollection: ConsumerCollection,
Expand All @@ -26,7 +28,7 @@ export function startLanguageClient(
const kafkaFileDocuments = getLanguageModelCache<KafkaFileDocument>(10, 60, document => languageService.parseKafkaFileDocument(document));

// Create the Kafka file language service.
const languageService = createLanguageService(clusterSettings, producerCollection, consumerCollection, modelProvider);
const languageService = createLanguageService(clusterSettings, clientAccessor, producerCollection, consumerCollection, modelProvider);

const documentSelector = [
{ language: "kafka", scheme: "file" },
Expand All @@ -52,6 +54,8 @@ export function startLanguageClient(
clusterSettings.onDidChangeSelected((e) => {
codeLensProvider.refresh();
});
// 4.
clientAccessor.onDidChangeClientState(() => codeLensProvider.refresh());

// Completion
const completion = new KafkaFileCompletionItemProvider(kafkaFileDocuments, languageService, workspaceSettings);
Expand Down Expand Up @@ -92,7 +96,7 @@ export function startLanguageClient(
};
}

function createLanguageService(clusterSettings: ClusterSettings, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService {
function createLanguageService(clusterSettings: ClusterSettings, clientAccessor : ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService {
const producerLaunchStateProvider = {
getProducerLaunchState(uri: vscode.Uri): ProducerLaunchState {
const producer = producerCollection.get(uri);
Expand All @@ -110,9 +114,12 @@ function createLanguageService(clusterSettings: ClusterSettings, producerCollect
const selectedClusterProvider = {
getSelectedCluster() {
const selected = clusterSettings.selected;
const clusterId = selected?.id;
const clusterState = clusterId ? clientAccessor.getState(clusterId) : undefined;
return {
clusterId: selected?.id,
clusterId,
clusterName: selected?.name,
clusterState
};
}
} as SelectedClusterProvider;
Expand Down
4 changes: 2 additions & 2 deletions src/kafka-file/languageservice/kafkaFileLanguageService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CodeLens, CompletionList, Diagnostic, Position, TextDocument, Uri } from "vscode";
import { ConsumerLaunchState } from "../../client";
import { ClientState, ConsumerLaunchState } from "../../client";
import { ProducerLaunchState } from "../../client/producer";
import { KafkaFileDocument, parseKafkaFile } from "./parser/kafkaFileParser";
import { KafkaFileCodeLenses } from "./services/codeLensProvider";
Expand All @@ -24,7 +24,7 @@ export interface ConsumerLaunchStateProvider {
* Provider API which gets the selected cluster id and name.
*/
export interface SelectedClusterProvider {
getSelectedCluster(): { clusterId?: string, clusterName?: string };
getSelectedCluster(): { clusterId?: string, clusterName?: string, clusterState?: ClientState };
}

export interface TopicDetail {
Expand Down
42 changes: 29 additions & 13 deletions src/kafka-file/languageservice/services/codeLensProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TextDocument, CodeLens, Range } from "vscode";
import { ConsumerLaunchState } from "../../../client";
import { ClientState, ConsumerLaunchState } from "../../../client";
import { createProducerUri, ProducerLaunchState } from "../../../client/producer";
import { LaunchConsumerCommand, ProduceRecordCommand, ProduceRecordCommandHandler, SelectClusterCommandHandler, StartConsumerCommandHandler, StopConsumerCommandHandler } from "../../../commands";
import { ProducerLaunchStateProvider, ConsumerLaunchStateProvider, SelectedClusterProvider } from "../kafkaFileLanguageService";
Expand All @@ -15,31 +15,47 @@ export class KafkaFileCodeLenses {
}
getCodeLenses(document: TextDocument, kafkaFileDocument: KafkaFileDocument): CodeLens[] {
const lenses: CodeLens[] = [];
const { clusterName, clusterId } = this.selectedClusterProvider.getSelectedCluster();
const { clusterName, clusterId, clusterState } = this.selectedClusterProvider.getSelectedCluster();
kafkaFileDocument.blocks.forEach(block => {
lenses.push(...this.createBlockLens(block, clusterName, clusterId));
lenses.push(...this.createBlockLens(block, clusterName, clusterId, clusterState));
});
return lenses;
}


private createBlockLens(block: Block, clusterName: string | undefined, clusterId: string | undefined): CodeLens[] {
private createBlockLens(block: Block, clusterName: string | undefined, clusterId: string | undefined, clusterState: ClientState | undefined): CodeLens[] {
const range = new Range(block.start, block.end);
const lineRange = new Range(block.start, block.start);
if (block.type === BlockType.consumer) {
return this.createConsumerLens(<ConsumerBlock>block, lineRange, range, clusterName, clusterId);
return this.createConsumerLens(<ConsumerBlock>block, lineRange, range, clusterName, clusterId, clusterState);
}
return this.createProducerLens(<ProducerBlock> block, lineRange, range, clusterName, clusterId);
return this.createProducerLens(<ProducerBlock>block, lineRange, range, clusterName, clusterId, clusterState);
}

createClusterLens(lineRange: Range, clusterName: string | undefined): CodeLens {
createClusterLens(lineRange: Range, clusterName: string | undefined, clusterState: ClientState | undefined): CodeLens {
const status = this.getClusterStatus(clusterState);
return new CodeLens(lineRange, {
title: clusterName ? `${clusterName}` : 'Select a cluster',
title: clusterName ? `${status}${clusterName}` : 'Select a cluster',
command: SelectClusterCommandHandler.commandId
});
}

private createProducerLens(block: ProducerBlock, lineRange: Range, range: Range, clusterName: string | undefined, clusterId: string | undefined): CodeLens[] {
getClusterStatus(state: ClientState | undefined) {
switch (state) {
case ClientState.disconnected:
return `$(eye-closed) `;
case ClientState.connecting:
return `$(sync~spin) `;
case ClientState.connected:
return `$(eye) `;
case ClientState.invalid:
return `$(error) `;
default:
return '';
}
}

private createProducerLens(block: ProducerBlock, lineRange: Range, range: Range, clusterName: string | undefined, clusterId: string | undefined, clusterState: ClientState | undefined): CodeLens[] {
const lenses: CodeLens[] = [];
if (clusterId) {
const produceRecordCommand = this.createProduceRecordCommand(block, range, clusterId);
Expand Down Expand Up @@ -70,7 +86,7 @@ export class KafkaFileCodeLenses {
}
}
// Add cluster lens
lenses.push(this.createClusterLens(lineRange, clusterName));
lenses.push(this.createClusterLens(lineRange, clusterName, clusterState));
return lenses;
}

Expand Down Expand Up @@ -121,7 +137,7 @@ export class KafkaFileCodeLenses {
} as ProduceRecordCommand;
}

private createConsumerLens(block: ConsumerBlock, lineRange: Range, range: Range, clusterName: string | undefined, clusterId: string | undefined): CodeLens[] {
private createConsumerLens(block: ConsumerBlock, lineRange: Range, range: Range, clusterName: string | undefined, clusterId: string | undefined, clusterState: ClientState | undefined): CodeLens[] {
const launchCommand = this.createLaunchConsumerCommand(block, range, clusterId);
const lenses: CodeLens[] = [];
if (clusterName) {
Expand Down Expand Up @@ -154,7 +170,7 @@ export class KafkaFileCodeLenses {
}
}
// Add cluster lens
lenses.push(this.createClusterLens(lineRange, clusterName));
lenses.push(this.createClusterLens(lineRange, clusterName, clusterState));
return lenses;
}

Expand All @@ -179,7 +195,7 @@ export class KafkaFileCodeLenses {
let keyFormat;
let valueFormat;
block.properties.forEach(property => {
switch (property. propertyName) {
switch (property.propertyName) {
case 'topic':
topicId = property.propertyValue;
break;
Expand Down

0 comments on commit e265c0a

Please sign in to comment.