Skip to content

Commit

Permalink
Declare key/value format for CONSUMER in kafka file
Browse files Browse the repository at this point in the history
Fixes jlandersen#112

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 15, 2021
1 parent bf47c3d commit 9a49691
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 14 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log
All notable changes to Kafka extension will be documented in this file.

## [0.12.0]
### Added
- declare key/value formats for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112).

## [0.11.0] - 2021-03-08
### Added
- Newly created topic or cluster is automatically selected in the Kafka Explorer. See [#61](https://github.com/jlandersen/vscode-kafka/issues/61).
Expand Down
17 changes: 17 additions & 0 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,26 @@ The `CONSUMER` block defines:
* 0,1,2
* 0-2
* 0,2-3
* `key-format` : [deserializer](#Deserializer) to use for the key.
* `value-format` : [deserializer](#Deserializer) to use for the value.

#### Deserializer

Deserializer can have the following value:

* `none`: no deserializer (ignores content).
* `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java).
* `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java).
* `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java).
* `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java).
* `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java).

#### Code Lens

A codelens is displayed above each `CONSUMER` line, and provides `Start consumer` / `Stop consumer` commands depending on the consumer group status.

#### Completion

Completion snippets can help you quickly bootstrap new `CONSUMER` blocks:

![Consumer snippets](assets/kafka-file-consumer-snippet.png)
Expand Down
22 changes: 22 additions & 0 deletions snippets/consumers.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,27 @@
"partitions: ${4|0|}"
],
"description": "A consumer with a partitions filter"
},
"key-format-consumer": {
"prefix": [
"key-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"key-format: ${3|none,double,float,integer,long,short|}"
],
"description": "A consumer with a key format"
},
"value-format-consumer": {
"prefix": [
"value-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"value-format: ${3|none,double,float,integer,long,short|}"
],
"description": "A consumer with a value format"
}
}
46 changes: 39 additions & 7 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export interface RecordReceivedEvent {
Expand All @@ -19,9 +22,11 @@ export interface RecordReceivedEvent {
export interface ConsumedRecord {
topic: string;
value: string | Buffer | null;
deserializedValue?: SerializationdResult | null;
offset?: string;
partition?: number;
key?: string | Buffer;
deserializedKey?: SerializationdResult | null;
}

export interface ConsumerChangedStatusEvent {
Expand Down Expand Up @@ -58,7 +63,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -74,7 +79,9 @@ export class Consumer implements vscode.Disposable {
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
};
}
catch (e) {
Expand Down Expand Up @@ -107,9 +114,15 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const deserializedKey = deserialize(message.key, this.options.messageKeyFormat);
const deserializedValue = deserialize(message.value, this.options.messageValueFormat);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
record: {
topic: topic, partition: partition,
deserializedKey: deserializedKey, deserializedValue: deserializedValue,
...message
},
});
},
});
Expand Down Expand Up @@ -349,18 +362,24 @@ export interface ConsumerInfoUri {
topicId: InitialConsumerOffset | string;
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
return vscode.Uri.parse(path + query);
}

Expand All @@ -377,13 +396,26 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || '';
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
return {
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
const result: ConsumerInfoUri = {
clusterId,
consumerGroupId,
topicId,
fromOffset: from && from.trim().length > 0 ? from : undefined,
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined
topicId
};
if (from && from.trim().length > 0) {
result.fromOffset = from;
}
if (partitions && partitions.trim().length > 0) {
result.partitions = partitions;
}
if (messageKeyFormat && messageKeyFormat.trim().length > 0) {
result.messageKeyFormat = messageKeyFormat as MessageFormat;
}
if (messageValueFormat && messageValueFormat.trim().length > 0) {
result.messageValueFormat = messageValueFormat as MessageFormat;
}
return result;
}

export function parsePartitions(partitions?: string): number[] | undefined {
Expand Down
104 changes: 104 additions & 0 deletions src/client/serialization.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
export type MessageFormat = "none" | "double" | "float" | "integer" | "long" | "short";

export type SerializationdResult = any | Error;

class SerializationException extends Error { }

// ---------------- Deserializers ----------------

interface Deserializer {
deserialize(data: Buffer): any;
}

const deserializerRegistry: Map<MessageFormat, Deserializer> = new Map();

export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null {
if (data === null || !format || format === "none") {
return null;
}
try {
const deserializer = getDeserializer(format);
if (!deserializer) {
throw new SerializationException(`Cannot find a deserializer for ${format} format.`);
}
return deserializer.deserialize(data);
}
catch (e) {
return e;
}
}

function getDeserializer(format: MessageFormat): Deserializer | undefined {
return deserializerRegistry.get(format);
}

class DoubleDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
if (data === null) {
return null;
}
if (data.length !== 8) {
throw new SerializationException("Size of data received by DoubleDeserializer is not 8");
}
return data.readDoubleBE(0);
}
}

class FloatDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
if (data === null) {
return null;
}
if (data.length !== 4) {
throw new SerializationException("Size of data received by FloatDeserializer is not 4");
}
return data.readFloatBE(0);
}
}

class IntegerDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
if (data === null) {
return null;
}
if (data.length !== 4) {
throw new Error("Size of data received by IntegerDeserializer is not 4");
}
return data.readInt32BE(0);
}
}

class LongDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
if (data === null) {
return null;
}
if (data.length !== 8) {
throw new SerializationException("Size of data received by LongDeserializer is not 8");
}
return data.readBigInt64BE(0);
}
}

class ShortDeserializer implements Deserializer {

deserialize(data: Buffer | null): any {
if (data === null) {
return null;
}
if (data.length !== 2) {
throw new SerializationException("Size of data received by ShortDeserializer is not 2");
}
return data.readInt16BE(0);
}
}

deserializerRegistry.set("double", new DoubleDeserializer());
deserializerRegistry.set("float", new FloatDeserializer());
deserializerRegistry.set("integer", new IntegerDeserializer());
deserializerRegistry.set("long", new LongDeserializer());
deserializerRegistry.set("short", new ShortDeserializer());
17 changes: 16 additions & 1 deletion src/kafka-file/codeLensProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
let topicId;
let partitions;
let offset = "";
let keyFormat;
let valueFormat;
for (let currentLine = range.start.line; currentLine <= range.end.line; currentLine++) {
const lineText = document.lineAt(currentLine).text;

Expand All @@ -242,14 +244,27 @@ export class KafkaFileCodeLensProvider implements vscode.CodeLensProvider, vscod
partitions = lineText.substr("partitions:".length).trim();
continue;
}

if (lineText.startsWith("key-format:")) {
keyFormat = lineText.substr("key-format:".length).trim();
continue;
}

if (lineText.startsWith("value-format:")) {
valueFormat = lineText.substr("value-format:".length).trim();
continue;
}

break;
}
return {
clusterId: selectedClusterId,
consumerGroupId,
topicId,
fromOffset: offset,
partitions
partitions,
messageKeyFormat: keyFormat,
messageValueFormat: valueFormat
} as LaunchConsumerCommand;
}
}
6 changes: 4 additions & 2 deletions src/providers/consumerVirtualTextDocumentProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ export class ConsumerVirtualTextDocumentProvider implements vscode.TextDocumentC
if (!this.isActive(uri)) {
return;
}
let line = `Key: ${message.key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`;
line = line + `Value:\n${message.value}\n\n`;
const key = message.deserializedKey !== null ? message.deserializedKey : message.key;
const value = message.deserializedValue !== null ? message.deserializedValue : message.value;
let line = `Key: ${key}\nPartition: ${message.partition}\nOffset: ${message.offset}\n`;
line = line + `Value:\n${value}\n\n`;
this.updateBuffer(uri, line);
}

Expand Down
6 changes: 3 additions & 3 deletions src/test/suite/client/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ suite("Extract consumer URI Test Suite", () => {
test("Consumer URI simple", () => {
assert.deepStrictEqual(
extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id`)),
{ clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: undefined }
{ clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id' }
);
});

test("Consumer URI with offset", () => {
assert.deepStrictEqual(
extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&from=1`)),
{ clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1', partitions: undefined }
{ clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: '1' }
);
});

test("Consumer URI with partitions", () => {
assert.deepStrictEqual(
extractConsumerInfoUri(vscode.Uri.parse(`kafka:cluster-id/group-id?topic=topic-id&partitions=0-5`)),
{ clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', fromOffset: undefined, partitions: '0-5' }
{ clusterId: 'cluster-id', consumerGroupId: 'group-id', topicId: 'topic-id', partitions: '0-5' }
);
});

Expand Down
Loading

0 comments on commit 9a49691

Please sign in to comment.