Skip to content

Commit

Permalink
fix(auth-admin): Move sleep logic from messgeHandler to worker service (
Browse files Browse the repository at this point in the history
#16712)

* move sleep logic from messgeHandler to worker service

* chore: nx format:write update dirty files

* small refactor

* chore: nx format:write update dirty files

* Update libs/message-queue/src/lib/nest/utils.spec.ts

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* fix message-queue tests

* chore: nx format:write update dirty files

* move sleep check

* change placement of sleep check back

* chore: nx format:write update dirty files

---------

Co-authored-by: andes-it <[email protected]>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Nov 14, 2024
1 parent c788e6a commit 85b4042
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { MessageProcessorService } from './messageProcessor.service'
queue: {
name: 'notifications',
queueName: environment.mainQueueName,
shouldSleepOutsideWorkingHours: true,
deadLetterQueue: {
queueName: environment.deadLetterQueueName,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,21 +368,23 @@ describe('NotificationsWorkerService', () => {
})

it('should not send email or push notification if we are outside working hours (8 AM - 11 PM) ', async () => {
// set time to be outside of working hours
jest.setSystemTime(outsideWorkingHours)

// First message will be handled since the receiveMessages call is waiting (wait time is max 20s and returns when a message is ready)
// This ensures that the next message is added after time is set outside working hours
await addToQueue(userWithNoDelegations.nationalId)
await addToQueue(userWithNoDelegations.nationalId)

expect(emailService.sendEmail).not.toHaveBeenCalled()
expect(notificationDispatch.sendPushNotification).not.toHaveBeenCalled()
expect(emailService.sendEmail).toHaveBeenCalledTimes(1)
expect(notificationDispatch.sendPushNotification).toHaveBeenCalledTimes(1)

// reset time to inside working hour
jest.advanceTimersByTime(workingHoursDelta)
// give worker some time to process message
await wait(2)

expect(emailService.sendEmail).toHaveBeenCalledTimes(1)
expect(notificationDispatch.sendPushNotification).toHaveBeenCalledTimes(1)
expect(emailService.sendEmail).toHaveBeenCalledTimes(2)
expect(notificationDispatch.sendPushNotification).toHaveBeenCalledTimes(2)
}, 10_000)

it('should not send email or push notification if no profile is found for recipient', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ import {
CompanyRegistryClientService,
} from '@island.is/clients/rsk/company-registry'

const WORK_STARTING_HOUR = 8 // 8 AM
const WORK_ENDING_HOUR = 23 // 11 PM

type HandleNotification = {
profile: {
nationalId: string
Expand Down Expand Up @@ -314,37 +311,12 @@ export class NotificationsWorkerService implements OnApplicationBootstrap {
}
}

async sleepOutsideWorkingHours(messageId: string): Promise<void> {
const now = new Date()
const currentHour = now.getHours()
const currentMinutes = now.getMinutes()
const currentSeconds = now.getSeconds()
// Is it outside working hours?
if (currentHour >= WORK_ENDING_HOUR || currentHour < WORK_STARTING_HOUR) {
// If it's past the end hour or before the start hour, sleep until the start hour.
const sleepHours = (24 - currentHour + WORK_STARTING_HOUR) % 24
const sleepDurationMilliSeconds =
(sleepHours * 3600 - currentMinutes * 60 - currentSeconds) * 1000
this.logger.info(
`Worker will sleep until 8 AM. Sleep duration: ${sleepDurationMilliSeconds} ms`,
{ messageId },
)
await new Promise((resolve) =>
setTimeout(resolve, sleepDurationMilliSeconds),
)
this.logger.info('Worker waking up after sleep.', { messageId })
}
}

async run() {
await this.worker.run<CreateHnippNotificationDto>(
async (message, job): Promise<void> => {
const messageId = job.id
this.logger.info('Message received by worker', { messageId })

// check if we are within operational hours or wait until we are
await this.sleepOutsideWorkingHours(messageId)

const notification = { messageId, ...message }
let dbNotification = await this.notificationModel.findOne({
where: { messageId },
Expand Down
2 changes: 2 additions & 0 deletions libs/message-queue/src/lib/nest/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export interface Queue {
maxReceiveCount?: number
// define a dead-letter queue for messages that fail processing repeatedly
deadLetterQueue?: DeadLetterQueue
// if true, the worker will sleep during the night
shouldSleepOutsideWorkingHours?: boolean
}

export interface DeadLetterQueue {
Expand Down
27 changes: 26 additions & 1 deletion libs/message-queue/src/lib/nest/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { clamp } from './utils'
import {
calculateSleepDurationUntilMorning,
clamp,
isOutsideWorkingHours,
} from './utils'

const insideWorkingHours = new Date(2021, 1, 1, 9, 0, 0)
const outsideWorkingHours = new Date(2021, 1, 1, 7, 0, 0)
const HOUR_IN_MS = 3600000

describe('utils', () => {
describe('clamp', () => {
Expand All @@ -12,4 +20,21 @@ describe('utils', () => {
expect(clamp(100, min, max)).toBe(10)
})
})
describe('isOutsideWorkingHours', () => {
it('returns true if the current hour is outside working hours', () => {
jest.useFakeTimers({ now: outsideWorkingHours })
expect(isOutsideWorkingHours()).toBe(true)
})
it('returns false if the current hour is inside working hours', () => {
jest.useFakeTimers({ now: insideWorkingHours })
expect(isOutsideWorkingHours()).toBe(false)
})
})

describe('calculateSleepDurationUntilMorning', () => {
it('calculates the time until the next morning', () => {
jest.useFakeTimers({ now: new Date(2021, 1, 1, 23, 0, 0) }) // 11 PM
expect(calculateSleepDurationUntilMorning()).toBe(9 * HOUR_IN_MS) // 9 hours
})
})
})
24 changes: 24 additions & 0 deletions libs/message-queue/src/lib/nest/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const TOKEN_PREFIX = 'IslandIsMessageQueue'
const WORK_STARTING_HOUR = 8 // 8 AM
const WORK_ENDING_HOUR = 23 // 11 PM

export const getQueueServiceToken = (name: string): string =>
`${TOKEN_PREFIX}/QueueService/${name}`
Expand All @@ -11,3 +13,25 @@ export const getClientServiceToken = (name: string): string =>

export const clamp = (v: number, min: number, max: number): number =>
Math.min(max, Math.max(min, v))

export const isOutsideWorkingHours = (): boolean => {
const now = new Date()
const currentHour = now.getHours()

return currentHour < WORK_STARTING_HOUR || currentHour >= WORK_ENDING_HOUR
}

export const calculateSleepDurationUntilMorning = (): number => {
const now = new Date()
const currentHour = now.getHours()
const currentMinutes = now.getMinutes()
const currentSeconds = now.getSeconds()
const sleepHours = (24 - currentHour + WORK_STARTING_HOUR) % 24
return (sleepHours * 3600 - currentMinutes * 60 - currentSeconds) * 1000
}

export const sleepUntilMorning = (): Promise<void> => {
const ms = calculateSleepDurationUntilMorning()

return new Promise((resolve) => setTimeout(resolve, ms))
}
18 changes: 16 additions & 2 deletions libs/message-queue/src/lib/nest/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Message } from '@aws-sdk/client-sqs'
import type { Logger } from '@island.is/logging'
import { QueueService } from './queue.service'
import type { Queue, Job } from './types'
import { clamp } from './utils'
import { clamp, isOutsideWorkingHours, sleepUntilMorning } from './utils'
import { ClientService } from './client.service'

type MessageHandler<T> = (handler: T, job: Job) => Promise<void>
Expand All @@ -20,6 +20,7 @@ enum Status {
Running,
Stopping,
Stopped,
Sleeping,
}

@Injectable()
Expand All @@ -46,7 +47,10 @@ export class WorkerService implements OnModuleDestroy {
)

this.status = Status.Running

while (this.status === Status.Running) {
await this.maybeSleepOutsideWorkingHours()

const messages = await this.client.receiveMessages(this.queue.url, {
maxNumMessages: this.config.maxConcurrentJobs,
})
Expand All @@ -57,6 +61,16 @@ export class WorkerService implements OnModuleDestroy {
}
}

private async maybeSleepOutsideWorkingHours() {
if (this.config.shouldSleepOutsideWorkingHours && isOutsideWorkingHours()) {
this.logger.info('Outside of working hours - worker sleeping')
this.status = Status.Sleeping
await sleepUntilMorning()
this.logger.info('Worker waking up')
this.status = Status.Running
}
}

private getConcurrency(): number {
const concurrency = this.config.maxConcurrentJobs ?? DEFAULT_BATCH_SIZE

Expand Down Expand Up @@ -107,7 +121,7 @@ export class WorkerService implements OnModuleDestroy {
}

async onModuleDestroy() {
if (this.status === Status.Running) {
if (this.status === Status.Running || this.status === Status.Sleeping) {
this.logger.info(`Stopping worker "${this.config.name}"`)

if (this.activeJobs !== null) {
Expand Down

0 comments on commit 85b4042

Please sign in to comment.