Skip to content

Commit

Permalink
Declare key/value format for PRODUCER in kafka file
Browse files Browse the repository at this point in the history
Fixes #113

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr authored and fbricon committed Mar 19, 2021
1 parent 446695b commit 91851f4
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ All notable changes to Kafka extension will be documented in this file.
### Added
- Extension API to contribute clusters. See [#123](https://github.com/jlandersen/vscode-kafka/issues/123).
- declare key/value formats for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112).
- declare key/value formats for PRODUCER in kafka file. See [#113](https://github.com/jlandersen/vscode-kafka/issues/113).

### Changed
- Improved the "New topic" wizard: the replication factor is now read from the broker configuration. Input will be skipped if value can't be higher than 1. See [#64](https://github.com/jlandersen/vscode-kafka/issues/64).
Expand Down
2 changes: 1 addition & 1 deletion docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The `CONSUMER` block defines:

#### Deserializer

Deserializer can have the following value:
The deserializers can have the following value:

* `none`: no deserializer (ignores content).
* `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java) which currently only supports `UTF-8` encoding.
Expand Down
20 changes: 20 additions & 0 deletions docs/Producing.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ To produce a single record, click on the `Produce record` link above the `PRODUC

The log about produced messages is printed in the `Kafka Producer Log` Output view.

The `PRODUCER` block defines:

* `keyed message` which is declared after PRODUCER *[optional]*.
* `key`: the key *[optional]*.
* `key-format` : [serializer](#Serializer) to use for the key *[optional]*.
* `value-format` : [serializer](#Serializer) to use for the value *[optional]*.

* the rest of the content is the value until `###`.

### Serializer

The serializers can have the following value:

* `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java) which currently only supports `UTF-8` encoding.
* `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java).
* `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java).
* `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java).
* `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java).
* `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java).

## Randomized content

Record content can be randomized by injecting mustache-like placeholders of [faker.js properties](https://github.com/Marak/faker.js#api-methods), like ``{{name.lastName}}`` or ``{{random.number}}``. Some randomized properties can be localized via the `kafka.producers.fakerjs.locale` setting.
Expand Down
85 changes: 59 additions & 26 deletions snippets/producers.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@

{
"producer": {
"prefix": [
"producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"producer": {
"prefix": [
"producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"${4:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records"
],
"description": "A producer generating keyed records"
},
"json-producer": {
"prefix": [
"json-producer"
],
"body": [
"PRODUCER ${1:json-output}",
"prefix": [
"json-producer"
],
"body": [
"PRODUCER ${1:json-output}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"{",
Expand All @@ -28,26 +29,58 @@
"",
"###",
""
],
"description": "A producer generating keyed JSON records"
],
"description": "A producer generating keyed JSON records"
},
"key-format-producer": {
"prefix": [
"producer"
],
"body": [
"PRODUCER ${1:key-formatted-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"key-format: ${3|none,double,float,integer,long,short|}",
"${4:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating formatted keyed records"
},
"value-format-producer": {
"prefix": [
"producer"
],
"body": [
"PRODUCER ${1:formatted-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"value-format: ${3|none,double,float,integer,long,short|}",
"${4:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating formatted value records"
},
"comment": {
"prefix": [
"prefix": [
"comment"
],
"body": [
"--- ${0}"
],
"description": "Adds a comment"
],
"body": [
"--- ${0}"
],
"description": "Adds a comment"
},
"separator": {
"prefix": [
"prefix": [
"separator"
],
"body": [
],
"body": [
"###",
""
],
"description": "Adds a producer separator"
],
"description": "Adds a producer separator"
}
}
87 changes: 87 additions & 0 deletions src/client/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,93 @@ export type SerializationdResult = any | Error;

export class SerializationException extends Error { }

// ---------------- Serializers ----------------

interface Serializer {
serialize(data: string): Buffer | string | null;
}

const serializerRegistry: Map<MessageFormat, Serializer> = new Map();

export function serialize(data?: string, format?: MessageFormat): Buffer | string | null {
if (!data || !format) {
return data || null;
}
const serializer = getSerializer(format);
if (!serializer) {
throw new SerializationException(`Cannot find a serializer for ${format} format.`);
}
return serializer.serialize(data);
}

function getSerializer(format: MessageFormat): Serializer | undefined {
return serializerRegistry.get(format);
}

class DoubleSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
const data = parseFloat(value);
const result = Buffer.alloc(8);
result.writeDoubleBE(data, 0);
return result;
};
}

class FloatSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
const data = parseFloat(value);
const result = Buffer.alloc(4);
result.writeFloatBE(data, 0);
return result;
};
}

class IntegerSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
const data = parseInt(value);
const result = Buffer.alloc(4);
result.writeInt32BE(data, 0);
return result;
};
}

class LongSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
const data = parseInt(value);
const result = Buffer.alloc(8);
result.writeBigInt64BE(BigInt(data), 0);
return result;
};
}

class ShortSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
const data = parseInt(value);
const result = Buffer.alloc(2);
result.writeInt16BE(data, 0);
return result;
};
}

class StringSerializer implements Serializer {

serialize(value: string): Buffer | string | null {
return value;
};
}

serializerRegistry.set("double", new DoubleSerializer());
serializerRegistry.set("float", new FloatSerializer());
serializerRegistry.set("integer", new IntegerSerializer());
serializerRegistry.set("long", new LongSerializer());
serializerRegistry.set("short", new ShortSerializer());
serializerRegistry.set("string", new StringSerializer());

// ---------------- Deserializers ----------------

interface Deserializer {
Expand Down
23 changes: 13 additions & 10 deletions src/commands/producers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import { OutputChannelProvider } from "../providers/outputChannelProvider";
import { KafkaExplorer } from "../explorer";
import { WorkspaceSettings } from "../settings";
import { pickClient } from "./common";
import { MessageFormat, serialize } from "../client/serialization";

export interface ProduceRecordCommand {
topicId?: string;
key?: string;
value: string
value: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export class ProduceRecordCommandHandler {
Expand All @@ -26,6 +29,11 @@ export class ProduceRecordCommandHandler {
}

async execute(command: ProduceRecordCommand, times: number): Promise<void> {
const client = await pickClient(this.clientAccessor);
if (!client) {
return;
}

const { topicId, key, value } = command;
const channel = this.channelProvider.getChannel("Kafka Producer Log");
if (topicId === undefined) {
Expand All @@ -47,23 +55,18 @@ export class ProduceRecordCommandHandler {
faker.seed(seed);
const randomizedValue = faker.fake(value);
return {
key:randomizedKey,
value:randomizedValue
key: serialize(randomizedKey, command.messageKeyFormat),
value: serialize(randomizedValue, command.messageValueFormat)
};
}

// Return key/value message as-is
return {
key:key,
value:value
key: serialize(key, command.messageKeyFormat),
value: serialize(value, command.messageValueFormat)
};
});

const client = await pickClient(this.clientAccessor);
if (!client) {
return;
}

const producer = await client.producer();
await producer.connect();

Expand Down
16 changes: 15 additions & 1 deletion src/kafka-file/codeLensProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
let topicId;
let key;
let value = "";
let keyFormat;
let valueFormat;
for (let currentLine = range.start.line + 1; currentLine <= range.end.line; currentLine++) {
const lineText = document.lineAt(currentLine).text;

Expand All @@ -149,6 +151,16 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
continue;
}

if (lineText.startsWith("key-format:")) {
keyFormat = lineText.substr("key-format:".length).trim();
continue;
}

if (lineText.startsWith("value-format:")) {
valueFormat = lineText.substr("value-format:".length).trim();
continue;
}

if (lineText.startsWith("--")) {
continue;
}
Expand All @@ -161,7 +173,9 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
topicId,
key,
value,
};
messageKeyFormat: keyFormat,
messageValueFormat: valueFormat
} as ProduceRecordCommand;
}

private createConsumerLens(lineRange: vscode.Range, range: vscode.Range, document: vscode.TextDocument, clusterName: string | undefined): vscode.CodeLens[] {
Expand Down
Loading

0 comments on commit 91851f4

Please sign in to comment.