From e4fdf041277a77abae7f3c5d3724ab103bf73f27 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Sun, 28 Feb 2021 16:05:30 +0100 Subject: [PATCH] Declare key/value format for CONSUMER in kafka file Fixes #112 Signed-off-by: azerr --- CHANGELOG.md | 4 +- docs/Consuming.md | 14 ++ docs/Producing.md | 20 ++ snippets/consumers.json | 22 ++ snippets/producers.json | 32 +++ src/client/client.ts | 1 - src/client/consumer.ts | 46 +++- src/client/serialization.ts | 221 ++++++++++++++++++ src/commands/consumers.ts | 4 +- src/commands/producers.ts | 31 ++- src/kafka-file/codeLensProvider.ts | 33 ++- .../consumerVirtualTextDocumentProvider.ts | 4 +- src/test/suite/client/consumer.test.ts | 6 +- src/test/suite/client/serialization.test.ts | 43 ++++ syntaxes/kafka.tmLanguage.json | 4 +- 15 files changed, 450 insertions(+), 35 deletions(-) create mode 100644 src/client/serialization.ts create mode 100644 src/test/suite/client/serialization.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c33282a..9b9da50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,9 @@ All notable changes to Kafka extension will be documented in this file. - The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40). - Added support for consumer group deletion. See [#26](https://github.com/jlandersen/vscode-kafka/issues/26). - .kafka files can define a `CONSUMER` block, to start a consumer group with a given offset, partitions. See [#96](https://github.com/jlandersen/vscode-kafka/issues/96). -- .kafka files show the selected cluster as a codelens on the first line. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102). +- .kafka files show the selected cluster as a codelens on each CONSUMER, PRODUCER block. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102). + - declare key/value format for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112). + - declare key/value format for PRODUCER in kafka file. See [#113](https://github.com/jlandersen/vscode-kafka/issues/113). ### Changed - Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21). diff --git a/docs/Consuming.md b/docs/Consuming.md index 6c0a63e..92dd6df 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -50,6 +50,20 @@ The `CONSUMER` block defines: * 0,1,2 * 0-2 * 0,2-3 + * `key-format`: the key format *[optional]* : + * none + * double + * float + * integer + * long + * short + * `value-format`: the value format *[optional]* : + * none + * double + * float + * integer + * long + * short A codelens is displayed above each `CONSUMER` line, showing the `status` of the consumer group (started / stopped), and provides `Start consumer` / `Stop consumer` commands according to that status. diff --git a/docs/Producing.md b/docs/Producing.md index 9315729..4244623 100644 --- a/docs/Producing.md +++ b/docs/Producing.md @@ -23,6 +23,26 @@ To produce a single record, click on the `Produce record` link above the `PRODUC The log about produced messages is printed in the `Kafka Producer Log` Output view. +The `PRODUCER` block defines: + + * `keyed message` which is declared after PRODUCER *[optional]*. + * `key`: the key *[optional]*. + * `key-format`: the key format *[optional]* : + * none + * double + * float + * integer + * long + * short + * `value-format`: the value format *[optional]* : + * none + * double + * float + * integer + * long + * short + * the rest of the content is the value upon `###` line is found. + ## Randomized content Record content can be randomized by injecting mustache-like placeholders of [faker.js properties](https://github.com/Marak/faker.js#api-methods), like ``{{name.lastName}}`` or ``{{random.number}}``. Some randomized properties can be localized via the `kafka.producers.fakerjs.locale` setting. diff --git a/snippets/consumers.json b/snippets/consumers.json index 0e3412a..bb47b51 100644 --- a/snippets/consumers.json +++ b/snippets/consumers.json @@ -21,5 +21,27 @@ "partitions: ${4|0|}" ], "description": "A consumer with a partitions filter" + }, + "key-format-consumer": { + "prefix": [ + "key-format-consumer" + ], + "body": [ + "CONSUMER ${1:consumer-group-id}", + "topic: ${2:topic_name}", + "key-format: ${3|none,double,float,integer,long,short|}" + ], + "description": "A consumer with a key format" + }, + "value-format-consumer": { + "prefix": [ + "value-format-consumer" + ], + "body": [ + "CONSUMER ${1:consumer-group-id}", + "topic: ${2:topic_name}", + "value-format: ${3|none,double,float,integer,long,short|}" + ], + "description": "A consumer with a value format" } } diff --git a/snippets/producers.json b/snippets/producers.json index 1380718..969860e 100644 --- a/snippets/producers.json +++ b/snippets/producers.json @@ -31,6 +31,38 @@ ], "description": "A producer generating keyed JSON records" }, + "key-format-producer": { + "prefix": [ + "key-format-producer" + ], + "body": [ + "PRODUCER ${1:keyed-message}", + "topic: ${2:topic_name}", + "key: ${3:mykeyq}", + "key-format: ${4|none,double,float,integer,long,short|}", + "${5:{{random.words}}}", + "", + "###", + "" + ], + "description": "A producer generating keyed records with key format" + }, + "value-format-producer": { + "prefix": [ + "value-format-producer" + ], + "body": [ + "PRODUCER ${1:keyed-message}", + "topic: ${2:topic_name}", + "key: ${3:mykeyq}", + "value-format: ${4|none,double,float,integer,long,short|}", + "${5:{{random.words}}}", + "", + "###", + "" + ], + "description": "A producer generating keyed records with value format" + }, "comment": { "prefix": [ "comment" diff --git a/src/client/client.ts b/src/client/client.ts index 33f0a73..fdb156e 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,5 +1,4 @@ import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs"; - import { Disposable } from "vscode"; import { WorkspaceSettings } from "../settings"; diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 4eb76a7..0ce73be 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -3,12 +3,15 @@ import { URLSearchParams } from "url"; import * as vscode from "vscode"; import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings"; import { ConnectionOptions, createKafka } from "./client"; +import { deserialize, MessageFormat, SerializationdResult } from "./serialization"; interface ConsumerOptions extends ConnectionOptions { consumerGroupId: string; topicId: string; fromOffset: InitialConsumerOffset | string; partitions?: number[]; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } export interface RecordReceivedEvent { @@ -19,9 +22,11 @@ export interface RecordReceivedEvent { export interface ConsumedRecord { topic: string; value: string | Buffer | null; + deserializedValue?: SerializationdResult | null; offset?: string; partition?: number; key?: string | Buffer; + deserializedKey?: SerializationdResult | null; } export interface ConsumerChangedStatusEvent { @@ -58,7 +63,7 @@ export class Consumer implements vscode.Disposable { public error: any; constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) { - const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri); + const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri); this.clusterId = clusterId; const cluster = clusterSettings.get(clusterId); @@ -74,7 +79,9 @@ export class Consumer implements vscode.Disposable { consumerGroupId: consumerGroupId, topicId, fromOffset: fromOffset || settings.consumerOffset, - partitions: parsePartitions(partitions) + partitions: parsePartitions(partitions), + messageKeyFormat, + messageValueFormat }; } catch (e) { @@ -107,9 +114,15 @@ export class Consumer implements vscode.Disposable { this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { + const deserializedKey = deserialize(message.key, this.options.messageKeyFormat); + const deserializedValue = deserialize(message.value, this.options.messageValueFormat); this.onDidReceiveMessageEmitter.fire({ uri: this.uri, - record: { topic: topic, partition: partition, ...message }, + record: { + topic: topic, partition: partition, + deserializedKey: deserializedKey, deserializedValue: deserializedValue, + ...message + }, }); }, }); @@ -349,11 +362,15 @@ export interface ConsumerInfoUri { topicId: InitialConsumerOffset | string; fromOffset?: string; partitions?: string; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } const TOPIC_QUERY_PARAMETER = 'topic'; const FROM_QUERY_PARAMETER = 'from'; const PARTITIONS_QUERY_PARAMETER = 'partitions'; +const KEY_FORMAT_QUERY_PARAMETER = 'key'; +const VALUE_FORMAT_QUERY_PARAMETER = 'value'; export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { const path = `kafka:${info.clusterId}/${info.consumerGroupId}`; @@ -361,6 +378,8 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId); query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset); query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions); + query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat); + query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat); return vscode.Uri.parse(path + query); } @@ -377,13 +396,26 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || ''; const from = urlParams.get(FROM_QUERY_PARAMETER); const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER); - return { + const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER); + const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER); + const result: ConsumerInfoUri = { clusterId, consumerGroupId, - topicId, - fromOffset: from && from.trim().length > 0 ? from : undefined, - partitions: partitions && partitions.trim().length > 0 ? partitions : undefined + topicId }; + if (from && from.trim().length > 0) { + result.fromOffset = from; + } + if (partitions && partitions.trim().length > 0) { + result.partitions = partitions; + } + if (messageKeyFormat && messageKeyFormat.trim().length > 0) { + result.messageKeyFormat = messageKeyFormat as MessageFormat; + } + if (messageValueFormat && messageValueFormat.trim().length > 0) { + result.messageValueFormat = messageValueFormat as MessageFormat; + } + return result; } export function parsePartitions(partitions?: string): number[] | undefined { diff --git a/src/client/serialization.ts b/src/client/serialization.ts new file mode 100644 index 0000000..a37f3b5 --- /dev/null +++ b/src/client/serialization.ts @@ -0,0 +1,221 @@ + +export type MessageFormat = "none" | "double" | "float" | "integer" | "long" | "short"; + +export type SerializationdResult = any | Error; + +class SerializationException extends Error { } + +// ---------------- Serializers ---------------- + +interface Serializer { + serialize(data: string): Buffer | string | null; +} + +const serializerRegistry: Map = new Map(); + +export function serialize(data?: string, format?: MessageFormat): Buffer | string | null { + if (!data || !format || format === "none") { + return null; + } + const serializer = getSerializer(format); + if (!serializer) { + throw new SerializationException(`Cannot find a serializer for ${format} format.`); + } + return serializer.serialize(data); +} + +function getSerializer(format: MessageFormat): Serializer | undefined { + return serializerRegistry.get(format); +} + +class DoubleSerializer implements Serializer { + + serialize(value: string): Buffer | string | null { + const data = parseFloat(value); + const result = [ + (data >>> 56), + (data >>> 48), + (data >>> 40), + (data >>> 32), + (data >>> 24), + (data >>> 16), + (data >>> 8), + data + ]; + return Buffer.from(result); + }; +} + +class FloatSerializer implements Serializer { + + serialize(value: string): Buffer | string | null { + const data = parseFloat(value); + const result = [ + (data >>> 24), + (data >>> 16), + (data >>> 8), + data + ]; + return Buffer.from(result); + }; +} + +/** + * Integer serializer. + * + * @see https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java + */ +class IntegerSerializer implements Serializer { + + serialize(value: string): Buffer | string | null { + const data = parseInt(value); + const result = [ + (data >>> 24), + (data >>> 16), + (data >>> 8), + data + ]; + return Buffer.from(result); + }; +} + +class LongSerializer implements Serializer { + + serialize(value: string): Buffer | string | null { + const data = parseInt(value); + const result = [ + (data >>> 56), + (data >>> 48), + (data >>> 40), + (data >>> 32), + (data >>> 24), + (data >>> 16), + (data >>> 8), + data + ]; + return Buffer.from(result); + }; +} + +class ShortSerializer implements Serializer { + + serialize(value: string): Buffer | string | null { + const data = parseInt(value); + const result = [ + (data >>> 8), + data + ]; + return Buffer.from(result); + }; +} + +serializerRegistry.set("double", new DoubleSerializer()); +serializerRegistry.set("float", new FloatSerializer()); +serializerRegistry.set("integer", new IntegerSerializer()); +serializerRegistry.set("long", new LongSerializer()); +serializerRegistry.set("short", new ShortSerializer()); + +// ---------------- Deserializers ---------------- + +interface Deserializer { + deserialize(data: Buffer): any; +} + +const deserializerRegistry: Map = new Map(); + +export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null { + if (data === null || !format || format === "none") { + return null; + } + try { + const deserializer = getDeserializer(format); + if (!deserializer) { + throw new SerializationException(`Cannot find a deserializer for ${format} format.`); + } + return deserializer.deserialize(data); + } + catch (e) { + return e; + } +} + +function getDeserializer(format: MessageFormat): Deserializer | undefined { + return deserializerRegistry.get(format); +} + +class DoubleDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 8) { + throw new SerializationException("Size of data received by Deserializer is not 8"); + } + return data.readDoubleBE(0); + } +} + +class FloatDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 4) { + throw new SerializationException("Size of data received by Deserializer is not 4"); + } + return data.readFloatBE(0); + } +} + +class IntegerDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 4) { + throw new Error("Size of data received by IntegerDeserializer is not 4"); + } + return data.readUInt32BE(0); + } +} + +class LongDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 8) { + throw new SerializationException("Size of data received by LongDeserializer is not 8"); + } + /*let value = 0; + for (const b of data) { + value <<= 8; + value |= b & 0xFF; + } + return value;*/ + return data.readBigUInt64BE(0); + } +} + +class ShortDeserializer implements Deserializer { + + deserialize(data: Buffer | null): any { + if (data === null) { + return null; + } + if (data.length !== 2) { + throw new SerializationException("Size of data received by ShortDeserializer is not 2"); + } + return data.readUInt16BE(0); + } +} + +deserializerRegistry.set("double", new DoubleDeserializer()); +deserializerRegistry.set("float", new FloatDeserializer()); +deserializerRegistry.set("integer", new IntegerDeserializer()); +deserializerRegistry.set("long", new LongDeserializer()); +deserializerRegistry.set("short", new ShortDeserializer()); diff --git a/src/commands/consumers.ts b/src/commands/consumers.ts index bb52a49..b4f646c 100644 --- a/src/commands/consumers.ts +++ b/src/commands/consumers.ts @@ -56,7 +56,9 @@ abstract class LaunchConsumerCommandHandler { // Try to start consumer if (consumer) { - vscode.window.showInformationMessage(`Consumer already started on '${command.topicId}'`); + // The consumer is started, open the document which tracks consumer messages. + const consumeUri = createConsumerUri(command); + openDocument(consumeUri); return; } diff --git a/src/commands/producers.ts b/src/commands/producers.ts index c8dcaa7..8e5da95 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -6,11 +6,15 @@ import { OutputChannelProvider } from "../providers/outputChannelProvider"; import { KafkaExplorer } from "../explorer"; import { WorkspaceSettings } from "../settings"; import { pickClient } from "./common"; +import { getErrorMessage } from "../errors"; +import { MessageFormat, serialize } from "../client/serialization"; export interface ProduceRecordCommand { topicId?: string; key?: string; - value: string + value: string; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } export class ProduceRecordCommandHandler { @@ -26,6 +30,11 @@ export class ProduceRecordCommandHandler { } async execute(command: ProduceRecordCommand, times: number): Promise { + const client = await pickClient(this.clientAccessor); + if (!client) { + return; + } + const { topicId, key, value } = command; const channel = this.channelProvider.getChannel("Kafka Producer Log"); if (topicId === undefined) { @@ -47,23 +56,18 @@ export class ProduceRecordCommandHandler { faker.seed(seed); const randomizedValue = faker.fake(value); return { - key:randomizedKey, - value:randomizedValue + key: serialize(randomizedKey, command.messageKeyFormat), + value: serialize(randomizedValue, command.messageValueFormat) }; } // Return key/value message as-is return { - key:key, - value:value + key: serialize(key, command.messageKeyFormat), + value: serialize(value, command.messageValueFormat) }; }); - const client = await pickClient(this.clientAccessor); - if (!client) { - return; - } - const producer = client.producer; await producer.connect(); @@ -88,12 +92,7 @@ export class ProduceRecordCommandHandler { const finishedOperation = performance.now(); const elapsed = (finishedOperation - startOperation).toFixed(2); channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`); - - if (error.message) { - channel.appendLine(`Error: ${error.message}`); - } else { - channel.appendLine(`Error: ${error}`); - } + channel.appendLine(`Error: ${getErrorMessage(error)}`); } } } diff --git a/src/kafka-file/codeLensProvider.ts b/src/kafka-file/codeLensProvider.ts index 537086c..e67d245 100644 --- a/src/kafka-file/codeLensProvider.ts +++ b/src/kafka-file/codeLensProvider.ts @@ -136,6 +136,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod let topicId; let key; let value = ""; + let keyFormat; + let valueFormat; for (let currentLine = range.start.line + 1; currentLine <= range.end.line; currentLine++) { const lineText = document.lineAt(currentLine).text; @@ -153,6 +155,16 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod continue; } + if (lineText.startsWith("key-format:")) { + keyFormat = lineText.substr("key-format:".length).trim(); + continue; + } + + if (lineText.startsWith("value-format:")) { + valueFormat = lineText.substr("value-format:".length).trim(); + continue; + } + value = document.getText(new vscode.Range(currentLine, 0, range.end.line + 1, 0)).trim(); break; } @@ -161,7 +173,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod topicId, key, value, - }; + messageKeyFormat: keyFormat, + messageValueFormat: valueFormat + } as ProduceRecordCommand; } private createConsumerLens(lineRange: vscode.Range, range: vscode.Range, document: vscode.TextDocument, clusterName: string | undefined): vscode.CodeLens[] { @@ -223,6 +237,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod let topicId; let partitions; let offset = ""; + let keyFormat; + let valueFormat; for (let currentLine = range.start.line; currentLine <= range.end.line; currentLine++) { const lineText = document.lineAt(currentLine).text; @@ -245,6 +261,17 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod partitions = lineText.substr("partitions:".length).trim(); continue; } + + if (lineText.startsWith("key-format:")) { + keyFormat = lineText.substr("key-format:".length).trim(); + continue; + } + + if (lineText.startsWith("value-format:")) { + valueFormat = lineText.substr("value-format:".length).trim(); + continue; + } + break; } return { @@ -252,7 +279,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod consumerGroupId, topicId, fromOffset: offset, - partitions + partitions, + messageKeyFormat: keyFormat, + messageValueFormat: valueFormat } as LaunchConsumerCommand; } } diff --git a/src/providers/consumerVirtualTextDocumentProvider.ts b/src/providers/consumerVirtualTextDocumentProvider.ts index a6e1d9c..1220697 100644 --- a/src/providers/consumerVirtualTextDocumentProvider.ts +++ b/src/providers/consumerVirtualTextDocumentProvider.ts @@ -109,8 +109,8 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC if (!this.isActive(uri)) { return; } - let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`; - line = line + `Value:\n${message.value}\n\n`; + let line = `Key: ${message.deserializedKey || message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`; + line = line + `Value:\n${message.deserializedValue || message.value}\n\n`; this.updateBuffer(uri, line); } diff --git a/src/test/suite/client/consumer.test.ts b/src/test/suite/client/consumer.test.ts index edeb0a1..eecce98 100644 --- a/src/test/suite/client/consumer.test.ts +++ b/src/test/suite/client/consumer.test.ts @@ -39,21 +39,21 @@ suite("Extract consumer URI Test Suite", () => { test("Consumer URI simple", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: undefined } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id' } ); }); test("Consumer URI with offset", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&from=1`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1', partitions: undefined } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1' } ); }); test("Consumer URI with partitions", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&partitions=0-5`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: '0-5' } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', partitions: '0-5' } ); }); diff --git a/src/test/suite/client/serialization.test.ts b/src/test/suite/client/serialization.test.ts new file mode 100644 index 0000000..1da22ef --- /dev/null +++ b/src/test/suite/client/serialization.test.ts @@ -0,0 +1,43 @@ +import * as assert from "assert"; +import { deserialize, serialize } from "../../../client/serialization"; + +suite("Serializer Test Suite", () => { + + test("Integer serializer", () => { + + assert.deepStrictEqual( + serialize('123', "integer"), + Buffer.from([0, 0, 0, 123]) + ); + }); + + test("Long serializer", () => { + + assert.deepStrictEqual( + serialize('123', "long"), + Buffer.from([0, 0, 0, 123, 0, 0, 0, 123]) + ); + }); + +}); + +suite("Deserializer Test Suite", () => { + + test("Integer deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([0, 0, 0, 123]), "integer"), + 123 + ); + }); + + /*test("Long deserializer", () => { + + assert.deepStrictEqual( + deserialize(Buffer.from([0, 0, 0, 123, 0, 0, 0, 123]), "long"), + 123 + ); + });*/ + +}); + diff --git a/syntaxes/kafka.tmLanguage.json b/syntaxes/kafka.tmLanguage.json index 7c7df67..5341850 100644 --- a/syntaxes/kafka.tmLanguage.json +++ b/syntaxes/kafka.tmLanguage.json @@ -50,7 +50,7 @@ ] }, "kafka.consumer.header": { - "begin": "(?i)^(?:(topic|from|partitions))", + "begin": "(?i)^(?:(topic|key-format|value-format|from|partitions))", "end": "$", "name": "meta.consumer.header.kafka", "beginCaptures": { @@ -98,7 +98,7 @@ ] }, "kafka.producer.header": { - "begin": "(?i)^(?:(topic|key))", + "begin": "(?i)^(?:(topic|key-format|value-format|key))", "end": "$", "name": "meta.producer.header.kafka", "beginCaptures": {