Skip to content

Commit

Permalink
String Encoding serialization support
Browse files Browse the repository at this point in the history
Fixes #181

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed May 11, 2021
1 parent abd60c0 commit c66fc7d
Show file tree
Hide file tree
Showing 20 changed files with 652 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to `Tools for Apache Kafka®` are documented in this file.
- Validation for available topics in `.kafka` files. See [#153](https://github.com/jlandersen/vscode-kafka/issues/153).
- Simplify snippets. See [#180](https://github.com/jlandersen/vscode-kafka/pull/180).
- Hover support in `.kafka` files. See [#149](https://github.com/jlandersen/vscode-kafka/issues/149).
- String encoding serialization support. See [#181](https://github.com/jlandersen/vscode-kafka/issues/181).

## [0.12.0] - 2021-04-26
### Added
Expand Down
11 changes: 5 additions & 6 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ Once this command is launched, it creates a consumer group (with an auto-generat

In this case, the starting offset can be only be configured via the [kafka.consumers.offset](#kafkaconsumersoffset) preference.

Known limitations:

* UTF-8 encoded keys and values only. If data is encoded differently, it will not be pretty.
* One consumer group is created per topic (may change in the future to just have one for the extension).

### Kafka file

Define simple consumers in a `.kafka` file, using the following format:
Expand Down Expand Up @@ -58,7 +53,7 @@ The `CONSUMER` block defines:
The deserializers can have the following value:

* `none`: no deserializer (ignores content).
* `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java) which currently only supports `UTF-8` encoding.
* `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings).
* `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java).
* `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java).
* `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java).
Expand All @@ -85,6 +80,10 @@ Completion is available for

![Property value completion](assets/kafka-file-consumer-property-value-completion.png)

* string encoding:

![String encoding completion](assets/kafka-file-consumer-string-encoding-completion.png)

* topic:

![Topic completion](assets/kafka-file-consumer-topic-completion.png)
Expand Down
6 changes: 5 additions & 1 deletion docs/Producing.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The `PRODUCER` block defines:

The serializers can have the following value:

* `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java) which currently only supports `UTF-8` encoding.
* `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings).
* `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java).
* `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java).
* `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java).
Expand All @@ -59,6 +59,10 @@ Completion is available for

![Property value completion](assets/kafka-file-producer-property-value-completion.png)

* string encoding:

![String encoding completion](assets/kafka-file-producer-string-encoding-completion.png)

* randomized content (see following section):

![FakerJS completion](assets/kafka-file-producer-fakerjs-completion.png)
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
32 changes: 27 additions & 5 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import * as vscode from "vscode";
import { ClientAccessor } from ".";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { addQueryParameter, Client, ConnectionOptions } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";
import { deserialize, MessageFormat, SerializationdResult, SerializationSetting } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageKeyFormatSettings?: SerializationSetting[];
messageValueFormat?: MessageFormat;
messageValueFormatSettings?: SerializationSetting[];
}

export interface RecordReceivedEvent {
Expand Down Expand Up @@ -62,7 +64,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings, private clientAccessor: ClientAccessor) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageKeyFormatSettings, messageValueFormat,messageValueFormatSettings } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -81,7 +83,9 @@ export class Consumer implements vscode.Disposable {
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
messageKeyFormatSettings,
messageValueFormat,
messageValueFormatSettings
};
}
catch (e) {
Expand Down Expand Up @@ -114,8 +118,8 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
message.key = deserialize(message.key, this.options.messageKeyFormat);
message.value = deserialize(message.value, this.options.messageValueFormat);
message.key = deserialize(message.key, this.options.messageKeyFormat, this.options.messageKeyFormatSettings);
message.value = deserialize(message.value, this.options.messageValueFormat, this.options.messageValueFormatSettings);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
Expand Down Expand Up @@ -360,14 +364,18 @@ export interface ConsumerInfoUri {
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageKeyFormatSettings?: SerializationSetting[];
messageValueFormat?: MessageFormat;
messageValueFormatSettings?: SerializationSetting[];
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const KEY_FORMAT_SETTINGS_QUERY_PARAMETER = 'key-settings';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';
const VALUE_FORMAT_SETTINGS_QUERY_PARAMETER = 'value-settings';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
Expand All @@ -376,7 +384,9 @@ export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, KEY_FORMAT_SETTINGS_QUERY_PARAMETER, info.messageKeyFormatSettings?.map(p => p.value).join(','));
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
query = addQueryParameter(query, VALUE_FORMAT_SETTINGS_QUERY_PARAMETER, info.messageValueFormatSettings?.map(p => p.value).join(','));
return vscode.Uri.parse(path + query);
}

Expand All @@ -387,7 +397,9 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageKeyFormatSettings = urlParams.get(KEY_FORMAT_SETTINGS_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
const messageValueFormatSettings = urlParams.get(VALUE_FORMAT_SETTINGS_QUERY_PARAMETER);
const result: ConsumerInfoUri = {
clusterId,
consumerGroupId,
Expand All @@ -402,9 +414,19 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
if (messageKeyFormat && messageKeyFormat.trim().length > 0) {
result.messageKeyFormat = messageKeyFormat as MessageFormat;
}
if (messageKeyFormatSettings) {
const settings = messageKeyFormatSettings.split(',').
map(value => <SerializationSetting>{ value });
result.messageKeyFormatSettings = settings;
}
if (messageValueFormat && messageValueFormat.trim().length > 0) {
result.messageValueFormat = messageValueFormat as MessageFormat;
}
if (messageValueFormatSettings) {
const settings = messageValueFormatSettings.split(',').
map(value => <SerializationSetting>{ value });
result.messageValueFormatSettings = settings;
}
return result;
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export interface ProducerInfoUri {
clusterId: string;
topicId?: string;
key?: string;
value: string;
value?: string;
}

const TOPIC_QUERY_PARAMETER = 'topic';
Expand Down
30 changes: 20 additions & 10 deletions src/client/serialization.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" ;
export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short";

export type SerializationdResult = any | Error;

export class SerializationException extends Error { }

export interface SerializationSetting {
name?: string;
value?: string;
}

// ---------------- Serializers ----------------

interface Serializer {
serialize(data: string): Buffer | string | null;
serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null;
}

const serializerRegistry: Map<MessageFormat, Serializer> = new Map();

export function serialize(data?: string, format?: MessageFormat): Buffer | string | null {
export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null {
if (!data || !format) {
return data || null;
}
const serializer = getSerializer(format);
if (!serializer) {
throw new SerializationException(`Cannot find a serializer for ${format} format.`);
}
return serializer.serialize(data);
return serializer.serialize(data, settings);
}

function getSerializer(format: MessageFormat): Serializer | undefined {
Expand Down Expand Up @@ -79,7 +84,11 @@ class ShortSerializer implements Serializer {

class StringSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
serialize(value: string, settings?: SerializationSetting[]): Buffer | string | null {
const encoding = settings?.[0].value;
if (encoding) {
return Buffer.from(value, <BufferEncoding>encoding);
}
return value;
};
}
Expand All @@ -94,12 +103,12 @@ serializerRegistry.set("string", new StringSerializer());
// ---------------- Deserializers ----------------

interface Deserializer {
deserialize(data: Buffer): any;
deserialize(data: Buffer, settings?: SerializationSetting[]): any;
}

const deserializerRegistry: Map<MessageFormat, Deserializer> = new Map();

export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null {
export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null {
if (data === null || !format) {
return data;
}
Expand All @@ -111,7 +120,7 @@ export function deserialize(data: Buffer | null, format?: MessageFormat): Serial
if (!deserializer) {
throw new SerializationException(`Cannot find a deserializer for ${format} format.`);
}
return deserializer.deserialize(data);
return deserializer.deserialize(data, settings);
}
catch (e) {
return e;
Expand Down Expand Up @@ -189,11 +198,12 @@ class ShortDeserializer implements Deserializer {

class StringDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
deserialize(data: Buffer | null, settings?: SerializationSetting[]): any {
if (data === null) {
return null;
}
return data.toString();
const encoding = settings?.[0].value;
return data.toString(encoding);
}
}

Expand Down
16 changes: 11 additions & 5 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import { OutputChannelProvider } from "../providers/outputChannelProvider";
import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";
import { MessageFormat, SerializationSetting, serialize } from "../client/serialization";
import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer";
import { ProducerRecord } from "kafkajs";
import { ProducerValidator } from "../validators/producer";
import { getErrorMessage } from "../errors";

export interface ProduceRecordCommand extends ProducerInfoUri {
messageKeyFormat?: MessageFormat;
messageKeyFormatSettings?: SerializationSetting[];
messageValueFormat?: MessageFormat;
messageValueFormatSettings?: SerializationSetting[];
}

export class ProduceRecordCommandHandler {
Expand Down Expand Up @@ -46,6 +48,10 @@ export class ProduceRecordCommandHandler {
channel.appendLine("No topic");
return;
}
if (value === undefined) {
channel.appendLine("No value");
return;
}
if (this.settings.producerFakerJSEnabled) {
faker.setLocale(this.settings.producerFakerJSLocale);
}
Expand All @@ -61,15 +67,15 @@ export class ProduceRecordCommandHandler {
faker.seed(seed);
const randomizedValue = faker.fake(value);
return {
key: serialize(randomizedKey, command.messageKeyFormat),
value: serialize(randomizedValue, command.messageValueFormat)
key: serialize(randomizedKey, command.messageKeyFormat, command.messageKeyFormatSettings),
value: serialize(randomizedValue, command.messageValueFormat, command.messageValueFormatSettings)
};
}

// Return key/value message as-is
return {
key: serialize(key, command.messageKeyFormat),
value: serialize(value, command.messageValueFormat)
key: serialize(key, command.messageKeyFormat, command.messageKeyFormatSettings),
value: serialize(value, command.messageValueFormat, command.messageValueFormatSettings)
};
});

Expand Down
2 changes: 1 addition & 1 deletion src/kafka-file/kafkaFileClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export function startLanguageClient(
// Completion
const completion = new KafkaFileCompletionItemProvider(kafkaFileDocuments, languageService, workspaceSettings);
context.subscriptions.push(
vscode.languages.registerCompletionItemProvider(documentSelector, completion, ':', '{', '.')
vscode.languages.registerCompletionItemProvider(documentSelector, completion, ':', '{', '.', '(')
);

// Validation
Expand Down
Loading

0 comments on commit c66fc7d

Please sign in to comment.