Skip to content

Commit

Permalink
feat(core): Extract SQL-based JobQueueStrategy in a bundled plugin
Browse files Browse the repository at this point in the history
By default, Vendure will use an in-memory JobQueueStrategy. For production the DefaultJobQueuePlugin should be used which will persist the job queue to the SQL database.
  • Loading branch information
michaelbromley committed Apr 3, 2020
1 parent 42b1d28 commit a2069f6
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 65 deletions.
4 changes: 2 additions & 2 deletions packages/core/src/config/default-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { LanguageCode } from '@vendure/common/lib/generated-types';
import { DEFAULT_AUTH_TOKEN_HEADER_KEY } from '@vendure/common/lib/shared-constants';

import { generatePublicId } from '../common/generate-public-id';
import { SqlJobQueueStrategy } from '../job-queue/sql-job-queue-strategy';
import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy';

import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy';
import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy';
Expand Down Expand Up @@ -94,7 +94,7 @@ export const defaultConfig: RuntimeVendureConfig = {
},
},
jobQueueOptions: {
jobQueueStrategy: new SqlJobQueueStrategy(),
jobQueueStrategy: new InMemoryJobQueueStrategy(),
pollInterval: 200,
},
customFields: {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/config/vendure-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ export interface JobQueueOptions {
/**
* @description
* Defines how the jobs in the queue are persisted and accessed.
*
* @default InMemoryJobQueueStrategy
*/
jobQueueStrategy?: JobQueueStrategy;
/**
Expand Down
2 changes: 0 additions & 2 deletions packages/core/src/entity/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { Fulfillment } from './fulfillment/fulfillment.entity';
import { GlobalSettings } from './global-settings/global-settings.entity';
import { HistoryEntry } from './history-entry/history-entry.entity';
import { OrderHistoryEntry } from './history-entry/order-history-entry.entity';
import { JobRecord } from './job-record/job-record.entity';
import { OrderItem } from './order-item/order-item.entity';
import { OrderLine } from './order-line/order-line.entity';
import { Order } from './order/order.entity';
Expand Down Expand Up @@ -75,7 +74,6 @@ export const coreEntitiesMap = {
Fulfillment,
GlobalSettings,
HistoryEntry,
JobRecord,
Order,
OrderHistoryEntry,
OrderItem,
Expand Down
124 changes: 124 additions & 0 deletions packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/* tslint:disable:no-non-null-assertion */
import { JobListOptions, SortOrder } from '@vendure/common/lib/generated-types';

import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy';
import { Job } from './job';

describe('InMemoryJobQueueStrategy', () => {
let strategy: InMemoryJobQueueStrategy;
beforeEach(() => {
strategy = new InMemoryJobQueueStrategy();
});

describe('findMany options', () => {
beforeEach(() => {
strategy.add(
new Job({
id: 'video-1',
queueName: 'video',
data: {},
createdAt: new Date('2020-04-03T10:00:00.000Z'),
}),
);
strategy.add(
new Job({
id: 'video-2',
queueName: 'video',
data: {},
createdAt: new Date('2020-04-03T10:01:00.000Z'),
}),
);
strategy.add(
new Job({
id: 'email-1',
queueName: 'email',
data: {},
createdAt: new Date('2020-04-03T10:02:00.000Z'),
}),
);
strategy.add(
new Job({
id: 'video-3',
queueName: 'video',
data: {},
createdAt: new Date('2020-04-03T10:03:00.000Z'),
}),
);
strategy.add(
new Job({
id: 'email-2',
queueName: 'email',
data: {},
createdAt: new Date('2020-04-03T10:04:00.000Z'),
}),
);
});

async function getIdResultsFor(options: JobListOptions): Promise<string[]> {
const result = await strategy.findMany(options);
return result.items.map((j) => j.id as string);
}

it('take & skip', async () => {
expect(await getIdResultsFor({ take: 1 })).toEqual(['video-1']);
expect(await getIdResultsFor({ take: 1, skip: 1 })).toEqual(['video-2']);
expect(await getIdResultsFor({ take: 10, skip: 2 })).toEqual(['email-1', 'video-3', 'email-2']);
});

it('sort createdAt', async () => {
expect(await getIdResultsFor({ sort: { createdAt: SortOrder.DESC } })).toEqual([
'email-2',
'video-3',
'email-1',
'video-2',
'video-1',
]);
expect(await getIdResultsFor({ sort: { createdAt: SortOrder.ASC } })).toEqual([
'video-1',
'video-2',
'email-1',
'video-3',
'email-2',
]);
});

it('sort id', async () => {
expect(await getIdResultsFor({ sort: { id: SortOrder.DESC } })).toEqual([
'video-3',
'video-2',
'video-1',
'email-2',
'email-1',
]);
expect(await getIdResultsFor({ sort: { id: SortOrder.ASC } })).toEqual([
'email-1',
'email-2',
'video-1',
'video-2',
'video-3',
]);
});

it('filter queueName', async () => {
expect(await getIdResultsFor({ filter: { queueName: { eq: 'video' } } })).toEqual([
'video-1',
'video-2',
'video-3',
]);

expect(await getIdResultsFor({ filter: { queueName: { contains: 'vid' } } })).toEqual([
'video-1',
'video-2',
'video-3',
]);
});

it('filter isSettled', async () => {
const video1 = await strategy.findOne('video-1');
video1?.complete();
await strategy.update(video1!);

expect(await getIdResultsFor({ filter: { isSettled: { eq: true } } })).toEqual(['video-1']);
});
});
});
177 changes: 177 additions & 0 deletions packages/core/src/job-queue/in-memory-job-queue-strategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import {
DateOperators,
JobFilterParameter,
JobListOptions,
JobSortParameter,
JobState,
NumberOperators,
StringOperators,
} from '@vendure/common/lib/generated-types';
import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';

import { generatePublicId } from '../common/generate-public-id';
import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
import { Logger } from '../config/logger/vendure-logger';

import { Job } from './job';

/**
* @description
* An in-memory {@link JobQueueStrategy}. This is the default strategy if not using a dedicated
* JobQueue plugin (e.g. {@link DefaultJobQueuePlugin}). Not recommended for production, since
* the queue will be cleared when the server stops.
* Completed jobs will be evicted from the store every 2 hours to prevent a memory leak.
*
* @docsCategory JobQueue
*/
export class InMemoryJobQueueStrategy implements JobQueueStrategy {
protected jobs = new Map<ID, Job>();
protected unsettledJobs: { [queueName: string]: Job[] } = {};
private timer: any;
private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours

init() {
this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
}

destroy() {
clearTimeout(this.timer);
}

async add(job: Job): Promise<Job> {
if (!job.id) {
(job as any).id = generatePublicId();
}
// tslint:disable-next-line:no-non-null-assertion
this.jobs.set(job.id!, job);
if (!this.unsettledJobs[job.queueName]) {
this.unsettledJobs[job.queueName] = [];
}
this.unsettledJobs[job.queueName].push(job);
return job;
}

async findOne(id: ID): Promise<Job | undefined> {
return this.jobs.get(id);
}

async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
let items = [...this.jobs.values()];
if (options) {
if (options.sort) {
items = this.applySort(items, options.sort);
}
if (options.filter) {
items = this.applyFilters(items, options.filter);
}
if (options.skip || options.take) {
items = this.applyPagination(items, options.skip, options.take);
}
}
return {
items,
totalItems: items.length,
};
}

async findManyById(ids: ID[]): Promise<Job[]> {
return ids.map((id) => this.jobs.get(id)).filter(notNullOrUndefined);
}

async next(queueName: string): Promise<Job | undefined> {
const next = this.unsettledJobs[queueName]?.shift();
if (next) {
next.start();
return next;
}
}

async update(job: Job): Promise<void> {
if (job.state === JobState.RETRYING || job.state === JobState.PENDING) {
this.unsettledJobs[job.queueName].unshift(job);
}
// tslint:disable-next-line:no-non-null-assertion
this.jobs.set(job.id!, job);
}

private applySort(items: Job[], sort: JobSortParameter): Job[] {
for (const [prop, direction] of Object.entries(sort)) {
const key = prop as keyof Required<JobSortParameter>;
const dir = direction === 'ASC' ? -1 : 1;
items = items.sort((a, b) => ((a[key] || 0) < (b[key] || 0) ? 1 * dir : -1 * dir));
}
return items;
}

private applyFilters(items: Job[], filters: JobFilterParameter): Job[] {
for (const [prop, operator] of Object.entries(filters)) {
const key = prop as keyof Required<JobFilterParameter>;
if (operator?.eq !== undefined) {
items = items.filter((i) => i[key] === operator.eq);
}

const contains = (operator as StringOperators)?.contains;
if (contains) {
items = items.filter((i) => (i[key] as string).includes(contains));
}
const gt = (operator as NumberOperators)?.gt;
if (gt) {
items = items.filter((i) => (i[key] as number) > gt);
}
const gte = (operator as NumberOperators)?.gte;
if (gte) {
items = items.filter((i) => (i[key] as number) >= gte);
}
const lt = (operator as NumberOperators)?.lt;
if (lt) {
items = items.filter((i) => (i[key] as number) < lt);
}
const lte = (operator as NumberOperators)?.lte;
if (lte) {
items = items.filter((i) => (i[key] as number) <= lte);
}
const before = (operator as DateOperators)?.before;
if (before) {
items = items.filter((i) => (i[key] as Date) <= before);
}
const after = (operator as DateOperators)?.after;
if (after) {
items = items.filter((i) => (i[key] as Date) >= after);
}
const between = (operator as NumberOperators)?.between;
if (between) {
items = items.filter((i) => {
const num = i[key] as number;
return num > between.start && num < between.end;
});
}
}
return items;
}

private applyPagination(items: Job[], skip?: number | null, take?: number | null): Job[] {
const start = skip || 0;
const end = take != null ? start + take : undefined;
return items.slice(start, end);
}

/**
* Delete old jobs from the `jobs` Map if they are settled and older than the value
* defined in `this.pruneJobsAfterMs`. This prevents a memory leak as the job queue
* grows indefinitely.
*/
private evictSettledJobs = () => {
for (const job of this.jobs.values()) {
if (job.isSettled) {
const settledAtMs = job.settledAt ? +job.settledAt : 0;
const nowMs = +new Date();
if (nowMs - settledAtMs > this.evictJobsAfterMs) {
// tslint:disable-next-line:no-non-null-assertion
this.jobs.delete(job.id!);
}
}
}
this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
};
}
1 change: 1 addition & 0 deletions packages/core/src/job-queue/job-queue.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ConfigService } from '../config/config.service';
import { ProcessContext, ServerProcessContext } from '../process-context/process-context';

import { Job } from './job';
import { JobQueue } from './job-queue';
import { JobQueueService } from './job-queue.service';
import { TestingJobQueueStrategy } from './testing-job-queue-strategy';

Expand Down
Loading

0 comments on commit a2069f6

Please sign in to comment.