From 2a6265879acbbe7383956a9c669dadcefbe8e42d Mon Sep 17 00:00:00 2001 From: angelozerr Date: Wed, 5 May 2021 05:07:55 +0200 Subject: [PATCH] Support for local Avro schema in CONSUMER/PRODUCER of kafka-file Fixes #114 Signed-off-by: azerr --- docs/Consuming.md | 10 +- docs/Producing.md | 9 +- docs/README.md | 1 + docs/Serialization.md | 93 ++++++++++ package-lock.json | 5 + package.json | 18 +- schemas/avro-avsc.json | 163 ++++++++++++++++++ src/avro/avroFileSupport.ts | 53 ++++++ src/avro/serialization.ts | 41 +++++ src/client/consumer.ts | 12 +- src/client/serialization.ts | 61 ++++--- src/commands/consumers.ts | 9 +- src/commands/producers.ts | 9 +- src/extension.ts | 10 +- ...kafkaFileClient.ts => kafkaFileSupport.ts} | 26 ++- .../kafkaFileLanguageService.ts | 19 +- src/kafka-file/languageservice/model.ts | 18 +- .../languageservice/services/diagnostics.ts | 62 +++++-- .../languageservice/services/documentLinks.ts | 24 +++ .../completionProperties.test.ts | 91 +++++++--- tsconfig.json | 3 +- 21 files changed, 625 insertions(+), 112 deletions(-) create mode 100644 docs/Serialization.md create mode 100644 schemas/avro-avsc.json create mode 100644 src/avro/avroFileSupport.ts create mode 100644 src/avro/serialization.ts rename src/kafka-file/{kafkaFileClient.ts => kafkaFileSupport.ts} (90%) create mode 100644 src/kafka-file/languageservice/services/documentLinks.ts diff --git a/docs/Consuming.md b/docs/Consuming.md index 69871686..3f7ce43f 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -50,15 +50,7 @@ The `CONSUMER` block defines: #### Deserializer -The deserializers can have the following value: - - * `none`: no deserializer (ignores content). - * `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). - * `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java). - * `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java). - * `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java). - * `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java). - * `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java). +A CONSUMER can deserialize `key/value` by declaring the proper deserializer with `key-format/value-format` property. See [Basic deserializer](Serialization.md#basic-deserializer), [Avro deserializer](Serialization.md#avro-deserializer) for more informations. #### Code Lens diff --git a/docs/Producing.md b/docs/Producing.md index b52d0f0e..cd901748 100644 --- a/docs/Producing.md +++ b/docs/Producing.md @@ -34,14 +34,7 @@ The `PRODUCER` block defines: ### Serializer -The serializers can have the following value: - - * `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). - * `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java). - * `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java). - * `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java). - * `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java). - * `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java). +A PRODUCER can serialize `key/value` by declaring the proper serializer with `key-format/value-format` property. See [Basic serializer](Serialization.md#basic-serializer), [Avro serializer](Serialization.md#avro-serializer) for more informations. ### Completion diff --git a/docs/README.md b/docs/README.md index a07ec726..1eb97b72 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,3 +6,4 @@ Welcome to the [Tools for Apache Kafka®](https://github.com/jlandersen/vscode-k * [Kafka file](KafkaFile.md#kafkafile) * [Producing messages](Producing.md#producing-messages) * [Consuming messages](Consuming.md#consuming-messages) +* [Serialization](Serialization.md#serialization) diff --git a/docs/Serialization.md b/docs/Serialization.md new file mode 100644 index 00000000..46b78fd3 --- /dev/null +++ b/docs/Serialization.md @@ -0,0 +1,93 @@ +# Serialization + + * A PRODUCER can serialize `key/value` by declaring the proper serializer with `key-format/value-format` property. + * A CONSUMER can deserialize `key/value` by declaring the proper deserializer for `key-format/value-format` property. + +## Basic serialization + +### Basic serializer + +The serializers can have the following value: + + * `string`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.StringSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). + * `double`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java). + * `float`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java). + * `integer`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java). + * `long`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.LongSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java). + * `short`: similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java). + +### Basic deserializer + +The deserializers can have the following value: + + * `none`: no deserializer (ignores content). + * `string`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.StringDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java). By default it supports `UTF-8` encoding, but you can specify the encoding as parameter like this `string(base64)`. The valid encoding values are defined in [Node.js' buffers and character encodings](https://nodejs.org/api/buffer.html#buffer_buffers_and_character_encodings). + * `double`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.DoubleDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java). + * `float`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.FloatDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java). + * `integer`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.IntegerDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java). + * `long`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.LongDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java). + * `short`: similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java). + +## Avro serialization + +Serialization can be done too with [Apache Avro Schema](http://avro.apache.org/docs/current/spec.html) with a local Avro Schema `*.avsc` + +For instance you can create an [Apache Avro](http://avro.apache.org/docs/current/spec.html) Schema `animals.avsc`: + + +```json +{ + "type": "record", + "fields": [ + { + "name": "kind", + "type": { + "name": "animals_type", + "type": "enum", + "symbols": [ + "CAT", + "DOG" + ] + } + }, + { + "name": "name", + "type": "string" + } + ] +} +``` + +and bind it with `avro(path/of/animals.avsc)` in `key-format` / `value-format`. Path is resolved following those strategies: + + * `file:///` a given file path (ex : `avro(file:///C:/path/of/animals.avsc)`. + * `/` relative path to the kafka file (ex : `avro(/path/of/animals.avsc)`. + * otherwise relative path to the workspace folder of kafka file (ex : `avro(path/of/animals.avsc)`. + +### Avro Schema support + +`*.avsc` files benefit with completion, validation for Avro specification. + +### Avro serializer + +You can serialize value of produced message by using the Avro schema `animals.avsc` like this: + +``` +PRODUCER json-output +topic: topic_name +value-format: avro(animals.avsc) +{"kind": "CAT", "name": "Albert"} + +### +``` + +### Avro deserializer + +You can deserialize value of consummed message by using the Avro schema `animals.avsc` like this: + +``` +CONSUMER consumer-group-id +topic: topic_name +from: earliest +value-format: avro(animals.avsc) +``` \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index fcc17a57..7e2bad8b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -546,6 +546,11 @@ "integrity": "sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==", "dev": true }, + "avsc": { + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.7.0.tgz", + "integrity": "sha512-oP3jgI9SaZnwLwkRx7sHDPctXCUGGp+X4FsCgzHpMTNhYhGhXAFinptdGpWid2GTXAkhNp8ksAjqyqQBhoQ7BQ==" + }, "azure-devops-node-api": { "version": "10.2.2", "resolved": "https://registry.npmjs.org/azure-devops-node-api/-/azure-devops-node-api-10.2.2.tgz", diff --git a/package.json b/package.json index ee291dc8..e11ae7c0 100644 --- a/package.json +++ b/package.json @@ -162,14 +162,23 @@ ".kafka" ], "aliases": [ - "kafka" + "Kafka DSL" ], "configuration": "./language-configuration.json" }, { "id": "kafka-consumer", "aliases": [ - "Kafka Consumer" + "Kafka Consumer View" + ] + }, + { + "id": "jsonc", + "filenamePatterns": [ + "*.avsc" + ], + "aliases": [ + "Avro Schema Definition" ] } ], @@ -199,6 +208,10 @@ { "fileMatch": "package.json", "url": "./schemas/package.schema.json" + }, + { + "fileMatch": "*.avsc", + "url": "./schemas/avro-avsc.json" } ], "commands": [ @@ -451,6 +464,7 @@ "test": "node ./out/test/runTest.js" }, "dependencies": { + "avsc": "^5.7.0", "faker": "^5.5.2", "fs-extra": "^8.1.0", "glob": "^7.1.6", diff --git a/schemas/avro-avsc.json b/schemas/avro-avsc.json new file mode 100644 index 00000000..3ecd7c8e --- /dev/null +++ b/schemas/avro-avsc.json @@ -0,0 +1,163 @@ + +{ + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "Avro Schema Definition", + "description": "Json-Schema definition for Avro AVSC files.", + "definitions": { + "avroSchema": { + "title": "Avro Schema", + "description": "Root Schema", + "oneOf": [ + { "$ref": "#/definitions/types" } + ] + }, + "types": { + "title": "Avro Types", + "description": "Allowed Avro types", + "oneOf": [ + { "$ref": "#/definitions/primitiveType" }, + { "$ref": "#/definitions/primitiveTypeWithMetadata" }, + { "$ref": "#/definitions/customTypeReference" }, + { "$ref": "#/definitions/avroRecord" }, + { "$ref": "#/definitions/avroEnum" }, + { "$ref": "#/definitions/avroArray" }, + { "$ref": "#/definitions/avroMap" }, + { "$ref": "#/definitions/avroFixed" }, + { "$ref": "#/definitions/avroUnion" } + ] + }, + "primitiveType": { + "title": "Primitive Type", + "description": "Basic type primitives.", + "type":"string", + "enum": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string" + ] + }, + "primitiveTypeWithMetadata": { + "title": "Primitive Type With Metadata", + "description": "A primitive type with metadata attached.", + "type": "object", + "properties": { + "type": { "$ref": "#/definitions/primitiveType" } + }, + "required": ["type"] + }, + "customTypeReference": { + "title": "Custom Type", + "description": "Reference to a ComplexType", + "not": { "$ref": "#/definitions/primitiveType" }, + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$" + }, + "avroUnion": { + "title": "Union", + "description": "A Union of types", + "type": "array", + "items": { "$ref": "#/definitions/avroSchema" }, + "minItems": 1 + }, + "avroField": { + "title": "Field", + "description": "A field within a Record", + "type": "object", + "properties": { + "name": { "$ref": "#/definitions/name" }, + "type": { "$ref": "#/definitions/types" }, + "doc": { "type": "string" }, + "default": { }, + "order": { "enum": ["ascending", "descending", "ignore"] }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["name", "type"] + }, + "avroRecord": { + "title": "Record", + "description": "A Record", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["record"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } } + }, + "required": ["type", "name", "fields"] + }, + "avroEnum": { + "title": "Enum", + "description": "An enumeration", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["enum"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["type", "name", "symbols"] + }, + "avroArray": { + "title": "Array", + "description": "An array", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["array"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "items": { "$ref": "#/definitions/types" } + }, + "required": ["type", "items"] + }, + "avroMap": { + "title": "Map", + "description": "A map of values", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["map"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "values": { "$ref": "#/definitions/types" } + }, + "required": ["type", "values"] + }, + "avroFixed": { + "title": "Fixed", + "description": "A fixed sized array of bytes", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["fixed"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "size": {"type":"number"} + }, + "required": ["type", "name", "size"] + }, + "name": { + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*$" + }, + "namespace": { + "type": "string", + "pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$" + } + }, + "oneOf": [ + { "$ref": "#/definitions/avroSchema" } + ] +} diff --git a/src/avro/avroFileSupport.ts b/src/avro/avroFileSupport.ts new file mode 100644 index 00000000..b9ee6059 --- /dev/null +++ b/src/avro/avroFileSupport.ts @@ -0,0 +1,53 @@ +import * as fs from 'fs'; +import { posix } from 'path'; +import * as avro from 'avsc'; +import { Disposable, ExtensionContext, Uri, workspace, WorkspaceFolder } from "vscode"; +import { registerAvroSerialization } from "./serialization"; + +const ENCODING = 'utf-8'; + +export function registerAvroFileSupport(context: ExtensionContext): Disposable { + // register avro serializer/deserializer from a local *.avro file + registerAvroSerialization(); + + return { + dispose() { + } + }; +} + +export function resolvePath(path: string, kafkaFileUri: Uri| undefined): Uri { + const uri = Uri.parse(path); + if(!kafkaFileUri) { + return uri; + } + if (path.startsWith('file://')) { + return uri; + } + if (path.startsWith('/') || path.startsWith('.')) { + const file = posix.join(kafkaFileUri.path, '..', path); + return Uri.parse(file); + } + const currentWorkspace: WorkspaceFolder | undefined = workspace.getWorkspaceFolder(kafkaFileUri); + const currentWorkspaceUri: Uri | undefined = (currentWorkspace && currentWorkspace.uri) + || (workspace.workspaceFolders && workspace.workspaceFolders[0].uri); + if (currentWorkspaceUri) { + const resolvedPath = Uri.joinPath(currentWorkspaceUri, path); + return resolvedPath; + } + return uri; + +} + +export function readAVSC(path: string, kafkaFileUri: Uri | undefined): avro.Type { + const resolvedPath = resolvePath(path, kafkaFileUri); + const rawSchema = JSON.parse(fs.readFileSync(resolvedPath.fsPath, ENCODING)); + return avro.Type.forSchema(rawSchema); +} + +export function checkAVSC(path: string, kafkaFileUri: Uri| undefined): string | undefined { + const resolvedPath = resolvePath(path, kafkaFileUri); + if (!fs.existsSync(resolvedPath.fsPath)) { + return `The '${path}' resolved with the file '${resolvedPath}' cannot be found.`; + } +} \ No newline at end of file diff --git a/src/avro/serialization.ts b/src/avro/serialization.ts new file mode 100644 index 00000000..e7413f8f --- /dev/null +++ b/src/avro/serialization.ts @@ -0,0 +1,41 @@ +import { Uri } from "vscode"; +import { Deserializer, registerDeserializer, registerSerializer, SerializationException, SerializationSetting, Serializer } from "../client/serialization"; +import { readAVSC } from "./avroFileSupport"; + +export function registerAvroSerialization() { + registerSerializer("avro", new AvroSerializer()); + registerDeserializer("avro", new AvroDeserializer()); +} +class AvroSerializer implements Serializer { + + serialize(value: string, kafkaFileUri: Uri | undefined, settings: SerializationSetting[]): Buffer | string | null { + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const path = settings[0].value; + if (!path) { + throw new SerializationException("The avro file path is required"); + } + const data = JSON.parse(value); + const type = readAVSC(path, kafkaFileUri); + return type.toBuffer(data); + } +} + +class AvroDeserializer implements Deserializer { + + deserialize(data: Buffer | null, kafkaFileUri: Uri | undefined, settings?: SerializationSetting[]): any { + if (data === null) { + return null; + } + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const path = settings[0].value; + if (!path) { + throw new SerializationException("The avro file path is required"); + } + const type = readAVSC(path, kafkaFileUri); + return type.fromBuffer(data); + } +} \ No newline at end of file diff --git a/src/client/consumer.ts b/src/client/consumer.ts index f5a41bda..6dd7897a 100644 --- a/src/client/consumer.ts +++ b/src/client/consumer.ts @@ -7,6 +7,7 @@ import { addQueryParameter, Client, ConnectionOptions } from "./client"; import { deserialize, MessageFormat, SerializationdResult, SerializationSetting } from "./serialization"; interface ConsumerOptions extends ConnectionOptions { + kafkaFileUri?: vscode.Uri; consumerGroupId: string; topicId: string; fromOffset: InitialConsumerOffset | string; @@ -63,7 +64,7 @@ export class Consumer implements vscode.Disposable { public state: ConsumerLaunchState = ConsumerLaunchState.idle; public error: any; - constructor(public uri: vscode.Uri, clusterSettings: ClusterSettings, private clientAccessor: ClientAccessor) { + constructor(public uri: vscode.Uri, kafkaFileUri: vscode.Uri | undefined, clusterSettings: ClusterSettings, private clientAccessor: ClientAccessor) { const { clusterId, consumerGroupId, topicId, fromOffset, partitions, messageKeyFormat, messageKeyFormatSettings, messageValueFormat,messageValueFormatSettings } = extractConsumerInfoUri(uri); this.clusterId = clusterId; const cluster = clusterSettings.get(clusterId); @@ -75,6 +76,7 @@ export class Consumer implements vscode.Disposable { const settings = getWorkspaceSettings(); this.options = { + kafkaFileUri, clusterProviderId: cluster.clusterProviderId, bootstrap: cluster.bootstrap, saslOption: cluster.saslOption, @@ -118,8 +120,8 @@ export class Consumer implements vscode.Disposable { this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { - message.key = deserialize(message.key, this.options.messageKeyFormat, this.options.messageKeyFormatSettings); - message.value = deserialize(message.value, this.options.messageValueFormat, this.options.messageValueFormatSettings); + message.key = deserialize(message.key, this.options.messageKeyFormat, this.options.kafkaFileUri, this.options.messageKeyFormatSettings); + message.value = deserialize(message.value, this.options.messageValueFormat, this.options.kafkaFileUri, this.options.messageValueFormatSettings); this.onDidReceiveMessageEmitter.fire({ uri: this.uri, record: { topic: topic, partition: partition, ...message }, @@ -235,9 +237,9 @@ export class ConsumerCollection implements vscode.Disposable { /** * Creates a new consumer for a provided uri. */ - async create(uri: vscode.Uri): Promise { + async create(uri: vscode.Uri, kafkaFileUri? : vscode.Uri): Promise { // Create the consumer - const consumer = new Consumer(uri, this.clusterSettings, this.clientAccessor); + const consumer = new Consumer(uri, kafkaFileUri, this.clusterSettings, this.clientAccessor); this.consumers[uri.toString()] = consumer; // Fire an event to notify that Consumer is starting diff --git a/src/client/serialization.ts b/src/client/serialization.ts index 45a5a544..ce395d5b 100644 --- a/src/client/serialization.ts +++ b/src/client/serialization.ts @@ -1,4 +1,8 @@ -export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short"; +import { Uri } from "vscode"; + +const serializerRegistry: Map = new Map(); + +export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" | string; export type SerializationdResult = any | Error; @@ -11,13 +15,15 @@ export interface SerializationSetting { // ---------------- Serializers ---------------- -interface Serializer { - serialize(data: string, settings?: SerializationSetting[]): Buffer | string | null; +export interface Serializer { + serialize(data: string, kafkaFileUri?: Uri, settings?: SerializationSetting[]): Buffer | string | null; } -const serializerRegistry: Map = new Map(); +export function registerSerializer(serializerId: string, serializer: Serializer) { + serializerRegistry.set(serializerId, serializer); +} -export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null { +export function serialize(data?: string, format?: MessageFormat, kafkaFileUri?: Uri, settings?: SerializationSetting[]): Buffer | string | null { if (!data || !format) { return data || null; } @@ -25,7 +31,7 @@ export function serialize(data?: string, format?: MessageFormat, settings?: Seri if (!serializer) { throw new SerializationException(`Cannot find a serializer for ${format} format.`); } - return serializer.serialize(data, settings); + return serializer.serialize(data, kafkaFileUri, settings); } function getSerializer(format: MessageFormat): Serializer | undefined { @@ -84,7 +90,7 @@ class ShortSerializer implements Serializer { class StringSerializer implements Serializer { - serialize(value: string, settings?: SerializationSetting[]): Buffer | string | null { + serialize(value: string, kafkaFileUri: Uri | undefined, settings?: SerializationSetting[]): Buffer | string | null { const encoding = settings?.[0].value; if (encoding) { return Buffer.from(value, encoding); @@ -93,22 +99,26 @@ class StringSerializer implements Serializer { }; } -serializerRegistry.set("double", new DoubleSerializer()); -serializerRegistry.set("float", new FloatSerializer()); -serializerRegistry.set("integer", new IntegerSerializer()); -serializerRegistry.set("long", new LongSerializer()); -serializerRegistry.set("short", new ShortSerializer()); -serializerRegistry.set("string", new StringSerializer()); +// Register default Kafka serializers +registerSerializer("double", new DoubleSerializer()); +registerSerializer("float", new FloatSerializer()); +registerSerializer("integer", new IntegerSerializer()); +registerSerializer("long", new LongSerializer()); +registerSerializer("short", new ShortSerializer()); +registerSerializer("string", new StringSerializer()); // ---------------- Deserializers ---------------- +const deserializerRegistry: Map = new Map(); -interface Deserializer { - deserialize(data: Buffer, settings?: SerializationSetting[]): any; +export interface Deserializer { + deserialize(data: Buffer, kafkaFileUri?: Uri, settings?: SerializationSetting[]): any; } -const deserializerRegistry: Map = new Map(); +export function registerDeserializer(deserializerId: string, deserializer: Deserializer) { + deserializerRegistry.set(deserializerId, deserializer); +} -export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null { +export function deserialize(data: Buffer | null, format?: MessageFormat, kafkaFileUri?: Uri, settings?: SerializationSetting[]): SerializationdResult | null { if (data === null || !format) { return data; } @@ -120,7 +130,7 @@ export function deserialize(data: Buffer | null, format?: MessageFormat, setting if (!deserializer) { throw new SerializationException(`Cannot find a deserializer for ${format} format.`); } - return deserializer.deserialize(data, settings); + return deserializer.deserialize(data, kafkaFileUri, settings); } catch (e) { return e; @@ -198,7 +208,7 @@ class ShortDeserializer implements Deserializer { class StringDeserializer implements Deserializer { - deserialize(data: Buffer | null, settings?: SerializationSetting[]): any { + deserialize(data: Buffer | null, kafkaFileUri: Uri | undefined, settings?: SerializationSetting[]): any { if (data === null) { return null; } @@ -207,9 +217,10 @@ class StringDeserializer implements Deserializer { } } -deserializerRegistry.set("double", new DoubleDeserializer()); -deserializerRegistry.set("float", new FloatDeserializer()); -deserializerRegistry.set("integer", new IntegerDeserializer()); -deserializerRegistry.set("long", new LongDeserializer()); -deserializerRegistry.set("short", new ShortDeserializer()); -deserializerRegistry.set("string", new StringDeserializer()); +// Register default Kafka deserializers +registerDeserializer("double", new DoubleDeserializer()); +registerDeserializer("float", new FloatDeserializer()); +registerDeserializer("integer", new IntegerDeserializer()); +registerDeserializer("long", new LongDeserializer()); +registerDeserializer("short", new ShortDeserializer()); +registerDeserializer("string", new StringDeserializer()); diff --git a/src/commands/consumers.ts b/src/commands/consumers.ts index ca3b88a9..a1ba8ccf 100644 --- a/src/commands/consumers.ts +++ b/src/commands/consumers.ts @@ -71,7 +71,8 @@ abstract class LaunchConsumerCommandHandler { openDocument(consumeUri); // Start the consumer - await startConsumerWithProgress(consumeUri, this.consumerCollection, this.explorer); + const kafkaFileUri = vscode.window.activeTextEditor?.document.uri; + await startConsumerWithProgress(consumeUri, kafkaFileUri, this.consumerCollection, this.explorer); } else { // Stop the consumer if (consumer) { @@ -134,7 +135,7 @@ export class ToggleConsumerCommandHandler { if (started) { await stopConsumerWithProgress(uri, this.consumerCollection); } else { - await startConsumerWithProgress(uri, this.consumerCollection); + await startConsumerWithProgress(uri, uri, this.consumerCollection); } } catch (e) { @@ -263,7 +264,7 @@ export class DeleteConsumerGroupCommandHandler { } } -async function startConsumerWithProgress(consumeUri: vscode.Uri, consumerCollection: ConsumerCollection, explorer?: KafkaExplorer) { +async function startConsumerWithProgress(consumeUri: vscode.Uri, kafkaFileUri: vscode.Uri | undefined, consumerCollection: ConsumerCollection, explorer?: KafkaExplorer) { const consumer = consumerCollection.get(consumeUri); if (consumer && consumer.state === ConsumerLaunchState.closing) { vscode.window.showErrorMessage(`The consumer cannot be started because it is stopping.`); @@ -275,7 +276,7 @@ async function startConsumerWithProgress(consumeUri: vscode.Uri, consumerCollect cancellable: false }, (progress, token) => { return new Promise((resolve, reject) => { - consumerCollection.create(consumeUri) + consumerCollection.create(consumeUri, kafkaFileUri) .then(consumer => { if (explorer) { explorer.refresh(); diff --git a/src/commands/producers.ts b/src/commands/producers.ts index c56834e3..690c0708 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -56,6 +56,7 @@ export class ProduceRecordCommandHandler { faker.setLocale(this.settings.producerFakerJSLocale); } + const kafkaFileUri = vscode.window.activeTextEditor?.document.uri; const messages = [...Array(times).keys()].map(() => { if (this.settings.producerFakerJSEnabled) { //Use same seed for key and value so we can generate content like @@ -67,15 +68,15 @@ export class ProduceRecordCommandHandler { faker.seed(seed); const randomizedValue = faker.fake(value); return { - key: serialize(randomizedKey, command.messageKeyFormat, command.messageKeyFormatSettings), - value: serialize(randomizedValue, command.messageValueFormat, command.messageValueFormatSettings) + key: serialize(randomizedKey, command.messageKeyFormat, kafkaFileUri, command.messageKeyFormatSettings), + value: serialize(randomizedValue, command.messageValueFormat, kafkaFileUri, command.messageValueFormatSettings) }; } // Return key/value message as-is return { - key: serialize(key, command.messageKeyFormat, command.messageKeyFormatSettings), - value: serialize(value, command.messageValueFormat, command.messageValueFormatSettings) + key: serialize(key, command.messageKeyFormat, kafkaFileUri, command.messageKeyFormatSettings), + value: serialize(value, command.messageValueFormat, kafkaFileUri, command.messageValueFormatSettings) }; }); diff --git a/src/extension.ts b/src/extension.ts index b9c3a717..33f31178 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -36,7 +36,8 @@ import { markdownPreviewProvider } from "./docs/markdownPreviewProvider"; import { getDefaultKafkaExtensionParticipant, refreshClusterProviderDefinitions } from "./kafka-extensions/registry"; import { KafkaExtensionParticipant } from "./kafka-extensions/api"; import { ProducerCollection } from "./client/producer"; -import { startLanguageClient } from "./kafka-file/kafkaFileClient"; +import { registerKafkaFileSupport } from "./kafka-file/kafkaFileSupport"; +import { registerAvroFileSupport } from "./avro/avroFileSupport"; export function activate(context: vscode.ExtensionContext): KafkaExtensionParticipant { Context.register(context); @@ -143,7 +144,12 @@ export function activate(context: vscode.ExtensionContext): KafkaExtensionPartic // .kafka file related context.subscriptions.push( - startLanguageClient(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context) + registerKafkaFileSupport(clusterSettings, clientAccessor, workspaceSettings, producerCollection, consumerCollection, explorer, context) + ); + + // .avro file related + context.subscriptions.push( + registerAvroFileSupport(context) ); context.subscriptions.push( diff --git a/src/kafka-file/kafkaFileClient.ts b/src/kafka-file/kafkaFileSupport.ts similarity index 90% rename from src/kafka-file/kafkaFileClient.ts rename to src/kafka-file/kafkaFileSupport.ts index 346d483e..4a898457 100644 --- a/src/kafka-file/kafkaFileClient.ts +++ b/src/kafka-file/kafkaFileSupport.ts @@ -112,9 +112,9 @@ class DataModelTopicProvider implements TopicProvider { } } -export function startLanguageClient( +export function registerKafkaFileSupport( clusterSettings: ClusterSettings, - clientAccessor: ClientAccessor, + clientAccessor : ClientAccessor, workspaceSettings: WorkspaceSettings, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, @@ -171,6 +171,11 @@ export function startLanguageClient( context.subscriptions.push( vscode.languages.registerHoverProvider(documentSelector, hover) ); + // Document Link + const documentLink = new KafkaFileDocumentLinkProvider(kafkaFileDocuments, languageService); + context.subscriptions.push( + vscode.languages.registerDocumentLinkProvider(documentSelector, documentLink) + ); // Open / Close document context.subscriptions.push(vscode.workspace.onDidOpenTextDocument(e => { @@ -201,7 +206,7 @@ export function startLanguageClient( }; } -function createLanguageService(clusterSettings: ClusterSettings, clientAccessor: ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService { +function createLanguageService(clusterSettings: ClusterSettings, clientAccessor : ClientAccessor, producerCollection: ProducerCollection, consumerCollection: ConsumerCollection, modelProvider: KafkaModelProvider): LanguageService { const producerLaunchStateProvider = { getProducerLaunchState(uri: vscode.Uri): ProducerLaunchState { const producer = producerCollection.get(uri); @@ -219,7 +224,7 @@ function createLanguageService(clusterSettings: ClusterSettings, clientAccessor: const selectedClusterProvider = { getSelectedCluster() { const selected = clusterSettings.selected; - const clusterId = selected?.id; + const clusterId = selected?.id; const clusterState = clusterId ? clientAccessor.getState(clusterId) : undefined; return { clusterId, @@ -300,7 +305,7 @@ class KafkaFileDiagnostics extends AbstractKafkaFileFeature implements vscode.Di kafkaFileDocuments: LanguageModelCache, languageService: LanguageService, clusterSettings: ClusterSettings, - clientAccessor: ClientAccessor, + clientAccessor : ClientAccessor, modelProvider: KafkaModelProvider, settings: WorkspaceSettings ) { @@ -373,7 +378,16 @@ class KafkaFileHoverProvider extends AbstractKafkaFileFeature implements vscode. return runSafeAsync(async () => { const kafkaFileDocument = this.getKafkaFileDocument(document); return this.languageService.doHover(document, kafkaFileDocument, position); - }, null, `Error while computing hover for ${document.uri}`, token); + }, null, `Error while computing hover for ${document.uri}`, token); } +} + +class KafkaFileDocumentLinkProvider extends AbstractKafkaFileFeature implements vscode.DocumentLinkProvider { + provideDocumentLinks(document: vscode.TextDocument, token: vscode.CancellationToken): vscode.ProviderResult { + return runSafeAsync(async () => { + const kafkaFileDocument = this.getKafkaFileDocument(document); + return this.languageService.provideDcumentLinks(document, kafkaFileDocument); + }, null, `Error while computing document link for ${document.uri}`, token); + } } \ No newline at end of file diff --git a/src/kafka-file/languageservice/kafkaFileLanguageService.ts b/src/kafka-file/languageservice/kafkaFileLanguageService.ts index cf27cfb4..c8931e6c 100644 --- a/src/kafka-file/languageservice/kafkaFileLanguageService.ts +++ b/src/kafka-file/languageservice/kafkaFileLanguageService.ts @@ -1,11 +1,12 @@ -import { CodeLens, CompletionList, Diagnostic, Hover, Position, TextDocument, Uri } from "vscode"; +import { CodeLens, CompletionList, Diagnostic, DocumentLink, Hover, Position, TextDocument, Uri } from "vscode"; import { ClientState, ConsumerLaunchState } from "../../client"; import { BrokerConfigs } from "../../client/config"; import { ProducerLaunchState } from "../../client/producer"; -import { KafkaFileDocument, parseKafkaFile } from "./parser/kafkaFileParser"; +import { CalleeFunction, KafkaFileDocument, parseKafkaFile, Property } from "./parser/kafkaFileParser"; import { KafkaFileCodeLenses } from "./services/codeLensProvider"; import { KafkaFileCompletion } from "./services/completion"; import { KafkaFileDiagnostics } from "./services/diagnostics"; +import { KafkaFileDocumentLinks } from "./services/documentLinks"; import { KafkaFileHover } from "./services/hover"; /** @@ -96,6 +97,8 @@ export interface LanguageService { * @param position the position where the hover was triggered. */ doHover(document: TextDocument, kafkaFileDocument: KafkaFileDocument, position: Position): Promise; + + provideDcumentLinks(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Promise; } /** @@ -112,15 +115,25 @@ export function getLanguageService(producerLaunchStateProvider: ProducerLaunchSt const completion = new KafkaFileCompletion(selectedClusterProvider, topicProvider); const diagnostics = new KafkaFileDiagnostics(selectedClusterProvider, topicProvider); const hover = new KafkaFileHover(selectedClusterProvider, topicProvider); + const links = new KafkaFileDocumentLinks(); return { parseKafkaFileDocument: (document: TextDocument) => parseKafkaFile(document), getCodeLenses: codeLenses.getCodeLenses.bind(codeLenses), doComplete: completion.doComplete.bind(completion), doDiagnostics: diagnostics.doDiagnostics.bind(diagnostics), - doHover: hover.doHover.bind(hover) + doHover: hover.doHover.bind(hover), + provideDcumentLinks: links.provideDocumentLinks.bind(links) }; } +export function getAvroCalleeFunction(property: Property): CalleeFunction | undefined { + if (property.propertyName === 'key-format' || property.propertyName === 'value-format') { + const callee = property.value; + if (callee && callee.functionName === 'avro') { + return callee; + } + } +} export function createTopicDocumentation(topic: TopicDetail): string { return `Topic \`${topic.id}\`\n` + ` * partition count: \`${topic.partitionCount}\`\n` + diff --git a/src/kafka-file/languageservice/model.ts b/src/kafka-file/languageservice/model.ts index 25f08278..ebab27d4 100644 --- a/src/kafka-file/languageservice/model.ts +++ b/src/kafka-file/languageservice/model.ts @@ -97,13 +97,17 @@ const consumerProperties = [ { name: "short", description: "Similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java)." + }, + { + name: "avro", + description: "Avro deserializer" } ] }, { name: "value-format", description: `[Deserializer](${getDocumentationPageUri('Consuming', 'deserializer')}) to use for the value *[optional]*.`, - enum: [ + enum: [ { name: "none", description: "No deserializer (ignores content)" @@ -131,6 +135,10 @@ const consumerProperties = [ { name: "short", description: "Similar deserializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortDeserializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java)." + }, + { + name: "avro", + description: "Avro deserializer" } ] }, @@ -182,6 +190,10 @@ const producerProperties = [ { name: "short", description: "Similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java)." + }, + { + name: "avro", + description: "Avro serializer" } ] }, @@ -212,6 +224,10 @@ const producerProperties = [ { name: "short", description: "Similar serializer to the Kafka Java client [org.apache.kafka.common.serialization.ShortSerializer](https://github.com/apache/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java)." + }, + { + name: "avro", + description: "Avro serializer" } ] } diff --git a/src/kafka-file/languageservice/services/diagnostics.ts b/src/kafka-file/languageservice/services/diagnostics.ts index 4c62ee1b..64ee8330 100644 --- a/src/kafka-file/languageservice/services/diagnostics.ts +++ b/src/kafka-file/languageservice/services/diagnostics.ts @@ -4,10 +4,18 @@ import { ConsumerValidator } from "../../../validators/consumer"; import { ProducerValidator } from "../../../validators/producer"; import { CommonsValidator } from "../../../validators/commons"; import { fakerjsAPIModel, PartModelProvider } from "../model"; -import { SelectedClusterProvider, TopicProvider } from "../kafkaFileLanguageService"; +import { getAvroCalleeFunction, SelectedClusterProvider, TopicProvider } from "../kafkaFileLanguageService"; import { ClientState } from "../../../client"; import { BrokerConfigs } from "../../../client/config"; +import { checkAVSC } from "../../../avro/avroFileSupport"; +class ValidationContext { + public readonly diagnostics: Diagnostic[] = []; + + constructor(public readonly document: TextDocument, public producerFakerJSEnabled: boolean) { + + } +} /** * Kafka file diagnostics support. */ @@ -18,24 +26,24 @@ export class KafkaFileDiagnostics { } async doDiagnostics(document: TextDocument, kafkaFileDocument: KafkaFileDocument, producerFakerJSEnabled: boolean): Promise { - const diagnostics: Diagnostic[] = []; + const validationContext = new ValidationContext(document, producerFakerJSEnabled); for (const block of kafkaFileDocument.blocks) { if (block.type === BlockType.consumer) { - await this.validateConsumerBlock(block, diagnostics); + await this.validateConsumerBlock(block, validationContext); } else { - await this.validateProducerBlock(block, producerFakerJSEnabled, diagnostics); + await this.validateProducerBlock(block, validationContext); } } - return diagnostics; + return validationContext.diagnostics; } - async validateConsumerBlock(block: ConsumerBlock, diagnostics: Diagnostic[]) { - await this.validateProperties(block, false, diagnostics); + async validateConsumerBlock(block: ConsumerBlock, validationContext: ValidationContext) { + await this.validateProperties(block, validationContext); } - async validateProducerBlock(block: ProducerBlock, producerFakerJSEnabled: boolean, diagnostics: Diagnostic[]) { - await this.validateProperties(block, producerFakerJSEnabled, diagnostics); - this.validateProducerValue(block, producerFakerJSEnabled, diagnostics); + async validateProducerBlock(block: ProducerBlock, validationContext: ValidationContext) { + await this.validateProperties(block, validationContext); + this.validateProducerValue(block, validationContext.producerFakerJSEnabled, validationContext.diagnostics); } validateProducerValue(block: ProducerBlock, producerFakerJSEnabled: boolean, diagnostics: Diagnostic[]) { @@ -150,12 +158,12 @@ export class KafkaFileDiagnostics { return new Range(start, end); } - async validateProperties(block: Block, producerFakerJSEnabled: boolean, diagnostics: Diagnostic[]) { + async validateProperties(block: Block, validationContext: ValidationContext) { const existingProperties = new Map(); let topicProperty: Property | undefined; for (const property of block.properties) { const propertyName = property.propertyName; - this.validateProperty(property, block, producerFakerJSEnabled, diagnostics); + this.validateProperty(property, block, validationContext); if (propertyName === 'topic') { topicProperty = property; } @@ -173,11 +181,14 @@ export class KafkaFileDiagnostics { if (properties.length > 1) { properties.forEach(property => { const range = property.propertyKeyRange; + const diagnostics = validationContext.diagnostics; diagnostics.push(new Diagnostic(range, `Duplicate property '${propertyName}'`, DiagnosticSeverity.Warning)); }); } }); + // Validate existing topic declaration and topic value + const diagnostics = validationContext.diagnostics; if (!topicProperty) { const range = new Range(block.start, new Position(block.start.line, block.start.character + 8)); diagnostics.push(new Diagnostic(range, `The ${block.type === BlockType.consumer ? 'consumer' : 'producer'} must declare the 'topic:' property.`, DiagnosticSeverity.Error)); @@ -186,7 +197,8 @@ export class KafkaFileDiagnostics { } } - validateProperty(property: Property, block: Block, producerFakerJSEnabled: boolean, diagnostics: Diagnostic[]) { + validateProperty(property: Property, block: Block, validationContext: ValidationContext) { + const diagnostics = validationContext.diagnostics; const propertyName = property.propertyName; // 1. Validate property syntax this.validateSyntaxProperty(propertyName, property, diagnostics); @@ -198,7 +210,29 @@ export class KafkaFileDiagnostics { this.validateUnknownProperty(propertyName, property, diagnostics); } else { // 3. Validate property value - this.validatePropertyValue(property, block.type, producerFakerJSEnabled, diagnostics); + this.validatePropertyValue(property, block.type, validationContext.producerFakerJSEnabled, diagnostics); + } + + // validate avro parameter + const avro = getAvroCalleeFunction(property); + if (avro) { + const parameter = avro.parameters[0]; + const path = parameter?.value; + if (!path) { + // parameter path is required + const range = property.propertyValue ? property.propertyTrimmedValueRange : property.propertyKeyRange; + if (range) { + diagnostics.push(new Diagnostic(range, `The avro format must declare the 'path' parameter.`, DiagnosticSeverity.Error)); + } + } else { + const result = checkAVSC(path, validationContext.document.uri); + if (result) { + const range = parameter.range(); + if (range) { + diagnostics.push(new Diagnostic(range, result, DiagnosticSeverity.Error)); + } + } + } } } } diff --git a/src/kafka-file/languageservice/services/documentLinks.ts b/src/kafka-file/languageservice/services/documentLinks.ts new file mode 100644 index 00000000..325fa330 --- /dev/null +++ b/src/kafka-file/languageservice/services/documentLinks.ts @@ -0,0 +1,24 @@ +import { DocumentLink, TextDocument } from "vscode"; +import { resolvePath } from "../../../avro/avroFileSupport"; +import { getAvroCalleeFunction } from "../kafkaFileLanguageService"; +import { KafkaFileDocument } from "../parser/kafkaFileParser"; + +export class KafkaFileDocumentLinks { + + async provideDocumentLinks(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Promise { + const links: Array = []; + for (const block of kafkaFileDocument.blocks) { + block.properties.forEach(property => { + const callee = getAvroCalleeFunction(property); + if (callee) { + const parameter = callee.parameters[0]; + if (parameter) { + const targetUri = resolvePath(parameter.value, document.uri); + links.push(new DocumentLink(parameter.range(), targetUri)); + } + } + }); + } + return links; + } +} \ No newline at end of file diff --git a/src/test/suite/kafka-file/languageservice/completionProperties.test.ts b/src/test/suite/kafka-file/languageservice/completionProperties.test.ts index 924f4914..1f46f409 100644 --- a/src/test/suite/kafka-file/languageservice/completionProperties.test.ts +++ b/src/test/suite/kafka-file/languageservice/completionProperties.test.ts @@ -43,12 +43,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) }, { @@ -79,12 +79,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) }, { @@ -114,12 +114,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) }, { @@ -149,12 +149,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { @@ -184,12 +184,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { @@ -219,12 +219,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) }, { @@ -256,12 +256,12 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { },*/ { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|none,string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|none,string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) }, { @@ -421,6 +421,11 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 11), position(1, 11)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 11), position(1, 11)) } ] }); @@ -471,6 +476,11 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 13), position(1, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 13), position(1, 13)) } ] }); @@ -557,12 +567,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 0)) } ] @@ -588,12 +598,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 1)) } ] @@ -618,12 +628,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 5)) } ] @@ -648,12 +658,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) } ] @@ -678,12 +688,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 6)) } ] @@ -708,12 +718,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { }, { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(1, 0), position(1, 11)) } ] @@ -740,12 +750,12 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { },*/ { label: 'key-format', kind: CompletionItemKind.Property, - insertText: 'key-format: ${1|string,double,float,integer,long,short|}', + insertText: 'key-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) }, { label: 'value-format', kind: CompletionItemKind.Property, - insertText: 'value-format: ${1|string,double,float,integer,long,short|}', + insertText: 'value-format: ${1|string,double,float,integer,long,short,avro|}', range: range(position(2, 0), position(2, 5)) } ] @@ -792,6 +802,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 11), position(1, 11)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 11), position(1, 11)) } ] }); @@ -837,6 +852,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 11), position(1, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 11), position(1, 13)) } ] }); @@ -883,6 +903,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(2, 11), position(2, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(2, 11), position(2, 13)) } ] }); @@ -930,6 +955,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(2, 11), position(2, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(2, 11), position(2, 13)) } ] }); @@ -975,6 +1005,11 @@ suite("Kafka File PRODUCER Completion Test Suite", () => { label: 'short', kind: CompletionItemKind.Value, insertText: ' short', range: range(position(1, 13), position(1, 13)) + }, + { + label: 'avro', kind: CompletionItemKind.Value, + insertText: ' avro', + range: range(position(1, 13), position(1, 13)) } ] }); diff --git a/tsconfig.json b/tsconfig.json index 593386da..298679eb 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,8 @@ "target": "es6", "outDir": "out", "lib": [ - "es6" + "es6", + "dom" ], "sourceMap": true, "rootDir": "src",