Skip to content

Commit

Permalink
Use real'er consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Sep 27, 2023
1 parent 61ee5ce commit 04c4fc5
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 56 deletions.
2 changes: 1 addition & 1 deletion packages/amqp/lib/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class AmqpConnectionManager {
}
this.isReconnecting = false

this.logger.info('AmqpConnectionManager: Finish reconnecting')
this.logger.info('AmqpConnectionManager: Reconnect complete')
}

async init() {
Expand Down
89 changes: 47 additions & 42 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas'
import { FakeConsumer } from '../fakes/FakeConsumer'
import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver'
import { FakeLogger } from '../fakes/FakeLogger'
import { createSilentChannel } from '../utils/channelUtils'
import { userPermissionMap } from '../repositories/PermissionRepository'
import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig'
import type { Dependencies } from '../utils/testContext'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'
Expand All @@ -22,6 +22,30 @@ import type { AmqpPermissionPublisherMultiSchema } from './AmqpPermissionPublish
const perms: [string, ...string[]] = ['perm1', 'perm2']
const userIds = [100, 200, 300]

function checkPermissions(userIds: number[]) {
const usersPerms = userIds.reduce((acc, userId) => {
if (userPermissionMap[userId]) {
acc.push(userPermissionMap[userId])
}
return acc
}, [] as string[][])

if (usersPerms.length > userIds.length) {
return usersPerms.slice(0, userIds.length - 1)
}

if (usersPerms && usersPerms.length !== userIds.length) {
return null
}

for (const userPerms of usersPerms)
if (userPerms.length !== perms.length) {
return null
}

return usersPerms
}

describe('PermissionPublisher', () => {
describe('logging', () => {
let logger: FakeLogger
Expand Down Expand Up @@ -112,12 +136,6 @@ describe('PermissionPublisher', () => {
beforeAll(async () => {
diContainer = await registerDependencies(TEST_AMQP_CONFIG, {
consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG),
permissionConsumer: asClass(FakeConsumer, {
lifetime: Lifetime.SINGLETON,
asyncInit: 'start',
asyncDispose: 'close',
asyncDisposePriority: 10,
}),
})
})

Expand Down Expand Up @@ -175,58 +193,45 @@ describe('PermissionPublisher', () => {
})

it('reconnects on lost connection', async () => {
const { permissionPublisher } = diContainer.cradle
const users = Object.values(userPermissionMap)
expect(users).toHaveLength(0)

userPermissionMap[100] = []
userPermissionMap[200] = []
userPermissionMap[300] = []

const { permissionPublisher, permissionConsumer } = diContainer.cradle
await permissionConsumer.start()

const message = {
userIds,
messageType: 'add',
permissions: perms,
} satisfies PERMISSIONS_MESSAGE_TYPE

permissionPublisher.publish(message)

await diContainer.cradle.amqpConnectionManager.close()
await diContainer.cradle.amqpConnectionManager.init()

// wait till we are done reconnecting
await waitAndRetry(() => {
return diContainer.cradle.amqpConnectionManager.getConnectionSync()
})

let receivedMessage: PERMISSIONS_MESSAGE_TYPE | null = null
const consumerChannel = await createSilentChannel(
diContainer.cradle.amqpConnectionManager.getConnectionSync()!,
)
await consumerChannel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => {
if (message === null) {
return
}
const decodedMessage = deserializeAmqpMessage(
message,
PERMISSIONS_MESSAGE_SCHEMA,
new FakeConsumerErrorResolver(),
)
receivedMessage = decodedMessage.result!
})

permissionPublisher.publish(message)
const updatedUsersPermissions = await waitAndRetry(
() => {
permissionPublisher.publish(message)

await waitAndRetry(() => {
return receivedMessage !== null
})

expect(receivedMessage).toEqual({
messageType: 'add',
permissions: ['perm1', 'perm2'],
userIds: [100, 200, 300],
})
return checkPermissions(userIds)
},
100,
20,
)

await permissionPublisher.close()
try {
await consumerChannel.close()
} catch {
// it's ok
if (null === updatedUsersPermissions) {
throw new Error('Users permissions unexpectedly null')
}

expect(updatedUsersPermissions).toBeDefined()
expect(updatedUsersPermissions[0]).toHaveLength(2)
})
})
})
13 changes: 0 additions & 13 deletions packages/amqp/test/utils/channelUtils.ts

This file was deleted.

0 comments on commit 04c4fc5

Please sign in to comment.