Skip to content

Commit

Permalink
Initial SASL/SCRAM support
Browse files Browse the repository at this point in the history
Signed-off-by: Fred Bricon <[email protected]>
  • Loading branch information
fbricon committed Jan 12, 2021
1 parent 697d130 commit 5a6473f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to Kafka extension will be documented in this file.
- Click on empty Kafka explorer to add a new cluster. See [#76](https//github.com/jlandersen/vscode-kafka/pull/76).
- Added glob patterns to filter topics (`kafka.explorer.topics.filters`) and consumer groups (`kafka.explorer.consumers.filters`) out of the Kafka explorer. See [#74](https://github.com/jlandersen/vscode-kafka/pull/74).
- Kafka Explorer item labels can now be copied to the clipboard (supports multi selection). See [#68](https://github.com/jlandersen/vscode-kafka/issues/68).
- Added SASL/SCRAM-256 and SASL/SCRAM-512 authentication support.

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
60 changes: 31 additions & 29 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";
import { Disposable } from "vscode";
import { WorkspaceSettings } from "../settings";

export interface Cluster {
id: string;
name: string;
export interface ConnectionOptions {
bootstrap: string;
saslOption?: SaslOption;
}

export interface Cluster extends ConnectionOptions {
id: string;
name: string;
}

/**
* The supported SASL mechanisms for authentication.
*/
export type SaslMechanism = "plain";
export type SaslMechanism = "plain" | "scram-sha-256" | "scram-sha-512";

export interface SaslOption {
mechanism: SaslMechanism;
Expand Down Expand Up @@ -60,11 +63,6 @@ export interface DeleteTopicRequest {
timeout?: number | undefined;
}

export interface Options {
host: string;
sasl: "none" | "SASL/PLAIN";
}

export interface ConsumerGroup {
groupId: string;
state: "Unknown" | "PreparingRebalance" | "CompletingRebalance" | "Stable" | "Dead" | "Empty";
Expand Down Expand Up @@ -175,29 +173,15 @@ class KafkaJsClient implements Client {
brokers: Broker[];
};

constructor(cluster: Cluster, workspaceSettings: WorkspaceSettings) {
constructor(connectionOptions : ConnectionOptions, workspaceSettings: WorkspaceSettings) {
this.metadata = {
brokers: [],
topics: [],
};

if (cluster.saslOption && cluster.saslOption.username && cluster.saslOption.password) {
this.kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: cluster.bootstrap.split(","),
ssl: true,
sasl: { mechanism: "plain", username: cluster.saslOption.username, password: cluster.saslOption.password },
});
} else {
this.kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: cluster.bootstrap.split(","),
});
}

this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
this.kafkaJsClient = createKafka(connectionOptions);
this.kafkaClient = this.kafkaJsClient;
this.kafkaAdminClient = this.kafkaJsClient.admin();
this.producer = this.kafkaJsClient.producer();
}

canConnect(): boolean {
Expand Down Expand Up @@ -326,8 +310,26 @@ class KafkaJsClient implements Client {

dispose() {
this.kafkaAdminClient.disconnect();
}
}
}

export const createClient = (cluster: Cluster, workspaceSettings: WorkspaceSettings): Client => new EnsureConnectedDecorator(
new KafkaJsClient(cluster, workspaceSettings));

export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
let kafkaJsClient: Kafka;
if (connectionOptions.saslOption && connectionOptions.saslOption.username && connectionOptions.saslOption.password) {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
ssl: true,
sasl: { mechanism: connectionOptions.saslOption.mechanism, username: connectionOptions.saslOption.username, password: connectionOptions.saslOption.password },
});
} else {
kafkaJsClient = new Kafka({
clientId: "vscode-kafka",
brokers: connectionOptions.bootstrap.split(","),
});
}
return kafkaJsClient;
}
21 changes: 3 additions & 18 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ import { Kafka, Consumer as KafkaJsConsumer } from "kafkajs";
import * as vscode from "vscode";

import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { SaslOption } from "./client";
import { ConnectionOptions, createKafka } from "./client";

interface ConsumerOptions {
interface ConsumerOptions extends ConnectionOptions {
clusterId: string;
bootstrap: string;
fromOffset: InitialConsumerOffset;
topic: string;
saslOption?: SaslOption;
}

export interface RecordReceivedEvent {
Expand Down Expand Up @@ -72,20 +70,7 @@ class Consumer implements vscode.Disposable {
* Received messages and/or errors are emitted via events.
*/
async start(): Promise<void> {
if (this.options.saslOption && this.options.saslOption.username && this.options.saslOption.password) {
this.kafkaClient = new Kafka({
clientId: "vscode-kafka",
brokers: this.options.bootstrap.split(","),
ssl: true,
sasl: { mechanism: "plain", username: this.options.saslOption.username, password: this.options.saslOption.password },
});
} else {
this.kafkaClient = new Kafka({
clientId: "vscode-kafka",
brokers: this.options.bootstrap.split(","),
});
}

this.kafkaClient = createKafka(this.options);
this.consumer = this.kafkaClient.consumer({ groupId: `vscode-kafka-${this.options.clusterId}-${this.options.topic}`, retry: { retries: 3 }});
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.options.topic, fromBeginning: this.options.fromOffset === "earliest" })
Expand Down
17 changes: 13 additions & 4 deletions src/wizards/clusters.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { QuickPickItem, window } from "vscode";
import { SaslOption } from "../client";
import { SaslMechanism, SaslOption } from "../client";
import { INPUT_TITLE } from "../constants";
import { KafkaExplorer } from "../explorer/kafkaExplorer";
import { ClusterSettings } from "../settings/clusters";
Expand Down Expand Up @@ -52,7 +52,16 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore
}

async function inputAuthentification(input: MultiStepInput, state: Partial<AddClusterState>) {
const authOptions: QuickPickItem[] = [{ "label": "None" }, { "label": "SASL/PLAIN" }];
const authMechanisms = new Map<string,string>([
["SASL/PLAIN", "plain"],
["SASL/SCRAM-256", "scram-sha-256"],
["SASL/SCRAM-512", "scram-sha-512"]
]);
const authOptions: QuickPickItem[] = [{ "label": "None" }]
for (const label of authMechanisms.keys()) {
authOptions.push({"label":label});
}

const authentification = (await input.showQuickPick({
title: INPUT_TITLE,
step: input.getStepNumber(),
Expand All @@ -61,8 +70,8 @@ export async function addClusterWizard(clusterSettings: ClusterSettings, explore
items: authOptions,
activeItem: authOptions[0]
})).label;
if (authentification && authentification == authOptions[1].label) {
state.saslOption = { mechanism: 'plain' };
if (authentification && authentification != authOptions[0].label) {
state.saslOption = { mechanism: authMechanisms.get(authentification) as SaslMechanism};
return (input: MultiStepInput) => inputAuthentificationUserName(input, state);
}
return undefined;
Expand Down

0 comments on commit 5a6473f

Please sign in to comment.