Skip to content

Commit

Permalink
Remove setReplicationIterator. Make less database queries during repl…
Browse files Browse the repository at this point in the history
…ication
  • Loading branch information
FreeSlave committed Oct 20, 2021
1 parent ad8451f commit d7159a2
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 104 deletions.
10 changes: 5 additions & 5 deletions packages/core/core/src/types/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ export type Eventstore = {
existingSecrets: OldSecretRecord[],
deletedSecrets: Array<OldSecretRecord['id']>
) => Promise<void>
setReplicationIterator: (iterator: SerializableMap) => Promise<void>
setReplicationStatus: (
status: ReplicationStatus,
info?: ReplicationState['statusData'],
setReplicationStatus: (state: {
status: ReplicationStatus
statusData?: ReplicationState['statusData']
lastEvent?: OldEvent
) => Promise<void>
iterator?: ReplicationState['iterator']
}) => Promise<void>
setReplicationPaused: (pause: boolean) => Promise<void>
getReplicationState: () => Promise<ReplicationState>
resetReplication: () => Promise<void>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ const makeTestRuntime = (
replicateEvents: jest.fn(),
replicateSecrets: jest.fn(),
resetReplication: jest.fn(),
setReplicationIterator: jest.fn(),
setReplicationPaused: jest.fn(),
setReplicationStatus: jest.fn(),
saveEvent: jest.fn(async (originalEvent) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ const makeTestRuntime = (events: Event[] = []): ViewModelRuntime => {
replicateEvents: jest.fn(),
replicateSecrets: jest.fn(),
resetReplication: jest.fn(),
setReplicationIterator: jest.fn(),
setReplicationPaused: jest.fn(),
setReplicationStatus: jest.fn(),
saveEvent: jest.fn(),
Expand Down
26 changes: 17 additions & 9 deletions packages/modules/module-replication/src/api-handlers/replicate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
}

try {
await req.resolve.eventstoreAdapter.setReplicationStatus('batchInProgress')
await req.resolve.eventstoreAdapter.setReplicationIterator(input.iterator)
await req.resolve.eventstoreAdapter.setReplicationStatus({
status: 'batchInProgress',
iterator: input.iterator,
})

res.status(202)
res.end('Replication has been started')
} catch (error) {
try {
await req.resolve.eventstoreAdapter.setReplicationStatus('error', error)
await req.resolve.eventstoreAdapter.setReplicationStatus({
status: 'error',
statusData: error,
})
} catch (e) {
error.message += e.message
}
Expand All @@ -50,16 +55,19 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
input.secretsToDelete
)
await req.resolve.eventstoreAdapter.replicateEvents(input.events)
await req.resolve.eventstoreAdapter.setReplicationStatus(
'batchDone',
{
await req.resolve.eventstoreAdapter.setReplicationStatus({
status: 'batchDone',
statusData: {
appliedEventsCount: input.events.length,
},
input.events[input.events.length - 1]
)
lastEvent: input.events[input.events.length - 1],
})
} catch (error) {
try {
await req.resolve.eventstoreAdapter.setReplicationStatus('error', error)
await req.resolve.eventstoreAdapter.setReplicationStatus({
status: 'error',
statusData: error,
})
} catch (e) {}
}
await req.resolve.broadcastEvent()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { ResolveRequest, ResolveResponse } from '@resolve-js/core'

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
const result = await req.resolve.eventstoreAdapter.getReplicationState()
const description = await req.resolve.eventstoreAdapter.describe()
res.json({
...result,
totalEventCount: description.eventCount,
totalSecretCount: description.secretCount,
})
const result: any = await req.resolve.eventstoreAdapter.getReplicationState()
if (req.query['extra'] !== undefined) {
const description = await req.resolve.eventstoreAdapter.describe()
result.totalEventCount = description.eventCount
result.totalSecretCount = description.secretCount
}
res.json(result)
}

export default handler
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ const createAdapter = <
injectSecret,
replicateEvents,
replicateSecrets,
setReplicationIterator,
setReplicationPaused,
setReplicationStatus,
getReplicationState,
Expand Down Expand Up @@ -224,7 +223,6 @@ const createAdapter = <

replicateEvents: wrapMethod(adapterPool, replicateEvents),
replicateSecrets: wrapMethod(adapterPool, replicateSecrets),
setReplicationIterator: wrapMethod(adapterPool, setReplicationIterator),
setReplicationPaused: wrapMethod(adapterPool, setReplicationPaused),
setReplicationStatus: wrapMethod(adapterPool, setReplicationStatus),
getReplicationState: wrapMethod(adapterPool, getReplicationState),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,6 @@ export interface AdapterFunctions<
ConnectedProps,
Adapter['setReplicationStatus']
>
setReplicationIterator?: PoolMethod<
ConnectedProps,
Adapter['setReplicationIterator']
>
setReplicationPaused?: PoolMethod<
ConnectedProps,
Adapter['setReplicationPaused']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import dropFinal from './drop-final'
import replicateEvents from './replicate-events'
import replicateSecrets from './replicate-secrets'
import setReplicationStatus from './set-replication-status'
import setReplicationIterator from './set-replication-iterator'
import setReplicationPaused from './set-replication-paused'
import getReplicationState from './get-replication-state'
import resetReplication from './reset-replication'
Expand Down Expand Up @@ -87,7 +86,6 @@ const createSqliteAdapter = (options: SqliteAdapterConfig): Adapter => {
replicateEvents,
replicateSecrets,
setReplicationStatus,
setReplicationIterator,
setReplicationPaused,
getReplicationState,
resetReplication,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@ import initReplicationStateTable from './init-replication-state-table'

const setReplicationStatus = async (
pool: AdapterPool,
status: ReplicationStatus,
statusData?: ReplicationState['statusData'],
lastEvent?: OldEvent
{
status,
statusData,
lastEvent,
iterator,
}: {
status: ReplicationStatus
statusData?: ReplicationState['statusData']
lastEvent?: OldEvent
iterator?: ReplicationState['iterator']
}
): Promise<void> => {
const { executeQuery, escapeId, escape } = pool

Expand All @@ -19,15 +27,22 @@ const setReplicationStatus = async (
await executeQuery(
`UPDATE ${escapeId(replicationStateTableName)}
SET
"Status" = ${escape(status)},
"StatusData" = ${
statusData != null ? escape(JSON.stringify(statusData)) : 'NULL'
}
${
lastEvent != null
? `, "SuccessEvent" = ${escape(JSON.stringify(lastEvent))}`
: ``
}`
"Status" = ${escape(status)},
"StatusData" = ${
statusData != null ? escape(JSON.stringify(statusData)) : 'NULL'
}
${
lastEvent != null
? `, "SuccessEvent" = ${escape(JSON.stringify(lastEvent))}`
: ``
}
${
iterator !== undefined
? `, "Iterator" = ${
iterator != null ? escape(JSON.stringify(iterator)) : 'NULL'
}`
: ``
}`
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import loadSecrets from './load-secrets'
import replicateEvents from './replicate-events'
import replicateSecrets from './replicate-secrets'
import setReplicationStatus from './set-replication-status'
import setReplicationIterator from './set-replication-iterator'
import setReplicationPaused from './set-replication-paused'
import getReplicationState from './get-replication-state'
import resetReplication from './reset-replication'
Expand Down Expand Up @@ -97,7 +96,6 @@ const createPostgresqlAdapter = (options: PostgresqlAdapterConfig): Adapter => {
replicateEvents,
replicateSecrets,
setReplicationStatus,
setReplicationIterator,
setReplicationPaused,
getReplicationState,
resetReplication,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@ import initReplicationStateTable from './init-replication-state-table'

const setReplicationStatus = async (
pool: AdapterPool,
status: ReplicationStatus,
statusData?: ReplicationState['statusData'],
lastEvent?: OldEvent
{
status,
statusData,
lastEvent,
iterator,
}: {
status: ReplicationStatus
statusData?: ReplicationState['statusData']
lastEvent?: OldEvent
iterator?: ReplicationState['iterator']
}
): Promise<void> => {
const { executeStatement, escapeId, escape, databaseName } = pool

Expand All @@ -28,6 +36,13 @@ const setReplicationStatus = async (
lastEvent != null
? `, "SuccessEvent" = ${escape(JSON.stringify(lastEvent))}`
: ``
}
${
iterator !== undefined
? `, "Iterator" = ${
iterator != null ? escape(JSON.stringify(iterator)) : 'NULL'
}`
: ``
}`
)
}
Expand Down
24 changes: 13 additions & 11 deletions tests/eventstore-replication/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,41 +49,43 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => {
})

test('set-replication-status should change status, statusData properties of the state', async () => {
await adapter.setReplicationStatus('batchInProgress', {
info: 'in progress',
await adapter.setReplicationStatus({
status: 'batchInProgress',
statusData: { info: 'in progress' },
})
let state = await adapter.getReplicationState()
expect(state.status).toEqual('batchInProgress')
expect(state.statusData).toEqual({ info: 'in progress' })

await adapter.setReplicationStatus('batchDone')
await adapter.setReplicationStatus({ status: 'batchDone' })
state = await adapter.getReplicationState()
expect(state.status).toEqual('batchDone')
expect(state.statusData).toEqual(null)
})

test('set-replication-status should set successEvent and not rewrite it if it was not provided', async () => {
test('set-replication-status should set successEvent and iterator and not rewrite them if they were not provided', async () => {
const event: OldEvent = {
aggregateId: 'aggregateId',
aggregateVersion: 1,
timestamp: 1,
type: 'type',
}

await adapter.setReplicationStatus('batchDone', null, event)
await adapter.setReplicationStatus({
status: 'batchDone',
statusData: null,
lastEvent: event,
iterator: { cursor: 'DEAF' },
})
let state = await adapter.getReplicationState()
expect(state.status).toEqual('batchDone')
expect(state.successEvent).toEqual(event)
expect(state.iterator).toEqual({ cursor: 'DEAF' })

await adapter.setReplicationStatus('error')
await adapter.setReplicationStatus({ status: 'error' })
state = await adapter.getReplicationState()
expect(state.status).toEqual('error')
expect(state.successEvent).toEqual(event)
})

test('set-replication-iterator should change iterator property of the state', async () => {
await adapter.setReplicationIterator({ cursor: 'DEAF' })
const state = await adapter.getReplicationState()
expect(state.iterator).toEqual({ cursor: 'DEAF' })
})

Expand Down

0 comments on commit d7159a2

Please sign in to comment.