Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests): add tests for worker and queue services #3936

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';

import { TriggerHandlerQueueService } from './trigger-handler-queue.service';

let triggerHandlerQueueService: TriggerHandlerQueueService;

describe('TriggerHandlerQueue service', () => {
beforeEach(async () => {
triggerHandlerQueueService = new TriggerHandlerQueueService();
await triggerHandlerQueueService.bullMqService.queue.obliterate();
});

afterEach(async () => {
await triggerHandlerQueueService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(triggerHandlerQueueService).to.exist;
expect(Object.keys(triggerHandlerQueueService)).to.include.members(['name', 'bullMqService']);
expect(triggerHandlerQueueService.name).to.equal('trigger-handler');
expect(await triggerHandlerQueueService.bullMqService.getRunningStatus()).to.deep.equal({
queueIsPaused: false,
queueName: 'trigger-handler',
workerName: undefined,
workerIsRunning: undefined,
});
expect(triggerHandlerQueueService.bullMqService.queue).to.deep.include({
_events: {},
_eventsCount: 0,
_maxListeners: undefined,
name: 'trigger-handler',
jobsOpts: {
removeOnComplete: true,
},
});
expect(triggerHandlerQueueService.bullMqService.queue.opts).to.deep.include({
blockingConnection: false,
connection: {
connectTimeout: 50000,
db: 1,
family: 4,
host: 'localhost',
keepAlive: 30000,
keyPrefix: '',
password: undefined,
port: 6379,
tls: undefined,
},
defaultJobOptions: {
removeOnComplete: true,
},
prefix: 'bull',
sharedConnection: false,
});
expect(triggerHandlerQueueService.bullMqService.queue.opts.connection).to.deep.include({
host: 'localhost',
port: 6379,
});
});

it('should add a job in the queue', async () => {
const jobId = 'trigger-queue-job-id';
const organizationId = 'trigger-queue-organization-id';
const jobData = {
test: 'trigger-queue-job-data',
};
await triggerHandlerQueueService.add(jobId, jobData, organizationId);

expect(await triggerHandlerQueueService.bullMqService.queue.getActiveCount()).to.equal(0);
expect(await triggerHandlerQueueService.bullMqService.queue.getWaitingCount()).to.equal(1);

const triggerHandlerQueueJobs = await triggerHandlerQueueService.bullMqService.queue.getJobs();
expect(triggerHandlerQueueJobs.length).to.equal(1);
const [triggerHandlerQueueJob] = triggerHandlerQueueJobs;
expect(triggerHandlerQueueJob).to.deep.include({
id: '1',
name: jobId,
data: jobData,
attemptsMade: 0,
});
});
});
39 changes: 39 additions & 0 deletions apps/worker/src/app/workflow/services/metric-queue.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';

import { MetricQueueService } from './metric-queue.service';

import { WorkflowModule } from '../workflow.module';

let metricQueueService: MetricQueueService;

describe('Metric Queue service', () => {
beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

metricQueueService = moduleRef.get<MetricQueueService>(MetricQueueService);
});

afterEach(async () => {
await metricQueueService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(metricQueueService).to.be.ok;
expect(metricQueueService).to.have.all.keys('DEFAULT_ATTEMPTS', 'bullMqService', 'name', 'token_list');
expect(await metricQueueService.bullMqService.getRunningStatus()).to.deep.include({
queueName: 'metric',
workerName: 'metric',
});
expect(metricQueueService.bullMqService.worker.opts).to.deep.include({
concurrency: 1,
lockDuration: 500,
});
expect(metricQueueService.bullMqService.worker.opts.connection).to.deep.include({
host: 'localhost',
port: 6379,
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import { setTimeout } from 'timers/promises';

import { TriggerProcessorQueueService } from './trigger-processor-queue.service';

import { WorkflowModule } from '../workflow.module';

let triggerProcessorQueueService: TriggerProcessorQueueService;

describe('Trigger Processor Queue service', () => {
beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [WorkflowModule],
}).compile();

triggerProcessorQueueService = moduleRef.get<TriggerProcessorQueueService>(TriggerProcessorQueueService);
await triggerProcessorQueueService.bullMqService.queue.obliterate();
});

afterEach(async () => {
await triggerProcessorQueueService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(triggerProcessorQueueService).to.be.ok;
expect(triggerProcessorQueueService).to.have.all.keys('bullMqService', 'name', 'triggerEventUsecase');
expect(await triggerProcessorQueueService.bullMqService.getRunningStatus()).to.deep.equal({
queueIsPaused: false,
queueName: 'trigger-handler',
workerName: 'trigger-handler',
workerIsRunning: true,
});
expect(triggerProcessorQueueService.bullMqService.worker.opts).to.deep.include({
concurrency: 200,
lockDuration: 90000,
});
expect(triggerProcessorQueueService.bullMqService.worker.opts.connection).to.deep.include({
host: 'localhost',
port: 6379,
});
});

it('should be able to automatically pull a job from the queue that will error', async () => {
const existingJobs = await triggerProcessorQueueService.bullMqService.queue.getJobs();
expect(existingJobs.length).to.equal(0);

const jobId = 'trigger-processor-queue-job-id';
const _environmentId = 'trigger-processor-queue-environment-id';
const _organizationId = 'trigger-processor-queue-organization-id';
const _userId = 'trigger-processor-queue-user-id';
const jobData = {
_id: jobId,
test: 'trigger-processor-queue-job-data',
_environmentId,
_organizationId,
_userId,
};

await triggerProcessorQueueService.bullMqService.add(jobId, jobData, _organizationId);

expect(await triggerProcessorQueueService.bullMqService.queue.getActiveCount()).to.equal(1);
expect(await triggerProcessorQueueService.bullMqService.queue.getWaitingCount()).to.equal(0);

const timestamp = Date.now();

// When we arrive to pull the job it has been already pulled by the worker
const nextJob = await triggerProcessorQueueService.bullMqService.worker.getNextJob(jobId);
expect(nextJob).to.equal(undefined);

await setTimeout(100);

const queueJobs = await triggerProcessorQueueService.bullMqService.queue.getJobs();
expect(queueJobs.length).to.equal(1);
const [queueJob] = queueJobs;

expect(queueJob).to.deep.include({
attemptsMade: 1,
data: jobData,
delay: 0,
failedReason: 'Notification template could not be found',
id: '1',
name: jobId,
progress: 0,
});
const [stackTrace] = queueJob.stacktrace;
expect(stackTrace)
.to.be.a('string')
.and.satisfy((str) => str.startsWith('ApiException: Notification template could not be found'));
expect(timestamp).to.be.greaterThanOrEqual(Number(queueJob.processedOn));
});
});
160 changes: 160 additions & 0 deletions packages/application-generic/src/services/bull-mq.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import {
BullMqService,
QueueBaseOptions,
WorkerOptions,
} from './bull-mq.service';

let bullMqService: BullMqService;

describe('BullMQ Service', () => {
describe('Non cluster mode', () => {
beforeEach(() => {
bullMqService = new BullMqService();
});

afterEach(async () => {
await bullMqService.gracefulShutdown();
});

describe('Set up', () => {
it('should be able to instantiate it correctly', async () => {
expect(bullMqService.queue).toBeUndefined();
expect(bullMqService.worker).toBeUndefined();
expect(BullMqService.haveProInstalled()).toBeFalsy();
expect(await bullMqService.getRunningStatus()).toEqual({
queueIsPaused: undefined,
queueName: undefined,
workerIsRunning: undefined,
workerName: undefined,
});
});

it('should create a queue properly with the default configuration', async () => {
const queueName = 'test-queue';
const queueOptions: QueueBaseOptions = {};
await bullMqService.createQueue(queueName, queueOptions);

expect(bullMqService.queue.name).toEqual(queueName);
expect(bullMqService.queue.opts.connection).toEqual({
connectTimeout: 50000,
db: 1,
family: 4,
host: 'localhost',
keepAlive: 30000,
keyPrefix: '',
password: undefined,
port: 6379,
tls: undefined,
});

expect(await bullMqService.getRunningStatus()).toEqual({
queueIsPaused: false,
queueName,
workerIsRunning: undefined,
workerName: undefined,
});
});

it('should create a queue properly with a chosen configuration', async () => {
const queueName = 'test-queue';
const queueOptions: QueueBaseOptions = {
connection: {
connectTimeout: 10000,
db: 10,
family: 6,
keepAlive: 1000,
keyPrefix: 'test',
},
};
await bullMqService.createQueue(queueName, queueOptions);

expect(bullMqService.queue.name).toEqual(queueName);
expect(bullMqService.queue.opts.connection).toEqual({
connectTimeout: 10000,
db: 10,
family: 6,
host: 'localhost',
keepAlive: 1000,
keyPrefix: 'test',
password: undefined,
port: 6379,
tls: undefined,
});

expect(await bullMqService.getRunningStatus()).toEqual({
queueIsPaused: false,
queueName,
workerIsRunning: undefined,
workerName: undefined,
});
});

it('should create a worker properly with the default configuration', async () => {
const workerName = 'test-worker';
await bullMqService.createWorker(workerName, undefined, {});

expect(bullMqService.worker.name).toEqual(workerName);
expect(bullMqService.worker.opts.connection).toEqual({
connectTimeout: 50000,
db: 1,
family: 4,
host: 'localhost',
keepAlive: 30000,
keyPrefix: '',
password: undefined,
port: 6379,
tls: undefined,
});

expect(await bullMqService.getRunningStatus()).toEqual({
queueIsPaused: undefined,
queueName: undefined,
workerIsRunning: false,
workerName,
});
});

it('should create a worker properly with a chosen configuration', async () => {
const workerName = 'test-worker';
const workerOptions: WorkerOptions = {
connection: {
connectTimeout: 10000,
db: 10,
family: 6,
keepAlive: 1000,
keyPrefix: 'test',
},
lockDuration: 90000,
concurrency: 200,
};
await bullMqService.createWorker(workerName, undefined, workerOptions);

expect(bullMqService.worker.name).toEqual(workerName);
/*
* TODO: This test if executed shows we have a bug. As we are not running this
* in the CI is not going to block. I am fixing it in MemoryDB feature.
*/
expect(bullMqService.worker.opts.connection).toEqual({
connectTimeout: 10000,
db: 10,
family: 6,
host: 'localhost',
keepAlive: 1000,
keyPrefix: 'test',
password: undefined,
port: 6379,
tls: undefined,
});
expect(bullMqService.worker.opts.concurrency).toEqual(200);
expect(bullMqService.worker.opts.lockDuration).toEqual(90000);

expect(await bullMqService.getRunningStatus()).toEqual({
queueIsPaused: undefined,
queueName: undefined,
workerIsRunning: false,
workerName,
});
});
});
});
});
6 changes: 5 additions & 1 deletion packages/application-generic/src/services/bull-mq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ interface IEventJobData {

type BullMqJobData = undefined | IJobData | IEventJobData;

export { QueueBaseOptions, RedisConnectionOptions as BullMqConnectionOptions };
export {
QueueBaseOptions,
RedisConnectionOptions as BullMqConnectionOptions,
WorkerOptions,
};

export const bullMqBaseOptions = {
connection: {
Expand Down
Loading
Loading