diff --git a/packages/core/index.ts b/packages/core/index.ts index 18f370d0..0d088b4c 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -48,6 +48,7 @@ export type { MessageSchemaContainerOptions } from './lib/queues/MessageSchemaCo export { objectToBuffer } from './lib/utils/queueUtils' export { waitAndRetry } from './lib/utils/waitUtils' export { parseMessage } from './lib/utils/parseUtils' +export { toDatePreprocessor } from './lib/utils/toDateProcessor' export { reloadConfig, isProduction } from './lib/utils/envUtils' @@ -55,4 +56,5 @@ export { DomainEventEmitter } from './lib/events/DomainEventEmitter' export { EventRegistry } from './lib/events/EventRegistry' export { FakeListener } from './lib/events/fakes/FakeListener' export * from './lib/events/eventTypes' +export * from './lib/events/baseEventSchemas' export * from './lib/messages/baseMessageSchemas' diff --git a/packages/core/lib/events/baseEventSchemas.ts b/packages/core/lib/events/baseEventSchemas.ts new file mode 100644 index 00000000..73dcedd1 --- /dev/null +++ b/packages/core/lib/events/baseEventSchemas.ts @@ -0,0 +1,11 @@ +import { z } from 'zod' + +// Core fields that describe either internal event or external message +export const BASE_EVENT_SCHEMA = z.object({ + id: z.string().describe('event unique identifier'), + timestamp: z.string().datetime().describe('iso 8601 datetime'), + type: z.literal('').describe('event type name'), + payload: z.optional(z.object({})).describe('event payload based on type'), +}) + +export type BaseEventType = z.infer diff --git a/packages/core/lib/events/eventTypes.ts b/packages/core/lib/events/eventTypes.ts index 6ec46494..c1476bf5 100644 --- a/packages/core/lib/events/eventTypes.ts +++ b/packages/core/lib/events/eventTypes.ts @@ -6,7 +6,7 @@ import type { BASE_MESSAGE_SCHEMA } from '../messages/baseMessageSchemas' export type EventTypeNames = CommonEventDefinitionSchemaType['type'] -// To be extended with transport-specific fields, e. g. "snsTopic" in specific libraries +// To be extended with transport-specific fields, e.g. "snsTopic" in specific libraries export type CommonEventDefinition = { schema: ZodObject< Omit<(typeof BASE_MESSAGE_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny } diff --git a/packages/core/lib/messages/baseMessageSchemas.ts b/packages/core/lib/messages/baseMessageSchemas.ts index ce2a33f1..f892b30e 100644 --- a/packages/core/lib/messages/baseMessageSchemas.ts +++ b/packages/core/lib/messages/baseMessageSchemas.ts @@ -1,28 +1,28 @@ import z from 'zod' -// Core fields that describe event -export const BASE_MESSAGE_SCHEMA = z.object({ - id: z.string().describe('event unique identifier'), - timestamp: z.string().datetime().describe('iso 8601 datetime'), - type: z.literal('').describe('event type name'), - payload: z.optional(z.object({})).describe('event payload based on type'), -}) +import { BASE_EVENT_SCHEMA } from '../events/baseEventSchemas' + +// External message metadata that describe the context in which the message was created, primarily used for debugging purposes +export const MESSAGE_METADATA_SCHEMA = z + .object({ + schemaVersion: z.string().min(1).describe('message schema version'), + // this is always set to a service that created the message + producedBy: z.string().min(1).describe('app/service that produced the message'), + // this is always propagated within the message chain. For the first message in the chain it is equal to "producedBy" + originatedFrom: z + .string() + .min(1) + .describe('app/service that initiated entire workflow that led to creating this message'), + // this is always propagated within the message chain. + correlationId: z.string().describe('unique identifier passed to all events in workflow chain'), + }) + .describe('external message metadata') -// Extra fields that are optional for the event processing -export const EXTENDED_MESSAGE_SCHEMA = BASE_MESSAGE_SCHEMA.extend({ - metadata: z - .object({ - schemaVersion: z.string().min(1).describe('message schema version'), - producedBy: z.string().min(1).describe('app/service that produced the message'), - originatedFrom: z - .string() - .min(1) - .describe('app/service that initiated entire workflow that led to creating this message'), - correlationId: z - .string() - .describe('unique identifier passed to all events in workflow chain'), - }) - .describe('event metadata'), +export const BASE_MESSAGE_SCHEMA = BASE_EVENT_SCHEMA.extend({ + // For internal domain events that did not originate within a message chain metadata field can be omitted, producer should then assume it is initiating a new chain + metadata: MESSAGE_METADATA_SCHEMA.optional(), }) export type BaseMessageType = z.infer + +export type MessageMetadataType = z.infer diff --git a/packages/core/lib/utils/toDatePreprocessor.spec.ts b/packages/core/lib/utils/toDatePreprocessor.spec.ts new file mode 100644 index 00000000..5eb382bb --- /dev/null +++ b/packages/core/lib/utils/toDatePreprocessor.spec.ts @@ -0,0 +1,52 @@ +import { describe, expect, it } from 'vitest' +import { z } from 'zod' + +import { toDatePreprocessor } from './toDateProcessor' + +describe('toDatePreprocessor', () => { + it('converts valid strings to date', () => { + const SCHEMA = z.object({ + createdAt: z.preprocess(toDatePreprocessor, z.date()), + updatedAt: z.preprocess(toDatePreprocessor, z.date()), + }) + + const result = SCHEMA.parse({ + createdAt: '2016-01-01', + updatedAt: '2022-01-12T00:00:00.000Z', + }) + + expect(result).toEqual({ + createdAt: new Date('2016-01-01'), + updatedAt: new Date('2022-01-12T00:00:00.000Z'), + }) + }) + + it('converts valid numbers to date', () => { + const SCHEMA = z.object({ + createdAt: z.preprocess(toDatePreprocessor, z.date()), + updatedAt: z.preprocess(toDatePreprocessor, z.date()), + }) + + const result = SCHEMA.parse({ + createdAt: 166, + updatedAt: 99999, + }) + + expect(result).toEqual({ + createdAt: new Date(166), + updatedAt: new Date(99999), + }) + }) + + it('does not convert function input', () => { + const SCHEMA = z.object({ + createdAt: z.preprocess(toDatePreprocessor, z.date()), + }) + + expect(() => + SCHEMA.parse({ + createdAt: (x: string) => x, + }), + ).toThrow(/Expected date, received function/) + }) +}) diff --git a/packages/core/lib/utils/toDateProcessor.ts b/packages/core/lib/utils/toDateProcessor.ts new file mode 100644 index 00000000..de9778fa --- /dev/null +++ b/packages/core/lib/utils/toDateProcessor.ts @@ -0,0 +1,10 @@ +export const toDatePreprocessor = (value: unknown) => { + switch (typeof value) { + case 'string': + case 'number': + return new Date(value) + + default: + return value // could not coerce, return the original and face the consequences during validation + } +} diff --git a/packages/core/test/testContext.ts b/packages/core/test/testContext.ts index 0e05d2a3..76762d46 100644 --- a/packages/core/test/testContext.ts +++ b/packages/core/test/testContext.ts @@ -8,7 +8,7 @@ import { z } from 'zod' import { DomainEventEmitter } from '../lib/events/DomainEventEmitter' import { EventRegistry } from '../lib/events/EventRegistry' -import { EXTENDED_MESSAGE_SCHEMA } from '../lib/messages/baseMessageSchemas' +import { BASE_MESSAGE_SCHEMA } from '../lib/messages/baseMessageSchemas' import type { TransactionObservabilityManager } from '../lib/types/MessageQueueTypes' export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON } @@ -19,7 +19,7 @@ const TestLogger: Logger = pino() export const TestEvents = { created: { - schema: EXTENDED_MESSAGE_SCHEMA.extend({ + schema: BASE_MESSAGE_SCHEMA.extend({ type: z.literal('entity.created'), payload: z.object({ message: z.string(), @@ -28,7 +28,7 @@ export const TestEvents = { }, updated: { - schema: EXTENDED_MESSAGE_SCHEMA.extend({ + schema: BASE_MESSAGE_SCHEMA.extend({ type: z.literal('entity.updated'), payload: z.object({ message: z.string(),