From 6c81fe048bd5fb041124340932efe5835deae829 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Wed, 5 May 2021 05:07:55 +0200 Subject: [PATCH] Support for local Avro schema in CONSUMER/PRODUCER of kafka-file Fixes #114 Signed-off-by: azerr --- package-lock.json | 5 ++ package.json | 22 ++++- schemas/avro-avsc.json | 163 ++++++++++++++++++++++++++++++++++++ src/client/serialization.ts | 38 +++++++-- 4 files changed, 219 insertions(+), 9 deletions(-) create mode 100644 schemas/avro-avsc.json diff --git a/package-lock.json b/package-lock.json index be16292e..73d75051 100644 --- a/package-lock.json +++ b/package-lock.json @@ -546,6 +546,11 @@ "integrity": "sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==", "dev": true }, + "avsc": { + "version": "5.6.3", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.6.3.tgz", + "integrity": "sha512-awjOvjubyo2Ax+Xzvj97ClgAG9z6q6WDVpD0T+WHWrkWg96nMH1mZKjs51dt1L3+pz3CC0bfoGZ22KvqXapXtw==" + }, "azure-devops-node-api": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/azure-devops-node-api/-/azure-devops-node-api-7.2.0.tgz", diff --git a/package.json b/package.json index 0c48be0b..48d8420a 100644 --- a/package.json +++ b/package.json @@ -162,14 +162,23 @@ ".kafka" ], "aliases": [ - "kafka" + "Kafka DSL" ], "configuration": "./language-configuration.json" }, { "id": "kafka-consumer", "aliases": [ - "Kafka Consumer" + "Kafka Consumer View" + ] + }, + { + "id": "jsonc", + "filenamePatterns": [ + "*.avsc" + ], + "aliases": [ + "Avro Schema Definition" ] } ], @@ -199,6 +208,10 @@ { "fileMatch": "package.json", "url": "./schemas/package.schema.json" + }, + { + "fileMatch": "*.avsc", + "url": "./schemas/avro-avsc.json" } ], "commands": [ @@ -320,7 +333,7 @@ "command": "vscode-kafka.discover.clusterproviders", "title": "Discover Cluster Providers", "category": "Kafka", - "icon":"$(extensions)" + "icon": "$(extensions)" } ], "menus": { @@ -451,6 +464,7 @@ "test": "node ./out/test/runTest.js" }, "dependencies": { + "avsc": "^5.6.3", "faker": "^5.5.2", "fs-extra": "^8.1.0", "glob": "^7.1.6", @@ -476,4 +490,4 @@ "webpack": "^5.10.0", "webpack-cli": "^4.2.0" } -} +} \ No newline at end of file diff --git a/schemas/avro-avsc.json b/schemas/avro-avsc.json new file mode 100644 index 00000000..3ecd7c8e --- /dev/null +++ b/schemas/avro-avsc.json @@ -0,0 +1,163 @@ + +{ + "$schema": "http://json-schema.org/draft-06/schema#", + "title": "Avro Schema Definition", + "description": "Json-Schema definition for Avro AVSC files.", + "definitions": { + "avroSchema": { + "title": "Avro Schema", + "description": "Root Schema", + "oneOf": [ + { "$ref": "#/definitions/types" } + ] + }, + "types": { + "title": "Avro Types", + "description": "Allowed Avro types", + "oneOf": [ + { "$ref": "#/definitions/primitiveType" }, + { "$ref": "#/definitions/primitiveTypeWithMetadata" }, + { "$ref": "#/definitions/customTypeReference" }, + { "$ref": "#/definitions/avroRecord" }, + { "$ref": "#/definitions/avroEnum" }, + { "$ref": "#/definitions/avroArray" }, + { "$ref": "#/definitions/avroMap" }, + { "$ref": "#/definitions/avroFixed" }, + { "$ref": "#/definitions/avroUnion" } + ] + }, + "primitiveType": { + "title": "Primitive Type", + "description": "Basic type primitives.", + "type":"string", + "enum": [ + "null", + "boolean", + "int", + "long", + "float", + "double", + "bytes", + "string" + ] + }, + "primitiveTypeWithMetadata": { + "title": "Primitive Type With Metadata", + "description": "A primitive type with metadata attached.", + "type": "object", + "properties": { + "type": { "$ref": "#/definitions/primitiveType" } + }, + "required": ["type"] + }, + "customTypeReference": { + "title": "Custom Type", + "description": "Reference to a ComplexType", + "not": { "$ref": "#/definitions/primitiveType" }, + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$" + }, + "avroUnion": { + "title": "Union", + "description": "A Union of types", + "type": "array", + "items": { "$ref": "#/definitions/avroSchema" }, + "minItems": 1 + }, + "avroField": { + "title": "Field", + "description": "A field within a Record", + "type": "object", + "properties": { + "name": { "$ref": "#/definitions/name" }, + "type": { "$ref": "#/definitions/types" }, + "doc": { "type": "string" }, + "default": { }, + "order": { "enum": ["ascending", "descending", "ignore"] }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["name", "type"] + }, + "avroRecord": { + "title": "Record", + "description": "A Record", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["record"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } } + }, + "required": ["type", "name", "fields"] + }, + "avroEnum": { + "title": "Enum", + "description": "An enumeration", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["enum"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } } + }, + "required": ["type", "name", "symbols"] + }, + "avroArray": { + "title": "Array", + "description": "An array", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["array"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "items": { "$ref": "#/definitions/types" } + }, + "required": ["type", "items"] + }, + "avroMap": { + "title": "Map", + "description": "A map of values", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["map"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "values": { "$ref": "#/definitions/types" } + }, + "required": ["type", "values"] + }, + "avroFixed": { + "title": "Fixed", + "description": "A fixed sized array of bytes", + "type": "object", + "properties": { + "type": {"type":"string", "enum": ["fixed"]}, + "name": { "$ref": "#/definitions/name" }, + "namespace": { "$ref": "#/definitions/namespace" }, + "doc": { "type": "string" }, + "aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }, + "size": {"type":"number"} + }, + "required": ["type", "name", "size"] + }, + "name": { + "type": "string", + "pattern": "^[A-Za-z_][A-Za-z0-9_]*$" + }, + "namespace": { + "type": "string", + "pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$" + } + }, + "oneOf": [ + { "$ref": "#/definitions/avroSchema" } + ] +} diff --git a/src/client/serialization.ts b/src/client/serialization.ts index d04b071b..3b777735 100644 --- a/src/client/serialization.ts +++ b/src/client/serialization.ts @@ -1,9 +1,16 @@ -export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" ; +import * as avro from "avsc"; + +export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" | "avro"; export type SerializationdResult = any | Error; export class SerializationException extends Error { } +export interface SerializationSetting { + name: string; + value?: string; +} + // ---------------- Serializers ---------------- interface Serializer { @@ -12,7 +19,7 @@ interface Serializer { const serializerRegistry: Map = new Map(); -export function serialize(data?: string, format?: MessageFormat): Buffer | string | null { +export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null { if (!data || !format) { return data || null; } @@ -94,12 +101,12 @@ serializerRegistry.set("string", new StringSerializer()); // ---------------- Deserializers ---------------- interface Deserializer { - deserialize(data: Buffer): any; + deserialize(data: Buffer, settings?: SerializationSetting[]): any; } const deserializerRegistry: Map = new Map(); -export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null { +export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null { if (data === null || !format) { return data; } @@ -111,7 +118,7 @@ export function deserialize(data: Buffer | null, format?: MessageFormat): Serial if (!deserializer) { throw new SerializationException(`Cannot find a deserializer for ${format} format.`); } - return deserializer.deserialize(data); + return deserializer.deserialize(data, settings); } catch (e) { return e; @@ -197,9 +204,30 @@ class StringDeserializer implements Deserializer { } } +class AvroDeserializer implements Deserializer { + + deserialize(data: Buffer | null, settings?: SerializationSetting[]): any { + if (data === null) { + return null; + } + if (!settings) { + throw new SerializationException("The avro file path is required"); + } + const type = avro.Type.forSchema({ + type: 'record', + fields: [ + { name: 'kind', type: { type: 'enum', symbols: ['CAT', 'DOG'] } }, + { name: 'name', type: 'string' } + ] + } as avro.Schema); + return type.fromBuffer(data); + } +} + deserializerRegistry.set("double", new DoubleDeserializer()); deserializerRegistry.set("float", new FloatDeserializer()); deserializerRegistry.set("integer", new IntegerDeserializer()); deserializerRegistry.set("long", new LongDeserializer()); deserializerRegistry.set("short", new ShortDeserializer()); deserializerRegistry.set("string", new StringDeserializer()); +deserializerRegistry.set("avro", new AvroDeserializer());