Skip to content

Commit

Permalink
refactor: integrate custom schemas with parser and spectral (#579)
Browse files Browse the repository at this point in the history
  • Loading branch information
magicmatatjahu authored and derberg committed Oct 4, 2022
1 parent 4558bb2 commit dbd5e0d
Show file tree
Hide file tree
Showing 21 changed files with 1,746 additions and 371 deletions.
1,284 changes: 1,061 additions & 223 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
"dependencies": {
"@asyncapi/specs": "^3.1.0",
"@openapi-contrib/openapi-schema-to-json-schema": "^3.2.0",
"@stoplight/spectral-core": "^1.10.1",
"@stoplight/spectral-functions": "^1.5.1",
"@stoplight/spectral-parsers": "^1.0.1",
"@stoplight/spectral-rulesets": "^1.4.3",
"@stoplight/spectral-core": "^1.13.1",
"@stoplight/spectral-functions": "^1.7.1",
"@stoplight/spectral-parsers": "^1.0.2",
"@stoplight/spectral-rulesets": "^1.12.0",
"ajv": "^8.11.0",
"avsc": "^5.7.4",
"js-yaml": "^3.14.1",
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const xParserMessageName = 'x-parser-message-name';
export const xParserSchemaId = 'x-parser-schema-id';

export const xParserOriginalSchemaFormat = 'x-parser-original-schema-format';
export const xParserOriginalPayload = 'x-parser-original-payload';
export const xParserOriginalTraits = 'x-parser-original-traits';

export const xParserCircular = 'x-parser-circular';
Expand Down
17 changes: 11 additions & 6 deletions src/custom-operations/parse-schema.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { JSONPath } from 'jsonpath-plus';
import { toPath } from 'lodash';

import { parseSchema, getDefaultSchemaFormat } from '../schema-parser';
import { xParserOriginalSchemaFormat } from '../constants';
import { parseSchema, getSchemaFormat, getDefaultSchemaFormat } from '../schema-parser';
import { xParserOriginalPayload } from '../constants';

import type { Parser } from '../parser';
import type { ParseSchemaInput } from "../schema-parser";
Expand All @@ -14,15 +14,17 @@ interface ToParseItem {
}

const customSchemasPathsV2 = [
// operations
'$.channels.*.[publish,subscribe].message',
'$.channels.*.[publish,subscribe].message.oneOf.*',
'$.components.channels.*.[publish,subscribe].message',
'$.components.channels.*.[publish,subscribe].message.oneOf.*',
// messages
'$.components.messages.*',
];

export async function parseSchemasV2(parser: Parser, detailed: DetailedAsyncAPI) {
const defaultSchemaFormat = getDefaultSchemaFormat(detailed.parsed.asyncapi as string);
const defaultSchemaFormat = getDefaultSchemaFormat(detailed.semver.version);
const parseItems: Array<ToParseItem> = [];

const visited: Set<unknown> = new Set();
Expand All @@ -43,13 +45,16 @@ export async function parseSchemasV2(parser: Parser, detailed: DetailedAsyncAPI)
return;
}

const schemaFormat = getSchemaFormat(value.schemaFormat, detailed.semver.version);
parseItems.push({
input: {
asyncapi: detailed,
data: payload,
meta: undefined,
meta: {
message: value,
},
path: [...toPath(result.path.slice(1)), 'payload'],
schemaFormat: value.schemaFormat || defaultSchemaFormat,
schemaFormat,
defaultSchemaFormat,
},
value,
Expand All @@ -62,6 +67,6 @@ export async function parseSchemasV2(parser: Parser, detailed: DetailedAsyncAPI)
}

async function parseSchemaV2(parser: Parser, item: ToParseItem) {
item.value[xParserOriginalSchemaFormat] = item.input.schemaFormat;
item.value[xParserOriginalPayload] = item.input.data;
item.value.payload = await parseSchema(parser, item.input);
}
5 changes: 3 additions & 2 deletions src/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AsyncAPIDocumentInterface, newAsyncAPIDocument } from "./models";

import { customOperations } from './custom-operations';
import { validate } from "./lint";
import { createDetailedAsyncAPI, normalizeInput, toAsyncAPIDocument } from "./utils";
import { createDetailedAsyncAPI, normalizeInput, toAsyncAPIDocument, unfreezeObject } from "./utils";

import { xParserSpecParsed } from './constants';

Expand Down Expand Up @@ -47,7 +47,7 @@ export async function parse(parser: Parser, asyncapi: ParseInput, options?: Pars
}

// unfreeze the object - Spectral makes resolved document "freezed"
const validatedDoc = JSON.parse(JSON.stringify(validated));
const validatedDoc = unfreezeObject(validated);
validatedDoc[String(xParserSpecParsed)] = true;

const detailed = createDetailedAsyncAPI(asyncapi as string | Record<string, unknown>, validatedDoc);
Expand All @@ -67,6 +67,7 @@ export async function parse(parser: Parser, asyncapi: ParseInput, options?: Pars

const defaultOptions: ParseOptions = {
applyTraits: true,
parseSchemas: true,
};
function normalizeOptions(options?: ParseOptions): ParseOptions {
if (!options || typeof options !== 'object') {
Expand Down
13 changes: 8 additions & 5 deletions src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { Spectral } from "@stoplight/spectral-core";
import { asyncapi as aasRuleset } from "@stoplight/spectral-rulesets";

import { parse } from "./parse";
import { lint, validate } from "./lint";
import { registerSchemaParser } from './schema-parser';
import { AsyncAPISchemaParser } from "./schema-parser/asyncapi-schema-parser";
import { configureSpectral } from "./spectral";

import type { IConstructorOpts } from "@stoplight/spectral-core";
import type { ParseInput, ParseOptions } from "./parse";
Expand All @@ -18,16 +19,18 @@ export class Parser {
public readonly parserRegistry = new Map<string, SchemaParser>();
public readonly spectral: Spectral;

constructor(options?: ParserOptions) {
const { spectral } = options || {};
constructor(
private readonly options?: ParserOptions
) {
const { spectral } = this.options || {};
if (spectral instanceof Spectral) {
this.spectral = spectral;
} else {
this.spectral = new Spectral(spectral);
}

// TODO: fix type
this.spectral.setRuleset(aasRuleset as any);
this.registerSchemaParser(AsyncAPISchemaParser());
configureSpectral(this);
}

parse(asyncapi: ParseInput, options?: ParseOptions) {
Expand Down
9 changes: 5 additions & 4 deletions src/schema-parser/asyncapi-schema-parser.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { SchemaParser, ParseSchemaInput, ValidateSchemaInput } from "../schema-parser";
import Ajv, { ErrorObject, ValidateFunction } from "ajv";
import type { AsyncAPISchema, SchemaValidateResult } from '../types';
import Ajv from "ajv";
// @ts-ignore
import specs from '@asyncapi/specs';

import type { ErrorObject, ValidateFunction } from "ajv";
import type { AsyncAPISchema, SchemaValidateResult } from '../types';
import type { SchemaParser, ParseSchemaInput, ValidateSchemaInput } from "../schema-parser";

const ajv = new Ajv({
allErrors: true,
strict: false,
logger: false,
});

// Only versions compatible with JSON Schema Draf-07 are supported.
const specVersions = Object.keys(specs).filter((version: string) => !['1.0.0', '1.1.0', '1.2.0', '2.0.0-rc1', '2.0.0-rc2'].includes(version));

Expand Down
62 changes: 25 additions & 37 deletions src/schema-parser/avro-schema-parser.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import avsc, { type Schema} from "avsc";
import type { AsyncAPISchema, SchemaValidateResult } from "types";
import { SchemaParser, type ParseSchemaInput, type ValidateSchemaInput } from "../schema-parser";
import { type JSONSchema7TypeName } from "json-schema";
import avsc from "avsc";

import type { JSONSchema7TypeName } from "json-schema";
import type { Schema } from "avsc";
import type { SchemaParser, ParseSchemaInput, ValidateSchemaInput } from "../schema-parser";
import type { AsyncAPISchema, SchemaValidateResult } from '../types';

type AvroSchema = Schema & { [key: string]: any } & any;

Expand Down Expand Up @@ -32,29 +34,15 @@ async function validate(input: ValidateSchemaInput<unknown, unknown>): Promise<S

async function parse(input: ParseSchemaInput<unknown, unknown>): Promise<AsyncAPISchema> {
const asyncAPISchema = await avroToJsonSchema(input.data as AvroSchema);
const message = (input.meta as any).message

// TODO: Should the following modifications to the message object be done in the caller and for all parsers rather than here?
// remove that function when https://github.com/asyncapi/spec/issues/622 will be introduced in AsyncAPI spec
async function handleKafkaProtocolKey() {
if (message.bindings && message.bindings.kafka) {
const key = message.bindings.kafka.key;
if (key) {
const bindingsTransformed = await avroToJsonSchema(key);
message['x-parser-original-bindings-kafka-key'] = key;
message.bindings.kafka.key = bindingsTransformed;
}
}
}

if (message !== undefined) {
// TODO: Should the following modifications to the message object be done in the caller and for all parsers rather than here?
message['x-parser-original-schema-format'] = input.schemaFormat || input.defaultSchemaFormat;
message['x-parser-original-payload'] = input.data;
message.payload = asyncAPISchema;
delete message.schemaFormat;

await handleKafkaProtocolKey();
const message = (input.meta as any).message
const key = message?.bindings?.kafka?.key;
if (key) {
const bindingsTransformed = await avroToJsonSchema(key);
message['x-parser-original-bindings-kafka-key'] = key;
message.bindings.kafka.key = bindingsTransformed;
}

return asyncAPISchema;
Expand Down Expand Up @@ -94,7 +82,7 @@ const typeMappings: { [key: string]: JSONSchema7TypeName } = {
uuid: 'string',
};

const commonAttributesMapping = (avroDefinition: AvroSchema, jsonSchema: AsyncAPISchema, isTopLevel: boolean) => {
function commonAttributesMapping(avroDefinition: AvroSchema, jsonSchema: AsyncAPISchema, isTopLevel: boolean): void {
if (avroDefinition.doc) jsonSchema.description = avroDefinition.doc;
if (avroDefinition.default !== undefined) jsonSchema.default = avroDefinition.default;

Expand Down Expand Up @@ -124,7 +112,7 @@ function getFullyQualifiedName(avroDefinition: AvroSchema) {
* @param parentJsonSchema the parent json schema which contains the required property to enrich
* @param haveDefaultValue we assure that a required field does not have a default value
*/
const requiredAttributesMapping = (fieldDefinition: any, parentJsonSchema: AsyncAPISchema, haveDefaultValue: boolean) => {
function requiredAttributesMapping(fieldDefinition: any, parentJsonSchema: AsyncAPISchema, haveDefaultValue: boolean): void {
const isUnionWithNull = Array.isArray(fieldDefinition.type) && fieldDefinition.type.includes('null');

// we assume that a union type without null and a field without default value is required
Expand All @@ -134,7 +122,7 @@ const requiredAttributesMapping = (fieldDefinition: any, parentJsonSchema: Async
}
};

function extractNonNullableTypeIfNeeded(typeInput: any, jsonSchemaInput: AsyncAPISchema) {
function extractNonNullableTypeIfNeeded(typeInput: any, jsonSchemaInput: AsyncAPISchema): { type: string, jsonSchema: AsyncAPISchema } {
let type = typeInput;
let jsonSchema = jsonSchemaInput;
// Map example to first non-null type
Expand All @@ -145,10 +133,10 @@ function extractNonNullableTypeIfNeeded(typeInput: any, jsonSchemaInput: AsyncAP
jsonSchema = jsonSchema.oneOf[0] as AsyncAPISchema;
}
}
return {type, jsonSchema};
return { type, jsonSchema };
}

const exampleAttributeMapping = (type: any, example: any, jsonSchema: AsyncAPISchema) => {
function exampleAttributeMapping(type: any, example: any, jsonSchema: AsyncAPISchema): void {
if (example === undefined || jsonSchema.examples || Array.isArray(type)) return;

switch (type) {
Expand All @@ -163,7 +151,7 @@ const exampleAttributeMapping = (type: any, example: any, jsonSchema: AsyncAPISc
}
};

const additionalAttributesMapping = (typeInput: any, avroDefinition: AvroSchema, jsonSchemaInput: AsyncAPISchema) => {
function additionalAttributesMapping(typeInput: any, avroDefinition: AvroSchema, jsonSchemaInput: AsyncAPISchema): void {
const __ret = extractNonNullableTypeIfNeeded(typeInput, jsonSchemaInput);
const type = __ret.type;
const jsonSchema = __ret.jsonSchema;
Expand Down Expand Up @@ -201,7 +189,7 @@ const additionalAttributesMapping = (typeInput: any, avroDefinition: AvroSchema,
}
};

function validateAvroSchema(avroDefinition: AvroSchema) {
function validateAvroSchema(avroDefinition: AvroSchema): void | never {
// don't need to use the output from parsing the
// avro definition - we're just using this as a
// validator as this will throw an exception if
Expand All @@ -217,13 +205,13 @@ function validateAvroSchema(avroDefinition: AvroSchema) {
* @param key String | Undefined - the fully qualified name of an avro record
* @param value JsonSchema - The json schema from the avro record
*/
function cacheAvroRecordDef(cache: {[key:string]: AsyncAPISchema}, key: string, value: AsyncAPISchema) {
function cacheAvroRecordDef(cache: {[key:string]: AsyncAPISchema}, key: string, value: AsyncAPISchema): void {
if (key !== undefined) {
cache[key] = value;
}
}

async function convertAvroToJsonSchema(avroDefinition: AvroSchema , isTopLevel: boolean, recordCache: Map<string, AsyncAPISchema> | any = {}) {
async function convertAvroToJsonSchema(avroDefinition: AvroSchema , isTopLevel: boolean, recordCache: Map<string, AsyncAPISchema> | any = {}): Promise<AsyncAPISchema> {
const jsonSchema: AsyncAPISchema = {};
const isUnion = Array.isArray(avroDefinition);

Expand Down Expand Up @@ -287,8 +275,8 @@ async function convertAvroToJsonSchema(avroDefinition: AvroSchema , isTopLevel:
* @param jsonSchema the schema for the record.
* @returns {Promise<Map<string, any>>}
*/
async function processRecordSchema(avroDefinition: AvroSchema, recordCache: {[key:string]: AsyncAPISchema}, jsonSchema: AsyncAPISchema) {
const propsMap = new Map();
async function processRecordSchema(avroDefinition: AvroSchema, recordCache: {[key:string]: AsyncAPISchema}, jsonSchema: AsyncAPISchema): Promise<Map<string, any>> {
const propsMap = new Map<string, any>();
for (const field of avroDefinition.fields) {
// If the type is a sub schema it will have been stored in the cache.
if (recordCache[field.type]) {
Expand Down Expand Up @@ -317,9 +305,9 @@ async function processRecordSchema(avroDefinition: AvroSchema, recordCache: {[ke
* @param avroDefinition the avro schema to be processed
* @param isTopLevel is this the top level of the schema or is this a sub schema
* @param recordCache the cache of previously processed record types
* @returns {Promise<JsonSchema>} the mutated jsonSchema that was provided to the function
* @returns {Promise<AsyncAPISchema>} the mutated jsonSchema that was provided to the function
*/
async function processUnionSchema(jsonSchema: AsyncAPISchema, avroDefinition: AvroSchema, isTopLevel: boolean, recordCache: {[key:string]: AsyncAPISchema}) {
async function processUnionSchema(jsonSchema: AsyncAPISchema, avroDefinition: AvroSchema, isTopLevel: boolean, recordCache: {[key:string]: AsyncAPISchema}): Promise<AsyncAPISchema> {
jsonSchema.oneOf = [];
let nullDef = null;
for (const avroDef of avroDefinition) {
Expand Down
14 changes: 10 additions & 4 deletions src/schema-parser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ export interface SchemaParser<D = unknown, M = unknown> {
export async function validateSchema(parser: Parser, input: ValidateSchemaInput) {
const schemaParser = parser.parserRegistry.get(input.schemaFormat);
if (schemaParser === undefined) {
// throw appropriate error
throw new Error();
throw new Error('Unknown schema format');
}
return schemaParser.validate(input);
}

export async function parseSchema(parser: Parser, input: ParseSchemaInput) {
const schemaParser = parser.parserRegistry.get(input.schemaFormat);
if (schemaParser === undefined) {
return;
throw new Error('Unknown schema format');
}
return schemaParser.parse(input);
}
Expand All @@ -49,14 +48,21 @@ export function registerSchemaParser(parser: Parser, schemaParser: SchemaParser)
|| typeof schemaParser.parse !== 'function'
|| typeof schemaParser.getMimeTypes !== 'function'
) {
throw new Error('custom parser must have "parse()", "validate()" and "getMimeTypes()" functions.');
throw new Error('Custom parser must have "parse()", "validate()" and "getMimeTypes()" functions.');
}

schemaParser.getMimeTypes().forEach(schemaFormat => {
parser.parserRegistry.set(schemaFormat, schemaParser);
});
}

export function getSchemaFormat(schematFormat: string | undefined, asyncapiVersion: string) {
if (typeof schematFormat === 'string') {
return schematFormat;
}
return getDefaultSchemaFormat(asyncapiVersion);
}

export function getDefaultSchemaFormat(asyncapiVersion: string) {
return `application/vnd.aai.asyncapi;version=${asyncapiVersion}`;
}
Loading

0 comments on commit dbd5e0d

Please sign in to comment.