Skip to content

Commit

Permalink
test(core): Add tests for scaling service (no-changelog) (#10320)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Aug 8, 2024
1 parent a0b021b commit aa95059
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 3 deletions.
259 changes: 259 additions & 0 deletions packages/cli/src/scaling/__tests__/scaling.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import { mock } from 'jest-mock-extended';
import { ScalingService } from '../scaling.service';
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
import config from '@/config';
import * as BullModule from 'bull';
import type { Job, JobData, JobOptions, JobQueue } from '../types';
import { ApplicationError } from 'n8n-workflow';

const queue = mock<JobQueue>({
client: { ping: jest.fn() },
});

jest.mock('bull', () => ({
__esModule: true,
default: jest.fn(() => queue),
}));

describe('ScalingService', () => {
beforeEach(() => {
jest.clearAllMocks();
config.set('generic.instanceType', 'main');
});

describe('setupQueue', () => {
it('should set up the queue', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
const { prefix, settings } = config.get('queue.bull');
const Bull = jest.mocked(BullModule.default);

/**
* Act
*/
await scalingService.setupQueue();

/**
* Assert
*/
expect(Bull).toHaveBeenCalledWith(QUEUE_NAME, {
prefix,
settings,
createClient: expect.any(Function),
});
expect(queue.on).toHaveBeenCalledWith('global:progress', expect.any(Function));
expect(queue.on).toHaveBeenCalledWith('error', expect.any(Function));
});
});

describe('setupWorker', () => {
it('should set up a worker with concurrency', async () => {
/**
* Arrange
*/
config.set('generic.instanceType', 'worker');
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
const concurrency = 5;

/**
* Act
*/
scalingService.setupWorker(concurrency);

/**
* Assert
*/
expect(queue.process).toHaveBeenCalledWith(JOB_TYPE_NAME, concurrency, expect.any(Function));
});

it('should throw if called on a non-worker instance', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();

/**
* Act and Assert
*/
expect(() => scalingService.setupWorker(5)).toThrow();
});
});

describe('pauseQueue', () => {
it('should pause the queue', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();

/**
* Act
*/
await scalingService.pauseQueue();

/**
* Assert
*/
expect(queue.pause).toHaveBeenCalledWith(true, true);
});
});

describe('pingQueue', () => {
it('should ping the queue', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();

/**
* Act
*/
await scalingService.pingQueue();

/**
* Assert
*/
expect(queue.client.ping).toHaveBeenCalled();
});
});

describe('addJob', () => {
it('should add a job', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
queue.add.mockResolvedValue(mock<Job>({ id: '456' }));

/**
* Act
*/
const jobData = mock<JobData>({ executionId: '123' });
const jobOptions = mock<JobOptions>();
await scalingService.addJob(jobData, jobOptions);

/**
* Assert
*/
expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, jobOptions);
});
});

describe('getJob', () => {
it('should get a job', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
const jobId = '123';
queue.getJob.mockResolvedValue(mock<Job>({ id: jobId }));

/**
* Act
*/
const job = await scalingService.getJob(jobId);

/**
* Assert
*/
expect(queue.getJob).toHaveBeenCalledWith(jobId);
expect(job?.id).toBe(jobId);
});
});

describe('findJobsByStatus', () => {
it('should find jobs by status', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
queue.getJobs.mockResolvedValue([mock<Job>({ id: '123' })]);

/**
* Act
*/
const jobs = await scalingService.findJobsByStatus(['active']);

/**
* Assert
*/
expect(queue.getJobs).toHaveBeenCalledWith(['active']);
expect(jobs).toHaveLength(1);
expect(jobs.at(0)?.id).toBe('123');
});
});

describe('stopJob', () => {
it('should stop an active job', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
const job = mock<Job>({ isActive: jest.fn().mockResolvedValue(true) });

/**
* Act
*/
const result = await scalingService.stopJob(job);

/**
* Assert
*/
expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' });
expect(result).toBe(true);
});

it('should stop an inactive job', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
const job = mock<Job>({ isActive: jest.fn().mockResolvedValue(false) });

/**
* Act
*/
const result = await scalingService.stopJob(job);

/**
* Assert
*/
expect(job.remove).toHaveBeenCalled();
expect(result).toBe(true);
});

it('should report failure to stop a job', async () => {
/**
* Arrange
*/
const scalingService = new ScalingService(mock(), mock(), mock());
await scalingService.setupQueue();
const job = mock<Job>({
isActive: jest.fn().mockImplementation(() => {
throw new ApplicationError('Something went wrong');
}),
});

/**
* Act
*/
const result = await scalingService.stopJob(job);

/**
* Assert
*/
expect(result).toBe(false);
});
});
});
2 changes: 1 addition & 1 deletion packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type PCancelable from 'p-cancelable';
*/
@Service()
export class JobProcessor {
private readonly runningJobs: { [jobId: JobId]: RunningJob } = {};
private readonly runningJobs: Record<JobId, RunningJob> = {};

constructor(
private readonly logger: Logger,
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ export class ScalingService {

try {
if (await job.isActive()) {
await job.progress({ kind: 'abort-job' });
await job.progress({ kind: 'abort-job' }); // being processed by worker
this.logger.debug('[ScalingService] Stopped active job', props);
return true;
}

await job.remove();
await job.remove(); // not yet picked up, or waiting for next pickup (stalled)
this.logger.debug('[ScalingService] Stopped inactive job', props);
return true;
} catch (error: unknown) {
Expand Down

0 comments on commit aa95059

Please sign in to comment.