Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro validation and serialization #173

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ await memphisConnection.produce({
stationName: '<station-name>',
producerName: '<producer-name>',
genUniqueSuffix: false, // defaults to false
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema)
ackWaitSec: 15, // defaults to 15
asyncProduce: true // defaults to false
headers: headers, // defults to empty
Expand All @@ -328,7 +328,7 @@ Creating a producer first

```js
await producer.produce({
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema)
ackWaitSec: 15 // defaults to 15
});
```
Expand All @@ -349,7 +349,7 @@ or
```js
const headers = { key: 'value' };
await producer.produce({
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema)
headers: headers
});
```
Expand All @@ -360,7 +360,7 @@ Meaning your application won't wait for broker acknowledgement - use only in cas

```js
await producer.produce({
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema)
ackWaitSec: 15, // defaults to 15
asyncProduce: true // defaults to false
});
Expand All @@ -372,7 +372,7 @@ Stations are idempotent by default for 2 minutes (can be configured), Idempotenc

```js
await producer.produce({
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema)
message: 'Uint8Arrays/object/string/DocumentNode graphql', // Uint8Arrays/object (schema validated station - protobuf) or Uint8Arrays/object (schema validated station - json schema) or Uint8Arrays/string/DocumentNode graphql (schema validated station - graphql schema) or Uint8Arrays/object (schema validated station - avro schema)
ackWaitSec: 15, // defaults to 15
msgId: 'id' // defaults to null
});
Expand Down
2 changes: 2 additions & 0 deletions lib/memphis.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ declare class Memphis {
producersPerStation: Map<string, number>;
meassageDescriptors: Map<string, protobuf.Type>;
jsonSchemas: Map<string, Function>;
avroSchemas: Map<string, Function>;
graphqlSchemas: Map<string, GraphQLSchema>;
clusterConfigurations: Map<string, boolean>;
stationSchemaverseToDlsMap: Map<string, boolean>;
Expand Down Expand Up @@ -68,6 +69,7 @@ declare class Memphis {
private _compileProtobufSchema;
private _scemaUpdatesListener;
private _compileJsonSchema;
private _compileAvroSchema;
private _compileGraphQl;
private _listenForSchemaUpdates;
private _sdkClientUpdatesListener;
Expand Down
24 changes: 23 additions & 1 deletion lib/memphis.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const storageTypes = {
MEMORY: 'memory'
};
const maxBatchSize = 5000;
const avro = require('avro-js')

class Memphis {
constructor() {
this.isConnectionActive = false;
Expand All @@ -64,6 +66,7 @@ class Memphis {
this.producersPerStation = new Map();
this.meassageDescriptors = new Map();
this.jsonSchemas = new Map();
this.avroSchemas = new Map();
this.graphqlSchemas = new Map();
this.clusterConfigurations = new Map();
this.stationSchemaverseToDlsMap = new Map();
Expand Down Expand Up @@ -238,6 +241,10 @@ class Memphis {
const graphQlSchema = this._compileGraphQl(internalStationName);
this.graphqlSchemas.set(internalStationName, graphQlSchema);
break;
case 'avro':
const avroSchema = this._compileAvroSchema(internalStationName);
this.avroSchemas.set(internalStationName, avroSchema);
break;
}
}
const sub = this.brokerManager.subscribe(`$memphis_schema_updates_${internalStationName}`);
Expand Down Expand Up @@ -290,6 +297,17 @@ class Memphis {
}
}
}
_compileAvroSchema(stationName) {
let stationSchemaData = this.stationSchemaDataMap.get(stationName);
const schema = stationSchemaData['active_version']['schema_content'];
let validate;
try {
validate = avro.parse(schema);
return validate;
} catch (ex) {
throw (0, utils_1.MemphisError)(new Error('invalid avro schema'));
}
}
_compileGraphQl(stationName) {
const stationSchemaData = this.stationSchemaDataMap.get(stationName);
const schemaContent = stationSchemaData['active_version']['schema_content'];
Expand Down Expand Up @@ -322,6 +340,10 @@ class Memphis {
const graphQlSchema = this._compileGraphQl(stationName);
this.graphqlSchemas.set(stationName, graphQlSchema);
break;
case 'avro':
const avroSchema = this._compileAvroSchema(stationName);
this.avroSchemas.set(stationName, avroSchema);
break;
}
}
catch (ex) {
Expand Down Expand Up @@ -679,7 +701,7 @@ class Memphis {
}
async createSchema({ schemaName, schemaType, schemaFilePath }) {
try {
if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf")
if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf" && schemaType !== "avro")
throw (0, utils_1.MemphisError)(new Error("Schema type not supported"));
var nameConvention = RegExp('^[a-z0-9_.-]*$');
if (!nameConvention.test(schemaName))
Expand Down
1 change: 1 addition & 0 deletions lib/producer.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export declare class Producer {
}): Promise<void>;
private _parseJsonValidationErrors;
private _validateJsonMessage;
private _validateAvroMessage;
private _validateProtobufMessage;
private _validateGraphqlMessage;
private _validateMessage;
Expand Down
43 changes: 43 additions & 0 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ exports.Producer = void 0;
const graphql_1 = require("graphql");
const _1 = require(".");
const utils_1 = require("./utils");
const avro = require('avro-js')
const schemaVFailAlertType = 'schema_validation_fail_alert';
class Producer {
constructor(connection, producerName, stationName, realName) {
Expand Down Expand Up @@ -106,6 +107,46 @@ class Producer {
throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${ex.message}`));
}
}
_validateAvroMessage(msg) {
try {
let schema = this.connection.avroSchemas.get(this.internal_station);
let msgObj;
let msgToSend = new Uint8Array();
const isBuffer = Buffer.isBuffer(msg);
if (isBuffer) {
try {
msgObj = JSON.parse(msg.toString());
} catch (ex) {
throw (0, utils_1.MemphisError)(new Error('Expecting Avro format: ' + ex));
}
msgToSend = msg;
const type = avro.parse(schema);
var buf = type.toBuffer(msgObj);
const valid = type.isValid(msgObj);
if (!valid) {
throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${type}`));
}
return msgToSend;
} else if (Object.prototype.toString.call(msg) == '[object Object]') {
msgObj = msg;
let enc = new TextEncoder();
const msgString = JSON.stringify(msg);
msgToSend = enc.encode(msgString);
const type = avro.parse(schema);
var buf = type.toBuffer(msgObj);
const valid = type.isValid(msgObj);
if (!valid) {
throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${ex.message}`));
}

return msgToSend;
} else {
throw (0, utils_1.MemphisError)(new Error('Unsupported message type'));
}
} catch (ex) {
throw (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${ex.message}`));
}
}
_validateProtobufMessage(msg) {
let meassageDescriptor = this.connection.meassageDescriptors.get(this.internal_station);
if (meassageDescriptor) {
Expand Down Expand Up @@ -182,6 +223,8 @@ class Producer {
return this._validateJsonMessage(msg);
case 'graphql':
return this._validateGraphqlMessage(msg);
case 'avro':
return this._validateAvroMessage(msg);
default:
return msg;
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"@nestjs/platform-express": "^9.0.0",
"ajv": "^8.11.2",
"ajv-draft-04": "^1.0.0",
"avro-js": "^1.11.2",
"axios": "^0.26.1",
"dotenv": "^16.0.1",
"graphql": "^16.6.0",
Expand Down
25 changes: 24 additions & 1 deletion src/memphis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { Station } from './station';
import { generateNameSuffix, MemphisError, sleep } from './utils';
import { v4 as uuidv4 } from 'uuid';
import { NatsConnection } from 'nats';
const avro = require('avro-js')

interface IRetentionTypes {
MAX_MESSAGE_AGE_SECONDS: string;
Expand Down Expand Up @@ -81,6 +82,7 @@ class Memphis {
public producersPerStation: Map<string, number>;
public meassageDescriptors: Map<string, protobuf.Type>;
public jsonSchemas: Map<string, Function>;
public avroSchemas: Map<string, Function>;
public graphqlSchemas: Map<string, GraphQLSchema>;
public clusterConfigurations: Map<string, boolean>;
public stationSchemaverseToDlsMap: Map<string, boolean>;
Expand Down Expand Up @@ -116,6 +118,7 @@ class Memphis {
this.producersPerStation = new Map();
this.meassageDescriptors = new Map();
this.jsonSchemas = new Map();
this.avroSchemas = new Map();
this.graphqlSchemas = new Map();
this.clusterConfigurations = new Map();
this.stationSchemaverseToDlsMap = new Map();
Expand Down Expand Up @@ -350,6 +353,10 @@ class Memphis {
const graphQlSchema = this._compileGraphQl(internalStationName);
this.graphqlSchemas.set(internalStationName, graphQlSchema);
break;
case 'avro':
const avroSchema = this._compileAvroSchema(internalStationName);
this.avroSchemas.set(internalStationName, avroSchema);
break;
}
}
const sub = this.brokerManager.subscribe(
Expand Down Expand Up @@ -400,6 +407,18 @@ class Memphis {
}
}

private _compileAvroSchema(stationName: string): any {
let stationSchemaData = this.stationSchemaDataMap.get(stationName);
const schema = stationSchemaData['active_version']['schema_content'];
let validate: any;
try {
validate = avro.parse(schema);
return validate;
} catch (ex) {
throw MemphisError(new Error('invalid avro schema'));
}
}

private _compileGraphQl(stationName: string): GraphQLSchema {
const stationSchemaData = this.stationSchemaDataMap.get(stationName);
const schemaContent = stationSchemaData['active_version']['schema_content'];
Expand Down Expand Up @@ -434,6 +453,10 @@ class Memphis {
const graphQlSchema = this._compileGraphQl(stationName);
this.graphqlSchemas.set(stationName, graphQlSchema);
break;
case 'avro':
const avroSchema = this._compileAvroSchema(stationName);
this.avroSchemas.set(stationName, avroSchema);
break;
}
} catch (ex) {
throw MemphisError(ex);
Expand Down Expand Up @@ -1064,7 +1087,7 @@ class Memphis {
}): Promise<void> {
try {

if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf")
if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf" && schemaType !== "avro")
throw MemphisError(new Error("Schema type not supported"));

var nameConvention = RegExp('^[a-z0-9_.-]*$');
Expand Down
44 changes: 44 additions & 0 deletions src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as broker from 'nats';

import { Memphis, MsgHeaders } from '.';
import { MemphisError, stringToHex } from './utils';
const avro = require('avro-js')

const schemaVFailAlertType = 'schema_validation_fail_alert';

Expand Down Expand Up @@ -129,6 +130,47 @@ export class Producer {
}
}

private _validateAvroMessage(msg: any): any {
try {
let schema = this.connection.avroSchemas.get(this.internal_station);
let msgObj;
let msgToSend = new Uint8Array();
const isBuffer = Buffer.isBuffer(msg);
if (isBuffer) {
try {
msgObj = JSON.parse(msg.toString());
} catch (ex) {
throw MemphisError(new Error('Expecting Avro format: ' + ex));
}
msgToSend = msg;
const type = avro.parse(schema);
var buf = type.toBuffer(msgObj);
const valid = type.isValid(msgObj);
if (!valid) {
throw MemphisError(new Error(`Schema validation has failed: ${type}`));
}
return msgToSend;
} else if (Object.prototype.toString.call(msg) == '[object Object]') {
msgObj = msg;
let enc = new TextEncoder();
const msgString = JSON.stringify(msg);
msgToSend = enc.encode(msgString);
const type = avro.parse(schema);
var buf = type.toBuffer(msgObj);
const valid = type.isValid(msgObj);
if (!valid) {
throw MemphisError(new Error(`Schema validation has failed: ${type}`));
}

return msgToSend;
} else {
throw MemphisError(new Error('Unsupported message type'));
}
} catch (ex) {
throw MemphisError(new Error(`Schema validation has failed: ${ex.message}`));
}
}

private _validateProtobufMessage(msg: any): any {
let meassageDescriptor = this.connection.meassageDescriptors.get(this.internal_station);
if (meassageDescriptor) {
Expand Down Expand Up @@ -200,6 +242,8 @@ export class Producer {
return this._validateJsonMessage(msg);
case 'graphql':
return this._validateGraphqlMessage(msg);
case 'avro':
return this._validateAvroMessage(msg);
default:
return msg;
}
Expand Down