Skip to content

Commit

Permalink
Declare key/value format for CONSUMER in kafka file
Browse files Browse the repository at this point in the history
Fixes jlandersen#112

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 1, 2021
1 parent b7d41e4 commit 16e41f8
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 29 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ All notable changes to Kafka extension will be documented in this file.
- The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).
- Added support for consumer group deletion. See [#26](https://github.com/jlandersen/vscode-kafka/issues/26).
- .kafka files can define a `CONSUMER` block, to start a consumer group with a given offset, partitions. See [#96](https://github.com/jlandersen/vscode-kafka/issues/96).
- .kafka files show the selected cluster as a codelens on the first line. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102).
- .kafka files show the selected cluster as a codelens on each CONSUMER, PRODUCER block. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102).
- declare key/value format for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112).
- declare key/value format for PRODUCER in kafka file. See [#113](https://github.com/jlandersen/vscode-kafka/issues/113).

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
"js-yaml": "^3.14.0",
"kafkajs": "^1.15.0"
"kafkajs": "^1.15.0",
"avsc": "^5.5.3"
},
"devDependencies": {
"@types/faker": "^5.1.5",
Expand Down
22 changes: 22 additions & 0 deletions snippets/consumers.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,27 @@
"partitions: ${4|0|}"
],
"description": "A consumer with a partitions filter"
},
"key-format-consumer": {
"prefix": [
"key-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"key-format: ${3|none,boolean,bytes,double,float,int,long,null,string|}"
],
"description": "A consumer with a key format"
},
"value-format-consumer": {
"prefix": [
"value-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"value-format: ${3|none,boolean,bytes,double,float,int,long,null,string|}"
],
"description": "A consumer with a value format"
}
}
32 changes: 32 additions & 0 deletions snippets/producers.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@
],
"description": "A producer generating keyed JSON records"
},
"key-format-producer": {
"prefix": [
"key-format-producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"key-format: ${4|none,boolean,bytes,double,float,int,long,null,string|}",
"${5:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records with key format"
},
"value-format-producer": {
"prefix": [
"value-format-producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"value-format: ${4|none,boolean,bytes,double,float,int,long,null,string|}",
"${5:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records with value format"
},
"comment": {
"prefix": [
"comment"
Expand Down
50 changes: 49 additions & 1 deletion src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { WorkspaceSettings } from "../settings";
import { Type } from "avsc";

export interface ConnectionOptions {
bootstrap: string;
Expand Down Expand Up @@ -92,6 +92,7 @@ export interface Client extends Disposable {
createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]>;
deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void>;
}
export type MessageFormat = "none" | "boolean" | "bytes" | "double" | "float" | "int" | "long" | "null" | "string";

class EnsureConnectedDecorator implements Client {

Expand Down Expand Up @@ -353,3 +354,50 @@ export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
}
return kafkaJsClient;
};

export function decodeMessage(content: Buffer | null, format?: MessageFormat): Buffer | null {
if (content !== null && format && format !== "none") {
const avroType = getAvroType(format);
if (avroType) {
return avroType.decode(content).value;
}
}
return content;
}

export function encodeMessage(content?: string, format?: MessageFormat): Buffer | string | null {
if (content !== undefined && format && format !== "none") {
const avroType = getAvroType(format);
if (avroType) {
const val = convert(content, format);
return avroType.toBuffer(val);
}
}
return content || null;
}


function convert(content: string, format: MessageFormat): any {
switch (format) {
case "boolean":
return Boolean(content);
case "bytes":
return Buffer.from(content, 'hex');
case "double":
return Number.parseFloat(content);
case "float":
return Number.parseFloat(content);
case "int":
return Number.parseInt(content);
case "long":
return Number.parseInt(content);
case "null":
return null;
default:
return content;
}
}

function getAvroType(format: MessageFormat): Type | null {
return Type.forSchema(format);
}
24 changes: 20 additions & 4 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part
import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { ConnectionOptions, createKafka, decodeMessage, MessageFormat } from "./client";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export interface RecordReceivedEvent {
Expand Down Expand Up @@ -58,7 +60,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -74,7 +76,9 @@ export class Consumer implements vscode.Disposable {
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
};
}
catch (e) {
Expand Down Expand Up @@ -107,6 +111,8 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
message.key = decodeMessage(message.key, this.options.messageKeyFormat) || message.key;
message.value = decodeMessage(message.value, this.options.messageValueFormat);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
Expand Down Expand Up @@ -349,18 +355,24 @@ export interface ConsumerInfoUri {
topicId: InitialConsumerOffset | string;
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
return vscode.Uri.parse(path + query);
}

Expand All @@ -377,12 +389,16 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || '';
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
return {
clusterId,
consumerGroupId,
topicId,
fromOffset: from && from.trim().length > 0 ? from : undefined,
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined,
messageKeyFormat: messageKeyFormat && messageKeyFormat.trim().length > 0 ? messageKeyFormat as MessageFormat : undefined,
messageValueFormat: messageValueFormat && messageValueFormat.trim().length > 0 ? messageValueFormat as MessageFormat : undefined
};
}

Expand Down
4 changes: 3 additions & 1 deletion src/commands/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ abstract class LaunchConsumerCommandHandler {
// Try to start consumer

if (consumer) {
vscode.window.showInformationMessage(`Consumer already started on '${command.topicId}'`);
// The consumer is started, open the document which tracks consumer messages.
const consumeUri = createConsumerUri(command);
openDocument(consumeUri);
return;
}

Expand Down
32 changes: 15 additions & 17 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import * as faker from "faker";

import { performance } from "perf_hooks";
import { ClientAccessor } from "../client";
import { ClientAccessor, encodeMessage, MessageFormat } from "../client";
import { OutputChannelProvider } from "../providers/outputChannelProvider";
import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { getErrorMessage } from "../errors";

export interface ProduceRecordCommand {
topicId?: string;
key?: string;
value: string
value: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export class ProduceRecordCommandHandler {
Expand All @@ -26,6 +29,11 @@ export class ProduceRecordCommandHandler {
}

async execute(command: ProduceRecordCommand, times: number): Promise<void> {
const client = await pickClient(this.clientAccessor);
if (!client) {
return;
}

const { topicId, key, value } = command;
const channel = this.channelProvider.getChannel("Kafka Producer Log");
if (topicId === undefined) {
Expand All @@ -47,23 +55,18 @@ export class ProduceRecordCommandHandler {
faker.seed(seed);
const randomizedValue = faker.fake(value);
return {
key:randomizedKey,
value:randomizedValue
key: encodeMessage(randomizedKey, command.messageKeyFormat),
value: encodeMessage(randomizedValue, command.messageValueFormat)
};
}

// Return key/value message as-is
return {
key:key,
value:value
key: encodeMessage(key, command.messageKeyFormat),
value: encodeMessage(value, command.messageValueFormat)
};
});

const client = await pickClient(this.clientAccessor);
if (!client) {
return;
}

const producer = client.producer;
await producer.connect();

Expand All @@ -88,12 +91,7 @@ export class ProduceRecordCommandHandler {
const finishedOperation = performance.now();
const elapsed = (finishedOperation - startOperation).toFixed(2);
channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`);

if (error.message) {
channel.appendLine(`Error: ${error.message}`);
} else {
channel.appendLine(`Error: ${error}`);
}
channel.appendLine(`Error: ${getErrorMessage(error)}`);
}
}
}
Loading

0 comments on commit 16e41f8

Please sign in to comment.