diff --git a/src/client/consumer.ts b/src/client/consumer.ts index 59fb9294..f5a41bda 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -4,7 +4,7 @@ import * as vscode from "vscode"; import { ClientAccessor } from "."; import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings"; import { addQueryParameter, Client, ConnectionOptions } from "./client"; -import { deserialize, MessageFormat, SerializationdResult } from "./serialization"; +import { deserialize, MessageFormat, SerializationdResult, SerializationSetting } from "./serialization"; interface ConsumerOptions extends ConnectionOptions { consumerGroupId: string; @@ -12,7 +12,9 @@ interface ConsumerOptions extends ConnectionOptions { fromOffset: InitialConsumerOffset | string; partitions?: number[]; messageKeyFormat?: MessageFormat; + messageKeyFormatSettings?: SerializationSetting[]; messageValueFormat?: MessageFormat; + messageValueFormatSettings?: SerializationSetting[]; } export interface RecordReceivedEvent { @@ -62,7 +64,7 @@ export class Consumer implements vscode.Disposable { public error: any; constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings, private clientAccessor: ClientAccessor) { - const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri); + const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageKeyFormatSettings, messageValueFormat,messageValueFormatSettings } = extractConsumerInfoUri(uri); this.clusterId = clusterId; const cluster = clusterSettings.get(clusterId); @@ -81,7 +83,9 @@ export class Consumer implements vscode.Disposable { fromOffset: fromOffset || settings.consumerOffset, partitions: parsePartitions(partitions), messageKeyFormat, - messageValueFormat + messageKeyFormatSettings, + messageValueFormat, + messageValueFormatSettings }; } catch (e) { @@ -114,8 +118,8 @@ export class Consumer implements vscode.Disposable { this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { - message.key = deserialize(message.key, this.options.messageKeyFormat); - message.value = deserialize(message.value, this.options.messageValueFormat); + message.key = deserialize(message.key, this.options.messageKeyFormat, this.options.messageKeyFormatSettings); + message.value = deserialize(message.value, this.options.messageValueFormat, this.options.messageValueFormatSettings); this.onDidReceiveMessageEmitter.fire({ uri: this.uri, record: { topic: topic, partition: partition, ...message }, @@ -360,14 +364,18 @@ export interface ConsumerInfoUri { fromOffset?: string; partitions?: string; messageKeyFormat?: MessageFormat; + messageKeyFormatSettings?: SerializationSetting[]; messageValueFormat?: MessageFormat; + messageValueFormatSettings?: SerializationSetting[]; } const TOPIC_QUERY_PARAMETER = 'topic'; const FROM_QUERY_PARAMETER = 'from'; const PARTITIONS_QUERY_PARAMETER = 'partitions'; const KEY_FORMAT_QUERY_PARAMETER = 'key'; +const KEY_FORMAT_SETTINGS_QUERY_PARAMETER = 'key-settings'; const VALUE_FORMAT_QUERY_PARAMETER = 'value'; +const VALUE_FORMAT_SETTINGS_QUERY_PARAMETER = 'value-settings'; export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { const path = `kafka:${info.clusterId}/${info.consumerGroupId}`; @@ -376,7 +384,9 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri { query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset); query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions); query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat); + query = addQueryParameter(query, KEY_FORMAT_SETTINGS_QUERY_PARAMETER, info.messageKeyFormatSettings?.map(p => p.value).join(',')); query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat); + query = addQueryParameter(query, VALUE_FORMAT_SETTINGS_QUERY_PARAMETER, info.messageValueFormatSettings?.map(p => p.value).join(',')); return vscode.Uri.parse(path + query); } @@ -387,7 +397,9 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { const from = urlParams.get(FROM_QUERY_PARAMETER); const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER); const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER); + const messageKeyFormatSettings = urlParams.get(KEY_FORMAT_SETTINGS_QUERY_PARAMETER); const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER); + const messageValueFormatSettings = urlParams.get(VALUE_FORMAT_SETTINGS_QUERY_PARAMETER); const result: ConsumerInfoUri = { clusterId, consumerGroupId, @@ -402,9 +414,19 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri { if (messageKeyFormat && messageKeyFormat.trim().length > 0) { result.messageKeyFormat = messageKeyFormat as MessageFormat; } + if (messageKeyFormatSettings) { + const settings = messageKeyFormatSettings.split(','). + map(value => { value }); + result.messageKeyFormatSettings = settings; + } if (messageValueFormat && messageValueFormat.trim().length > 0) { result.messageValueFormat = messageValueFormat as MessageFormat; } + if (messageValueFormatSettings) { + const settings = messageValueFormatSettings.split(','). + map(value => { value }); + result.messageValueFormatSettings = settings; + } return result; } diff --git a/src/client/serialization.ts b/src/client/serialization.ts index d04b071b..45a5a544 100644 --- a/src/client/serialization.ts +++ b/src/client/serialization.ts @@ -1,18 +1,23 @@ -export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" ; +export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short"; export type SerializationdResult = any | Error; export class SerializationException extends Error { } +export interface SerializationSetting { + name?: string; + value?: string; +} + // ---------------- Serializers ---------------- interface Serializer { - serialize(data: string): Buffer | string | null; + serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null; } const serializerRegistry: Map = new Map(); -export function serialize(data?: string, format?: MessageFormat): Buffer | string | null { +export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null { if (!data || !format) { return data || null; } @@ -20,7 +25,7 @@ export function serialize(data?: string, format?: MessageFormat): Buffer | strin if (!serializer) { throw new SerializationException(`Cannot find a serializer for ${format} format.`); } - return serializer.serialize(data); + return serializer.serialize(data, settings); } function getSerializer(format: MessageFormat): Serializer | undefined { @@ -79,7 +84,11 @@ class ShortSerializer implements Serializer { class StringSerializer implements Serializer { - serialize(value: string): Buffer | string | null { + serialize(value: string, settings?: SerializationSetting[]): Buffer | string | null { + const encoding = settings?.[0].value; + if (encoding) { + return Buffer.from(value, encoding); + } return value; }; } @@ -94,12 +103,12 @@ serializerRegistry.set("string", new StringSerializer()); // ---------------- Deserializers ---------------- interface Deserializer { - deserialize(data: Buffer): any; + deserialize(data: Buffer, settings?: SerializationSetting[]): any; } const deserializerRegistry: Map = new Map(); -export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null { +export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null { if (data === null || !format) { return data; } @@ -111,7 +120,7 @@ export function deserialize(data: Buffer | null, format?: MessageFormat): Serial if (!deserializer) { throw new SerializationException(`Cannot find a deserializer for ${format} format.`); } - return deserializer.deserialize(data); + return deserializer.deserialize(data, settings); } catch (e) { return e; @@ -189,11 +198,12 @@ class ShortDeserializer implements Deserializer { class StringDeserializer implements Deserializer { - deserialize(data: Buffer | null): any { + deserialize(data: Buffer | null, settings?: SerializationSetting[]): any { if (data === null) { return null; } - return data.toString(); + const encoding = settings?.[0].value; + return data.toString(encoding); } } diff --git a/src/commands/producers.ts b/src/commands/producers.ts index 0afe0a90..c9310298 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -7,7 +7,7 @@ import { OutputChannelProvider } from "../providers/outputChannelProvider"; import { KafkaExplorer } from "../explorer"; import { WorkspaceSettings } from "../settings"; import { pickClient } from "./common"; -import { MessageFormat, serialize } from "../client/serialization"; +import { MessageFormat, SerializationSetting, serialize } from "../client/serialization"; import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer"; import { ProducerRecord } from "kafkajs"; import { ProducerValidator } from "../validators/producer"; @@ -15,7 +15,9 @@ import { getErrorMessage } from "../errors"; export interface ProduceRecordCommand extends ProducerInfoUri { messageKeyFormat?: MessageFormat; + messageKeyFormatSettings?: SerializationSetting[]; messageValueFormat?: MessageFormat; + messageValueFormatSettings?: SerializationSetting[]; } export class ProduceRecordCommandHandler { @@ -61,15 +63,15 @@ export class ProduceRecordCommandHandler { faker.seed(seed); const randomizedValue = faker.fake(value); return { - key: serialize(randomizedKey, command.messageKeyFormat), - value: serialize(randomizedValue, command.messageValueFormat) + key: serialize(randomizedKey, command.messageKeyFormat, command.messageKeyFormatSettings), + value: serialize(randomizedValue, command.messageValueFormat, command.messageValueFormatSettings) }; } // Return key/value message as-is return { - key: serialize(key, command.messageKeyFormat), - value: serialize(value, command.messageValueFormat) + key: serialize(key, command.messageKeyFormat, command.messageKeyFormatSettings), + value: serialize(value, command.messageValueFormat, command.messageValueFormatSettings) }; }); diff --git a/src/kafka-file/languageservice/parser/kafkaFileParser.ts b/src/kafka-file/languageservice/parser/kafkaFileParser.ts index db3fc849..1c391171 100644 --- a/src/kafka-file/languageservice/parser/kafkaFileParser.ts +++ b/src/kafka-file/languageservice/parser/kafkaFileParser.ts @@ -12,7 +12,8 @@ export enum NodeKind { propertyKey, propertyAssigner, propertyValue, - mustacheExpression + mustacheExpression, + parameter } export interface Node { start: Position; @@ -178,7 +179,7 @@ export class Property extends BaseNode { return true; } - findNodeAt(position : Position) : Node { + findNodeAt(position: Position): Node { if (this.isBeforeAssigner(position)) { return this.key?.findNodeAt(position) || this; } @@ -242,6 +243,34 @@ export class DynamicChunk extends ChildrenNode { } +export class Parameter extends Chunk { + name?: string; + + public get value() : string { + return this.content?.trim(); + } + +} + +export class MethodChunk extends ChildrenNode { + startParametersCharacter?: number; + endParametersCharacter?: number; + + constructor(public readonly content: string, start: Position, end: Position, kind: NodeKind) { + super(start, end, kind); + parseParameters(this); + } + + public get methodName() : string { + return this.startParametersCharacter ? this.content.substring(0, this.startParametersCharacter - this.start.character).trim() : this.content.trim(); + } + + public get parameters(): Array { + return this.children; + } + +} + /** * Mustache expression AST (ex : {{random.words}}) */ @@ -459,11 +488,18 @@ function createProperty(lineText: string, lineNumber: number, parent: Block): Pr const content = lineText.substr(start, end); if (withinValue) { const propertyName = propertyKey?.content.trim(); - if (propertyName === 'key') { - propertyValue = new DynamicChunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue); - } else { - propertyValue = new Chunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue); - } + switch (propertyName) { + case "key": + propertyValue = new DynamicChunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue); + break; + case "key-format": + case "value-format": + propertyValue = new MethodChunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue); + break; + default: + propertyValue = new Chunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue); + break; + } } else { propertyKey = new Chunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyKey); } @@ -483,6 +519,66 @@ export class ExpressionEdge { } } +function parseParameters(parent: MethodChunk) { + const content = parent.content; + let startLine = parent.start.line; + let startColumn = parent.start.character; + let currentLine = startLine; + let currentColumn = startColumn; + let previousChar: string | undefined; + let startParameter: number | undefined; + + function addParameterIfNeeded() { + if (startParameter) { + const value = content.substring(startParameter - startColumn, currentColumn - startColumn); + const start = new Position(currentLine, startParameter); + const end = new Position(currentLine, currentColumn + 1); + parent.addChild(new Parameter(value, start, end, NodeKind.parameter)); + } + } + + for (let currentOffset = 0; currentOffset < content.length; currentOffset++) { + const currentChar = content[currentOffset]; + switch (currentChar) { + case '\r': + // compute line, column position + currentLine++; + currentColumn = 0; + break; + case '\n': { + if (previousChar !== '\r') { + // compute line, column position + currentLine++; + currentColumn = 0; + } + break; + } + case '(': { + if (!parent.startParametersCharacter) { + parent.startParametersCharacter = currentColumn; + startParameter = currentColumn + 1; + } + break; + } + case ')': { + parent.endParametersCharacter = currentColumn; + addParameterIfNeeded(); + startParameter = undefined; + break; + } + case ',': { + addParameterIfNeeded(); + startParameter = currentColumn + 1; + break; + } + } + if (currentChar !== '\r' && currentChar !== '\n') { + currentColumn++; + } + } + addParameterIfNeeded(); +} + function parseMustacheExpressions(parent: DynamicChunk) { const content = parent.content; let startLine = parent.start.line; @@ -532,7 +628,7 @@ function parseMustacheExpressions(parent: DynamicChunk) { } // 2. create mustache expression AST by visiting collected edges - + let previousEdge = new ExpressionEdge(new Position(startLine, startColumn), 0, true); const endOfValueEdge = new ExpressionEdge(new Position(currentLine, currentColumn), currentOffset, false); for (let i = 0; i < edges.length; i++) { @@ -548,7 +644,7 @@ function parseMustacheExpressions(parent: DynamicChunk) { const openedEdge = currentEdge; let closedEdge = endOfValueEdge; - let closed = false; + let closed = false; if (matchingClosedEdge) { // '}}' has been found closed = true; diff --git a/src/kafka-file/languageservice/services/codeLensProvider.ts b/src/kafka-file/languageservice/services/codeLensProvider.ts index 54633634..28926fa8 100644 --- a/src/kafka-file/languageservice/services/codeLensProvider.ts +++ b/src/kafka-file/languageservice/services/codeLensProvider.ts @@ -3,7 +3,7 @@ import { ClientState, ConsumerLaunchState } from "../../../client"; import { createProducerUri, ProducerLaunchState } from "../../../client/producer"; import { LaunchConsumerCommand, ProduceRecordCommand, ProduceRecordCommandHandler, SelectClusterCommandHandler, StartConsumerCommandHandler, StopConsumerCommandHandler } from "../../../commands"; import { ProducerLaunchStateProvider, ConsumerLaunchStateProvider, SelectedClusterProvider } from "../kafkaFileLanguageService"; -import { Block, BlockType, ConsumerBlock, KafkaFileDocument, ProducerBlock } from "../parser/kafkaFileParser"; +import { Block, BlockType, ConsumerBlock, KafkaFileDocument, MethodChunk, ProducerBlock } from "../parser/kafkaFileParser"; /** * Kafka file codeLens support. @@ -43,7 +43,7 @@ export class KafkaFileCodeLenses { getClusterStatus(state: ClientState | undefined) { switch (state) { case ClientState.disconnected: - return `$(eye-closed) `; + return `$(eye-closed) `; case ClientState.connecting: return `$(sync~spin) `; case ClientState.connected: @@ -110,7 +110,9 @@ export class KafkaFileCodeLenses { let key; let value = block.value?.content; let keyFormat; + let keyFormatSettings; let valueFormat; + let valueFormatSettings; block.properties.forEach(property => { switch (property.propertyName) { case 'topic': @@ -120,10 +122,12 @@ export class KafkaFileCodeLenses { key = property.propertyValue; break; case 'key-format': - keyFormat = property.propertyValue; + keyFormat = (property.value).methodName; + keyFormatSettings = (property.value).parameters; break; case 'value-format': - valueFormat = property.propertyValue; + valueFormat = (property.value).methodName; + valueFormatSettings = (property.value).parameters; break; } }); @@ -133,7 +137,9 @@ export class KafkaFileCodeLenses { key, value, messageKeyFormat: keyFormat, - messageValueFormat: valueFormat + messageKeyFormatSettings: keyFormatSettings, + messageValueFormat: valueFormat, + messageValueFormatSettings: valueFormatSettings } as ProduceRecordCommand; } @@ -193,7 +199,9 @@ export class KafkaFileCodeLenses { let partitions; let offset; let keyFormat; + let keyFormatSettings; let valueFormat; + let valueFormatSettings; block.properties.forEach(property => { switch (property.propertyName) { case 'topic': @@ -206,10 +214,12 @@ export class KafkaFileCodeLenses { partitions = property.propertyValue; break; case 'key-format': - keyFormat = property.propertyValue; + keyFormat = (property.value).methodName; + keyFormatSettings = (property.value).parameters; break; case 'value-format': - valueFormat = property.propertyValue; + valueFormat = (property.value).methodName; + valueFormatSettings = (property.value).parameters; break; } }); @@ -221,7 +231,9 @@ export class KafkaFileCodeLenses { fromOffset: offset, partitions, messageKeyFormat: keyFormat, - messageValueFormat: valueFormat + messageValueFormat: valueFormat, + messageKeyFormatSettings: keyFormatSettings, + messageValueFormatSettings: valueFormatSettings } as LaunchConsumerCommand; } } diff --git a/src/kafka-file/languageservice/services/completion.ts b/src/kafka-file/languageservice/services/completion.ts index 36aec62a..3ac358b1 100644 --- a/src/kafka-file/languageservice/services/completion.ts +++ b/src/kafka-file/languageservice/services/completion.ts @@ -1,7 +1,14 @@ import { TextDocument, Position, CompletionList, CompletionItem, SnippetString, MarkdownString, CompletionItemKind, Range } from "vscode"; import { createTopicDocumentation, SelectedClusterProvider, TopicProvider } from "../kafkaFileLanguageService"; import { consumerModel, fakerjsAPIModel, Model, ModelDefinition, producerModel } from "../model"; -import { Block, BlockType, Chunk, ConsumerBlock, KafkaFileDocument, MustacheExpression, NodeKind, ProducerBlock, Property } from "../parser/kafkaFileParser"; +import { Block, BlockType, Chunk, ConsumerBlock, KafkaFileDocument, MethodChunk, MustacheExpression, NodeKind, Parameter, ProducerBlock, Property } from "../parser/kafkaFileParser"; + +/** + * Supported encoding by nodejs Buffer. + * + * @see https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings + */ +const bufferEncoding = ["ascii" , "utf8" , "utf-8" , "utf16le" , "ucs2" , "ucs-2" , "base64" , "latin1" , "binary" , "hex"]; /** * Kafka file completion support. @@ -18,7 +25,7 @@ export class KafkaFileCompletion { return; } - // Following comments with use the '|' character to show the position where the complation is trigerred + // Following comments with use the '|' character to show the position where the compilation is trigerred const items: Array = []; switch (node.kind) { case NodeKind.consumerBlock: { @@ -112,6 +119,12 @@ export class KafkaFileCompletion { this.collectFakerJSExpressions(expression, producerFakerJSEnabled, position, items); break; } + case NodeKind.parameter: { + // key-format: string(|) + const parameter = node; + this.collectMethodParameters(parameter, position, items); + break; + } } return new CompletionList(items, true); } @@ -202,6 +215,11 @@ export class KafkaFileCompletion { } const insertText = new SnippetString(' '); insertText.appendText(value); + if (value === 'string' && (propertyName === 'key-format' || propertyName === 'value-format')) { + insertText.appendText('('); + insertText.appendChoice(bufferEncoding); + insertText.appendText(')'); + } item.insertText = insertText; item.range = valueRange; items.push(item); @@ -275,6 +293,21 @@ export class KafkaFileCompletion { }*/ } } + + collectMethodParameters(parameter: Parameter, position: Position, items: CompletionItem[]) { + const method = parameter.parent; + switch(method.methodName) { + case 'string': { + const range = parameter.range(); + bufferEncoding.forEach(encoding => { + const item = new CompletionItem(encoding); + item.kind = CompletionItemKind.EnumMember; + item.range = range; + items.push(item); + }); + } + } + } } function createMarkdownString(contents : string) { diff --git a/src/kafka-file/languageservice/services/diagnostics.ts b/src/kafka-file/languageservice/services/diagnostics.ts index 94e07152..e61ba3c0 100644 --- a/src/kafka-file/languageservice/services/diagnostics.ts +++ b/src/kafka-file/languageservice/services/diagnostics.ts @@ -1,5 +1,5 @@ import { Diagnostic, DiagnosticSeverity, Position, Range, TextDocument } from "vscode"; -import { Block, BlockType, ConsumerBlock, DynamicChunk, KafkaFileDocument, MustacheExpression, ProducerBlock, Property } from "../parser/kafkaFileParser"; +import { Block, BlockType, Chunk, ConsumerBlock, DynamicChunk, KafkaFileDocument, MethodChunk, MustacheExpression, ProducerBlock, Property } from "../parser/kafkaFileParser"; import { ConsumerValidator } from "../../../validators/consumer"; import { ProducerValidator } from "../../../validators/producer"; import { CommonsValidator } from "../../../validators/commons"; @@ -236,7 +236,7 @@ export class KafkaFileDiagnostics { if (!range) { return; } - const errorMessage = await this.validateValue(propertyName, type, propertyValue); + const errorMessage = await this.validateValue(propertyName, type, property.value); if (errorMessage) { diagnostics.push(new Diagnostic(range, errorMessage, DiagnosticSeverity.Error)); } @@ -249,18 +249,20 @@ export class KafkaFileDiagnostics { } } - private async validateValue(propertyName: string, type: BlockType, propertyValue?: string): Promise { + private async validateValue(propertyName: string, type: BlockType, propertyValue?: Chunk): Promise { switch (propertyName) { case 'topic': - return CommonsValidator.validateTopic(propertyValue); + return CommonsValidator.validateTopic(propertyValue?.content.trim()); case 'key-format': - return type === BlockType.consumer ? ConsumerValidator.validateKeyFormat(propertyValue) : ProducerValidator.validateKeyFormat(propertyValue); + const keyFormat = (propertyValue).methodName; + return type === BlockType.consumer ? ConsumerValidator.validateKeyFormat(keyFormat) : ProducerValidator.validateKeyFormat(keyFormat); case 'value-format': - return type === BlockType.consumer ? ConsumerValidator.validateValueFormat(propertyValue) : ProducerValidator.validateValueFormat(propertyValue); + const valueFormat = (propertyValue).methodName; + return type === BlockType.consumer ? ConsumerValidator.validateValueFormat(valueFormat) : ProducerValidator.validateValueFormat(valueFormat); case 'from': - return ConsumerValidator.validateOffset(propertyValue); + return ConsumerValidator.validateOffset(propertyValue?.content.trim()); case 'partitions': { - return ConsumerValidator.validatePartitions(propertyValue); + return ConsumerValidator.validatePartitions(propertyValue?.content.trim()); } } }