diff --git a/docs/Consuming.md b/docs/Consuming.md index 69871686..3f7ce43f 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -50,15 +50,7 @@ The `CONSUMER` block defines: #### Deserializer -The deserializers can have the following value: - - * `none`: no deserializer (ignores content). - * `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). - * `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java). - * `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java). - * `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java). - * `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java). - * `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java). +A CONSUMER can deserialize `key/value` by declaring the proper deserializer with `key-format/value-format` property. See [Basic deserializer](Serialization.md#basic-deserializer), [Avro deserializer](Serialization.md#avro-deserializer) for more informations. #### Code Lens diff --git a/docs/Producing.md b/docs/Producing.md index b52d0f0e..cd901748 100644 --- a/docs/Producing.md +++ b/docs/Producing.md @@ -34,14 +34,7 @@ The `PRODUCER` block defines: ### Serializer -The serializers can have the following value: - - * `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). - * `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java). - * `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java). - * `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java). - * `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java). - * `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java). +A PRODUCER can serialize `key/value` by declaring the proper serializer with `key-format/value-format` property. See [Basic serializer](Serialization.md#basic-serializer), [Avro serializer](Serialization.md#avro-serializer) for more informations. ### Completion diff --git a/docs/README.md b/docs/README.md index a07ec726..1eb97b72 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,3 +6,4 @@ Welcome to the [Tools for Apache Kafka®](https://github.com/jlandersen/vscode-k * [Kafka file](KafkaFile.md#kafkafile) * [Producing messages](Producing.md#producing-messages) * [Consuming messages](Consuming.md#consuming-messages) +* [Serialization](Serialization.md#serialization) diff --git a/docs/Serialization.md b/docs/Serialization.md new file mode 100644 index 00000000..009c62e1 --- /dev/null +++ b/docs/Serialization.md @@ -0,0 +1,89 @@ +# Serialization + + * A PRODUCER can serialize `key/value` by declaring the proper serializer with `key-format/value-format` property. + * A CONSUMER can deserialize `key/value` by declaring the proper deserializer for `key-format/value-format` property. + +## Basic serialization + +### Basic serializer + +The serializers can have the following value: + + * `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). + * `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java). + * `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java). + * `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java). + * `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java). + * `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java). + +### Basic deserializer + +The deserializers can have the following value: + + * `none`: no deserializer (ignores content). + * `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). + * `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java). + * `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java). + * `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java). + * `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java). + * `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java). + +## Avro serialization + +Serialization can be done too with [Apache Avro Schema](http://avro.apache.org/docs/current/spec.html) with a local Avro Schema `*.avsc` + +For instance you can create an [Apache Avro](http://avro.apache.org/docs/current/spec.html) Schema `animals.avsc`: + + +```json +{ + "type": "record", + "fields": [ + { + "name": "kind", + "type": { + "name": "animals_type", + "type": "enum", + "symbols": [ + "CAT", + "DOG" + ] + } + }, + { + "name": "name", + "type": "string" + } + ] +} +``` + +and bind it with `avro(path/of/animals.avsc)` in `key-format` / `value-format` + +### Avro Schema support + +`*.avsc` files benefit with completion, validation for Avro specification. + +### Avro serializer + +You can serialize value of produced message by using the Avro schema `animals.avsc` like this: + +``` +PRODUCER json-output +topic: topic_name +value-format: avro(animals.avsc) +{"kind": "CAT", "name": "Albert"} + +### +``` + +### Avro deserializer + +You can deserialize value of consummed message by using the Avro schema `animals.avsc` like this: + +``` +CONSUMER consumer-group-id +topic: topic_name +from: earliest +value-format: avro(animals.avsc) +``` \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index fcc17a57..7e2bad8b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -546,6 +546,11 @@ "integrity": "sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==", "dev": true }, + "avsc": { + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.7.0.tgz", + "integrity": "sha512-oP3jgI9SaZnwLwkRx7sHDPctXCUGGp+X4FsCgzHpMTNhYhGhXAFinptdGpWid2GTXAkhNp8ksAjqyqQBhoQ7BQ==" + }, "azure-devops-node-api": { "version": "10.2.2", "resolved": "https://registry.npmjs.org/azure-devops-node-api/-/azure-devops-node-api-10.2.2.tgz", diff --git a/package.json b/package.json index ee291dc8..e11ae7c0 100644 --- a/package.json +++ b/package.json @@ -162,14 +162,23 @@ ".kafka" ], "aliases": [ - "kafka" + "Kafka DSL" ], "configuration": "./language-configuration.json" }, { "id": "kafka-consumer", "aliases": [ - "Kafka Consumer" + "Kafka Consumer View" + ] + }, + { + "id": "jsonc", + "filenamePatterns": [ + "*.avsc" + ], + "aliases": [ + "Avro Schema Definition" ] } ], @@ -199,6 +208,10 @@ { "fileMatch": "package.json", "url": "./schemas/package.schema.json" + }, + { + "fileMatch": "*.avsc", + "url": "./schemas/avro-avsc.json" } ], "commands": [ @@ -451,6 +464,7 @@ "test": "node ./out/test/runTest.js" }, "dependencies": { + "avsc": "^5.7.0", "faker": "^5.5.2", "fs-extra": "^8.1.0", "glob": "^7.1.6", diff --git a/schemas/avro-avsc.json b/schemas/avro-avsc.json new file mode 100644 index 00000000..3ecd7c8e --- /dev/null +++ b/schemas/avro-avsc.json @@ -0,0 +1,163 @@ + +{ + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "Avro Schema Definition", + "description": "Json-Schema definition for Avro AVSC files.", + "definitions": { + "avroSchema": { + "title": "Avro Schema", + "description": "Root Schema", + "oneOf": [ + { "$ref": "#/definitions/types" } + ] + }, + "types": { + "title": "Avro Types", + "description": "Allowed Avro types", + "oneOf": [ + { "$ref": "#/definitions/primitiveType" }, + { "$ref": "#/definitions/primitiveTypeWithMetadata" }, + { "$ref": "#/definitions/customTypeReference" }, + { "$ref": "#/definitions/avroRecord" }, + { "$ref": "#/definitions/avroEnum" }, + { "$ref": "#/definitions/avroArray" }, + { "$ref": "#/definitions/avroMap" }, + { "$ref": "#/definitions/avroFixed" }, + { "$ref": "#/definitions/avroUnion" } + ] + }, + "primitiveType": { + "title": "Primitive Type", + "description": "Basic type primitives.", + "type":"string", + "enum": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string" + ] + }, + "primitiveTypeWithMetadata": { + "title": "Primitive Type With Metadata", + "description": "A primitive type with metadata attached.", + "type": "object", + "properties": { + "type": { "$ref": "#/definitions/primitiveType" } + }, + "required": ["type"] + }, + "customTypeReference": { + "title": "Custom Type", + "description": "Reference to a ComplexType", + "not": { "$ref": "#/definitions/primitiveType" }, + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$" + }, + "avroUnion": { + "title": "Union", + "description": "A Union of types", + "type": "array", + "items": { "$ref": "#/definitions/avroSchema" }, + "minItems": 1 + }, + "avroField": { + "title": "Field", + "description": "A field within a Record", + "type": "object", + "properties": { + "name": { "$ref": "#/definitions/name" }, + "type": { "$ref": "#/definitions/types" }, + "doc": { "type": "string" }, + "default": { }, + "order": { "enum": ["ascending", "descending", "ignore"] }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["name", "type"] + }, + "avroRecord": { + "title": "Record", + "description": "A Record", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["record"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } } + }, + "required": ["type", "name", "fields"] + }, + "avroEnum": { + "title": "Enum", + "description": "An enumeration", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["enum"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["type", "name", "symbols"] + }, + "avroArray": { + "title": "Array", + "description": "An array", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["array"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "items": { "$ref": "#/definitions/types" } + }, + "required": ["type", "items"] + }, + "avroMap": { + "title": "Map", + "description": "A map of values", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["map"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "values": { "$ref": "#/definitions/types" } + }, + "required": ["type", "values"] + }, + "avroFixed": { + "title": "Fixed", + "description": "A fixed sized array of bytes", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["fixed"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "size": {"type":"number"} + }, + "required": ["type", "name", "size"] + }, + "name": { + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*$" + }, + "namespace": { + "type": "string", + "pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$" + } + }, + "oneOf": [ + { "$ref": "#/definitions/avroSchema" } + ] +} diff --git a/src/avro/avroFileSupport.ts b/src/avro/avroFileSupport.ts new file mode 100644 index 00000000..24d9f27d --- /dev/null +++ b/src/avro/avroFileSupport.ts @@ -0,0 +1,39 @@ +import * as fs from 'fs'; +import * as avro from 'avsc'; +import { Disposable, ExtensionContext, TextDocument, window, workspace, WorkspaceFolder } from "vscode"; +import { registerAvroSerialization } from "./serialization"; + +const ENCODING = 'utf-8'; + +export function registerAvroFileSupport(context: ExtensionContext): Disposable { + // register avro serializer/deserializer from a local *.avro file + registerAvroSerialization(); + + return { + dispose() { + } + }; +} + +export function resolvePath(path: string): string { + const currentFile: TextDocument | undefined = (window.activeTextEditor && window.activeTextEditor.document && window.activeTextEditor.document.languageId === 'kafka') ? window.activeTextEditor.document : undefined; + // const currentFileUri: string |undefined = (currentFile && currentFile.uri) ? currentFile.uri.fsPath : undefined; + const currentWorkspace: WorkspaceFolder | undefined = (currentFile && currentFile.uri) ? workspace.getWorkspaceFolder(currentFile.uri) : undefined; + const currentWorkspaceUri: string | undefined = (currentWorkspace && currentWorkspace.uri.fsPath) + || (workspace.workspaceFolders && workspace.workspaceFolders[0].uri.fsPath); + const resolvedPath = currentWorkspaceUri + '/' + path; + return resolvedPath; +} + +export function readAVSC(path: string): avro.Type { + const resolvedPath = resolvePath(path); + const rawSchema = JSON.parse(fs.readFileSync(resolvedPath, ENCODING)); + return avro.Type.forSchema(rawSchema); +} + +export function checkAVSC(path: string) : string | undefined{ + const resolvedPath = resolvePath(path); + if (!fs.existsSync(resolvedPath)) { + return `The '${path}' resolved with the file '${resolvedPath}' cannot be found.`; + } +} \ No newline at end of file diff --git a/src/avro/serialization.ts b/src/avro/serialization.ts new file mode 100644 index 00000000..e726ef5c --- /dev/null +++ b/src/avro/serialization.ts @@ -0,0 +1,40 @@ +import { Deserializer, registerDeserializer, registerSerializer, SerializationException, SerializationSetting, Serializer } from "../client/serialization"; +import { readAVSC } from "./avroFileSupport"; + +export function registerAvroSerialization() { + registerSerializer("avro", new AvroSerializer()); + registerDeserializer("avro", new AvroDeserializer()); +} +class AvroSerializer implements Serializer { + + serialize(value: string, settings: SerializationSetting[]): Buffer | string | null { + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const path = settings[0].value; + if (!path) { + throw new SerializationException("The avro file path is required"); + } + const data = JSON.parse(value); + const type = readAVSC(path); + return type.toBuffer(data); + } +} + +class AvroDeserializer implements Deserializer { + + deserialize(data: Buffer | null, settings?: SerializationSetting[]): any { + if (data === null) { + return null; + } + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const path = settings[0].value; + if (!path) { + throw new SerializationException("The avro file path is required"); + } + const type = readAVSC(path); + return type.fromBuffer(data); + } +} \ No newline at end of file diff --git a/src/client/serialization.ts b/src/client/serialization.ts index 45a5a544..d8d10605 100644 --- a/src/client/serialization.ts +++ b/src/client/serialization.ts @@ -1,4 +1,6 @@ -export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short"; +const serializerRegistry: Map = new Map(); + +export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" | string; export type SerializationdResult = any | Error; @@ -11,11 +13,13 @@ export interface SerializationSetting { // ---------------- Serializers ---------------- -interface Serializer { +export interface Serializer { serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null; } -const serializerRegistry: Map = new Map(); +export function registerSerializer(serializerId: string, serializer: Serializer) { + serializerRegistry.set(serializerId, serializer); +} export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null { if (!data || !format) { @@ -93,20 +97,24 @@ class StringSerializer implements Serializer { }; } -serializerRegistry.set("double", new DoubleSerializer()); -serializerRegistry.set("float", new FloatSerializer()); -serializerRegistry.set("integer", new IntegerSerializer()); -serializerRegistry.set("long", new LongSerializer()); -serializerRegistry.set("short", new ShortSerializer()); -serializerRegistry.set("string", new StringSerializer()); +// Register default Kafka serializers +registerSerializer("double", new DoubleSerializer()); +registerSerializer("float", new FloatSerializer()); +registerSerializer("integer", new IntegerSerializer()); +registerSerializer("long", new LongSerializer()); +registerSerializer("short", new ShortSerializer()); +registerSerializer("string", new StringSerializer()); // ---------------- Deserializers ---------------- +const deserializerRegistry: Map = new Map(); -interface Deserializer { +export interface Deserializer { deserialize(data: Buffer, settings?: SerializationSetting[]): any; } -const deserializerRegistry: Map = new Map(); +export function registerDeserializer(deserializerId: string, deserializer: Deserializer) { + deserializerRegistry.set(deserializerId, deserializer); +} export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null { if (data === null || !format) { @@ -207,9 +215,10 @@ class StringDeserializer implements Deserializer { } } -deserializerRegistry.set("double", new DoubleDeserializer()); -deserializerRegistry.set("float", new FloatDeserializer()); -deserializerRegistry.set("integer", new IntegerDeserializer()); -deserializerRegistry.set("long", new LongDeserializer()); -deserializerRegistry.set("short", new ShortDeserializer()); -deserializerRegistry.set("string", new StringDeserializer()); +// Register default Kafka deserializers +registerDeserializer("double", new DoubleDeserializer()); +registerDeserializer("float", new FloatDeserializer()); +registerDeserializer("integer", new IntegerDeserializer()); +registerDeserializer("long", new LongDeserializer()); +registerDeserializer("short", new ShortDeserializer()); +registerDeserializer("string", new StringDeserializer()); diff --git a/src/extension.ts b/src/extension.ts index b9c3a717..33f31178 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -36,7 +36,8 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { getDefaultKafkaExtensionParticipant, refreshClusterProviderDefinitions } from "./kafka-extensions/registry"; import { KafkaExtensionParticipant } from "./kafka-extensions/api"; import { ProducerCollection } from "./client/producer"; -import { startLanguageClient } from "./kafka-file/kafkaFileClient"; +import { registerKafkaFileSupport } from "./kafka-file/kafkaFileSupport"; +import { registerAvroFileSupport } from "./avro/avroFileSupport"; export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant { Context.register(context); @@ -143,7 +144,12 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic // .kafka file related context.subscriptions.push( - startLanguageClient(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context) + registerKafkaFileSupport(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context) + ); + + // .avro file related + context.subscriptions.push( + registerAvroFileSupport(context) ); context.subscriptions.push( diff --git a/src/kafka-file/kafkaFileClient.ts b/src/kafka-file/kafkaFileSupport.ts similarity index 90% rename from src/kafka-file/kafkaFileClient.ts rename to src/kafka-file/kafkaFileSupport.ts index 346d483e..4a898457 100644 --- a/src/kafka-file/kafkaFileClient.ts +++ b/src/kafka-file/kafkaFileSupport.ts @@ -112,9 +112,9 @@ class DataModelTopicProvider implements TopicProvider { } } -export function startLanguageClient( +export function registerKafkaFileSupport( clusterSettings: ClusterSettings, - clientAccessor: ClientAccessor, + clientAccessor : ClientAccessor, workspaceSettings: WorkspaceSettings, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, @@ -171,6 +171,11 @@ export function startLanguageClient( context.subscriptions.push( vscode.languages.registerHoverProvider(documentSelector, hover) ); + // Document Link + const documentLink = new KafkaFileDocumentLinkProvider(kafkaFileDocuments, languageService); + context.subscriptions.push( + vscode.languages.registerDocumentLinkProvider(documentSelector, documentLink) + ); // Open / Close document context.subscriptions.push(vscode.workspace.onDidOpenTextDocument(e => { @@ -201,7 +206,7 @@ export function startLanguageClient( }; } -function createLanguageService(clusterSettings: ClusterSettings, clientAccessor: ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService { +function createLanguageService(clusterSettings: ClusterSettings, clientAccessor : ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService { const producerLaunchStateProvider = { getProducerLaunchState(uri: vscode.Uri): ProducerLaunchState { const producer = producerCollection.get(uri); @@ -219,7 +224,7 @@ function createLanguageService(clusterSettings: ClusterSettings, clientAccessor: const selectedClusterProvider = { getSelectedCluster() { const selected = clusterSettings.selected; - const clusterId = selected?.id; + const clusterId = selected?.id; const clusterState = clusterId ? clientAccessor.getState(clusterId) : undefined; return { clusterId, @@ -300,7 +305,7 @@ class KafkaFileDiagnostics extends AbstractKafkaFileFeature implements vscode.Di kafkaFileDocuments: LanguageModelCache, languageService: LanguageService, clusterSettings: ClusterSettings, - clientAccessor: ClientAccessor, + clientAccessor : ClientAccessor, modelProvider: KafkaModelProvider, settings: WorkspaceSettings ) { @@ -373,7 +378,16 @@ class KafkaFileHoverProvider extends AbstractKafkaFileFeature implements vscode. return runSafeAsync(async () => { const kafkaFileDocument = this.getKafkaFileDocument(document); return this.languageService.doHover(document, kafkaFileDocument, position); - }, null, `Error while computing hover for ${document.uri}`, token); + }, null, `Error while computing hover for ${document.uri}`, token); } +} + +class KafkaFileDocumentLinkProvider extends AbstractKafkaFileFeature implements vscode.DocumentLinkProvider { + provideDocumentLinks(document: vscode.TextDocument, token: vscode.CancellationToken): vscode.ProviderResult { + return runSafeAsync(async () => { + const kafkaFileDocument = this.getKafkaFileDocument(document); + return this.languageService.provideDcumentLinks(document, kafkaFileDocument); + }, null, `Error while computing document link for ${document.uri}`, token); + } } \ No newline at end of file diff --git a/src/kafka-file/languageservice/kafkaFileLanguageService.ts b/src/kafka-file/languageservice/kafkaFileLanguageService.ts index cf27cfb4..c8931e6c 100644 --- a/src/kafka-file/languageservice/kafkaFileLanguageService.ts +++ b/src/kafka-file/languageservice/kafkaFileLanguageService.ts @@ -1,11 +1,12 @@ -import { CodeLens, CompletionList, Diagnostic, Hover, Position, TextDocument, Uri } from "vscode"; +import { CodeLens, CompletionList, Diagnostic, DocumentLink, Hover, Position, TextDocument, Uri } from "vscode"; import { ClientState, ConsumerLaunchState } from "../../client"; import { BrokerConfigs } from "../../client/config"; import { ProducerLaunchState } from "../../client/producer"; -import { KafkaFileDocument, parseKafkaFile } from "./parser/kafkaFileParser"; +import { CalleeFunction, KafkaFileDocument, parseKafkaFile, Property } from "./parser/kafkaFileParser"; import { KafkaFileCodeLenses } from "./services/codeLensProvider"; import { KafkaFileCompletion } from "./services/completion"; import { KafkaFileDiagnostics } from "./services/diagnostics"; +import { KafkaFileDocumentLinks } from "./services/documentLinks"; import { KafkaFileHover } from "./services/hover"; /** @@ -96,6 +97,8 @@ export interface LanguageService { * @param position the position where the hover was triggered. */ doHover(document: TextDocument, kafkaFileDocument: KafkaFileDocument, position: Position): Promise; + + provideDcumentLinks(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Promise; } /** @@ -112,15 +115,25 @@ export function getLanguageService(producerLaunchStateProvider: ProducerLaunchSt const completion = new KafkaFileCompletion(selectedClusterProvider, topicProvider); const diagnostics = new KafkaFileDiagnostics(selectedClusterProvider, topicProvider); const hover = new KafkaFileHover(selectedClusterProvider, topicProvider); + const links = new KafkaFileDocumentLinks(); return { parseKafkaFileDocument: (document: TextDocument) => parseKafkaFile(document), getCodeLenses: codeLenses.getCodeLenses.bind(codeLenses), doComplete: completion.doComplete.bind(completion), doDiagnostics: diagnostics.doDiagnostics.bind(diagnostics), - doHover: hover.doHover.bind(hover) + doHover: hover.doHover.bind(hover), + provideDcumentLinks: links.provideDocumentLinks.bind(links) }; } +export function getAvroCalleeFunction(property: Property): CalleeFunction | undefined { + if (property.propertyName === 'key-format' || property.propertyName === 'value-format') { + const callee = property.value; + if (callee && callee.functionName === 'avro') { + return callee; + } + } +} export function createTopicDocumentation(topic: TopicDetail): string { return `Topic \`${topic.id}\`\n` + ` * partition count: \`${topic.partitionCount}\`\n` + diff --git a/src/kafka-file/languageservice/model.ts b/src/kafka-file/languageservice/model.ts index 25f08278..ebab27d4 100644 --- a/src/kafka-file/languageservice/model.ts +++ b/src/kafka-file/languageservice/model.ts @@ -97,13 +97,17 @@ const consumerProperties = [ { name: "short", description: "Similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java)." + }, + { + name: "avro", + description: "Avro deserializer" } ] }, { name: "value-format", description: `[Deserializer](${getDocumentationPageUri('Consuming', 'deserializer')}) to use for the value *[optional]*.`, - enum: [ + enum: [ { name: "none", description: "No deserializer (ignores content)" @@ -131,6 +135,10 @@ const consumerProperties = [ { name: "short", description: "Similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java)." + }, + { + name: "avro", + description: "Avro deserializer" } ] }, @@ -182,6 +190,10 @@ const producerProperties = [ { name: "short", description: "Similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java)." + }, + { + name: "avro", + description: "Avro serializer" } ] }, @@ -212,6 +224,10 @@ const producerProperties = [ { name: "short", description: "Similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java)." + }, + { + name: "avro", + description: "Avro serializer" } ] } diff --git a/src/kafka-file/languageservice/services/diagnostics.ts b/src/kafka-file/languageservice/services/diagnostics.ts index 4c62ee1b..dc101cee 100644 --- a/src/kafka-file/languageservice/services/diagnostics.ts +++ b/src/kafka-file/languageservice/services/diagnostics.ts @@ -4,9 +4,10 @@ import { ConsumerValidator } from "../../../validators/consumer"; import { ProducerValidator } from "../../../validators/producer"; import { CommonsValidator } from "../../../validators/commons"; import { fakerjsAPIModel, PartModelProvider } from "../model"; -import { SelectedClusterProvider, TopicProvider } from "../kafkaFileLanguageService"; +import { getAvroCalleeFunction, SelectedClusterProvider, TopicProvider } from "../kafkaFileLanguageService"; import { ClientState } from "../../../client"; import { BrokerConfigs } from "../../../client/config"; +import { checkAVSC } from "../../../avro/avroFileSupport"; /** * Kafka file diagnostics support. @@ -178,6 +179,7 @@ export class KafkaFileDiagnostics { } }); + // Validate existing topic declaration and topic value if (!topicProperty) { const range = new Range(block.start, new Position(block.start.line, block.start.character + 8)); diagnostics.push(new Diagnostic(range, `The ${block.type === BlockType.consumer ? 'consumer' : 'producer'} must declare the 'topic:' property.`, DiagnosticSeverity.Error)); @@ -200,6 +202,28 @@ export class KafkaFileDiagnostics { // 3. Validate property value this.validatePropertyValue(property, block.type, producerFakerJSEnabled, diagnostics); } + + // validate avro parameter + const avro = getAvroCalleeFunction(property); + if (avro) { + const parameter = avro.parameters[0]; + const path = parameter?.value; + if (!path) { + // parameter path is required + const range = property.propertyValue ? property.propertyTrimmedValueRange : property.propertyKeyRange; + if (range) { + diagnostics.push(new Diagnostic(range, `The avro format must declare the 'path' parameter.`, DiagnosticSeverity.Error)); + } + } else { + const result = checkAVSC(path); + if (result) { + const range = parameter.range(); + if (range) { + diagnostics.push(new Diagnostic(range, result, DiagnosticSeverity.Error)); + } + } + } + } } } diff --git a/src/kafka-file/languageservice/services/documentLinks.ts b/src/kafka-file/languageservice/services/documentLinks.ts new file mode 100644 index 00000000..763e2a29 --- /dev/null +++ b/src/kafka-file/languageservice/services/documentLinks.ts @@ -0,0 +1,25 @@ +import { DocumentLink, TextDocument, Uri } from "vscode"; +import { resolvePath } from "../../../avro/avroFileSupport"; +import { getAvroCalleeFunction } from "../kafkaFileLanguageService"; +import { KafkaFileDocument } from "../parser/kafkaFileParser"; + +export class KafkaFileDocumentLinks { + + async provideDocumentLinks(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Promise { + const links: Array = []; + for (const block of kafkaFileDocument.blocks) { + block.properties.forEach(property => { + const callee = getAvroCalleeFunction(property); + if (callee) { + const parameter = callee.parameters[0]; + if (parameter) { + const resolvedPath = resolvePath(parameter.value); + const targetUri = Uri.parse('file:' + resolvedPath, true); + links.push(new DocumentLink(parameter.range(), targetUri)); + } + } + }); + } + return links; + } +} \ No newline at end of file diff --git a/src/test/suite/kafka-file/languageservice/completionProperties.test.ts b/src/test/suite/kafka-file/languageservice/completionProperties.test.ts index 924f4914..1f46f409 100644 --- a/src/test/suite/kafka-file/languageservice/completionProperties.test.ts +++ b/src/test/suite/kafka-file/languageservice/completionProperties.test.ts @@ -43,12 +43,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) }, { @@ -79,12 +79,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) }, { @@ -114,12 +114,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) }, { @@ -149,12 +149,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { @@ -184,12 +184,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { @@ -219,12 +219,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) }, { @@ -256,12 +256,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { },*/ { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) }, { @@ -421,6 +421,11 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 11), position(1, 11)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 11), position(1, 11)) } ] }); @@ -471,6 +476,11 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 13), position(1, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 13), position(1, 13)) } ] }); @@ -557,12 +567,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) } ] @@ -588,12 +598,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) } ] @@ -618,12 +628,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) } ] @@ -648,12 +658,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) } ] @@ -678,12 +688,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) } ] @@ -708,12 +718,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) } ] @@ -740,12 +750,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { },*/ { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) } ] @@ -792,6 +802,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 11), position(1, 11)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 11), position(1, 11)) } ] }); @@ -837,6 +852,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 11), position(1, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 11), position(1, 13)) } ] }); @@ -883,6 +903,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(2, 11), position(2, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(2, 11), position(2, 13)) } ] }); @@ -930,6 +955,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(2, 11), position(2, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(2, 11), position(2, 13)) } ] }); @@ -975,6 +1005,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 13), position(1, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 13), position(1, 13)) } ] }); diff --git a/tsconfig.json b/tsconfig.json index 593386da..298679eb 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,8 @@ "target": "es6", "outDir": "out", "lib": [ - "es6" + "es6", + "dom" ], "sourceMap": true, "rootDir": "src",