Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve jitter in readmodel-postgres adapter #2194

Merged
merged 4 commits into from
Jan 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -288,7 +288,7 @@ export type OmitObject<T extends object, U extends object> = {

export type BuildInfo = {
eventsWithCursors?: Array<EventWithCursor>
initiator: 'command' | 'read-model-next'
initiator: 'command' | 'command-foreign' | 'read-model-next'
notificationId: string
sendTime: number
coldStart?: boolean
Original file line number Diff line number Diff line change
@@ -14,17 +14,11 @@
"jsx": "react",
"outDir": "lib",
"declarationDir": "types",
"composite": true,
"typeRoots": [
"./typings",
"./node_modules/@types",
"../../../../../node_modules/@types"
]
},
"include": ["src", "typings", "test"],
"references": [
{
"path": "../readmodel-base"
}
]
"include": ["src", "typings", "test"]
}
Original file line number Diff line number Diff line change
@@ -758,7 +758,7 @@ const build: ExternalMethods['build'] = async (
const getVacantTimeInMillis = () =>
Math.max(inputGetVacantTimeInMillis() - immediatelyStopTimeout, 0)
eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
const { eventsWithCursors, ...inputMetricData } = buildInfo
const { eventsWithCursors, retryAttempt, ...inputMetricData } = buildInfo
const metricData = {
...inputMetricData,
pureLedgerTime: 0,
@@ -950,11 +950,17 @@ const build: ExternalMethods['build'] = async (
}
}
} catch (error) {
const nextArgs: Parameters<typeof next> = [
Math.min(Math.pow(2, ~~retryAttempt) * 100, 10000),
{ retryAttempt: ~~retryAttempt + 1 },
]

if (error === immediatelyStopError) {
try {
await basePool.connection.end()
} catch (e) {}
await next()

await next(...nextArgs)
return
}

@@ -993,7 +999,7 @@ const build: ExternalMethods['build'] = async (
error.name === 'ServiceBusyError'
) {
log.debug(`PassthroughError is retryable. Going to the next step`)
await next()
await next(...nextArgs)
}
} finally {
log.debug(`Building is finished`)
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import type {
OmitObject,
} from './types'

const MAX_DISCONNECT_TIME = 3000
const connect: CurrentConnectMethod = async (imports, pool, options) => {
let {
tablePrefix,
@@ -70,7 +71,6 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
connectionErrorsMap.get(connection)?.push(error)
})
await connection.connect()
await connection.query('SELECT 0 AS "defunct"')
} catch (error) {
connectionErrorsMap.get(connection)?.push(error)
}
@@ -99,17 +99,27 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pool.connection = null!
}

affectedReadModelOperationsSet.clear()
let timeout: NodeJS.Timeout | null = null
try {
affectedReadModelOperationsSet.clear()
await connection.end()
} catch (err) {}
await Promise.race([
new Promise((resolve) => {
timeout = setTimeout(resolve, MAX_DISCONNECT_TIME)
}),
connection.end(),
])
} catch (err) {
} finally {
if (timeout != null) {
clearTimeout(timeout)
}
}

throw summaryError
}
}

const initialConnection = await establishConnection()
await maybeThrowConnectionErrors(initialConnection)

const inlineLedgerRunQuery: InlineLedgerRunQueryMethod = async (
sql,
passthroughRuntimeErrors = false
@@ -184,7 +194,8 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
schemaName: databaseName,
tablePrefix,
inlineLedgerRunQuery,
connection: initialConnection,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
connection: null!,
activePassthrough: false,
buildMode,
useSqs,
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CurrentDisconnectMethod } from './types'

const MAX_DISCONNECT_TIME = 10000
const MAX_DISCONNECT_TIME = 3000

const disconnect: CurrentDisconnectMethod = async (pool) => {
if (pool.connection != null) {
Original file line number Diff line number Diff line change
@@ -10,22 +10,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"on",
"error",
[Function],
],
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
@@ -90,6 +74,14 @@ Array [
\\"AggregateIds\\" = 'null'
",
],
Array [
"on",
"error",
[Function],
],
Array [
"connect",
],
Array [
"query",
"
@@ -477,10 +469,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
@@ -774,10 +762,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
@@ -1057,10 +1041,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
@@ -1462,10 +1442,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
@@ -1735,10 +1711,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
1 change: 1 addition & 0 deletions tests/package.json
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
"test:read-model": "jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand",
"test:read-model-mysql": "env TEST_MYSQL=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s",
"test:read-model-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand",
"test:read-model-builder-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-builder-postgres/*.test.[jt]s --runInBand",
"test:eventstore-order-events": "jest --config=./jest.config.js --testMatch=**/eventstore-order-events/*.test.[jt]s",
"test:eventstore": "jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s",
"test:eventstore-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s --runInBand",
162 changes: 162 additions & 0 deletions tests/read-model-builder-postgres/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import {
default as factory,
isPostgres,
adapters,
} from '../readmodel-test-utils'
type UnPromise<T> = T extends Promise<T> ? T : never
const maybeDescribe = isPostgres() ? describe : describe.skip
jest.setTimeout(60000)

maybeDescribe('Postgres reconnections with delay', () => {
const uniqueName = 'postgres-reconnections-delay' as const
const readModelName = 'PostgresReconnectionsDelay' as const
const getDelay = (delay: number | null) => (delay != null ? +delay : 0)
const delayFunction = async (delay: number) =>
await new Promise((resolve) => setTimeout(resolve, getDelay(delay)))
const eventType = 'EVENT_TYPE'
const getNotificationObj = (
isNext: boolean,
notificationExtraPayload?: object
) => ({
eventSubscriber: readModelName,
initiator: isNext ? 'read-model-next' : 'command',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...(notificationExtraPayload != null ? notificationExtraPayload : {}),
})
const subscriptionOptions = { eventTypes: [eventType], aggregateIds: null }
const currentConnections = new Set()
const currentBuilders = new Set()
let adapter: typeof adapters[typeof uniqueName]
let baseConnection: UnPromise<ReturnType<typeof adapter['connect']>>
const connectionsDelays = new WeakMap<typeof baseConnection, Array<number>>()
let flushWorkers: Function
let buildOnConnection: Function
let performWorkers = true

beforeAll(async () => {
await factory.create(uniqueName)()
adapter = adapters[uniqueName]
baseConnection = await adapter.connect(readModelName)
flushWorkers = async () =>
await adapter.subscribe(
baseConnection,
readModelName,
subscriptionOptions.eventTypes,
subscriptionOptions.aggregateIds,
async () => null
)
const baseSubscribe = flushWorkers
await baseSubscribe()
buildOnConnection = async (
connection: typeof baseConnection,
parameters: any
) => {
if (!performWorkers) {
return
}
let buildPromise
try {
buildPromise = adapter.build(
connection,
readModelName,
connection,
{
acquireInitHandler: async () => delayFunction.bind(null, 100),
acquireEventHandler: async () => delayFunction.bind(null, 5000),
acquireResolver: async () => delayFunction.bind(null, 100),
connectorName: 'default',
name: readModelName,
},
(timeout: number, notificationExtraPayload: object) => {
void (async () => {
if (!connectionsDelays.has(connection)) {
connectionsDelays.set(connection, [])
}
connectionsDelays.get(connection).push(getDelay(timeout))
await buildPromise
await delayFunction(timeout)
await buildOnConnection(
connection,
getNotificationObj(true, notificationExtraPayload)
)
})()
return Promise.resolve(null)
},
{
loadEvents: async ({ cursor }) => ({
events: cursor === 'true' ? [{ type: eventType }] : [],
}),
getNextCursor: (cursor) => (cursor == null ? 'true' : 'false'),
establishTimeLimit: delayFunction,
},
getDelay.bind(null, 60000),
parameters
)
currentBuilders.add(buildPromise)
await buildPromise
} finally {
if (buildPromise != null) {
currentBuilders.delete(buildPromise)
}
}
}
})

afterAll(async () => {
await Promise.all(
Array.from(currentConnections).map(async (connection) => {
try {
await adapter.disconnect(connection, readModelName)
} catch (e) {}
})
)
currentConnections.clear()

await adapter.unsubscribe(baseConnection, readModelName, async () => null)
await adapter.disconnect(baseConnection, readModelName)

await factory.destroy(uniqueName)()
})

test('should perform with correct jitter', async () => {
while (currentConnections.size < 3) {
currentConnections.add(await adapter.connect(readModelName))
}
for (const connection of currentConnections) {
void buildOnConnection(connection, getNotificationObj(false))
}

const stopThrottlingTimestamp = Date.now() + 10000
while (Date.now() < stopThrottlingTimestamp) {
await flushWorkers()
await delayFunction(100)
}

while (currentBuilders.size > 0) {
await delayFunction(500)
}
performWorkers = false

for (const connection of currentConnections) {
const delays = connectionsDelays.get(connection) ?? []
const delaysBase2Degrees = delays
.filter((delay) => delay > 0)
.map((delay) => Math.log2(delay / 100))

for (let index = 0; index < delaysBase2Degrees.length; index++) {
expect(Math.floor(delaysBase2Degrees[index])).toEqual(
delaysBase2Degrees[index]
)
expect(
delaysBase2Degrees[index] === 0 ||
(index > 0
? delaysBase2Degrees[index - 1] + 1 === delaysBase2Degrees[index]
: false)
).toEqual(true)
}
await adapter.disconnect(connection, readModelName)
}
currentConnections.clear()
})
})
Loading