diff --git a/src/job/models/jobHandlerFactory.ts b/src/job/models/jobHandlerFactory.ts index 844581b..dddf643 100644 --- a/src/job/models/jobHandlerFactory.ts +++ b/src/job/models/jobHandlerFactory.ts @@ -9,6 +9,7 @@ export const jobHandlerFactory = (container: DependencyContainer) => { const logger = container.resolve(SERVICES.LOGGER); return (jobType: string): IJobHandler => { try { + logger.info(`Getting job handler for job type ${jobType}`); const jobHandler = container.resolve(jobType); if (!jobHandler) { const errorMsg = `Job handler for job type ${jobType} not found`; diff --git a/src/job/models/jobProcessor.ts b/src/job/models/jobProcessor.ts index 1b6bf76..800950e 100644 --- a/src/job/models/jobProcessor.ts +++ b/src/job/models/jobProcessor.ts @@ -52,59 +52,74 @@ export class JobProcessor { try { jobAndTaskType = await this.getJobWithPhaseTask(); if (!jobAndTaskType) { + logger.debug({ msg: 'waiting for next dequeue', metadata: { dequeueIntervalMs: this.dequeueIntervalMs } }); await setTimeoutPromise(this.dequeueIntervalMs); return; } await this.processJob(jobAndTaskType); } catch (error) { - if (error instanceof Error) { - logger.error({ msg: `Failed processing the job: ${error.message}`, error }); - if (jobAndTaskType) { - const { job, task } = jobAndTaskType; - await this.queueClient.reject(job.id, task.id, true, error.message); - } + if (error instanceof Error && jobAndTaskType) { + const { job, task } = jobAndTaskType; + logger.error({ msg: 'rejecting task', error, metadata: { job, task } }); + await this.queueClient.reject(job.id, task.id, true, error.message); } + logger.debug({ msg: 'waiting for next dequeue', metadata: { dequeueIntervalMs: this.dequeueIntervalMs } }); await setTimeoutPromise(this.dequeueIntervalMs); } } private async processJob(jobAndTaskType: JobAndPhaseTask): Promise { + const logger = this.logger.child({ logContext: { ...this.logContext, function: this.processJob.name } }); const { job, task } = jobAndTaskType; - const taskTypes = this.ingestionConfig.pollingTasks; - const jobHandler = this.jobHandlerFactory(job.type); + try { + const taskTypes = this.ingestionConfig.pollingTasks; + const jobHandler = this.jobHandlerFactory(job.type); - switch (task.type) { - case taskTypes.init: - await jobHandler.handleJobInit(job, task.id); - break; - case taskTypes.finalize: - await jobHandler.handleJobFinalize(job, task.id); - break; + switch (task.type) { + case taskTypes.init: + await jobHandler.handleJobInit(job, task.id); + break; + case taskTypes.finalize: + await jobHandler.handleJobFinalize(job, task.id); + break; + } + } catch (error) { + if (error instanceof Error) { + logger.error({ msg: `Failed processing the job: ${error.message}`, error }); + } + throw error; } } private async getJobWithPhaseTask(): Promise { const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJobWithPhaseTask.name } }); - for (const taskType of this.pollingTaskTypes) { - for (const jobType of this.jobTypes) { - logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` }); - const task = await this.queueClient.dequeue(jobType, taskType); - if (!task) { - logger.debug({ msg: `no task of type "${taskType}" and job of type "${jobType}" found` }); - continue; - } - if (task.attempts === this.ingestionConfig.taskMaxTaskAttempts) { - logger.warn({ msg: `task ${task.id} reached max attempts, skipping`, metadata: task }); - continue; - } - logger.info({ msg: `dequeued task ${task.id}`, metadata: task }); + try { + for (const taskType of this.pollingTaskTypes) { + for (const jobType of this.jobTypes) { + logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` }); + const task = await this.queueClient.dequeue(jobType, taskType); + if (!task) { + logger.debug({ msg: `no task of type "${taskType}" and job of type "${jobType}" found` }); + continue; + } + if (task.attempts === this.ingestionConfig.taskMaxTaskAttempts) { + logger.warn({ msg: `task ${task.id} reached max attempts, skipping`, metadata: task }); + continue; + } + logger.info({ msg: `dequeued task ${task.id}`, metadata: task }); - await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS }); - const job = await this.queueClient.jobManagerClient.getJob(task.jobId); - logger.info({ msg: `got job ${job.id}`, metadata: job }); - return { job, task }; + await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS }); + const job = await this.queueClient.jobManagerClient.getJob(task.jobId); + logger.info({ msg: `got job ${job.id}`, metadata: job }); + return { job, task }; + } + } + } catch (error) { + if (error instanceof Error) { + logger.error({ msg: `Failed to get job with phase task: ${error.message}`, error }); } + throw error; } } } diff --git a/tests/unit/job/jobProcessor/JobProcessor.spec.ts b/tests/unit/job/jobProcessor/JobProcessor.spec.ts index 77c763f..5396c8d 100644 --- a/tests/unit/job/jobProcessor/JobProcessor.spec.ts +++ b/tests/unit/job/jobProcessor/JobProcessor.spec.ts @@ -210,5 +210,29 @@ describe('JobProcessor', () => { await queueClient.heartbeatClient.stop(dequeuedTask.id); }); + + it('should throw an error if an error occurred during dequeue task and get job', async () => { + jest.useRealTimers(); + + testContext = setupJobProcessorTest({ useMockQueueClient: false }); + + const { jobProcessor, configMock, queueClient } = testContext; + const jobManagerBaseUrl = configMock.get('jobManagement.config.jobManagerBaseUrl'); + const jobType = initTestCases[0].jobType; + const taskType = initTestCases[0].taskType; + const consumeTaskUrl = `/tasks/${jobType}/${taskType}/startPending`; + const misMatchRegex = /^\/tasks\/[^/]+\/[^/]+\/startPending$/; + + nock.emitter.on('no match', () => { + nock(jobManagerBaseUrl).post(misMatchRegex).reply(404, undefined).persist(); + }); + + nock(jobManagerBaseUrl).post(consumeTaskUrl).reply(500, 'Request failed with status code 500').persist(); + + const dequeueSpy = jest.spyOn(queueClient, 'dequeue'); + + await expect(jobProcessor['getJobWithPhaseTask']()).rejects.toThrow('Request failed with status code 500'); + expect(dequeueSpy).toHaveBeenCalledWith(jobType, taskType); + }); }); });