Skip to content

Commit

Permalink
fix: release-tasks-according-to-job-status (MAPCO-3951) (#31)
Browse files Browse the repository at this point in the history
* fix: change task status according to job status

* test: change test of releaseInactive function

* refactor: execute the update query only twice

* test: add more tests
  • Loading branch information
netanelC authored Feb 12, 2024
1 parent fb0050d commit a7d42a4
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 24 deletions.
15 changes: 15 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"@map-colonies/openapi-express-viewer": "^3.0.0",
"@map-colonies/read-pkg": "0.0.1",
"@map-colonies/telemetry": "4.1.0",
"@ngneat/falso": "^7.1.1",
"@opentelemetry/api": "1.4.0",
"@opentelemetry/api-metrics": "0.24.0",
"@opentelemetry/instrumentation-express": "0.32.1",
Expand Down
59 changes: 47 additions & 12 deletions src/DAL/repositories/taskRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EntityRepository, In, LessThan, Brackets, UpdateResult } from 'typeorm';
import { EntityRepository, LessThan, Brackets, UpdateResult, In } from 'typeorm';
import { container } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { ConflictError, NotFoundError } from '@map-colonies/error-types';
Expand All @@ -21,6 +21,7 @@ import {
} from '../../common/dataModels/tasks';
import { OperationStatus } from '../../common/dataModels/enums';
import { JobEntity } from '../entity/job';
import { IJobAndTaskStatus } from '../../common/interfaces';
import { GeneralRepository } from './generalRepository';

declare type SqlRawResponse = [unknown[], number];
Expand Down Expand Up @@ -141,17 +142,41 @@ export class TaskRepository extends GeneralRepository<TaskEntity> {
}

public async releaseInactiveTask(taskIds: string[]): Promise<string[]> {
const res = await this.createQueryBuilder()
.update()
.set({ status: OperationStatus.PENDING, attempts: () => 'attempts + 1' })
.where({ id: In(taskIds), status: OperationStatus.IN_PROGRESS })
.returning('id')
.updateEntity(true)
.execute();
const raw = res.raw as { id: string }[];
const updatedIds = raw.map((value) => {
return (value as TaskEntity).id;
});
const getJobStatusQuery = `
SELECT task.id as "taskId", "jobId", task.status as "taskStatus", job.status as "jobStatus"
FROM "Job" job, "Task" task
where task.id IN ('${taskIds.join("','")}') and task."jobId" = job.id
`;
const jobAndTaskEntities = (await this.query(getJobStatusQuery)) as IJobAndTaskStatus[];
const updatedIds: string[] = [];

const pendingEntities = jobAndTaskEntities.filter(
(entity) =>
entity.taskStatus === OperationStatus.IN_PROGRESS &&
(entity.jobStatus === OperationStatus.PENDING || entity.jobStatus === OperationStatus.IN_PROGRESS)
);

const abortedEntities = jobAndTaskEntities.filter(
(entity) =>
entity.taskStatus === OperationStatus.IN_PROGRESS &&
entity.jobStatus !== OperationStatus.PENDING &&
entity.jobStatus !== OperationStatus.IN_PROGRESS
);

// Execute update query for Pending status
if (pendingEntities.length > 0) {
const pendingTaskIds = pendingEntities.map((entity) => entity.taskId);
await this.updateTaskStatus(pendingTaskIds, OperationStatus.PENDING);
updatedIds.push(...pendingTaskIds);
}

// Execute update query for Aborted status
if (abortedEntities.length > 0) {
const abortedTaskIds = abortedEntities.map((entity) => entity.taskId);
await this.updateTaskStatus(abortedTaskIds, OperationStatus.ABORTED);
updatedIds.push(...abortedTaskIds);
}

return updatedIds;
}

Expand Down Expand Up @@ -280,4 +305,14 @@ export class TaskRepository extends GeneralRepository<TaskEntity> {
throw err;
}
}

private async updateTaskStatus(taskIds: string[], newStatus: OperationStatus): Promise<void> {
await this.createQueryBuilder()
.update()
.set({ status: newStatus, attempts: () => 'attempts + 1' })
.where({ id: In(taskIds) })
.returning('id')
.updateEntity(true)
.execute();
}
}
8 changes: 8 additions & 0 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PostgresConnectionOptions } from 'typeorm/driver/postgres/PostgresConnectionOptions';
import { ResponseCodes } from './constants';
import { OperationStatus } from './dataModels/enums';

export interface IConfig {
get: <T>(setting: string) => T;
Expand All @@ -25,3 +26,10 @@ export interface IHttpResponse<T> {
export interface DefaultResponse {
code: ResponseCodes;
}

export interface IJobAndTaskStatus {
taskId: string;
jobId: string;
taskStatus: OperationStatus;
jobStatus: OperationStatus;
}
69 changes: 57 additions & 12 deletions tests/integration/taskManagement/taskManagement.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import httpStatusCodes from 'http-status-codes';
import { getContainerConfig, resetContainer } from '../testContainerConfig';
import { getApp } from '../../../src/app';
import { createJobAndTaskStatus, createUuid } from '../../mocks/values';
import { TaskRepository } from '../../../src/DAL/repositories/taskRepository';
import { JobRepository } from '../../../src/DAL/repositories/jobRepository';
import { registerRepository, initTypeOrmMocks, RepositoryMocks } from '../../mocks/DBMock';
import { registerRepository, initTypeOrmMocks, RepositoryMocks, In } from '../../mocks/DBMock';
import { OperationStatus } from '../../../src/common/dataModels/enums';
import { TaskEntity } from '../../../src/DAL/entity/task';
import { IGetTaskResponse } from '../../../src/common/dataModels/tasks';
import { ResponseCodes } from '../../../src/common/constants';
import { IJobAndTaskStatus } from '../../../src/common/interfaces';
import { JobEntity } from '../../../src/DAL/entity/job';
import { TaskManagementRequestSender } from './helpers/taskManagementRequestSender';

Expand Down Expand Up @@ -277,21 +279,64 @@ describe('tasks', function () {

describe('release inactive tasks', () => {
describe('Happy Path', () => {
it('should release inactive tasks', async function () {
const req = ['6716ddc8-40fb-41b2-bf1d-5c433fe4728f', '2f2ca364-6ce7-4154-a07a-af9ba3970ddf'];
taskRepositoryMocks.queryBuilder.execute.mockResolvedValue({
raw: [
{
id: '6716ddc8-40fb-41b2-bf1d-5c433fe4728f',
},
],
});
it(`should release inactive tasks to 'Pending' when job is 'In-Progress' or 'Pending'`, async function () {
const reqIDs = [createUuid(), createUuid()];
const jobAndTaskEntities: IJobAndTaskStatus[] = [
createJobAndTaskStatus(OperationStatus.IN_PROGRESS, reqIDs[0]),
createJobAndTaskStatus(OperationStatus.PENDING, reqIDs[1]),
];
taskRepositoryMocks.queryMock.mockResolvedValue(jobAndTaskEntities);
taskRepositoryMocks.queryBuilder.execute.mockResolvedValueOnce({ raw: [{ id: In(reqIDs) as string[] }] });

const response = await requestSender.releaseInactive(req);
const response = await requestSender.releaseInactive(reqIDs);

expect(response.status).toBe(httpStatusCodes.OK);
expect(response.body).toEqual(reqIDs);
expect(taskRepositoryMocks.queryBuilder.execute).toHaveBeenCalledTimes(1);
expect(taskRepositoryMocks.queryBuilder.set).toHaveBeenCalledWith(expect.objectContaining({ status: OperationStatus.PENDING }));
expect(response).toSatisfyApiSpec();
});

it(`should release inactive tasks to 'Aborted' when job is neither 'In-Progress' nor 'Pending'`, async function () {
const reqIDs = [createUuid(), createUuid(), createUuid()];
const jobAndTaskEntities: IJobAndTaskStatus[] = [
createJobAndTaskStatus(OperationStatus.ABORTED, reqIDs[0]),
createJobAndTaskStatus(OperationStatus.FAILED, reqIDs[1]),
createJobAndTaskStatus(OperationStatus.EXPIRED, reqIDs[2]),
];
taskRepositoryMocks.queryMock.mockResolvedValue(jobAndTaskEntities);
taskRepositoryMocks.queryBuilder.execute.mockResolvedValueOnce({ raw: [{ id: In(reqIDs) as string[] }] });

const response = await requestSender.releaseInactive(reqIDs);

expect(response.status).toBe(httpStatusCodes.OK);
expect(response.body).toEqual(['6716ddc8-40fb-41b2-bf1d-5c433fe4728f']);
expect(response.body).toEqual(reqIDs);
expect(taskRepositoryMocks.queryBuilder.execute).toHaveBeenCalledTimes(1);
expect(taskRepositoryMocks.queryBuilder.set).toHaveBeenCalledWith(expect.objectContaining({ status: OperationStatus.ABORTED }));
expect(response).toSatisfyApiSpec();
});

it(`should execute the query twice when has both active (or preActive) and inactive jobs`, async function () {
const reqIDs = [createUuid(), createUuid(), createUuid(), createUuid(), createUuid(), createUuid()];
const jobAndTaskEntities: IJobAndTaskStatus[] = [
createJobAndTaskStatus(OperationStatus.PENDING, reqIDs[0]),
createJobAndTaskStatus(OperationStatus.IN_PROGRESS, reqIDs[1]),
createJobAndTaskStatus(OperationStatus.ABORTED, reqIDs[2]),
createJobAndTaskStatus(OperationStatus.FAILED, reqIDs[3]),
createJobAndTaskStatus(OperationStatus.COMPLETED, reqIDs[4]),
createJobAndTaskStatus(OperationStatus.EXPIRED, reqIDs[5]),
];
taskRepositoryMocks.queryMock.mockResolvedValue(jobAndTaskEntities);
taskRepositoryMocks.queryBuilder.execute.mockResolvedValueOnce({ raw: [{ id: In(reqIDs.slice(0, 3)) as string[] }] });
taskRepositoryMocks.queryBuilder.execute.mockResolvedValueOnce({ raw: [{ id: In(reqIDs.slice(3)) as string[] }] });

const response = await requestSender.releaseInactive(reqIDs);

expect(response.status).toBe(httpStatusCodes.OK);
expect(response.body).toEqual(reqIDs);
expect(taskRepositoryMocks.queryBuilder.execute).toHaveBeenCalledTimes(2);
expect(taskRepositoryMocks.queryBuilder.set).toHaveBeenNthCalledWith(1, expect.objectContaining({ status: OperationStatus.PENDING }));
expect(taskRepositoryMocks.queryBuilder.set).toHaveBeenNthCalledWith(2, expect.objectContaining({ status: OperationStatus.ABORTED }));
expect(response).toSatisfyApiSpec();
});
});
Expand Down
21 changes: 21 additions & 0 deletions tests/mocks/values.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { randUuid } from '@ngneat/falso';
import { IJobAndTaskStatus } from '../../src/common/interfaces';
import { OperationStatus } from '../../src/common/dataModels/enums';

export const createUuid = (): string => {
return randUuid();
};

export const createJobAndTaskStatus = (
jobStatus: OperationStatus,
taskId = createUuid(),
jobId = createUuid(),
taskStatus = OperationStatus.IN_PROGRESS
): IJobAndTaskStatus => {
return {
taskId,
taskStatus,
jobId,
jobStatus,
};
};

0 comments on commit a7d42a4

Please sign in to comment.