Skip to content

Commit

Permalink
Declare key/value format for CONSUMER in kafka file
Browse files Browse the repository at this point in the history
Fixes jlandersen#112

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed Mar 4, 2021
1 parent b7d41e4 commit c05ebb3
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 51 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ All notable changes to Kafka extension will be documented in this file.
- The consumer view now provides a `Clear Consumer View` command. See [#84](https://github.com/jlandersen/vscode-kafka/issues/40).
- Added support for consumer group deletion. See [#26](https://github.com/jlandersen/vscode-kafka/issues/26).
- .kafka files can define a `CONSUMER` block, to start a consumer group with a given offset, partitions. See [#96](https://github.com/jlandersen/vscode-kafka/issues/96).
- .kafka files show the selected cluster as a codelens on the first line. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102).
- .kafka files show the selected cluster as a codelens on each CONSUMER, PRODUCER block. See [#102](https://github.com/jlandersen/vscode-kafka/issues/102).
- declare key/value format for CONSUMER in kafka file. See [#112](https://github.com/jlandersen/vscode-kafka/issues/112).
- declare key/value format for PRODUCER in kafka file. See [#113](https://github.com/jlandersen/vscode-kafka/issues/113).

### Changed
- Improved the "New cluster" and "New topic" wizards: now include validation and a back button. See [#21](https://github.com/jlandersen/vscode-kafka/issues/21).
Expand Down
22 changes: 22 additions & 0 deletions snippets/consumers.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,27 @@
"partitions: ${4|0|}"
],
"description": "A consumer with a partitions filter"
},
"key-format-consumer": {
"prefix": [
"key-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"key-format: ${3|none,double,float,integer,long,short|}"
],
"description": "A consumer with a key format"
},
"value-format-consumer": {
"prefix": [
"value-format-consumer"
],
"body": [
"CONSUMER ${1:consumer-group-id}",
"topic: ${2:topic_name}",
"value-format: ${3|none,double,float,integer,long,short|}"
],
"description": "A consumer with a value format"
}
}
32 changes: 32 additions & 0 deletions snippets/producers.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@
],
"description": "A producer generating keyed JSON records"
},
"key-format-producer": {
"prefix": [
"key-format-producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"key-format: ${4|none,double,float,integer,long,short|}",
"${5:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records with key format"
},
"value-format-producer": {
"prefix": [
"value-format-producer"
],
"body": [
"PRODUCER ${1:keyed-message}",
"topic: ${2:topic_name}",
"key: ${3:mykeyq}",
"value-format: ${4|none,double,float,integer,long,short|}",
"${5:{{random.words}}}",
"",
"###",
""
],
"description": "A producer generating keyed records with value format"
},
"comment": {
"prefix": [
"comment"
Expand Down
1 change: 0 additions & 1 deletion src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Admin, ConfigResourceTypes, Kafka, Producer } from "kafkajs";

import { Disposable } from "vscode";
import { WorkspaceSettings } from "../settings";

Expand Down
46 changes: 39 additions & 7 deletions src/client/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import { URLSearchParams } from "url";
import * as vscode from "vscode";
import { getWorkspaceSettings, InitialConsumerOffset, ClusterSettings } from "../settings";
import { ConnectionOptions, createKafka } from "./client";
import { deserialize, MessageFormat, SerializationdResult } from "./serialization";

interface ConsumerOptions extends ConnectionOptions {
consumerGroupId: string;
topicId: string;
fromOffset: InitialConsumerOffset | string;
partitions?: number[];
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

export interface RecordReceivedEvent {
Expand All @@ -19,9 +22,11 @@ export interface RecordReceivedEvent {
export interface ConsumedRecord {
topic: string;
value: string | Buffer | null;
deserializedValue?: SerializationdResult | null;
offset?: string;
partition?: number;
key?: string | Buffer;
deserializedKey?: SerializationdResult | null;
}

export interface ConsumerChangedStatusEvent {
Expand Down Expand Up @@ -58,7 +63,7 @@ export class Consumer implements vscode.Disposable {
public error: any;

constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings) {
const { clusterId, consumerGroupId, topicId, fromOffset, partitions } = extractConsumerInfoUri(uri);
const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageValueFormat } = extractConsumerInfoUri(uri);
this.clusterId = clusterId;
const cluster = clusterSettings.get(clusterId);

Expand All @@ -74,7 +79,9 @@ export class Consumer implements vscode.Disposable {
consumerGroupId: consumerGroupId,
topicId,
fromOffset: fromOffset || settings.consumerOffset,
partitions: parsePartitions(partitions)
partitions: parsePartitions(partitions),
messageKeyFormat,
messageValueFormat
};
}
catch (e) {
Expand Down Expand Up @@ -107,9 +114,15 @@ export class Consumer implements vscode.Disposable {

this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const deserializedKey = deserialize(message.key, this.options.messageKeyFormat);
const deserializedValue = deserialize(message.value, this.options.messageValueFormat);
this.onDidReceiveMessageEmitter.fire({
uri: this.uri,
record: { topic: topic, partition: partition, ...message },
record: {
topic: topic, partition: partition,
deserializedKey: deserializedKey, deserializedValue: deserializedValue,
...message
},
});
},
});
Expand Down Expand Up @@ -349,18 +362,24 @@ export interface ConsumerInfoUri {
topicId: InitialConsumerOffset | string;
fromOffset?: string;
partitions?: string;
messageKeyFormat?: MessageFormat;
messageValueFormat?: MessageFormat;
}

const TOPIC_QUERY_PARAMETER = 'topic';
const FROM_QUERY_PARAMETER = 'from';
const PARTITIONS_QUERY_PARAMETER = 'partitions';
const KEY_FORMAT_QUERY_PARAMETER = 'key';
const VALUE_FORMAT_QUERY_PARAMETER = 'value';

export function createConsumerUri(info: ConsumerInfoUri): vscode.Uri {
const path = `kafka:${info.clusterId}/${info.consumerGroupId}`;
let query = '';
query = addQueryParameter(query, TOPIC_QUERY_PARAMETER, info.topicId);
query = addQueryParameter(query, FROM_QUERY_PARAMETER, info.fromOffset);
query = addQueryParameter(query, PARTITIONS_QUERY_PARAMETER, info.partitions);
query = addQueryParameter(query, KEY_FORMAT_QUERY_PARAMETER, info.messageKeyFormat);
query = addQueryParameter(query, VALUE_FORMAT_QUERY_PARAMETER, info.messageValueFormat);
return vscode.Uri.parse(path + query);
}

Expand All @@ -377,13 +396,26 @@ export function extractConsumerInfoUri(uri: vscode.Uri): ConsumerInfoUri {
const topicId = urlParams.get(TOPIC_QUERY_PARAMETER) || '';
const from = urlParams.get(FROM_QUERY_PARAMETER);
const partitions = urlParams.get(PARTITIONS_QUERY_PARAMETER);
return {
const messageKeyFormat = urlParams.get(KEY_FORMAT_QUERY_PARAMETER);
const messageValueFormat = urlParams.get(VALUE_FORMAT_QUERY_PARAMETER);
const result: ConsumerInfoUri = {
clusterId,
consumerGroupId,
topicId,
fromOffset: from && from.trim().length > 0 ? from : undefined,
partitions: partitions && partitions.trim().length > 0 ? partitions : undefined
topicId
};
if (from && from.trim().length > 0) {
result.fromOffset = from;
}
if (partitions && partitions.trim().length > 0) {
result.partitions = partitions;
}
if (messageKeyFormat && messageKeyFormat.trim().length > 0) {
result.messageKeyFormat = messageKeyFormat as MessageFormat;
}
if (messageValueFormat && messageValueFormat.trim().length > 0) {
result.messageValueFormat = messageValueFormat as MessageFormat;
}
return result;
}

export function parsePartitions(partitions?: string): number[] | undefined {
Expand Down
Loading

0 comments on commit c05ebb3

Please sign in to comment.