From 9278fe1f1f02f0260ace81f2db91d4aaac72bb40 Mon Sep 17 00:00:00 2001 From: big-vi Date: Fri, 28 Jul 2023 11:06:14 +0530 Subject: [PATCH] avro validation and serialization --- README.md | 10 +++++----- lib/memphis.d.ts | 2 ++ lib/memphis.js | 24 +++++++++++++++++++++++- lib/producer.d.ts | 1 + lib/producer.js | 43 +++++++++++++++++++++++++++++++++++++++++++ package.json | 1 + src/memphis.ts | 25 ++++++++++++++++++++++++- src/producer.ts | 44 ++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 143 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 120a1a8..5db8844 100644 --- a/README.md +++ b/README.md @@ -316,7 +316,7 @@ await memphisConnection.produce({ stationName: '', producerName: '', genUniqueSuffix: false, // defaults to false - message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) + message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema) ackWaitSec: 15, // defaults to 15 asyncProduce: true // defaults to false headers: headers, // defults to empty @@ -328,7 +328,7 @@ Creating a producer first ```js await producer.produce({ - message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) + message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema) ackWaitSec: 15 // defaults to 15 }); ``` @@ -349,7 +349,7 @@ or ```js const headers = { key: 'value' }; await producer.produce({ - message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) + message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema) headers: headers }); ``` @@ -360,7 +360,7 @@ Meaning your application won't wait for broker acknowledgement - use only in cas ```js await producer.produce({ - message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) + message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema) ackWaitSec: 15, // defaults to 15 asyncProduce: true // defaults to false }); @@ -372,7 +372,7 @@ Stations are idempotent by default for 2 minutes (can be configured), Idempotenc ```js await producer.produce({ - message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) + message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema) ackWaitSec: 15, // defaults to 15 msgId: 'id' // defaults to null }); diff --git a/lib/memphis.d.ts b/lib/memphis.d.ts index be11233..b393a52 100644 --- a/lib/memphis.d.ts +++ b/lib/memphis.d.ts @@ -40,6 +40,7 @@ declare class Memphis { producersPerStation: Map; meassageDescriptors: Map; jsonSchemas: Map; + avroSchemas: Map; graphqlSchemas: Map; clusterConfigurations: Map; stationSchemaverseToDlsMap: Map; @@ -68,6 +69,7 @@ declare class Memphis { private _compileProtobufSchema; private _scemaUpdatesListener; private _compileJsonSchema; + private _compileAvroSchema; private _compileGraphQl; private _listenForSchemaUpdates; private _sdkClientUpdatesListener; diff --git a/lib/memphis.js b/lib/memphis.js index 5e9e653..641c645 100644 --- a/lib/memphis.js +++ b/lib/memphis.js @@ -40,6 +40,8 @@ const storageTypes = { MEMORY: 'memory' }; const maxBatchSize = 5000; +const avro = require('avro-js') + class Memphis { constructor() { this.isConnectionActive = false; @@ -64,6 +66,7 @@ class Memphis { this.producersPerStation = new Map(); this.meassageDescriptors = new Map(); this.jsonSchemas = new Map(); + this.avroSchemas = new Map(); this.graphqlSchemas = new Map(); this.clusterConfigurations = new Map(); this.stationSchemaverseToDlsMap = new Map(); @@ -238,6 +241,10 @@ class Memphis { const graphQlSchema = this._compileGraphQl(internalStationName); this.graphqlSchemas.set(internalStationName, graphQlSchema); break; + case 'avro': + const avroSchema = this._compileAvroSchema(internalStationName); + this.avroSchemas.set(internalStationName, avroSchema); + break; } } const sub = this.brokerManager.subscribe(`$memphis_schema_updates_${internalStationName}`); @@ -290,6 +297,17 @@ class Memphis { } } } + _compileAvroSchema(stationName) { + let stationSchemaData = this.stationSchemaDataMap.get(stationName); + const schema = stationSchemaData['active_version']['schema_content']; + let validate; + try { + validate = avro.parse(schema); + return validate; + } catch (ex) { + throw (0, utils_1.MemphisError)(new Error('invalid avro schema')); + } + } _compileGraphQl(stationName) { const stationSchemaData = this.stationSchemaDataMap.get(stationName); const schemaContent = stationSchemaData['active_version']['schema_content']; @@ -322,6 +340,10 @@ class Memphis { const graphQlSchema = this._compileGraphQl(stationName); this.graphqlSchemas.set(stationName, graphQlSchema); break; + case 'avro': + const avroSchema = this._compileAvroSchema(stationName); + this.avroSchemas.set(stationName, avroSchema); + break; } } catch (ex) { @@ -679,7 +701,7 @@ class Memphis { } async createSchema({ schemaName, schemaType, schemaFilePath }) { try { - if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf") + if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf" && schemaType !== "avro") throw (0, utils_1.MemphisError)(new Error("Schema type not supported")); var nameConvention = RegExp('^[a-z0-9_.-]*$'); if (!nameConvention.test(schemaName)) diff --git a/lib/producer.d.ts b/lib/producer.d.ts index dba814d..a679770 100644 --- a/lib/producer.d.ts +++ b/lib/producer.d.ts @@ -17,6 +17,7 @@ export declare class Producer { }): Promise; private _parseJsonValidationErrors; private _validateJsonMessage; + private _validateAvroMessage; private _validateProtobufMessage; private _validateGraphqlMessage; private _validateMessage; diff --git a/lib/producer.js b/lib/producer.js index b2deb6c..dc57f9b 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -4,6 +4,7 @@ exports.Producer = void 0; const graphql_1 = require("graphql"); const _1 = require("."); const utils_1 = require("./utils"); +const avro = require('avro-js') const schemaVFailAlertType = 'schema_validation_fail_alert'; class Producer { constructor(connection, producerName, stationName, realName) { @@ -106,6 +107,46 @@ class Producer { throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${ex.message}`)); } } + _validateAvroMessage(msg) { + try { + let schema = this.connection.avroSchemas.get(this.internal_station); + let msgObj; + let msgToSend = new Uint8Array(); + const isBuffer = Buffer.isBuffer(msg); + if (isBuffer) { + try { + msgObj = JSON.parse(msg.toString()); + } catch (ex) { + throw (0, utils_1.MemphisError)(new Error('Expecting Avro format: ' + ex)); + } + msgToSend = msg; + const type = avro.parse(schema); + var buf = type.toBuffer(msgObj); + const valid = type.isValid(msgObj); + if (!valid) { + throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${type}`)); + } + return msgToSend; + } else if (Object.prototype.toString.call(msg) == '[object Object]') { + msgObj = msg; + let enc = new TextEncoder(); + const msgString = JSON.stringify(msg); + msgToSend = enc.encode(msgString); + const type = avro.parse(schema); + var buf = type.toBuffer(msgObj); + const valid = type.isValid(msgObj); + if (!valid) { + throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${ex.message}`)); + } + + return msgToSend; + } else { + throw (0, utils_1.MemphisError)(new Error('Unsupported message type')); + } + } catch (ex) { + throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${ex.message}`)); + } + } _validateProtobufMessage(msg) { let meassageDescriptor = this.connection.meassageDescriptors.get(this.internal_station); if (meassageDescriptor) { @@ -182,6 +223,8 @@ class Producer { return this._validateJsonMessage(msg); case 'graphql': return this._validateGraphqlMessage(msg); + case 'avro': + return this._validateAvroMessage(msg); default: return msg; } diff --git a/package.json b/package.json index d2c263d..5f2f0e4 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "@nestjs/platform-express": "^9.0.0", "ajv": "^8.11.2", "ajv-draft-04": "^1.0.0", + "avro-js": "^1.11.2", "axios": "^0.26.1", "dotenv": "^16.0.1", "graphql": "^16.6.0", diff --git a/src/memphis.ts b/src/memphis.ts index cdfc534..e0dd847 100644 --- a/src/memphis.ts +++ b/src/memphis.ts @@ -32,6 +32,7 @@ import { Station } from './station'; import { generateNameSuffix, MemphisError, sleep } from './utils'; import { v4 as uuidv4 } from 'uuid'; import { NatsConnection } from 'nats'; +const avro = require('avro-js') interface IRetentionTypes { MAX_MESSAGE_AGE_SECONDS: string; @@ -81,6 +82,7 @@ class Memphis { public producersPerStation: Map; public meassageDescriptors: Map; public jsonSchemas: Map; + public avroSchemas: Map; public graphqlSchemas: Map; public clusterConfigurations: Map; public stationSchemaverseToDlsMap: Map; @@ -116,6 +118,7 @@ class Memphis { this.producersPerStation = new Map(); this.meassageDescriptors = new Map(); this.jsonSchemas = new Map(); + this.avroSchemas = new Map(); this.graphqlSchemas = new Map(); this.clusterConfigurations = new Map(); this.stationSchemaverseToDlsMap = new Map(); @@ -350,6 +353,10 @@ class Memphis { const graphQlSchema = this._compileGraphQl(internalStationName); this.graphqlSchemas.set(internalStationName, graphQlSchema); break; + case 'avro': + const avroSchema = this._compileAvroSchema(internalStationName); + this.avroSchemas.set(internalStationName, avroSchema); + break; } } const sub = this.brokerManager.subscribe( @@ -400,6 +407,18 @@ class Memphis { } } + private _compileAvroSchema(stationName: string): any { + let stationSchemaData = this.stationSchemaDataMap.get(stationName); + const schema = stationSchemaData['active_version']['schema_content']; + let validate: any; + try { + validate = avro.parse(schema); + return validate; + } catch (ex) { + throw MemphisError(new Error('invalid avro schema')); + } + } + private _compileGraphQl(stationName: string): GraphQLSchema { const stationSchemaData = this.stationSchemaDataMap.get(stationName); const schemaContent = stationSchemaData['active_version']['schema_content']; @@ -434,6 +453,10 @@ class Memphis { const graphQlSchema = this._compileGraphQl(stationName); this.graphqlSchemas.set(stationName, graphQlSchema); break; + case 'avro': + const avroSchema = this._compileAvroSchema(stationName); + this.avroSchemas.set(stationName, avroSchema); + break; } } catch (ex) { throw MemphisError(ex); @@ -1064,7 +1087,7 @@ class Memphis { }): Promise { try { - if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf") + if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf" && schemaType !== "avro") throw MemphisError(new Error("Schema type not supported")); var nameConvention = RegExp('^[a-z0-9_.-]*$'); diff --git a/src/producer.ts b/src/producer.ts index 5284b7d..17048de 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -3,6 +3,7 @@ import * as broker from 'nats'; import { Memphis, MsgHeaders } from '.'; import { MemphisError, stringToHex } from './utils'; +const avro = require('avro-js') const schemaVFailAlertType = 'schema_validation_fail_alert'; @@ -129,6 +130,47 @@ export class Producer { } } + private _validateAvroMessage(msg: any): any { + try { + let schema = this.connection.avroSchemas.get(this.internal_station); + let msgObj; + let msgToSend = new Uint8Array(); + const isBuffer = Buffer.isBuffer(msg); + if (isBuffer) { + try { + msgObj = JSON.parse(msg.toString()); + } catch (ex) { + throw MemphisError(new Error('Expecting Avro format: ' + ex)); + } + msgToSend = msg; + const type = avro.parse(schema); + var buf = type.toBuffer(msgObj); + const valid = type.isValid(msgObj); + if (!valid) { + throw MemphisError(new Error(`Schema validation has failed: ${type}`)); + } + return msgToSend; + } else if (Object.prototype.toString.call(msg) == '[object Object]') { + msgObj = msg; + let enc = new TextEncoder(); + const msgString = JSON.stringify(msg); + msgToSend = enc.encode(msgString); + const type = avro.parse(schema); + var buf = type.toBuffer(msgObj); + const valid = type.isValid(msgObj); + if (!valid) { + throw MemphisError(new Error(`Schema validation has failed: ${type}`)); + } + + return msgToSend; + } else { + throw MemphisError(new Error('Unsupported message type')); + } + } catch (ex) { + throw MemphisError(new Error(`Schema validation has failed: ${ex.message}`)); + } + } + private _validateProtobufMessage(msg: any): any { let meassageDescriptor = this.connection.meassageDescriptors.get(this.internal_station); if (meassageDescriptor) { @@ -200,6 +242,8 @@ export class Producer { return this._validateJsonMessage(msg); case 'graphql': return this._validateGraphqlMessage(msg); + case 'avro': + return this._validateAvroMessage(msg); default: return msg; }