Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: polling loop using generator, manging job flow with jobHandler (MAPCO-4439) #2

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@
"dequeueIntervalMs": 3000
},
"ingestion": {
"init": {
"taskType": "init"
"pollingTasks": {
"init": "init",
"finalize": "finalize"
},
"jobs": {
"new": {
Expand Down
5 changes: 0 additions & 5 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,4 @@ export const SERVICES = {
QUEUE_CLIENT: Symbol('QueueClient'),
} satisfies Record<string, symbol>;

export const JOB_TYPES = {
Ingestion_New: 'Ingestion_New',
Ingestion_Update: 'Ingestion_Update',
Ingestion_Swap_Update: 'Ingestion_Swap_Update',
} as const satisfies Record<string, string>;
/* eslint-enable @typescript-eslint/naming-convention */
24 changes: 10 additions & 14 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { NewRasterLayer, UpdateRasterLayer } from '@map-colonies/mc-model-types';

Check warning on line 1 in src/common/interfaces.ts

View workflow job for this annotation

GitHub Actions / ESLint

src/common/interfaces.ts#L1

'NewRasterLayer' is defined but never used (@typescript-eslint/no-unused-vars)

Check warning on line 1 in src/common/interfaces.ts

View workflow job for this annotation

GitHub Actions / ESLint

src/common/interfaces.ts#L1

'UpdateRasterLayer' is defined but never used (@typescript-eslint/no-unused-vars)
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
import { IJobResponse } from '@map-colonies/mc-priority-queue';

//#region config interfaces
Expand Down Expand Up @@ -34,28 +34,24 @@
swapUpdate: IJobConfig | undefined;
}

export interface IPollingTasks {
init: string;
finalize: string;
}

export interface IngestionConfig {
init: {
taskType: string;
};
pollingTasks: IPollingTasks;
jobs: IngestionJobsConfig;
}
//#endregion config interfaces
//#endregion config
export interface LogContext {
fileName: string;
class?: string;
function?: string;
}

/* eslint-disable @typescript-eslint/naming-convention */
export interface JobTypeMap {
Ingestion_New: NewRasterLayer;
Ingestion_Update: UpdateRasterLayer;
Ingestion_Swap_Update: UpdateRasterLayer;
}
/* eslint-disable @typescript-eslint/naming-convention */

export interface IJobHandler {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJob: (job: IJobResponse<any, any>) => Promise<void>;
handleJobInit: (job: IJobResponse<any, any>) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobFinalize: (job: IJobResponse<any, any>) => Promise<void>;
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ server.listen(port, () => {
logger.info(`app started on port ${port}`);
startPolling().catch((error) => {
if (error instanceof Error) {
logger.error({ msg: 'error in main loop', error: error.message });
logger.fatal({ msg: 'error in main loop', error: error.message });
}
jobProcessor.stop();
process.exit(1);
Expand Down
56 changes: 36 additions & 20 deletions src/models/jobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import { JOB_HANDLER_FACTORY_SYMBOL, JobHandlerFactory } from './jobHandlerFacto
export class JobProcessor {
private readonly logContext: LogContext;
private readonly jobTypes: string[];
private readonly initTaskType: string;
private readonly pollingTaskTypes: string[];
private readonly dequeueIntervalMs: number;
private readonly ingestionConfig: IngestionConfig;
private isRunning = true;
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
Expand All @@ -22,10 +23,10 @@ export class JobProcessor {
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient
) {
this.dequeueIntervalMs = this.config.get<number>('jobManagement.config.dequeueIntervalMs');
const { jobs, init } = this.config.get<IngestionConfig>('jobManagement.ingestion');
this.ingestionConfig = this.config.get<IngestionConfig>('jobManagement.ingestion');
const { jobs, pollingTasks } = this.ingestionConfig;
this.jobTypes = getAvailableJobTypes(jobs);
this.initTaskType = init.taskType;

this.pollingTaskTypes = [pollingTasks.init, pollingTasks.finalize];
this.logContext = {
fileName: __filename,
class: JobProcessor.name,
Expand All @@ -49,35 +50,50 @@ export class JobProcessor {
private async consumeAndProcess(): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.consumeAndProcess.name };
try {
const job = await this.getJob();
const jobAndTaskType = await this.getJobWithTaskType();

if (!job) {
if (!jobAndTaskType) {
await setTimeoutPromise(this.dequeueIntervalMs);
return;
}

const jobHandler = this.jobHandlerFactory(job.type);
await jobHandler.handleJob(job);
await this.processJob(jobAndTaskType);
} catch (error) {
this.logger.error({ msg: 'Failed processing the job', error, logContext: logCtx });
await setTimeoutPromise(this.dequeueIntervalMs);
}
}

private async getJob(): Promise<IJobResponse<unknown, unknown> | undefined> {
const logCtx: LogContext = { ...this.logContext, function: this.getJob.name };
for (const jobType of this.jobTypes) {
this.logger.debug({ msg: `try to dequeue task of type "${this.initTaskType}" and job of type "${jobType}"`, logContext: logCtx });
const task = await this.queueClient.dequeue(jobType, this.initTaskType);
private async processJob(jobAndTaskType: { job: IJobResponse<unknown, unknown>; taskType: string }): Promise<void> {
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const { job, taskType } = jobAndTaskType;
const taskTypes = this.ingestionConfig.pollingTasks;
const jobHandler = this.jobHandlerFactory(job.type);

if (!task) {
continue;
}
switch (taskType) {
case taskTypes.init:
await jobHandler.handleJobInit(job);
break;
case taskTypes.finalize:
await jobHandler.handleJobFinalize(job);
break;
}
}

private async getJobWithTaskType(): Promise<{ job: IJobResponse<unknown, unknown>; taskType: string } | undefined> {
const logCtx: LogContext = { ...this.logContext, function: this.getJobWithTaskType.name };
for (const taskType of this.pollingTaskTypes) {
for (const jobType of this.jobTypes) {
this.logger.debug({ msg: `try to dequeue task of type "${taskType}" and job of type "${jobType}"`, logContext: logCtx });
const task = await this.queueClient.dequeue(jobType, taskType);

this.logger.info({ msg: `dequeued task ${task.id}`, metadata: task, logContext: this.logContext });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
this.logger.info({ msg: `got job ${job.id}`, metadata: job, logContext: this.logContext });
return job;
if (!task) {
continue;
}
this.logger.info({ msg: `dequeued task ${task.id}`, metadata: task, logContext: this.logContext });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
this.logger.info({ msg: `got job ${job.id}`, metadata: job, logContext: this.logContext });
return { job, taskType: task.type };
}
}
}
}
12 changes: 9 additions & 3 deletions src/models/newJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ export class NewJobHandler implements IJobHandler {
};
}

public async handleJob(job: IJobResponse<NewRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJob.name };
this.logger.info({ msg: 'handling new job', metadata: { job }, logContext: logCtx });
public async handleJobInit(job: IJobResponse<NewRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobInit.name };
this.logger.info({ msg: 'handling "new" job "init"', metadata: { job }, logContext: logCtx });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await Promise.reject('not implemented');
}

public async handleJobFinalize(job: IJobResponse<NewRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobFinalize.name };
this.logger.info({ msg: 'handling "new" job "finalize"', metadata: { job }, logContext: logCtx });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await Promise.reject('not implemented');
}
}
12 changes: 9 additions & 3 deletions src/models/swapJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ export class SwapJobHandler implements IJobHandler {
this.logContext = { fileName: __filename, class: SwapJobHandler.name };
}

public async handleJob(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJob.name };
this.logger.info({ msg: 'handling swap update job', metadata: { job }, logContext: logCtx });
public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobInit.name };
this.logger.info({ msg: 'handling "swap update" job "init"', metadata: { job }, logContext: logCtx });
await Promise.reject('not implemented');
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobFinalize.name };
this.logger.info({ msg: 'handling "swap update" job "finalize"', metadata: { job }, logContext: logCtx });
await Promise.reject('not implemented');
}
}
12 changes: 9 additions & 3 deletions src/models/updateJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ export class UpdateJobHandler implements IJobHandler {
this.logContext = { fileName: __filename, class: UpdateJobHandler.name };
}

public async handleJob(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJob.name };
this.logger.info({ msg: 'handling update job', metadata: { job }, logContext: logCtx });
public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobInit.name };
this.logger.info({ msg: 'handling "update" job "init"', metadata: { job }, logContext: logCtx });
await Promise.reject('not implemented');
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobFinalize.name };
this.logger.info({ msg: 'handling "update" job "finalize"', metadata: { job }, logContext: logCtx });
await Promise.reject('not implemented');
}
}
96 changes: 73 additions & 23 deletions tests/unit/jobProcessor/JobProcessor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { ITaskResponse } from '@map-colonies/mc-priority-queue';
import { registerDefaultConfig } from '../mocks/configMock';
import { ingestionNewJob, ingestionUpdateJob } from '../mocks/jobsMockData';
import { initTaskForIngestionNew, initTaskForIngestionUpdate } from '../mocks/tasksMockData';
import { NewJobHandler } from '../../../src/models/newJobHandler';
import { UpdateJobHandler } from '../../../src/models/updateJobHandler';
import { SwapJobHandler } from '../../../src/models/swapJobHandler';
import { IJobHandler, IngestionJobsConfig } from '../../../src/common/interfaces';
import {
finalizeTaskForIngestionNew,
finalizeTaskForIngestionSwapUpdate,
finalizeTaskForIngestionUpdate,
initTaskForIngestionNew,
initTaskForIngestionUpdate,
} from '../mocks/tasksMockData';
import { IJobHandler, IngestionConfig } from '../../../src/common/interfaces';
import { JobProcessorTestContext, setupJobProcessorTest } from './jobProcessorSetup';

jest.mock('timers/promises', () => ({
Expand Down Expand Up @@ -36,49 +39,96 @@ describe('JobProcessor', () => {
describe('start', () => {
it('should start polling and stop when stop is called', async () => {
const { jobProcessor, mockDequeue, configMock } = testContext;
const jobTypes = configMock.get<IngestionJobsConfig>('jobManagement.ingestion.jobs');
const ingestionConfig = configMock.get<IngestionConfig>('jobManagement.ingestion');
const dequeueIntervalMs = configMock.get<number>('jobManagement.config.dequeueIntervalMs');
const jobTypesArray = Object.keys(jobTypes);
const { jobs, pollingTasks } = ingestionConfig;
const jobTypesAmount = Object.keys(jobs).length;
const pollingTasksTypesAmount = Object.keys(pollingTasks).length;
const totalDequeueCalls = jobTypesAmount * pollingTasksTypesAmount;

const processPromise = jobProcessor.start();
jest.advanceTimersByTime(dequeueIntervalMs);
jobProcessor.stop();
await processPromise;

expect(mockDequeue).toHaveBeenCalledTimes(jobTypesArray.length); // as number of job types
expect(mockDequeue).toHaveBeenCalledTimes(totalDequeueCalls);
});
});

describe('consumeAndProcess', () => {
const testCases = [
const initTestCases = [
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
{
jobType: 'Ingestion_New',
jobType: ingestionNewJob.type,
taskType: initTaskForIngestionNew.type,
job: ingestionNewJob,
initTask: initTaskForIngestionNew,
expectedHandlerType: NewJobHandler,
task: initTaskForIngestionNew,
},
{
jobType: 'Ingestion_Update',
jobType: ingestionUpdateJob.type,
taskType: initTaskForIngestionNew.type,
job: ingestionUpdateJob,
initTask: initTaskForIngestionUpdate,
expectedHandlerType: UpdateJobHandler,
task: initTaskForIngestionUpdate,
},
{
jobType: 'Ingestion_Swap_Update',
jobType: ingestionUpdateJob.type,
taskType: initTaskForIngestionNew.type,
job: ingestionUpdateJob,
initTask: initTaskForIngestionUpdate,
expectedHandlerType: SwapJobHandler,
task: initTaskForIngestionUpdate,
},
];
const finalizeTestCases = [
{
jobType: ingestionNewJob.type,
taskType: finalizeTaskForIngestionNew.type,
job: ingestionNewJob,
task: finalizeTaskForIngestionNew,
},
{
jobType: ingestionUpdateJob.type,
taskType: finalizeTaskForIngestionUpdate.type,
job: ingestionUpdateJob,
task: finalizeTaskForIngestionUpdate,
},
{
jobType: ingestionUpdateJob.type,
taskType: finalizeTaskForIngestionSwapUpdate.type,
job: ingestionUpdateJob,
task: finalizeTaskForIngestionSwapUpdate,
},
];

test.each(initTestCases)('should process job of type $jobType and init task successfully', async ({ job, task }) => {
const { jobProcessor, mockDequeue, mockGetJob, mockJobHandlerFactory, configMock } = testContext;
const dequeueIntervalMs = configMock.get<number>('jobManagement.config.dequeueIntervalMs');

const mockHandler: jest.Mocked<IJobHandler> = {
handleJobInit: jest.fn().mockResolvedValue(undefined),
handleJobFinalize: jest.fn().mockResolvedValue(undefined),
};
mockDequeue.mockResolvedValueOnce(task as ITaskResponse<unknown>);
mockGetJob.mockResolvedValueOnce(job);
mockJobHandlerFactory.mockReturnValueOnce(mockHandler);

const processPromise = jobProcessor.start();
jest.advanceTimersByTime(dequeueIntervalMs);
jobProcessor.stop();
await processPromise;

expect(mockGetJob).toHaveBeenCalledTimes(1);
expect(mockGetJob).toHaveBeenCalledWith(task.jobId);
expect(mockJobHandlerFactory).toHaveBeenCalledWith(job.type);
expect(mockHandler.handleJobInit).toHaveBeenCalledWith(job);
});

test.each(testCases)('should process job of type $jobType successfully', async ({ job, initTask }) => {
test.each(finalizeTestCases)('should process job of type $jobType and finalize task successfully', async ({ job, task }) => {
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const { jobProcessor, mockDequeue, mockGetJob, mockJobHandlerFactory, configMock } = testContext;
const dequeueIntervalMs = configMock.get<number>('jobManagement.config.dequeueIntervalMs');

const mockHandler: jest.Mocked<IJobHandler> = {
handleJob: jest.fn().mockResolvedValue(undefined),
handleJobInit: jest.fn().mockResolvedValue(undefined),
handleJobFinalize: jest.fn().mockResolvedValue(undefined),
};
mockDequeue.mockResolvedValueOnce(initTask as ITaskResponse<unknown>);
mockDequeue.mockResolvedValueOnce(task as ITaskResponse<unknown>);
mockGetJob.mockResolvedValueOnce(job);
mockJobHandlerFactory.mockReturnValueOnce(mockHandler);

Expand All @@ -88,9 +138,9 @@ describe('JobProcessor', () => {
await processPromise;

expect(mockGetJob).toHaveBeenCalledTimes(1);
expect(mockGetJob).toHaveBeenCalledWith(initTask.jobId);
expect(mockGetJob).toHaveBeenCalledWith(task.jobId);
expect(mockJobHandlerFactory).toHaveBeenCalledWith(job.type);
expect(mockHandler.handleJob).toHaveBeenCalledWith(job);
expect(mockHandler.handleJobFinalize).toHaveBeenCalledWith(job);
});
});
});
8 changes: 2 additions & 6 deletions tests/unit/jobProcessor/jobProcessorSetup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { Logger } from '@map-colonies/js-logger';
import jsLogger from '@map-colonies/js-logger';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { JobProcessor } from '../../../src/models/jobProcessor';
import { JobHandlerFactory } from '../../../src/models/jobHandlerFactory';
Expand All @@ -17,11 +17,7 @@ export interface JobProcessorTestContext {
}

export function setupJobProcessorTest(): JobProcessorTestContext {
const mockLogger = {
info: jest.fn(),
fatal: jest.fn(),
debug: jest.fn(),
} as unknown as jest.Mocked<Logger>;
const mockLogger = jsLogger({ enabled: false });

const mockJobHandlerFactory = jest.fn();

Expand Down
5 changes: 3 additions & 2 deletions tests/unit/mocks/configMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ const registerDefaultConfig = (): void => {
dequeueIntervalMs: 3000,
},
ingestion: {
init: {
taskType: 'init',
pollingTasks: {
init: 'init',
finalize: 'finalize',
},
jobs: {
new: {
Expand Down
Loading
Loading