Skip to content

Commit

Permalink
Declare key/value format for CONSUMER in kafka file
Browse files Browse the repository at this point in the history
Fixes jlandersen#112

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Feb 28, 2021
1 parent 9e6a02e commit a7e8879
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 6 deletions.
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
"js-yaml": "^3.14.0",
"kafkajs": "^1.15.0"
"kafkajs": "^1.15.0",
"avsc": "^5.5.3"
},
"devDependencies": {
"@types/faker": "^5.1.5",
Expand Down
22 changes: 22 additions & 0 deletions snippets/consumers.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,27 @@
"partitions: ${4|0|}"
],
"description": "A consumer with a partitions filter"
},
"key-format-consumer": {
"prefix": [
"key-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"key-format: ${3|long|}"
],
"description": "A consumer with a key format"
},
"value-format-consumer": {
"prefix": [
"value-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"value-format: ${3|long|}"
],
"description": "A consumer with a value format"
}
}
56 changes: 53 additions & 3 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { types as avrotypes, Type as AvroType } from "avsc";

type MessageFormat = "boolean" | "bytes" | "double" | "float" | "int" | "long" | "null" | "string";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export interface RecordReceivedEvent {
Expand Down Expand Up @@ -58,7 +63,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -74,7 +79,9 @@ export class Consumer implements vscode.Disposable {
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
};
}
catch (e) {
Expand Down Expand Up @@ -107,6 +114,8 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
message.key = this.format(message.key, this.options.messageKeyFormat) || message.key;
message.value = this.format(message.value, this.options.messageValueFormat);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
Expand All @@ -126,6 +135,37 @@ export class Consumer implements vscode.Disposable {
}
}
}
private format(content: Buffer | null, format?: MessageFormat): Buffer | null {
if (content !== null && format) {
const avroType = this.getAvroType(format);
if (avroType) {
return avroType.decode(content).value;
}
}
return content;
}

getAvroType(format: MessageFormat): AvroType | null {
switch (format) {
case "boolean":
return new avrotypes.BooleanType();
case "bytes":
return new avrotypes.BytesType();
case "double":
return new avrotypes.DoubleType();
case "float":
return new avrotypes.FloatType();
case "int":
return new avrotypes.IntType();
case "long":
return new avrotypes.LongType();
case "null":
return new avrotypes.NullType();
case "string":
return new avrotypes.StringType();
}
return null;
}

private async getPartitions(topic: string, partitions?: number[]): Promise<number[]> {
if (partitions) {
Expand Down Expand Up @@ -349,18 +389,24 @@ export interface ConsumerInfoUri {
topicId: InitialConsumerOffset | string;
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
return vscode.Uri.parse(path + query);
}

Expand All @@ -377,12 +423,16 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || '';
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
return {
clusterId,
consumerGroupId,
topicId,
fromOffset: from && from.trim().length > 0 ? from : undefined,
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined,
messageKeyFormat: messageKeyFormat && messageKeyFormat.trim().length > 0 ? messageKeyFormat as MessageFormat : undefined,
messageValueFormat: messageValueFormat && messageValueFormat.trim().length > 0 ? messageValueFormat as MessageFormat : undefined
};
}

Expand Down
17 changes: 16 additions & 1 deletion src/kafka-file/codeLensProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
let topicId;
let partitions;
let offset = "";
let keyFormat;
let valueFormat;
for (let currentLine = range.start.line; currentLine <= range.end.line; currentLine++) {
const lineText = document.lineAt(currentLine).text;

Expand All @@ -245,14 +247,27 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
partitions = lineText.substr("partitions:".length).trim();
continue;
}

if (lineText.startsWith("key-format:")) {
keyFormat = lineText.substr("key-format:".length).trim();
continue;
}

if (lineText.startsWith("value-format:")) {
valueFormat = lineText.substr("value-format:".length).trim();
continue;
}

break;
}
return {
clusterId: selectedClusterId,
consumerGroupId,
topicId,
fromOffset: offset,
partitions
partitions,
messageKeyFormat: keyFormat,
messageValueFormat: valueFormat
} as LaunchConsumerCommand;
}
}
2 changes: 1 addition & 1 deletion syntaxes/kafka.tmLanguage.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]
},
"kafka.consumer.header": {
"begin": "(?i)^(?:(topic|from|partitions))",
"begin": "(?i)^(?:(topic|key-format|value-format|from|partitions))",
"end": "$",
"name": "meta.consumer.header.kafka",
"beginCaptures": {
Expand Down

0 comments on commit a7e8879

Please sign in to comment.