diff --git a/CHANGELOG.md b/CHANGELOG.md index c33282a..9b9da50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,9 @@ All notable changes to Kafka extension will be documented in this file. - The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40). - Added support for consumer group deletion. See [#26](https://github.com/jlandersen/vscode-kafka/issues/26). - .kafka files can define a `CONSUMER` block, to start a consumer group with a given offset, partitions. See [#96](https://github.com/jlandersen/vscode-kafka/issues/96). -- .kafka files show the selected cluster as a codelens on the first line. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102). +- .kafka files show the selected cluster as a codelens on each CONSUMER, PRODUCER block. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102). + - declare key/value format for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112). + - declare key/value format for PRODUCER in kafka file. See [#113](https://github.com/jlandersen/vscode-kafka/issues/113). ### Changed - Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21). diff --git a/package-lock.json b/package-lock.json index af9a413..9a6014e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -546,6 +546,11 @@ "integrity": "sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==", "dev": true }, + "avsc": { + "version": "5.5.3", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz", + "integrity": "sha512-LaBbmBsus1mpK+6i99AF57/tUP8/wAj9+ZZm/5HGNIWN0dUZWUEp4/o79pAiRz49Mdb3PskltbCFD3w115TdZQ==" + }, "azure-devops-node-api": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/azure-devops-node-api/-/azure-devops-node-api-7.2.0.tgz", diff --git a/package.json b/package.json index 6133d94..f8d7cbf 100644 --- a/package.json +++ b/package.json @@ -421,6 +421,7 @@ "test": "node ./out/test/runTest.js" }, "dependencies": { + "avsc": "^5.5.3", "faker": "^5.1.0", "fs-extra": "^8.1.0", "glob": "^7.1.6", diff --git a/snippets/consumers.json b/snippets/consumers.json index 0e3412a..bcd5155 100644 --- a/snippets/consumers.json +++ b/snippets/consumers.json @@ -21,5 +21,27 @@ "partitions: ${4|0|}" ], "description": "A consumer with a partitions filter" + }, + "key-format-consumer": { + "prefix": [ + "key-format-consumer" + ], + "body": [ + "CONSUMER ${1:consumer-group-id}", + "topic: ${2:topic_name}", + "key-format: ${3|none,boolean,bytes,double,float,int,long,null,string|}" + ], + "description": "A consumer with a key format" + }, + "value-format-consumer": { + "prefix": [ + "value-format-consumer" + ], + "body": [ + "CONSUMER ${1:consumer-group-id}", + "topic: ${2:topic_name}", + "value-format: ${3|none,boolean,bytes,double,float,int,long,null,string|}" + ], + "description": "A consumer with a value format" } } diff --git a/snippets/producers.json b/snippets/producers.json index 1380718..43ea713 100644 --- a/snippets/producers.json +++ b/snippets/producers.json @@ -31,6 +31,38 @@ ], "description": "A producer generating keyed JSON records" }, + "key-format-producer": { + "prefix": [ + "key-format-producer" + ], + "body": [ + "PRODUCER ${1:keyed-message}", + "topic: ${2:topic_name}", + "key: ${3:mykeyq}", + "key-format: ${4|none,boolean,bytes,double,float,int,long,null,string|}", + "${5:{{random.words}}}", + "", + "###", + "" + ], + "description": "A producer generating keyed records with key format" + }, + "value-format-producer": { + "prefix": [ + "value-format-producer" + ], + "body": [ + "PRODUCER ${1:keyed-message}", + "topic: ${2:topic_name}", + "key: ${3:mykeyq}", + "value-format: ${4|none,boolean,bytes,double,float,int,long,null,string|}", + "${5:{{random.words}}}", + "", + "###", + "" + ], + "description": "A producer generating keyed records with value format" + }, "comment": { "prefix": [ "comment" diff --git a/src/client/client.ts b/src/client/client.ts index 33f0a73..bd4d19c 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,7 +1,7 @@ import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs"; - import { Disposable } from "vscode"; import { WorkspaceSettings } from "../settings"; +import { Type } from "avsc"; export interface ConnectionOptions { bootstrap: string; @@ -92,6 +92,7 @@ export interface Client extends Disposable { createTopic(createTopicRequest: CreateTopicRequest): Promise; deleteTopic(deleteTopicRequest: DeleteTopicRequest): Promise; } +export type MessageFormat = "none" | "boolean" | "bytes" | "double" | "float" | "int" | "long" | "null" | "string"; class EnsureConnectedDecorator implements Client { @@ -353,3 +354,50 @@ export const createKafka = (connectionOptions: ConnectionOptions): Kafka => { } return kafkaJsClient; }; + +export function decodeMessage(content: Buffer | null, format?: MessageFormat): Buffer | null { + if (content !== null && format && format !== "none") { + const avroType = getAvroType(format); + if (avroType) { + return avroType.decode(content).value; + } + } + return content; +} + +export function encodeMessage(content?: string, format?: MessageFormat): Buffer | string | null { + if (content !== undefined && format && format !== "none") { + const avroType = getAvroType(format); + if (avroType) { + const val = convert(content, format); + return avroType.toBuffer(val); + } + } + return content || null; +} + + +function convert(content: string, format: MessageFormat): any { + switch (format) { + case "boolean": + return Boolean(content); + case "bytes": + return Buffer.from(content, 'hex'); + case "double": + return Number.parseFloat(content); + case "float": + return Number.parseFloat(content); + case "int": + return Number.parseInt(content); + case "long": + return Number.parseInt(content); + case "null": + return null; + default: + return content; + } +} + +function getAvroType(format: MessageFormat): Type | null { + return Type.forSchema(format); +} diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 4eb76a7..387e74b 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -2,13 +2,15 @@ import { Kafka, Consumer as KafkaJsConsumer, PartitionAssigner, Assignment, Part import { URLSearchParams } from "url"; import * as vscode from "vscode"; import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings"; -import { ConnectionOptions, createKafka } from "./client"; +import { ConnectionOptions, createKafka, decodeMessage, MessageFormat } from "./client"; interface ConsumerOptions extends ConnectionOptions { consumerGroupId: string; topicId: string; fromOffset: InitialConsumerOffset | string; partitions?: number[]; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } export interface RecordReceivedEvent { @@ -58,7 +60,7 @@ export class Consumer implements vscode.Disposable { public error: any; constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) { - const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri); + const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri); this.clusterId = clusterId; const cluster = clusterSettings.get(clusterId); @@ -74,7 +76,9 @@ export class Consumer implements vscode.Disposable { consumerGroupId: consumerGroupId, topicId, fromOffset: fromOffset || settings.consumerOffset, - partitions: parsePartitions(partitions) + partitions: parsePartitions(partitions), + messageKeyFormat, + messageValueFormat }; } catch (e) { @@ -107,6 +111,8 @@ export class Consumer implements vscode.Disposable { this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { + message.key = decodeMessage(message.key, this.options.messageKeyFormat) || message.key; + message.value = decodeMessage(message.value, this.options.messageValueFormat); this.onDidReceiveMessageEmitter.fire({ uri: this.uri, record: { topic: topic, partition: partition, ...message }, @@ -349,11 +355,15 @@ export interface ConsumerInfoUri { topicId: InitialConsumerOffset | string; fromOffset?: string; partitions?: string; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } const TOPIC_QUERY_PARAMETER = 'topic'; const FROM_QUERY_PARAMETER = 'from'; const PARTITIONS_QUERY_PARAMETER = 'partitions'; +const KEY_FORMAT_QUERY_PARAMETER = 'key'; +const VALUE_FORMAT_QUERY_PARAMETER = 'value'; export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { const path = `kafka:${info.clusterId}/${info.consumerGroupId}`; @@ -361,6 +371,8 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId); query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset); query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions); + query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat); + query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat); return vscode.Uri.parse(path + query); } @@ -377,13 +389,26 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || ''; const from = urlParams.get(FROM_QUERY_PARAMETER); const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER); - return { + const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER); + const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER); + const result: ConsumerInfoUri = { clusterId, consumerGroupId, - topicId, - fromOffset: from && from.trim().length > 0 ? from : undefined, - partitions: partitions && partitions.trim().length > 0 ? partitions : undefined + topicId }; + if (from && from.trim().length > 0) { + result.fromOffset = from; + } + if (partitions && partitions.trim().length > 0) { + result.partitions = partitions; + } + if (messageKeyFormat && messageKeyFormat.trim().length > 0) { + result.messageKeyFormat = messageKeyFormat as MessageFormat; + } + if (messageValueFormat && messageValueFormat.trim().length > 0) { + result.messageValueFormat = messageValueFormat as MessageFormat; + } + return result; } export function parsePartitions(partitions?: string): number[] | undefined { diff --git a/src/commands/consumers.ts b/src/commands/consumers.ts index bb52a49..b4f646c 100644 --- a/src/commands/consumers.ts +++ b/src/commands/consumers.ts @@ -56,7 +56,9 @@ abstract class LaunchConsumerCommandHandler { // Try to start consumer if (consumer) { - vscode.window.showInformationMessage(`Consumer already started on '${command.topicId}'`); + // The consumer is started, open the document which tracks consumer messages. + const consumeUri = createConsumerUri(command); + openDocument(consumeUri); return; } diff --git a/src/commands/producers.ts b/src/commands/producers.ts index c8dcaa7..cc689bb 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -1,16 +1,19 @@ import * as faker from "faker"; import { performance } from "perf_hooks"; -import { ClientAccessor } from "../client"; +import { ClientAccessor, encodeMessage, MessageFormat } from "../client"; import { OutputChannelProvider } from "../providers/outputChannelProvider"; import { KafkaExplorer } from "../explorer"; import { WorkspaceSettings } from "../settings"; import { pickClient } from "./common"; +import { getErrorMessage } from "../errors"; export interface ProduceRecordCommand { topicId?: string; key?: string; - value: string + value: string; + messageKeyFormat?: MessageFormat; + messageValueFormat?: MessageFormat; } export class ProduceRecordCommandHandler { @@ -26,6 +29,11 @@ export class ProduceRecordCommandHandler { } async execute(command: ProduceRecordCommand, times: number): Promise { + const client = await pickClient(this.clientAccessor); + if (!client) { + return; + } + const { topicId, key, value } = command; const channel = this.channelProvider.getChannel("Kafka Producer Log"); if (topicId === undefined) { @@ -47,23 +55,18 @@ export class ProduceRecordCommandHandler { faker.seed(seed); const randomizedValue = faker.fake(value); return { - key:randomizedKey, - value:randomizedValue + key: encodeMessage(randomizedKey, command.messageKeyFormat), + value: encodeMessage(randomizedValue, command.messageValueFormat) }; } // Return key/value message as-is return { - key:key, - value:value + key: encodeMessage(key, command.messageKeyFormat), + value: encodeMessage(value, command.messageValueFormat) }; }); - const client = await pickClient(this.clientAccessor); - if (!client) { - return; - } - const producer = client.producer; await producer.connect(); @@ -88,12 +91,7 @@ export class ProduceRecordCommandHandler { const finishedOperation = performance.now(); const elapsed = (finishedOperation - startOperation).toFixed(2); channel.appendLine(`Failed to produce record(s) (${elapsed}ms)`); - - if (error.message) { - channel.appendLine(`Error: ${error.message}`); - } else { - channel.appendLine(`Error: ${error}`); - } + channel.appendLine(`Error: ${getErrorMessage(error)}`); } } } diff --git a/src/kafka-file/codeLensProvider.ts b/src/kafka-file/codeLensProvider.ts index 537086c..e67d245 100644 --- a/src/kafka-file/codeLensProvider.ts +++ b/src/kafka-file/codeLensProvider.ts @@ -136,6 +136,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod let topicId; let key; let value = ""; + let keyFormat; + let valueFormat; for (let currentLine = range.start.line + 1; currentLine <= range.end.line; currentLine++) { const lineText = document.lineAt(currentLine).text; @@ -153,6 +155,16 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod continue; } + if (lineText.startsWith("key-format:")) { + keyFormat = lineText.substr("key-format:".length).trim(); + continue; + } + + if (lineText.startsWith("value-format:")) { + valueFormat = lineText.substr("value-format:".length).trim(); + continue; + } + value = document.getText(new vscode.Range(currentLine, 0, range.end.line + 1, 0)).trim(); break; } @@ -161,7 +173,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod topicId, key, value, - }; + messageKeyFormat: keyFormat, + messageValueFormat: valueFormat + } as ProduceRecordCommand; } private createConsumerLens(lineRange: vscode.Range, range: vscode.Range, document: vscode.TextDocument, clusterName: string | undefined): vscode.CodeLens[] { @@ -223,6 +237,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod let topicId; let partitions; let offset = ""; + let keyFormat; + let valueFormat; for (let currentLine = range.start.line; currentLine <= range.end.line; currentLine++) { const lineText = document.lineAt(currentLine).text; @@ -245,6 +261,17 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod partitions = lineText.substr("partitions:".length).trim(); continue; } + + if (lineText.startsWith("key-format:")) { + keyFormat = lineText.substr("key-format:".length).trim(); + continue; + } + + if (lineText.startsWith("value-format:")) { + valueFormat = lineText.substr("value-format:".length).trim(); + continue; + } + break; } return { @@ -252,7 +279,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod consumerGroupId, topicId, fromOffset: offset, - partitions + partitions, + messageKeyFormat: keyFormat, + messageValueFormat: valueFormat } as LaunchConsumerCommand; } } diff --git a/src/test/suite/client/consumer.test.ts b/src/test/suite/client/consumer.test.ts index edeb0a1..eecce98 100644 --- a/src/test/suite/client/consumer.test.ts +++ b/src/test/suite/client/consumer.test.ts @@ -39,21 +39,21 @@ suite("Extract consumer URI Test Suite", () => { test("Consumer URI simple", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: undefined } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id' } ); }); test("Consumer URI with offset", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&from=1`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1', partitions: undefined } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1' } ); }); test("Consumer URI with partitions", () => { assert.deepStrictEqual( extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&partitions=0-5`)), - { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: '0-5' } + { clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', partitions: '0-5' } ); }); diff --git a/src/test/suite/extension.test.ts b/src/test/suite/extension.test.ts index ab4b55c..5f6eccd 100644 --- a/src/test/suite/extension.test.ts +++ b/src/test/suite/extension.test.ts @@ -3,12 +3,12 @@ import { before } from "mocha"; // You can import and use all API from the 'vscode' module // as well as import your extension to test it -import * as vscode from "vscode"; +//import * as vscode from "vscode"; // import * as myExtension from '../extension'; suite("Extension Test Suite", () => { before(() => { - vscode.window.showInformationMessage("Start all tests."); + // vscode.window.showInformationMessage("Start all tests."); }); test("Sample test", () => { diff --git a/syntaxes/kafka.tmLanguage.json b/syntaxes/kafka.tmLanguage.json index 7c7df67..5341850 100644 --- a/syntaxes/kafka.tmLanguage.json +++ b/syntaxes/kafka.tmLanguage.json @@ -50,7 +50,7 @@ ] }, "kafka.consumer.header": { - "begin": "(?i)^(?:(topic|from|partitions))", + "begin": "(?i)^(?:(topic|key-format|value-format|from|partitions))", "end": "$", "name": "meta.consumer.header.kafka", "beginCaptures": { @@ -98,7 +98,7 @@ ] }, "kafka.producer.header": { - "begin": "(?i)^(?:(topic|key))", + "begin": "(?i)^(?:(topic|key-format|value-format|key))", "end": "$", "name": "meta.producer.header.kafka", "beginCaptures": { diff --git a/tsconfig.json b/tsconfig.json index 593386d..76f6769 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,8 @@ "target": "es6", "outDir": "out", "lib": [ - "es6" + "es6", + "DOM" ], "sourceMap": true, "rootDir": "src",