Skip to content

Commit

Permalink
refactor: catching errors in jobProcessor for more accurate logging
Browse files Browse the repository at this point in the history
  • Loading branch information
almog8k committed Aug 27, 2024
1 parent 0459b77 commit b35be57
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/job/models/jobHandlerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export const jobHandlerFactory = (container: DependencyContainer) => {
const logger = container.resolve<Logger>(SERVICES.LOGGER);
return (jobType: string): IJobHandler => {
try {
logger.info(`Getting job handler for job type ${jobType}`);
const jobHandler = container.resolve<IJobHandler | null>(jobType);
if (!jobHandler) {
const errorMsg = `Job handler for job type ${jobType} not found`;
Expand Down
79 changes: 47 additions & 32 deletions src/job/models/jobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,59 +52,74 @@ export class JobProcessor {
try {
jobAndTaskType = await this.getJobWithPhaseTask();
if (!jobAndTaskType) {
logger.debug({ msg: 'waiting for next dequeue', metadata: { dequeueIntervalMs: this.dequeueIntervalMs } });
await setTimeoutPromise(this.dequeueIntervalMs);
return;
}

await this.processJob(jobAndTaskType);
} catch (error) {
if (error instanceof Error) {
logger.error({ msg: `Failed processing the job: ${error.message}`, error });
if (jobAndTaskType) {
const { job, task } = jobAndTaskType;
await this.queueClient.reject(job.id, task.id, true, error.message);
}
if (error instanceof Error && jobAndTaskType) {
const { job, task } = jobAndTaskType;
logger.error({ msg: 'rejecting task', error, metadata: { job, task } });
await this.queueClient.reject(job.id, task.id, true, error.message);
}
logger.debug({ msg: 'waiting for next dequeue', metadata: { dequeueIntervalMs: this.dequeueIntervalMs } });
await setTimeoutPromise(this.dequeueIntervalMs);
}
}

private async processJob(jobAndTaskType: JobAndPhaseTask): Promise<void> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.processJob.name } });
const { job, task } = jobAndTaskType;
const taskTypes = this.ingestionConfig.pollingTasks;
const jobHandler = this.jobHandlerFactory(job.type);
try {
const taskTypes = this.ingestionConfig.pollingTasks;
const jobHandler = this.jobHandlerFactory(job.type);

switch (task.type) {
case taskTypes.init:
await jobHandler.handleJobInit(job, task.id);
break;
case taskTypes.finalize:
await jobHandler.handleJobFinalize(job, task.id);
break;
switch (task.type) {
case taskTypes.init:
await jobHandler.handleJobInit(job, task.id);
break;
case taskTypes.finalize:
await jobHandler.handleJobFinalize(job, task.id);
break;
}
} catch (error) {
if (error instanceof Error) {
logger.error({ msg: `Failed processing the job: ${error.message}`, error });
}
throw error;
}
}

private async getJobWithPhaseTask(): Promise<JobAndPhaseTask | undefined> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJobWithPhaseTask.name } });
for (const taskType of this.pollingTaskTypes) {
for (const jobType of this.jobTypes) {
logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` });
const task = await this.queueClient.dequeue(jobType, taskType);
if (!task) {
logger.debug({ msg: `no task of type "${taskType}" and job of type "${jobType}" found` });
continue;
}
if (task.attempts === this.ingestionConfig.taskMaxTaskAttempts) {
logger.warn({ msg: `task ${task.id} reached max attempts, skipping`, metadata: task });
continue;
}
logger.info({ msg: `dequeued task ${task.id}`, metadata: task });
try {
for (const taskType of this.pollingTaskTypes) {
for (const jobType of this.jobTypes) {
logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` });
const task = await this.queueClient.dequeue(jobType, taskType);
if (!task) {
logger.debug({ msg: `no task of type "${taskType}" and job of type "${jobType}" found` });
continue;
}
if (task.attempts === this.ingestionConfig.taskMaxTaskAttempts) {
logger.warn({ msg: `task ${task.id} reached max attempts, skipping`, metadata: task });
continue;
}
logger.info({ msg: `dequeued task ${task.id}`, metadata: task });

await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
logger.info({ msg: `got job ${job.id}`, metadata: job });
return { job, task };
await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
logger.info({ msg: `got job ${job.id}`, metadata: job });
return { job, task };
}
}
} catch (error) {
if (error instanceof Error) {
logger.error({ msg: `Failed to get job with phase task: ${error.message}`, error });
}
throw error;
}
}
}
24 changes: 24 additions & 0 deletions tests/unit/job/jobProcessor/JobProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,29 @@ describe('JobProcessor', () => {

await queueClient.heartbeatClient.stop(dequeuedTask.id);
});

it('should throw an error if an error occurred during dequeue task and get job', async () => {
jest.useRealTimers();

testContext = setupJobProcessorTest({ useMockQueueClient: false });

const { jobProcessor, configMock, queueClient } = testContext;
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const jobType = initTestCases[0].jobType;
const taskType = initTestCases[0].taskType;
const consumeTaskUrl = `/tasks/${jobType}/${taskType}/startPending`;
const misMatchRegex = /^\/tasks\/[^/]+\/[^/]+\/startPending$/;

nock.emitter.on('no match', () => {
nock(jobManagerBaseUrl).post(misMatchRegex).reply(404, undefined).persist();
});

nock(jobManagerBaseUrl).post(consumeTaskUrl).reply(500, 'Request failed with status code 500').persist();

const dequeueSpy = jest.spyOn(queueClient, 'dequeue');

await expect(jobProcessor['getJobWithPhaseTask']()).rejects.toThrow('Request failed with status code 500');
expect(dequeueSpy).toHaveBeenCalledWith(jobType, taskType);
});
});
});

0 comments on commit b35be57

Please sign in to comment.