From 7c6899c74cfd8a0d4afe303cd69511a4eee9f349 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Fri, 12 Mar 2021 11:12:50 +0100 Subject: [PATCH] Declare key/value format for CONSUMER in kafka file Fixes #112 Signed-off-by: azerr --- CHANGELOG.md | 1 + docs/Consuming.md | 18 +++ snippets/consumers.json | 22 ++++ src/client/consumer.ts | 52 ++++++-- src/client/serialization.ts | 118 +++++++++++++++++ src/kafka-file/codeLensProvider.ts | 17 ++- .../consumerVirtualTextDocumentProvider.ts | 6 +- src/test/suite/client/consumer.test.ts | 6 +- src/test/suite/client/serialization.test.ts | 119 ++++++++++++++++++ syntaxes/kafka.tmLanguage.json | 2 +- 10 files changed, 347 insertions(+), 14 deletions(-) create mode 100644 src/client/serialization.ts create mode 100644 src/test/suite/client/serialization.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ddff6a5..c09900d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ All notable changes to Kafka extension will be documented in this file. ## [0.12.0] ### Added - Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123). +- declare key/value formats for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112). ## [0.11.0] - 2021-03-08 ### Added diff --git a/docs/Consuming.md b/docs/Consuming.md index c7c2561..7604fd7 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -50,9 +50,27 @@ The `CONSUMER` block defines: * 0,1,2 * 0-2 * 0,2-3 + * `key-format` : [deserializer](#Deserializer) to use for the key *[optional]*. + * `value-format` : [deserializer](#Deserializer) to use for the value *[optional]*. + +#### Deserializer + +Deserializer can have the following value: + + * `none`: no deserializer (ignores content). + * `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java) which supports only `UTF-8` encoding. + * `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java). + * `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java). + * `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java). + * `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java). + * `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java). + +#### Code Lens A codelens is displayed above each `CONSUMER` line, and provides `Start consumer` / `Stop consumer` commands depending on the consumer group status. +#### Completion + Completion snippets can help you quickly bootstrap new `CONSUMER` blocks: ![Consumer snippets](assets/kafka-file-consumer-snippet.png) diff --git a/snippets/consumers.json b/snippets/consumers.json index 0e3412a..2f4dd49 100644 --- a/snippets/consumers.json +++ b/snippets/consumers.json @@ -21,5 +21,27 @@ "partitions: ${4|0|}" ], "description": "A consumer with a partitions filter" + }, + "key-format-consumer": { + "prefix": [ + "key-format-consumer" + ], + "body": [ + "CONSUMER ${1:consumer-group-id}", + "topic: ${2:topic_name}", + "key-format: ${3|none,string,double,float,integer,long,short|}" + ], + "description": "A consumer with a key format" + }, + "value-format-consumer": { + "prefix": [ + "value-format-consumer" + ], + "body": [ + "CONSUMER ${1:consumer-group-id}", + "topic: ${2:topic_name}", + "value-format: ${3|none,string,double,float,integer,long,short|}" + ], + "description": "A consumer with a value format" } } diff --git a/src/client/consumer.ts b/src/client/consumer.ts index d281f3e..f5279d7 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -3,12 +3,15 @@ import { URLSearchParams } from "url"; import * as vscode from "vscode"; import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings"; import { ConnectionOptions, createKafka } from "./client"; +import { deserialize, MessageFormat, SerializationdResult } from "./serialization"; interface ConsumerOptions extends ConnectionOptions { consumerGroupId: string; topicId: string; fromOffset: InitialConsumerOffset | string; partitions?: number[]; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } export interface RecordReceivedEvent { @@ -19,9 +22,11 @@ export interface RecordReceivedEvent { export interface ConsumedRecord { topic: string; value: string | Buffer | null; + deserializedValue?: SerializationdResult; offset?: string; partition?: number; key?: string | Buffer; + deserializedKey?: SerializationdResult; } export interface ConsumerChangedStatusEvent { @@ -58,7 +63,7 @@ export class Consumer implements vscode.Disposable { public error: any; constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) { - const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri); + const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri); this.clusterId = clusterId; const cluster = clusterSettings.get(clusterId); @@ -75,7 +80,9 @@ export class Consumer implements vscode.Disposable { consumerGroupId: consumerGroupId, topicId, fromOffset: fromOffset || settings.consumerOffset, - partitions: parsePartitions(partitions) + partitions: parsePartitions(partitions), + messageKeyFormat, + messageValueFormat }; } catch (e) { @@ -108,9 +115,21 @@ export class Consumer implements vscode.Disposable { this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { + const record = { + topic: topic, partition: partition, + ...message + } as ConsumedRecord; + const deserializedKey = deserialize(message.key, this.options.messageKeyFormat); + if (deserializedKey !== null) { + record.deserializedKey = deserializedKey; + } + const deserializedValue = deserialize(message.value, this.options.messageValueFormat); + if (deserializedValue !== null) { + record.deserializedValue = deserializedValue; + } this.onDidReceiveMessageEmitter.fire({ uri: this.uri, - record: { topic: topic, partition: partition, ...message }, + record, }); }, }); @@ -350,11 +369,15 @@ export interface ConsumerInfoUri { topicId: InitialConsumerOffset | string; fromOffset?: string; partitions?: string; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } const TOPIC_QUERY_PARAMETER = 'topic'; const FROM_QUERY_PARAMETER = 'from'; const PARTITIONS_QUERY_PARAMETER = 'partitions'; +const KEY_FORMAT_QUERY_PARAMETER = 'key'; +const VALUE_FORMAT_QUERY_PARAMETER = 'value'; export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { const path = `kafka:${info.clusterId}/${info.consumerGroupId}`; @@ -362,6 +385,8 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId); query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset); query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions); + query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat); + query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat); return vscode.Uri.parse(path + query); } @@ -378,13 +403,26 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || ''; const from = urlParams.get(FROM_QUERY_PARAMETER); const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER); - return { + const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER); + const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER); + const result: ConsumerInfoUri = { clusterId, consumerGroupId, - topicId, - fromOffset: from && from.trim().length > 0 ? from : undefined, - partitions: partitions && partitions.trim().length > 0 ? partitions : undefined + topicId }; + if (from && from.trim().length > 0) { + result.fromOffset = from; + } + if (partitions && partitions.trim().length > 0) { + result.partitions = partitions; + } + if (messageKeyFormat && messageKeyFormat.trim().length > 0) { + result.messageKeyFormat = messageKeyFormat as MessageFormat; + } + if (messageValueFormat && messageValueFormat.trim().length > 0) { + result.messageValueFormat = messageValueFormat as MessageFormat; + } + return result; } export function parsePartitions(partitions?: string): number[] | undefined { diff --git a/src/client/serialization.ts b/src/client/serialization.ts new file mode 100644 index 0000000..b3206b3 --- /dev/null +++ b/src/client/serialization.ts @@ -0,0 +1,118 @@ +export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" ; + +export type SerializationdResult = any | Error; + +export class SerializationException extends Error { } + +// ---------------- Deserializers ---------------- + +interface Deserializer { + deserialize(data: Buffer): any; +} + +const deserializerRegistry: Map = new Map(); + +export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null { + if (data === null || !format) { + return null; + } + if (format === "none") { + return ''; + } + try { + const deserializer = getDeserializer(format); + if (!deserializer) { + throw new SerializationException(`Cannot find a deserializer for ${format} format.`); + } + return deserializer.deserialize(data); + } + catch (e) { + return e; + } +} + +function getDeserializer(format: MessageFormat): Deserializer | undefined { + return deserializerRegistry.get(format); +} + +class DoubleDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 8) { + throw new SerializationException("Size of data received by DoubleDeserializer is not 8"); + } + return data.readDoubleBE(0); + } +} + +class FloatDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 4) { + throw new SerializationException("Size of data received by FloatDeserializer is not 4"); + } + return data.readFloatBE(0); + } +} + +class IntegerDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 4) { + throw new Error("Size of data received by IntegerDeserializer is not 4"); + } + return data.readInt32BE(0); + } +} + +class LongDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 8) { + throw new SerializationException("Size of data received by LongDeserializer is not 8"); + } + return data.readBigInt64BE(0); + } +} + +class ShortDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 2) { + throw new SerializationException("Size of data received by ShortDeserializer is not 2"); + } + return data.readInt16BE(0); + } +} + +class StringDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + return data.toString(); + } +} + +deserializerRegistry.set("double", new DoubleDeserializer()); +deserializerRegistry.set("float", new FloatDeserializer()); +deserializerRegistry.set("integer", new IntegerDeserializer()); +deserializerRegistry.set("long", new LongDeserializer()); +deserializerRegistry.set("short", new ShortDeserializer()); +deserializerRegistry.set("string", new StringDeserializer()); diff --git a/src/kafka-file/codeLensProvider.ts b/src/kafka-file/codeLensProvider.ts index 74e6f01..805d0ff 100644 --- a/src/kafka-file/codeLensProvider.ts +++ b/src/kafka-file/codeLensProvider.ts @@ -220,6 +220,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod let topicId; let partitions; let offset = ""; + let keyFormat; + let valueFormat; for (let currentLine = range.start.line; currentLine <= range.end.line; currentLine++) { const lineText = document.lineAt(currentLine).text; @@ -242,6 +244,17 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod partitions = lineText.substr("partitions:".length).trim(); continue; } + + if (lineText.startsWith("key-format:")) { + keyFormat = lineText.substr("key-format:".length).trim(); + continue; + } + + if (lineText.startsWith("value-format:")) { + valueFormat = lineText.substr("value-format:".length).trim(); + continue; + } + break; } return { @@ -249,7 +262,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod consumerGroupId, topicId, fromOffset: offset, - partitions + partitions, + messageKeyFormat: keyFormat, + messageValueFormat: valueFormat } as LaunchConsumerCommand; } } diff --git a/src/providers/consumerVirtualTextDocumentProvider.ts b/src/providers/consumerVirtualTextDocumentProvider.ts index a6e1d9c..370ebf2 100644 --- a/src/providers/consumerVirtualTextDocumentProvider.ts +++ b/src/providers/consumerVirtualTextDocumentProvider.ts @@ -109,8 +109,10 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC if (!this.isActive(uri)) { return; } - let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`; - line = line + `Value:\n${message.value}\n\n`; + const key = message.deserializedKey !== undefined ? message.deserializedKey : message.key; + const value = message.deserializedValue !== undefined ? message.deserializedValue : message.value; + let line = `Key: ${key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`; + line = line + `Value:\n${value}\n\n`; this.updateBuffer(uri, line); } diff --git a/src/test/suite/client/consumer.test.ts b/src/test/suite/client/consumer.test.ts index edeb0a1..eecce98 100644 --- a/src/test/suite/client/consumer.test.ts +++ b/src/test/suite/client/consumer.test.ts @@ -39,21 +39,21 @@ suite("Extract consumer URI Test Suite", () => { test("Consumer URI simple", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: undefined } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id' } ); }); test("Consumer URI with offset", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&from=1`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1', partitions: undefined } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1' } ); }); test("Consumer URI with partitions", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&partitions=0-5`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: '0-5' } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', partitions: '0-5' } ); }); diff --git a/src/test/suite/client/serialization.test.ts b/src/test/suite/client/serialization.test.ts new file mode 100644 index 0000000..d82b2c1 --- /dev/null +++ b/src/test/suite/client/serialization.test.ts @@ -0,0 +1,119 @@ +import * as assert from "assert"; +import { deserialize } from "../../../client/serialization"; + +suite("Deserializer Test Suite", () => { + + test("String deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([97, 98, 99, 100]), "string"), + 'abcd' + ); + }); + + test("Double deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([64, 94, 192, 0, 0, 0, 0, 0]), "double"), + 123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([64, 94, 221, 47, 26, 159, 190, 119]), "double"), + 123.456 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([192, 94, 192, 0, 0, 0, 0, 0]), "double"), + -123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([0]), "double")?.message, + 'Size of data received by DoubleDeserializer is not 8' + ); + + }); + + test("Float deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([66, 246, 0, 0]), "float"), + 123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([66, 246, 233, 121]), "float"), + 123.45600128173828 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([194, 246, 0, 0]), "float"), + -123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([0]), "float")?.message, + 'Size of data received by FloatDeserializer is not 4' + ); + + }); + + test("Integer deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([0, 0, 0, 123]), "integer"), + 123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([255, 255, 255, 133]), "integer"), + -123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([0]), "integer")?.message, + 'Size of data received by IntegerDeserializer is not 4' + ); + + }); + + test("Long deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([0, 0, 0, 0, 0, 0, 0, 123]), "long"), + BigInt(123) + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([255, 255, 255, 255, 255, 255, 255, 133]), "long"), + BigInt(-123) + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([0]), "long")?.message, + 'Size of data received by LongDeserializer is not 8' + ); + + }); + + test("Short deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([0, 123]), "short"), + 123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([255, 133]), "short"), + -123 + ); + + assert.deepStrictEqual( + deserialize(Buffer.from([0]), "short")?.message, + 'Size of data received by ShortDeserializer is not 2' + ); + + }); + +}); diff --git a/syntaxes/kafka.tmLanguage.json b/syntaxes/kafka.tmLanguage.json index 7c7df67..a421ded 100644 --- a/syntaxes/kafka.tmLanguage.json +++ b/syntaxes/kafka.tmLanguage.json @@ -50,7 +50,7 @@ ] }, "kafka.consumer.header": { - "begin": "(?i)^(?:(topic|from|partitions))", + "begin": "(?i)^(?:(topic|key-format|value-format|from|partitions))", "end": "$", "name": "meta.consumer.header.kafka", "beginCaptures": {