Skip to content

Commit

Permalink
String Encoding serialization support
Browse files Browse the repository at this point in the history
Fixes jlandersen#181

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed May 7, 2021
1 parent abd60c0 commit 03d8283
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 47 deletions.
32 changes: 27 additions & 5 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import * as vscode from "vscode";
import { ClientAccessor } from ".";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { addQueryParameter, Client, ConnectionOptions } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";
import { deserialize, MessageFormat, SerializationdResult, SerializationSetting } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageKeyFormatSettings?: SerializationSetting[];
messageValueFormat?: MessageFormat;
messageValueFormatSettings?: SerializationSetting[];
}

export interface RecordReceivedEvent {
Expand Down Expand Up @@ -62,7 +64,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings, private clientAccessor: ClientAccessor) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageKeyFormatSettings, messageValueFormat,messageValueFormatSettings } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -81,7 +83,9 @@ export class Consumer implements vscode.Disposable {
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
messageKeyFormatSettings,
messageValueFormat,
messageValueFormatSettings
};
}
catch (e) {
Expand Down Expand Up @@ -114,8 +118,8 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
message.key = deserialize(message.key, this.options.messageKeyFormat);
message.value = deserialize(message.value, this.options.messageValueFormat);
message.key = deserialize(message.key, this.options.messageKeyFormat, this.options.messageKeyFormatSettings);
message.value = deserialize(message.value, this.options.messageValueFormat, this.options.messageValueFormatSettings);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
Expand Down Expand Up @@ -360,14 +364,18 @@ export interface ConsumerInfoUri {
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageKeyFormatSettings?: SerializationSetting[];
messageValueFormat?: MessageFormat;
messageValueFormatSettings?: SerializationSetting[];
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const KEY_FORMAT_SETTINGS_QUERY_PARAMETER = 'key-settings';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';
const VALUE_FORMAT_SETTINGS_QUERY_PARAMETER = 'value-settings';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
Expand All @@ -376,7 +384,9 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, KEY_FORMAT_SETTINGS_QUERY_PARAMETER, info.messageKeyFormatSettings?.map(p => p.value).join(','));
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
query = addQueryParameter(query, VALUE_FORMAT_SETTINGS_QUERY_PARAMETER, info.messageValueFormatSettings?.map(p => p.value).join(','));
return vscode.Uri.parse(path + query);
}

Expand All @@ -387,7 +397,9 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageKeyFormatSettings = urlParams.get(KEY_FORMAT_SETTINGS_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
const messageValueFormatSettings = urlParams.get(VALUE_FORMAT_SETTINGS_QUERY_PARAMETER);
const result: ConsumerInfoUri = {
clusterId,
consumerGroupId,
Expand All @@ -402,9 +414,19 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
if (messageKeyFormat && messageKeyFormat.trim().length > 0) {
result.messageKeyFormat = messageKeyFormat as MessageFormat;
}
if (messageKeyFormatSettings) {
const settings = messageKeyFormatSettings.split(',').
map(value => <SerializationSetting>{ value });
result.messageKeyFormatSettings = settings;
}
if (messageValueFormat && messageValueFormat.trim().length > 0) {
result.messageValueFormat = messageValueFormat as MessageFormat;
}
if (messageValueFormatSettings) {
const settings = messageValueFormatSettings.split(',').
map(value => <SerializationSetting>{ value });
result.messageValueFormatSettings = settings;
}
return result;
}

Expand Down
30 changes: 20 additions & 10 deletions src/client/serialization.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" ;
export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short";

export type SerializationdResult = any | Error;

export class SerializationException extends Error { }

export interface SerializationSetting {
name?: string;
value?: string;
}

// ---------------- Serializers ----------------

interface Serializer {
serialize(data: string): Buffer | string | null;
serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null;
}

const serializerRegistry: Map<MessageFormat, Serializer> = new Map();

export function serialize(data?: string, format?: MessageFormat): Buffer | string | null {
export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null {
if (!data || !format) {
return data || null;
}
const serializer = getSerializer(format);
if (!serializer) {
throw new SerializationException(`Cannot find a serializer for ${format} format.`);
}
return serializer.serialize(data);
return serializer.serialize(data, settings);
}

function getSerializer(format: MessageFormat): Serializer | undefined {
Expand Down Expand Up @@ -79,7 +84,11 @@ class ShortSerializer implements Serializer {

class StringSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
serialize(value: string, settings?: SerializationSetting[]): Buffer | string | null {
const encoding = settings?.[0].value;
if (encoding) {
return Buffer.from(value, <BufferEncoding>encoding);
}
return value;
};
}
Expand All @@ -94,12 +103,12 @@ serializerRegistry.set("string", new StringSerializer());
// ---------------- Deserializers ----------------

interface Deserializer {
deserialize(data: Buffer): any;
deserialize(data: Buffer, settings?: SerializationSetting[]): any;
}

const deserializerRegistry: Map<MessageFormat, Deserializer> = new Map();

export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null {
export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null {
if (data === null || !format) {
return data;
}
Expand All @@ -111,7 +120,7 @@ export function deserialize(data: Buffer | null, format?: MessageFormat): Serial
if (!deserializer) {
throw new SerializationException(`Cannot find a deserializer for ${format} format.`);
}
return deserializer.deserialize(data);
return deserializer.deserialize(data, settings);
}
catch (e) {
return e;
Expand Down Expand Up @@ -189,11 +198,12 @@ class ShortDeserializer implements Deserializer {

class StringDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
deserialize(data: Buffer | null, settings?: SerializationSetting[]): any {
if (data === null) {
return null;
}
return data.toString();
const encoding = settings?.[0].value;
return data.toString(encoding);
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import { OutputChannelProvider } from "../providers/outputChannelProvider";
import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";
import { MessageFormat, SerializationSetting, serialize } from "../client/serialization";
import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer";
import { ProducerRecord } from "kafkajs";
import { ProducerValidator } from "../validators/producer";
import { getErrorMessage } from "../errors";

export interface ProduceRecordCommand extends ProducerInfoUri {
messageKeyFormat?: MessageFormat;
messageKeyFormatSettings?: SerializationSetting[];
messageValueFormat?: MessageFormat;
messageValueFormatSettings?: SerializationSetting[];
}

export class ProduceRecordCommandHandler {
Expand Down Expand Up @@ -61,15 +63,15 @@ export class ProduceRecordCommandHandler {
faker.seed(seed);
const randomizedValue = faker.fake(value);
return {
key: serialize(randomizedKey, command.messageKeyFormat),
value: serialize(randomizedValue, command.messageValueFormat)
key: serialize(randomizedKey, command.messageKeyFormat, command.messageKeyFormatSettings),
value: serialize(randomizedValue, command.messageValueFormat, command.messageValueFormatSettings)
};
}

// Return key/value message as-is
return {
key: serialize(key, command.messageKeyFormat),
value: serialize(value, command.messageValueFormat)
key: serialize(key, command.messageKeyFormat, command.messageKeyFormatSettings),
value: serialize(value, command.messageValueFormat, command.messageValueFormatSettings)
};
});

Expand Down
114 changes: 105 additions & 9 deletions src/kafka-file/languageservice/parser/kafkaFileParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export enum NodeKind {
propertyKey,
propertyAssigner,
propertyValue,
mustacheExpression
mustacheExpression,
parameter
}
export interface Node {
start: Position;
Expand Down Expand Up @@ -178,7 +179,7 @@ export class Property extends BaseNode {
return true;
}

findNodeAt(position : Position) : Node {
findNodeAt(position: Position): Node {
if (this.isBeforeAssigner(position)) {
return this.key?.findNodeAt(position) || this;
}
Expand Down Expand Up @@ -242,6 +243,34 @@ export class DynamicChunk extends ChildrenNode<MustacheExpression> {

}

export class Parameter extends Chunk {
name?: string;

public get value() : string {
return this.content?.trim();
}

}

export class MethodChunk extends ChildrenNode<Parameter> {
startParametersCharacter?: number;
endParametersCharacter?: number;

constructor(public readonly content: string, start: Position, end: Position, kind: NodeKind) {
super(start, end, kind);
parseParameters(this);
}

public get methodName() : string {
return this.startParametersCharacter ? this.content.substring(0, this.startParametersCharacter - this.start.character).trim() : this.content.trim();
}

public get parameters(): Array<Parameter> {
return this.children;
}

}

/**
* Mustache expression AST (ex : {{random.words}})
*/
Expand Down Expand Up @@ -459,11 +488,18 @@ function createProperty(lineText: string, lineNumber: number, parent: Block): Pr
const content = lineText.substr(start, end);
if (withinValue) {
const propertyName = propertyKey?.content.trim();
if (propertyName === 'key') {
propertyValue = new DynamicChunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue);
} else {
propertyValue = new Chunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue);
}
switch (propertyName) {
case "key":
propertyValue = new DynamicChunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue);
break;
case "key-format":
case "value-format":
propertyValue = new MethodChunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue);
break;
default:
propertyValue = new Chunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyValue);
break;
}
} else {
propertyKey = new Chunk(content, new Position(lineNumber, start), new Position(lineNumber, end), NodeKind.propertyKey);
}
Expand All @@ -483,6 +519,66 @@ export class ExpressionEdge {
}
}

function parseParameters(parent: MethodChunk) {
const content = parent.content;
let startLine = parent.start.line;
let startColumn = parent.start.character;
let currentLine = startLine;
let currentColumn = startColumn;
let previousChar: string | undefined;
let startParameter: number | undefined;

function addParameterIfNeeded() {
if (startParameter) {
const value = content.substring(startParameter - startColumn, currentColumn - startColumn);
const start = new Position(currentLine, startParameter);
const end = new Position(currentLine, currentColumn + 1);
parent.addChild(new Parameter(value, start, end, NodeKind.parameter));
}
}

for (let currentOffset = 0; currentOffset < content.length; currentOffset++) {
const currentChar = content[currentOffset];
switch (currentChar) {
case '\r':
// compute line, column position
currentLine++;
currentColumn = 0;
break;
case '\n': {
if (previousChar !== '\r') {
// compute line, column position
currentLine++;
currentColumn = 0;
}
break;
}
case '(': {
if (!parent.startParametersCharacter) {
parent.startParametersCharacter = currentColumn;
startParameter = currentColumn + 1;
}
break;
}
case ')': {
parent.endParametersCharacter = currentColumn;
addParameterIfNeeded();
startParameter = undefined;
break;
}
case ',': {
addParameterIfNeeded();
startParameter = currentColumn + 1;
break;
}
}
if (currentChar !== '\r' && currentChar !== '\n') {
currentColumn++;
}
}
addParameterIfNeeded();
}

function parseMustacheExpressions(parent: DynamicChunk) {
const content = parent.content;
let startLine = parent.start.line;
Expand Down Expand Up @@ -532,7 +628,7 @@ function parseMustacheExpressions(parent: DynamicChunk) {
}

// 2. create mustache expression AST by visiting collected edges

let previousEdge = new ExpressionEdge(new Position(startLine, startColumn), 0, true);
const endOfValueEdge = new ExpressionEdge(new Position(currentLine, currentColumn), currentOffset, false);
for (let i = 0; i < edges.length; i++) {
Expand All @@ -548,7 +644,7 @@ function parseMustacheExpressions(parent: DynamicChunk) {

const openedEdge = currentEdge;
let closedEdge = endOfValueEdge;
let closed = false;
let closed = false;
if (matchingClosedEdge) {
// '}}' has been found
closed = true;
Expand Down
Loading

0 comments on commit 03d8283

Please sign in to comment.