Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Middleware support for aggregate commands, read model resolvers and projections #1925

Merged
merged 27 commits into from
Jul 2, 2021
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4644b67
command middleware spike
const314 Jun 16, 2021
085649b
query middleware spike
const314 Jun 18, 2021
9c29e08
add projection middleware spike; extract applyMiddlewares; change config
const314 Jun 23, 2021
a9d7312
add types
const314 Jun 23, 2021
46c830b
fix params
const314 Jun 23, 2021
f7aa8ea
small fix
const314 Jun 23, 2021
acfb2e0
add more runtime info for middlewares
const314 Jun 24, 2021
d608465
command middleware tests
const314 Jun 25, 2021
8972e35
update snapshots
const314 Jun 25, 2021
35dfd01
Merge branch 'dev' into feature/middleware
const314 Jun 25, 2021
0286b58
pass request to command middleware
const314 Jun 28, 2021
72c85cb
simplify middlewares; prepare CommandMiddleware chain on interop crea…
const314 Jun 29, 2021
18f00f6
pass request to ResolverMiddleware
const314 Jun 29, 2021
4c7ca6f
prepare resolvers and projection middleware chains on interop creation
const314 Jun 29, 2021
d6b1e3d
reorder middleware arguments
const314 Jun 30, 2021
e02b458
read model middleware tests
const314 Jun 30, 2021
7c75d7f
rename read model middleware types
const314 Jun 30, 2021
5f648bf
Merge branch 'dev' into feature/middleware
const314 Jun 30, 2021
d8a91bb
make context types extendable
const314 Jul 1, 2021
2daee89
user middleware approach for personal-data example
const314 Jul 1, 2021
0f89b18
clean up
const314 Jul 1, 2021
7a885d5
rename folder with middlewares
const314 Jul 1, 2021
aae43b9
Merge branch 'dev' into feature/middleware
const314 Jul 1, 2021
0c66aa4
Merge branch 'dev' into feature/middleware
const314 Jul 1, 2021
2fa2078
fix ts-node version in functional tests
const314 Jul 2, 2021
84304fd
simplify middleware handlers types
const314 Jul 2, 2021
7bc75fc
Merge branch 'dev' into feature/middleware
max-vasin Jul 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
reorder middleware arguments
  • Loading branch information
const314 committed Jun 30, 2021
commit d6b1e3dd3a0b46eaeba249d2597cbccb5f494571
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ import {
AggregateRuntime,
} from './types'
import { CommandError } from '../errors'
import { AggregateMeta, ExecutionContext } from '../types/runtime'
import { AggregateMeta, MiddlewareContext } from '../types/runtime'
import { getLog } from '../get-log'
import { getPerformanceTracerSubsegment } from '../utils'
import {
@@ -399,11 +399,14 @@ const makeCommandExecutor = (
}
}

const chainedHandlers = applyMiddlewares(commandHandler)
const chainedHandlers = applyMiddlewares(
(middlewareContext, state, command, context) =>
commandHandler(state, command, context)
)

return async (
command: Command,
executionContext?: ExecutionContext
middlewareContext: MiddlewareContext = {}
): Promise<CommandResult> => {
const monitoringGroup =
runtime.monitoring != null
@@ -476,11 +479,12 @@ const makeCommandExecutor = (
decrypt,
}

const event = await chainedHandlers(aggregateState, command, context, {
...executionContext,
interop: aggregate,
runtime,
})
const event = await chainedHandlers(
middlewareContext,
aggregateState,
command,
context
)
// const event = await commandHandler(aggregateState, command, context)

if (!checkOptionShape(event.type, [String])) {
4 changes: 2 additions & 2 deletions packages/core/core/src/aggregate/types.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ import {
CommandMiddleware,
Eventstore,
Monitoring,
ExecutionContext,
MiddlewareContext,
} from '../types/runtime'
import {
Event,
@@ -34,7 +34,7 @@ export type AggregatesInterop = {
aggregateMap: AggregateInteropMap
executeCommand: (
command: Command,
executionContext?: ExecutionContext
middlewareContext?: MiddlewareContext
) => Promise<CommandResult>
}

Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import { Event } from '../types/core'
import { createHttpError, HttpStatusCodes } from '../errors'
import { getPerformanceTracerSubsegment } from '../utils'
import {
ExecutionContext,
MiddlewareContext,
ReadModelMeta,
ResolverMiddlewareHandler,
} from '../types/runtime'
@@ -48,28 +48,29 @@ const makeReadModelInteropCreator = (runtime: ReadModelRuntime) => {

return (readModel: ReadModelMeta): ReadModelInterop => {
const { connectorName, name, resolvers, projection } = readModel
const middlewareContext = { meta: readModel, runtime }

const resolverInvokerMap = Object.keys(resolvers).reduce<{
[key: string]: ResolverMiddlewareHandler
}>((map, resolverName) => {
map[
resolverName
] = applyResolverMiddlewares((connection, args, context) =>
resolvers[resolverName](connection, args, context)
] = applyResolverMiddlewares(
(middlewareContext, connection, args, context) =>
resolvers[resolverName](connection, args, context)
)
return map
}, {})

const projectionHandlersChain = applyProjectionMiddlewares(
(store, event, context) => projection[event.type](store, event, context)
(middlewareContext, store, event, context) =>
projection[event.type](store, event, context)
)

const acquireResolver = async (
resolver: string,
args: any,
context: { jwt?: string },
executionContext?: ExecutionContext
middlewareContext: MiddlewareContext = {}
) => {
const log = getLog(
`read-model-interop:${name}:acquireResolver:${resolver}`
@@ -118,13 +119,17 @@ const makeReadModelInteropCreator = (runtime: ReadModelRuntime) => {
log.debug(`invoking the resolver`)

const data = await invoker(
{
...middlewareContext,
readModelName: name,
resolverName: resolver,
},
connection,
args,
{
secretsManager,
jwt: context.jwt,
},
{ ...middlewareContext, resolver, ...executionContext }
}
)
// const data = await invoker(connection, args, {
// secretsManager,
@@ -200,10 +205,10 @@ const makeReadModelInteropCreator = (runtime: ReadModelRuntime) => {
if (typeof projection[event.type] === 'function') {
return monitoredHandler(event.type, async () =>
projectionHandlersChain(
{ readModelName: name },
store,
event,
await buildEncryption(event),
middlewareContext
await buildEncryption(event)
)
)
}
4 changes: 2 additions & 2 deletions packages/core/core/src/read-model/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SecretsManager, Event } from '../types/core'
import {
ExecutionContext,
MiddlewareContext,
Monitoring,
ProjectionMiddleware,
ResolverMiddleware,
@@ -29,7 +29,7 @@ export type ReadModelInterop = {
context: {
jwt?: string
},
executionContext?: ExecutionContext
middlewareContext?: MiddlewareContext
) => Promise<ReadModelRuntimeResolver>
acquireInitHandler: (
store: any
32 changes: 14 additions & 18 deletions packages/core/core/src/types/runtime.ts
Original file line number Diff line number Diff line change
@@ -100,16 +100,16 @@ export type ViewModelMeta = {
//Middleware

type CommandMiddlewareParameters = [
...args: Parameters<CommandHandler>,
middlewareContext?: AggregateInteropContext
middlewareContext: AggregateMiddlewareContext,
...args: Parameters<CommandHandler>
]
type ResolverMiddlewareParameters = [
...args: Parameters<ReadModelResolver<any>>,
middlewareContext?: ReadModelResolverInteropContext
middlewareContext: ReadModelResolverMiddlewareContext,
...args: Parameters<ReadModelResolver<any>>
]
type ProjectionMiddlewareParameters = [
...args: Parameters<ReadModelEventHandler<any>>,
middlewareContext?: ReadModelInteropContext
middlewareContext: ReadModelMiddlewareContext,
...args: Parameters<ReadModelEventHandler<any>>
]

export type CommandMiddlewareHandler = (
@@ -131,24 +131,20 @@ type MiddlewareChainableFunction =

type MiddlewareHandler<THandler> = (next: THandler) => THandler

export type ExecutionContext = {
export type MiddlewareContext = {
req?: any
res?: any
}

type ReadModelInteropContext = {
meta: ReadModelMeta
runtime: ReadModelRuntime
} & ExecutionContext
type ReadModelMiddlewareContext = {
readModelName: string
} & MiddlewareContext

type ReadModelResolverInteropContext = {
resolver: string
} & ReadModelInteropContext
type ReadModelResolverMiddlewareContext = {
resolverName: string
} & ReadModelMiddlewareContext

type AggregateInteropContext = {
interop: AggregateInterop
runtime: AggregateRuntime
} & ExecutionContext
type AggregateMiddlewareContext = MiddlewareContext

type Middleware<
THandler extends MiddlewareChainableFunction
Original file line number Diff line number Diff line change
@@ -1181,6 +1181,7 @@ describe('Command middleware', () => {
const dummyMiddlewareHandler = jest.fn()

const dummyMiddleware: CommandMiddleware = (next) => async (
middlewareContext,
state,
command,
context
@@ -1189,7 +1190,7 @@ describe('Command middleware', () => {
state,
command,
})
return next(state, command, context)
return next(middlewareContext, state, command, context)
}

const runtime = makeTestRuntimeWithMiddleware(dummyMiddleware)
@@ -1251,11 +1252,13 @@ describe('Command middleware', () => {
])

const dummyMiddleware: CommandMiddleware = (next) => async (
middlewareContext,
state,
command,
context
) => {
return next(
middlewareContext,
state,
{
...command,
@@ -1289,11 +1292,12 @@ describe('Command middleware', () => {
])

const dummyMiddleware: CommandMiddleware = (next) => async (
middlewareContext,
state,
command,
context
) => {
const event = await next(state, command, context)
const event = await next(middlewareContext, state, command, context)
event.payload.additionalContent = 'Content from middleware'
return event
}
@@ -1318,26 +1322,38 @@ describe('Command middleware', () => {
])

const middleware1: CommandMiddleware = (next) => async (
middlewareContext,
state,
command,
context
) => {
const modifiedCommand = { ...command }
modifiedCommand.payload.contents = 'Command modified by first middleware'
const event = await next(state, modifiedCommand, context)
const event = await next(
middlewareContext,
state,
modifiedCommand,
context
)
event.payload.extra += '; Event modified by first middleware'
return event
}

const middleware2: CommandMiddleware = (next) => async (
middlewareContext,
state,
command,
context
) => {
const modifiedCommand = { ...command }
modifiedCommand.payload.contents +=
'; Command modified by second middleware'
const event = await next(state, modifiedCommand, context)
const event = await next(
middlewareContext,
state,
modifiedCommand,
context
)
event.payload.extra = 'Event modified by second middleware'
return event
}
6 changes: 3 additions & 3 deletions packages/runtime/runtime/src/common/command/index.ts
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ import {
CommandResult,
AggregatesInterop,
CommandError,
ExecutionContext,
MiddlewareContext,
} from '@resolve-js/core'

type CommandPool = {
@@ -56,12 +56,12 @@ const createCommand: CommandExecutorBuilder = ({

const disposableExecutor = async (
command: Command,
executionContext: ExecutionContext
middlewareContext: MiddlewareContext
): Promise<CommandResult> => {
if (pool.isDisposed) {
throw new CommandError('Command handler is disposed')
}
return await aggregatesInterop.executeCommand(command, executionContext)
return await aggregatesInterop.executeCommand(command, middlewareContext)
}

const api = {
Original file line number Diff line number Diff line change
@@ -16,15 +16,15 @@ function isCommandError(error) {

export const executeCommandWithRetryConflicts = async (
{ executeCommand, commandArgs, jwt },
executionContext
middlewareContext
) => {
const retryCount = commandArgs.immediateConflict != null ? 0 : 10
let lastError = null
let event = null

for (let retry = 0; retry <= retryCount; retry++) {
try {
event = await executeCommand({ ...commandArgs, jwt }, executionContext)
event = await executeCommand({ ...commandArgs, jwt }, middlewareContext)
lastError = null
break
} catch (error) {
4 changes: 2 additions & 2 deletions packages/runtime/runtime/src/common/query/index.ts
Original file line number Diff line number Diff line change
@@ -41,9 +41,9 @@ const interopApi = async (models: any, key: string, ...args: Array<any>) => {
`Model "${modelName}" does not implement method "${key}"`
)
}
const executionContext = args.length > 1 ? args[1] : undefined
const middlewareContext = args.length > 1 ? args[1] : {}

return await method(parameters, executionContext)
return await method(parameters, middlewareContext)
}

const createQuery = (params: CreateQueryOptions): any => {
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {
ExecutionContext,
MiddlewareContext,
makeMonitoringSafe,
ReadModelInterop,
SagaInterop,
@@ -33,7 +33,7 @@ const read = async (
pool: ReadModelPool,
interop: ReadModelInterop | SagaInterop,
{ jwt, ...params }: any,
executionContext?: ExecutionContext
middlewareContext?: MiddlewareContext
): Promise<any> => {
const { isDisposed, performanceTracer, monitoring } = pool

@@ -80,7 +80,7 @@ const read = async (
{
jwt,
},
executionContext
middlewareContext
)
log.debug(`invoking resolver`)
const result = await wrapConnection(pool, interop, resolver)
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { CommandMiddleware } from '@resolve-js/core'
const middleware: CommandMiddleware = (next) => async (
middlewareContext,
state,
command,
context,
middlewareContext
context
) => {
const { req, res } = middlewareContext
console.log({ req, res, command })

const { addedByMiddleware, isOdd } = context as any
const event = await next(state, command, context, middlewareContext)
const event = await next(middlewareContext, state, command, context)
if (isOdd && event.type === 'MY_AGGREGATE_ITEM_ADDED') {
event.payload.itemName += addedByMiddleware
}
Loading