Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(auth-admin): Move sleep logic from messgeHandler to worker service #16712

Merged
merged 18 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { MessageProcessorService } from './messageProcessor.service'
queue: {
name: 'notifications',
queueName: environment.mainQueueName,
shouldSleepOutsideWorkingHours: true,
deadLetterQueue: {
queueName: environment.deadLetterQueueName,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ import {
CompanyRegistryClientService,
} from '@island.is/clients/rsk/company-registry'

const WORK_STARTING_HOUR = 8 // 8 AM
const WORK_ENDING_HOUR = 23 // 11 PM

type HandleNotification = {
profile: {
nationalId: string
Expand Down Expand Up @@ -313,37 +310,12 @@ export class NotificationsWorkerService implements OnApplicationBootstrap {
}
}

async sleepOutsideWorkingHours(messageId: string): Promise<void> {
const now = new Date()
const currentHour = now.getHours()
const currentMinutes = now.getMinutes()
const currentSeconds = now.getSeconds()
// Is it outside working hours?
if (currentHour >= WORK_ENDING_HOUR || currentHour < WORK_STARTING_HOUR) {
// If it's past the end hour or before the start hour, sleep until the start hour.
const sleepHours = (24 - currentHour + WORK_STARTING_HOUR) % 24
const sleepDurationMilliSeconds =
(sleepHours * 3600 - currentMinutes * 60 - currentSeconds) * 1000
this.logger.info(
`Worker will sleep until 8 AM. Sleep duration: ${sleepDurationMilliSeconds} ms`,
{ messageId },
)
await new Promise((resolve) =>
setTimeout(resolve, sleepDurationMilliSeconds),
)
this.logger.info('Worker waking up after sleep.', { messageId })
}
}

async run() {
await this.worker.run<CreateHnippNotificationDto>(
async (message, job): Promise<void> => {
const messageId = job.id
this.logger.info('Message received by worker', { messageId })

// check if we are within operational hours or wait until we are
await this.sleepOutsideWorkingHours(messageId)

const notification = { messageId, ...message }
let dbNotification = await this.notificationModel.findOne({
where: { messageId },
Expand Down
2 changes: 2 additions & 0 deletions libs/message-queue/src/lib/nest/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export interface Queue {
maxReceiveCount?: number
// define a dead-letter queue for messages that fail processing repeatedly
deadLetterQueue?: DeadLetterQueue
// if true, the worker will sleep during the night
shouldSleepOutsideWorkingHours?: boolean
}

export interface DeadLetterQueue {
Expand Down
37 changes: 36 additions & 1 deletion libs/message-queue/src/lib/nest/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { clamp } from './utils'
import {
calculateSleepDurationUntilMorning,
clamp,
isOutsideWorkingHours,
sleepUntilMorning,
} from './utils'

magnearun marked this conversation as resolved.
Show resolved Hide resolved
describe('utils', () => {
describe('clamp', () => {
Expand All @@ -12,4 +17,34 @@ describe('utils', () => {
expect(clamp(100, min, max)).toBe(10)
})
})
describe('isOutsideWorkingHours', () => {
it('returns true if the current hour is outside working hours', () => {
expect(isOutsideWorkingHours(new Date('2023-01-01T07:00:00'))).toBe(true) // 7 AM
expect(isOutsideWorkingHours(new Date('2023-01-01T23:00:00'))).toBe(true) // 11 PM
expect(isOutsideWorkingHours(new Date('2023-01-01T00:00:00'))).toBe(true) // Midnight
expect(isOutsideWorkingHours(new Date('2023-01-01T08:00:00'))).toBe(false) // 8 AM
expect(isOutsideWorkingHours(new Date('2023-01-01T22:00:00'))).toBe(false) // 10 PM
})
})

describe('calculateSleepDurationUntilMorning', () => {
it('calculates sleep duration correctly for a time at midnight', () => {
const now = new Date('2023-01-01T00:00:00') // Midnight
const duration = calculateSleepDurationUntilMorning(now)
expect(duration).toBe(28800000) // 8 hours in milliseconds
})

it('calculates sleep duration correctly for a time in the middle of the night', () => {
const now = new Date('2023-01-01T05:00:00') // 11 PM
const duration = calculateSleepDurationUntilMorning(now)
// 3 hour in milliseconds
expect(duration).toBe(10800000)
})

it('calculates sleep duration correctly for a time just before work starts', () => {
const now = new Date('2023-01-01T07:59:00') // 7:59 AM
const duration = calculateSleepDurationUntilMorning(now)
expect(duration).toBe(60000) // 1 minute in milliseconds
})
})
})
magnearun marked this conversation as resolved.
Show resolved Hide resolved
22 changes: 22 additions & 0 deletions libs/message-queue/src/lib/nest/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const TOKEN_PREFIX = 'IslandIsMessageQueue'
const WORK_STARTING_HOUR = 8 // 8 AM
const WORK_ENDING_HOUR = 23 // 11 PM
magnearun marked this conversation as resolved.
Show resolved Hide resolved

export const getQueueServiceToken = (name: string): string =>
`${TOKEN_PREFIX}/QueueService/${name}`
Expand All @@ -11,3 +13,23 @@ export const getClientServiceToken = (name: string): string =>

export const clamp = (v: number, min: number, max: number): number =>
Math.min(max, Math.max(min, v))

export const isOutsideWorkingHours = (now: Date): boolean => {
const currentHour = now.getHours()

return currentHour < WORK_STARTING_HOUR || currentHour >= WORK_ENDING_HOUR
}

export const calculateSleepDurationUntilMorning = (now: Date): number => {
const currentHour = now.getHours()
const currentMinutes = now.getMinutes()
const currentSeconds = now.getSeconds()
const sleepHours = (24 - currentHour + WORK_STARTING_HOUR) % 24
return (sleepHours * 3600 - currentMinutes * 60 - currentSeconds) * 1000
}

export const sleepUntilMorning = (now: Date): Promise<void> => {
const ms = calculateSleepDurationUntilMorning(now)

return new Promise((resolve) => setTimeout(resolve, ms))
}
17 changes: 15 additions & 2 deletions libs/message-queue/src/lib/nest/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Message } from '@aws-sdk/client-sqs'
import type { Logger } from '@island.is/logging'
import { QueueService } from './queue.service'
import type { Queue, Job } from './types'
import { clamp } from './utils'
import { clamp, isOutsideWorkingHours, sleepUntilMorning } from './utils'
import { ClientService } from './client.service'

type MessageHandler<T> = (handler: T, job: Job) => Promise<void>
Expand All @@ -20,6 +20,7 @@ enum Status {
Running,
Stopping,
Stopped,
Sleeping,
magnearun marked this conversation as resolved.
Show resolved Hide resolved
}

@Injectable()
Expand All @@ -46,7 +47,19 @@ export class WorkerService implements OnModuleDestroy {
)

this.status = Status.Running

while (this.status === Status.Running) {
const now = new Date()
magnearun marked this conversation as resolved.
Show resolved Hide resolved
if (
magnearun marked this conversation as resolved.
Show resolved Hide resolved
this.config.shouldSleepOutsideWorkingHours &&
isOutsideWorkingHours(now)
) {
this.logger.info('Outside of working hours - worker sleeping')
magnearun marked this conversation as resolved.
Show resolved Hide resolved
this.status = Status.Sleeping
await sleepUntilMorning(now)
this.status = Status.Running
}

const messages = await this.client.receiveMessages(this.queue.url, {
maxNumMessages: this.config.maxConcurrentJobs,
})
Expand Down Expand Up @@ -107,7 +120,7 @@ export class WorkerService implements OnModuleDestroy {
}

async onModuleDestroy() {
if (this.status === Status.Running) {
if (this.status === Status.Running || this.status === Status.Sleeping) {
this.logger.info(`Stopping worker "${this.config.name}"`)

if (this.activeJobs !== null) {
Expand Down
Loading