Skip to content

Commit

Permalink
Support for local Avro schema in CONSUMER/PRODUCER of kafka-file
Browse files Browse the repository at this point in the history
Fixes jlandersen#114

Signed-off-by: azerr <[email protected]>
  • Loading branch information
angelozerr committed May 11, 2021
1 parent abd60c0 commit 6c81fe0
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 9 deletions.
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 18 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,23 @@
".kafka"
],
"aliases": [
"kafka"
"Kafka DSL"
],
"configuration": "./language-configuration.json"
},
{
"id": "kafka-consumer",
"aliases": [
"Kafka Consumer"
"Kafka Consumer View"
]
},
{
"id": "jsonc",
"filenamePatterns": [
"*.avsc"
],
"aliases": [
"Avro Schema Definition"
]
}
],
Expand Down Expand Up @@ -199,6 +208,10 @@
{
"fileMatch": "package.json",
"url": "./schemas/package.schema.json"
},
{
"fileMatch": "*.avsc",
"url": "./schemas/avro-avsc.json"
}
],
"commands": [
Expand Down Expand Up @@ -320,7 +333,7 @@
"command": "vscode-kafka.discover.clusterproviders",
"title": "Discover Cluster Providers",
"category": "Kafka",
"icon":"$(extensions)"
"icon": "$(extensions)"
}
],
"menus": {
Expand Down Expand Up @@ -451,6 +464,7 @@
"test": "node ./out/test/runTest.js"
},
"dependencies": {
"avsc": "^5.6.3",
"faker": "^5.5.2",
"fs-extra": "^8.1.0",
"glob": "^7.1.6",
Expand All @@ -476,4 +490,4 @@
"webpack": "^5.10.0",
"webpack-cli": "^4.2.0"
}
}
}
163 changes: 163 additions & 0 deletions schemas/avro-avsc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@

{
"$schema": "http://json-schema.org/draft-06/schema#",
"title": "Avro Schema Definition",
"description": "Json-Schema definition for Avro AVSC files.",
"definitions": {
"avroSchema": {
"title": "Avro Schema",
"description": "Root Schema",
"oneOf": [
{ "$ref": "#/definitions/types" }
]
},
"types": {
"title": "Avro Types",
"description": "Allowed Avro types",
"oneOf": [
{ "$ref": "#/definitions/primitiveType" },
{ "$ref": "#/definitions/primitiveTypeWithMetadata" },
{ "$ref": "#/definitions/customTypeReference" },
{ "$ref": "#/definitions/avroRecord" },
{ "$ref": "#/definitions/avroEnum" },
{ "$ref": "#/definitions/avroArray" },
{ "$ref": "#/definitions/avroMap" },
{ "$ref": "#/definitions/avroFixed" },
{ "$ref": "#/definitions/avroUnion" }
]
},
"primitiveType": {
"title": "Primitive Type",
"description": "Basic type primitives.",
"type":"string",
"enum": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"bytes",
"string"
]
},
"primitiveTypeWithMetadata": {
"title": "Primitive Type With Metadata",
"description": "A primitive type with metadata attached.",
"type": "object",
"properties": {
"type": { "$ref": "#/definitions/primitiveType" }
},
"required": ["type"]
},
"customTypeReference": {
"title": "Custom Type",
"description": "Reference to a ComplexType",
"not": { "$ref": "#/definitions/primitiveType" },
"type": "string",
"pattern": "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*$"
},
"avroUnion": {
"title": "Union",
"description": "A Union of types",
"type": "array",
"items": { "$ref": "#/definitions/avroSchema" },
"minItems": 1
},
"avroField": {
"title": "Field",
"description": "A field within a Record",
"type": "object",
"properties": {
"name": { "$ref": "#/definitions/name" },
"type": { "$ref": "#/definitions/types" },
"doc": { "type": "string" },
"default": { },
"order": { "enum": ["ascending", "descending", "ignore"] },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } }
},
"required": ["name", "type"]
},
"avroRecord": {
"title": "Record",
"description": "A Record",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["record"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"fields": { "type": "array", "items": { "$ref": "#/definitions/avroField" } }
},
"required": ["type", "name", "fields"]
},
"avroEnum": {
"title": "Enum",
"description": "An enumeration",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["enum"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"symbols": { "type": "array", "items": { "$ref": "#/definitions/name" } }
},
"required": ["type", "name", "symbols"]
},
"avroArray": {
"title": "Array",
"description": "An array",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["array"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"items": { "$ref": "#/definitions/types" }
},
"required": ["type", "items"]
},
"avroMap": {
"title": "Map",
"description": "A map of values",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["map"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"values": { "$ref": "#/definitions/types" }
},
"required": ["type", "values"]
},
"avroFixed": {
"title": "Fixed",
"description": "A fixed sized array of bytes",
"type": "object",
"properties": {
"type": {"type":"string", "enum": ["fixed"]},
"name": { "$ref": "#/definitions/name" },
"namespace": { "$ref": "#/definitions/namespace" },
"doc": { "type": "string" },
"aliases": { "type": "array", "items": { "$ref": "#/definitions/name" } },
"size": {"type":"number"}
},
"required": ["type", "name", "size"]
},
"name": {
"type": "string",
"pattern": "^[A-Za-z_][A-Za-z0-9_]*$"
},
"namespace": {
"type": "string",
"pattern": "^([A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)*)*$"
}
},
"oneOf": [
{ "$ref": "#/definitions/avroSchema" }
]
}
38 changes: 33 additions & 5 deletions src/client/serialization.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" ;
import * as avro from "avsc";

export type MessageFormat = "none" | "string" | "double" | "float" | "integer" | "long" | "short" | "avro";

export type SerializationdResult = any | Error;

export class SerializationException extends Error { }

export interface SerializationSetting {
name: string;
value?: string;
}

// ---------------- Serializers ----------------

interface Serializer {
Expand All @@ -12,7 +19,7 @@ interface Serializer {

const serializerRegistry: Map<MessageFormat, Serializer> = new Map();

export function serialize(data?: string, format?: MessageFormat): Buffer | string | null {
export function serialize(data?: string, format?: MessageFormat, settings?: SerializationSetting[]): Buffer | string | null {
if (!data || !format) {
return data || null;
}
Expand Down Expand Up @@ -94,12 +101,12 @@ serializerRegistry.set("string", new StringSerializer());
// ---------------- Deserializers ----------------

interface Deserializer {
deserialize(data: Buffer): any;
deserialize(data: Buffer, settings?: SerializationSetting[]): any;
}

const deserializerRegistry: Map<MessageFormat, Deserializer> = new Map();

export function deserialize(data: Buffer | null, format?: MessageFormat): SerializationdResult | null {
export function deserialize(data: Buffer | null, format?: MessageFormat, settings?: SerializationSetting[]): SerializationdResult | null {
if (data === null || !format) {
return data;
}
Expand All @@ -111,7 +118,7 @@ export function deserialize(data: Buffer | null, format?: MessageFormat): Serial
if (!deserializer) {
throw new SerializationException(`Cannot find a deserializer for ${format} format.`);
}
return deserializer.deserialize(data);
return deserializer.deserialize(data, settings);
}
catch (e) {
return e;
Expand Down Expand Up @@ -197,9 +204,30 @@ class StringDeserializer implements Deserializer {
}
}

class AvroDeserializer implements Deserializer {

deserialize(data: Buffer | null, settings?: SerializationSetting[]): any {
if (data === null) {
return null;
}
if (!settings) {
throw new SerializationException("The avro file path is required");
}
const type = avro.Type.forSchema({
type: 'record',
fields: [
{ name: 'kind', type: { type: 'enum', symbols: ['CAT', 'DOG'] } },
{ name: 'name', type: 'string' }
]
} as avro.Schema);
return type.fromBuffer(data);
}
}

deserializerRegistry.set("double", new DoubleDeserializer());
deserializerRegistry.set("float", new FloatDeserializer());
deserializerRegistry.set("integer", new IntegerDeserializer());
deserializerRegistry.set("long", new LongDeserializer());
deserializerRegistry.set("short", new ShortDeserializer());
deserializerRegistry.set("string", new StringDeserializer());
deserializerRegistry.set("avro", new AvroDeserializer());

0 comments on commit 6c81fe0

Please sign in to comment.