Skip to content

Commit

Permalink
Declare key/value format for CONSUMER in kafka file
Browse files Browse the repository at this point in the history
Fixes jlandersen#112

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 1, 2021
1 parent b7d41e4 commit e4326bc
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 16 deletions.
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
"js-yaml": "^3.14.0",
"kafkajs": "^1.15.0"
"kafkajs": "^1.15.0",
"avsc": "^5.5.3"
},
"devDependencies": {
"@types/faker": "^5.1.5",
Expand Down
22 changes: 22 additions & 0 deletions snippets/consumers.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,27 @@
"partitions: ${4|0|}"
],
"description": "A consumer with a partitions filter"
},
"key-format-consumer": {
"prefix": [
"key-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"key-format: ${3|none,boolean,bytes,double,float,int,long,null,string|}"
],
"description": "A consumer with a key format"
},
"value-format-consumer": {
"prefix": [
"value-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"value-format: ${3|none,boolean,bytes,double,float,int,long,null,string|}"
],
"description": "A consumer with a value format"
}
}
32 changes: 32 additions & 0 deletions snippets/producers.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@
],
"description": "A producer generating keyed JSON records"
},
"key-format-producer": {
"prefix": [
"key-format-producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"key-format: ${4|none,boolean,bytes,double,float,int,long,null,string|}",
"${5:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records with key format"
},
"value-format-producer": {
"prefix": [
"value-format-producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"value-format: ${4|none,boolean,bytes,double,float,int,long,null,string|}",
"${5:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records with value format"
},
"comment": {
"prefix": [
"comment"
Expand Down
52 changes: 51 additions & 1 deletion src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { WorkspaceSettings } from "../settings";
import { Type } from "avsc";

export interface ConnectionOptions {
bootstrap: string;
Expand Down Expand Up @@ -92,6 +92,7 @@ export interface Client extends Disposable {
createTopic(createTopicRequest: CreateTopicRequest): Promise<any[]>;
deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise<void>;
}
export type MessageFormat = "none" | "boolean" | "bytes" | "double" | "float" | "int" | "long" | "null" | "string";

class EnsureConnectedDecorator implements Client {

Expand Down Expand Up @@ -353,3 +354,52 @@ export const createKafka = (connectionOptions: ConnectionOptions): Kafka => {
}
return kafkaJsClient;
};

export function decodeMessage(content: Buffer | null, format?: MessageFormat): Buffer | null {
if (content !== null && format) {
const avroType = getAvroType(format);
if (avroType) {
return avroType.decode(content).value;
}
}
return content;
}

export function encodeMessage(content?: string, format?: MessageFormat): Buffer | string | null {
if (content !== undefined && format) {
const avroType = getAvroType(format);
if (avroType) {
const buf = Buffer.alloc(content.length);
const val = convert(content, format);
avroType.encode(val, buf);
return buf;
}
}
return content || null;
}


function convert(content: string, format: MessageFormat): any {
switch (format) {
case "boolean":
return Boolean(content);
case "bytes":
return Buffer.from(content, 'hex');
case "double":
return Number.parseFloat(content);
case "float":
return Number.parseFloat(content);
case "int":
return Number.parseInt(content);
case "long":
return Number.parseInt(content);
case "null":
return null;
default:
return content;
}
}

function getAvroType(format: MessageFormat): Type | null {
return Type.forSchema(format);
}
24 changes: 20 additions & 4 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part
import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { ConnectionOptions, createKafka, decodeMessage, MessageFormat } from "./client";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export interface RecordReceivedEvent {
Expand Down Expand Up @@ -58,7 +60,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -74,7 +76,9 @@ export class Consumer implements vscode.Disposable {
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
};
}
catch (e) {
Expand Down Expand Up @@ -107,6 +111,8 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
message.key = decodeMessage(message.key, this.options.messageKeyFormat) || message.key;
message.value = decodeMessage(message.value, this.options.messageValueFormat);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
Expand Down Expand Up @@ -349,18 +355,24 @@ export interface ConsumerInfoUri {
topicId: InitialConsumerOffset | string;
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
return vscode.Uri.parse(path + query);
}

Expand All @@ -377,12 +389,16 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || '';
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
return {
clusterId,
consumerGroupId,
topicId,
fromOffset: from && from.trim().length > 0 ? from : undefined,
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined,
messageKeyFormat: messageKeyFormat && messageKeyFormat.trim().length > 0 ? messageKeyFormat as MessageFormat : undefined,
messageValueFormat: messageValueFormat && messageValueFormat.trim().length > 0 ? messageValueFormat as MessageFormat : undefined
};
}

Expand Down
14 changes: 8 additions & 6 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as faker from "faker";

import { performance } from "perf_hooks";
import { ClientAccessor } from "../client";
import { ClientAccessor, encodeMessage, MessageFormat } from "../client";
import { OutputChannelProvider } from "../providers/outputChannelProvider";
import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
Expand All @@ -10,7 +10,9 @@ import { pickClient } from "./common";
export interface ProduceRecordCommand {
topicId?: string;
key?: string;
value: string
value: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export class ProduceRecordCommandHandler {
Expand Down Expand Up @@ -47,15 +49,15 @@ export class ProduceRecordCommandHandler {
faker.seed(seed);
const randomizedValue = faker.fake(value);
return {
key:randomizedKey,
value:randomizedValue
key: encodeMessage(randomizedKey, command.messageKeyFormat),
value: encodeMessage(randomizedValue, command.messageValueFormat)
};
}

// Return key/value message as-is
return {
key:key,
value:value
key: encodeMessage(key, command.messageKeyFormat),
value: encodeMessage(value, command.messageValueFormat)
};
});

Expand Down
33 changes: 31 additions & 2 deletions src/kafka-file/codeLensProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
let topicId;
let key;
let value = "";
let keyFormat;
let valueFormat;
for (let currentLine = range.start.line + 1; currentLine <= range.end.line; currentLine++) {
const lineText = document.lineAt(currentLine).text;

Expand All @@ -153,6 +155,16 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
continue;
}

if (lineText.startsWith("key-format:")) {
keyFormat = lineText.substr("key-format:".length).trim();
continue;
}

if (lineText.startsWith("value-format:")) {
valueFormat = lineText.substr("value-format:".length).trim();
continue;
}

value = document.getText(new vscode.Range(currentLine, 0, range.end.line + 1, 0)).trim();
break;
}
Expand All @@ -161,7 +173,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
topicId,
key,
value,
};
messageKeyFormat: keyFormat,
messageValueFormat: valueFormat
} as ProduceRecordCommand;
}

private createConsumerLens(lineRange: vscode.Range, range: vscode.Range, document: vscode.TextDocument, clusterName: string | undefined): vscode.CodeLens[] {
Expand Down Expand Up @@ -223,6 +237,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
let topicId;
let partitions;
let offset = "";
let keyFormat;
let valueFormat;
for (let currentLine = range.start.line; currentLine <= range.end.line; currentLine++) {
const lineText = document.lineAt(currentLine).text;

Expand All @@ -245,14 +261,27 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
partitions = lineText.substr("partitions:".length).trim();
continue;
}

if (lineText.startsWith("key-format:")) {
keyFormat = lineText.substr("key-format:".length).trim();
continue;
}

if (lineText.startsWith("value-format:")) {
valueFormat = lineText.substr("value-format:".length).trim();
continue;
}

break;
}
return {
clusterId: selectedClusterId,
consumerGroupId,
topicId,
fromOffset: offset,
partitions
partitions,
messageKeyFormat: keyFormat,
messageValueFormat: valueFormat
} as LaunchConsumerCommand;
}
}
4 changes: 2 additions & 2 deletions syntaxes/kafka.tmLanguage.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]
},
"kafka.consumer.header": {
"begin": "(?i)^(?:(topic|from|partitions))",
"begin": "(?i)^(?:(topic|key-format|value-format|from|partitions))",
"end": "$",
"name": "meta.consumer.header.kafka",
"beginCaptures": {
Expand Down Expand Up @@ -98,7 +98,7 @@
]
},
"kafka.producer.header": {
"begin": "(?i)^(?:(topic|key))",
"begin": "(?i)^(?:(topic|key-format|value-format|key))",
"end": "$",
"name": "meta.producer.header.kafka",
"beginCaptures": {
Expand Down

0 comments on commit e4326bc

Please sign in to comment.