diff --git a/packages/core/src/config/default-config.ts b/packages/core/src/config/default-config.ts index 09d028c87b..5675352bf2 100644 --- a/packages/core/src/config/default-config.ts +++ b/packages/core/src/config/default-config.ts @@ -3,7 +3,7 @@ import { LanguageCode } from '@vendure/common/lib/generated-types'; import { DEFAULT_AUTH_TOKEN_HEADER_KEY } from '@vendure/common/lib/shared-constants'; import { generatePublicId } from '../common/generate-public-id'; -import { SqlJobQueueStrategy } from '../job-queue/sql-job-queue-strategy'; +import { InMemoryJobQueueStrategy } from '../job-queue/in-memory-job-queue-strategy'; import { DefaultAssetNamingStrategy } from './asset-naming-strategy/default-asset-naming-strategy'; import { NoAssetPreviewStrategy } from './asset-preview-strategy/no-asset-preview-strategy'; @@ -94,7 +94,7 @@ export const defaultConfig: RuntimeVendureConfig = { }, }, jobQueueOptions: { - jobQueueStrategy: new SqlJobQueueStrategy(), + jobQueueStrategy: new InMemoryJobQueueStrategy(), pollInterval: 200, }, customFields: { diff --git a/packages/core/src/config/vendure-config.ts b/packages/core/src/config/vendure-config.ts index 8909bfd5e2..64a8288cdb 100644 --- a/packages/core/src/config/vendure-config.ts +++ b/packages/core/src/config/vendure-config.ts @@ -389,6 +389,8 @@ export interface JobQueueOptions { /** * @description * Defines how the jobs in the queue are persisted and accessed. + * + * @default InMemoryJobQueueStrategy */ jobQueueStrategy?: JobQueueStrategy; /** diff --git a/packages/core/src/entity/entities.ts b/packages/core/src/entity/entities.ts index 000d4041fb..37b55f9560 100644 --- a/packages/core/src/entity/entities.ts +++ b/packages/core/src/entity/entities.ts @@ -17,7 +17,6 @@ import { Fulfillment } from './fulfillment/fulfillment.entity'; import { GlobalSettings } from './global-settings/global-settings.entity'; import { HistoryEntry } from './history-entry/history-entry.entity'; import { OrderHistoryEntry } from './history-entry/order-history-entry.entity'; -import { JobRecord } from './job-record/job-record.entity'; import { OrderItem } from './order-item/order-item.entity'; import { OrderLine } from './order-line/order-line.entity'; import { Order } from './order/order.entity'; @@ -75,7 +74,6 @@ export const coreEntitiesMap = { Fulfillment, GlobalSettings, HistoryEntry, - JobRecord, Order, OrderHistoryEntry, OrderItem, diff --git a/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts b/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts new file mode 100644 index 0000000000..f4dea4c405 --- /dev/null +++ b/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts @@ -0,0 +1,124 @@ +/* tslint:disable:no-non-null-assertion */ +import { JobListOptions, SortOrder } from '@vendure/common/lib/generated-types'; + +import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy'; +import { Job } from './job'; + +describe('InMemoryJobQueueStrategy', () => { + let strategy: InMemoryJobQueueStrategy; + beforeEach(() => { + strategy = new InMemoryJobQueueStrategy(); + }); + + describe('findMany options', () => { + beforeEach(() => { + strategy.add( + new Job({ + id: 'video-1', + queueName: 'video', + data: {}, + createdAt: new Date('2020-04-03T10:00:00.000Z'), + }), + ); + strategy.add( + new Job({ + id: 'video-2', + queueName: 'video', + data: {}, + createdAt: new Date('2020-04-03T10:01:00.000Z'), + }), + ); + strategy.add( + new Job({ + id: 'email-1', + queueName: 'email', + data: {}, + createdAt: new Date('2020-04-03T10:02:00.000Z'), + }), + ); + strategy.add( + new Job({ + id: 'video-3', + queueName: 'video', + data: {}, + createdAt: new Date('2020-04-03T10:03:00.000Z'), + }), + ); + strategy.add( + new Job({ + id: 'email-2', + queueName: 'email', + data: {}, + createdAt: new Date('2020-04-03T10:04:00.000Z'), + }), + ); + }); + + async function getIdResultsFor(options: JobListOptions): Promise { + const result = await strategy.findMany(options); + return result.items.map((j) => j.id as string); + } + + it('take & skip', async () => { + expect(await getIdResultsFor({ take: 1 })).toEqual(['video-1']); + expect(await getIdResultsFor({ take: 1, skip: 1 })).toEqual(['video-2']); + expect(await getIdResultsFor({ take: 10, skip: 2 })).toEqual(['email-1', 'video-3', 'email-2']); + }); + + it('sort createdAt', async () => { + expect(await getIdResultsFor({ sort: { createdAt: SortOrder.DESC } })).toEqual([ + 'email-2', + 'video-3', + 'email-1', + 'video-2', + 'video-1', + ]); + expect(await getIdResultsFor({ sort: { createdAt: SortOrder.ASC } })).toEqual([ + 'video-1', + 'video-2', + 'email-1', + 'video-3', + 'email-2', + ]); + }); + + it('sort id', async () => { + expect(await getIdResultsFor({ sort: { id: SortOrder.DESC } })).toEqual([ + 'video-3', + 'video-2', + 'video-1', + 'email-2', + 'email-1', + ]); + expect(await getIdResultsFor({ sort: { id: SortOrder.ASC } })).toEqual([ + 'email-1', + 'email-2', + 'video-1', + 'video-2', + 'video-3', + ]); + }); + + it('filter queueName', async () => { + expect(await getIdResultsFor({ filter: { queueName: { eq: 'video' } } })).toEqual([ + 'video-1', + 'video-2', + 'video-3', + ]); + + expect(await getIdResultsFor({ filter: { queueName: { contains: 'vid' } } })).toEqual([ + 'video-1', + 'video-2', + 'video-3', + ]); + }); + + it('filter isSettled', async () => { + const video1 = await strategy.findOne('video-1'); + video1?.complete(); + await strategy.update(video1!); + + expect(await getIdResultsFor({ filter: { isSettled: { eq: true } } })).toEqual(['video-1']); + }); + }); +}); diff --git a/packages/core/src/job-queue/in-memory-job-queue-strategy.ts b/packages/core/src/job-queue/in-memory-job-queue-strategy.ts new file mode 100644 index 0000000000..d04e454d17 --- /dev/null +++ b/packages/core/src/job-queue/in-memory-job-queue-strategy.ts @@ -0,0 +1,177 @@ +import { + DateOperators, + JobFilterParameter, + JobListOptions, + JobSortParameter, + JobState, + NumberOperators, + StringOperators, +} from '@vendure/common/lib/generated-types'; +import { ID, PaginatedList } from '@vendure/common/lib/shared-types'; +import { notNullOrUndefined } from '@vendure/common/lib/shared-utils'; + +import { generatePublicId } from '../common/generate-public-id'; +import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy'; +import { Logger } from '../config/logger/vendure-logger'; + +import { Job } from './job'; + +/** + * @description + * An in-memory {@link JobQueueStrategy}. This is the default strategy if not using a dedicated + * JobQueue plugin (e.g. {@link DefaultJobQueuePlugin}). Not recommended for production, since + * the queue will be cleared when the server stops. + * Completed jobs will be evicted from the store every 2 hours to prevent a memory leak. + * + * @docsCategory JobQueue + */ +export class InMemoryJobQueueStrategy implements JobQueueStrategy { + protected jobs = new Map(); + protected unsettledJobs: { [queueName: string]: Job[] } = {}; + private timer: any; + private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours + + init() { + this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs); + } + + destroy() { + clearTimeout(this.timer); + } + + async add(job: Job): Promise { + if (!job.id) { + (job as any).id = generatePublicId(); + } + // tslint:disable-next-line:no-non-null-assertion + this.jobs.set(job.id!, job); + if (!this.unsettledJobs[job.queueName]) { + this.unsettledJobs[job.queueName] = []; + } + this.unsettledJobs[job.queueName].push(job); + return job; + } + + async findOne(id: ID): Promise { + return this.jobs.get(id); + } + + async findMany(options?: JobListOptions): Promise> { + let items = [...this.jobs.values()]; + if (options) { + if (options.sort) { + items = this.applySort(items, options.sort); + } + if (options.filter) { + items = this.applyFilters(items, options.filter); + } + if (options.skip || options.take) { + items = this.applyPagination(items, options.skip, options.take); + } + } + return { + items, + totalItems: items.length, + }; + } + + async findManyById(ids: ID[]): Promise { + return ids.map((id) => this.jobs.get(id)).filter(notNullOrUndefined); + } + + async next(queueName: string): Promise { + const next = this.unsettledJobs[queueName]?.shift(); + if (next) { + next.start(); + return next; + } + } + + async update(job: Job): Promise { + if (job.state === JobState.RETRYING || job.state === JobState.PENDING) { + this.unsettledJobs[job.queueName].unshift(job); + } + // tslint:disable-next-line:no-non-null-assertion + this.jobs.set(job.id!, job); + } + + private applySort(items: Job[], sort: JobSortParameter): Job[] { + for (const [prop, direction] of Object.entries(sort)) { + const key = prop as keyof Required; + const dir = direction === 'ASC' ? -1 : 1; + items = items.sort((a, b) => ((a[key] || 0) < (b[key] || 0) ? 1 * dir : -1 * dir)); + } + return items; + } + + private applyFilters(items: Job[], filters: JobFilterParameter): Job[] { + for (const [prop, operator] of Object.entries(filters)) { + const key = prop as keyof Required; + if (operator?.eq !== undefined) { + items = items.filter((i) => i[key] === operator.eq); + } + + const contains = (operator as StringOperators)?.contains; + if (contains) { + items = items.filter((i) => (i[key] as string).includes(contains)); + } + const gt = (operator as NumberOperators)?.gt; + if (gt) { + items = items.filter((i) => (i[key] as number) > gt); + } + const gte = (operator as NumberOperators)?.gte; + if (gte) { + items = items.filter((i) => (i[key] as number) >= gte); + } + const lt = (operator as NumberOperators)?.lt; + if (lt) { + items = items.filter((i) => (i[key] as number) < lt); + } + const lte = (operator as NumberOperators)?.lte; + if (lte) { + items = items.filter((i) => (i[key] as number) <= lte); + } + const before = (operator as DateOperators)?.before; + if (before) { + items = items.filter((i) => (i[key] as Date) <= before); + } + const after = (operator as DateOperators)?.after; + if (after) { + items = items.filter((i) => (i[key] as Date) >= after); + } + const between = (operator as NumberOperators)?.between; + if (between) { + items = items.filter((i) => { + const num = i[key] as number; + return num > between.start && num < between.end; + }); + } + } + return items; + } + + private applyPagination(items: Job[], skip?: number | null, take?: number | null): Job[] { + const start = skip || 0; + const end = take != null ? start + take : undefined; + return items.slice(start, end); + } + + /** + * Delete old jobs from the `jobs` Map if they are settled and older than the value + * defined in `this.pruneJobsAfterMs`. This prevents a memory leak as the job queue + * grows indefinitely. + */ + private evictSettledJobs = () => { + for (const job of this.jobs.values()) { + if (job.isSettled) { + const settledAtMs = job.settledAt ? +job.settledAt : 0; + const nowMs = +new Date(); + if (nowMs - settledAtMs > this.evictJobsAfterMs) { + // tslint:disable-next-line:no-non-null-assertion + this.jobs.delete(job.id!); + } + } + } + this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs); + }; +} diff --git a/packages/core/src/job-queue/job-queue.service.spec.ts b/packages/core/src/job-queue/job-queue.service.spec.ts index 4b9eef4d40..76105f2a37 100644 --- a/packages/core/src/job-queue/job-queue.service.spec.ts +++ b/packages/core/src/job-queue/job-queue.service.spec.ts @@ -6,6 +6,7 @@ import { ConfigService } from '../config/config.service'; import { ProcessContext, ServerProcessContext } from '../process-context/process-context'; import { Job } from './job'; +import { JobQueue } from './job-queue'; import { JobQueueService } from './job-queue.service'; import { TestingJobQueueStrategy } from './testing-job-queue-strategy'; diff --git a/packages/core/src/job-queue/testing-job-queue-strategy.ts b/packages/core/src/job-queue/testing-job-queue-strategy.ts index aee62eaadc..85f201c406 100644 --- a/packages/core/src/job-queue/testing-job-queue-strategy.ts +++ b/packages/core/src/job-queue/testing-job-queue-strategy.ts @@ -1,64 +1,14 @@ -import { JobListOptions, JobState } from '@vendure/common/lib/generated-types'; -import { ID, PaginatedList } from '@vendure/common/lib/shared-types'; - -import { generatePublicId } from '../common/generate-public-id'; -import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy'; - +import { InMemoryJobQueueStrategy } from './in-memory-job-queue-strategy'; import { Job } from './job'; /** * @description - * An in-memory {@link JobQueueStrategy} design for testing purposes. Not to be used in production - * since all jobs are lost when the server stops. + * An in-memory {@link JobQueueStrategy} design for testing purposes. */ -export class TestingJobQueueStrategy implements JobQueueStrategy { - private jobs: Job[] = []; - +export class TestingJobQueueStrategy extends InMemoryJobQueueStrategy { prePopulate(jobs: Job[]) { - this.jobs.push(...jobs); - } - - async add(job: Job): Promise { - (job as any).id = generatePublicId(); - this.jobs.push(job); - return job; - } - - async findOne(id: ID): Promise { - return this.jobs.find((j) => j.id === id); - } - - async findMany(options?: JobListOptions): Promise> { - // The sort, filter, paginate logic is not implemented because - // it is not needed for testing purposes. - const items = this.jobs; - return { - items, - totalItems: items.length, - }; - } - - async findManyById(ids: ID[]): Promise { - return this.jobs.filter((job) => job.id && ids.includes(job.id)); - } - - async next(queueName: string): Promise { - const next = this.jobs.find((job) => { - return ( - (job.state === JobState.PENDING || job.state === JobState.RETRYING) && - job.queueName === queueName - ); - }); - if (next) { - next.start(); - return next; - } - } - - async update(job: Job): Promise { - const index = this.jobs.findIndex((j) => j.id === job.id); - if (-1 < index) { - this.jobs.splice(index, 1, job); + for (const job of jobs) { + this.add(job); } } } diff --git a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts new file mode 100644 index 0000000000..1c69105731 --- /dev/null +++ b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts @@ -0,0 +1,21 @@ +import { PluginCommonModule } from '../plugin-common.module'; +import { VendurePlugin } from '../vendure-plugin'; + +import { JobRecord } from './job-record.entity'; +import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; + +/** + * @description + * A plugin which configures Vendure to use the SQL database to persist the JobQueue jobs. + * + * @docsCategory JobQueue + */ +@VendurePlugin({ + imports: [PluginCommonModule], + entities: [JobRecord], + configuration: (config) => { + config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy(); + return config; + }, +}) +export class DefaultJobQueuePlugin {} diff --git a/packages/core/src/entity/job-record/job-record.entity.ts b/packages/core/src/plugin/default-job-queue-plugin/job-record.entity.ts similarity index 93% rename from packages/core/src/entity/job-record/job-record.entity.ts rename to packages/core/src/plugin/default-job-queue-plugin/job-record.entity.ts index 41fa4ee142..f1e693d7fc 100644 --- a/packages/core/src/entity/job-record/job-record.entity.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/job-record.entity.ts @@ -2,7 +2,7 @@ import { JobState } from '@vendure/common/lib/generated-types'; import { DeepPartial } from '@vendure/common/lib/shared-types'; import { Column, Entity } from 'typeorm'; -import { VendureEntity } from '../base/base.entity'; +import { VendureEntity } from '../../entity/base/base.entity'; @Entity() export class JobRecord extends VendureEntity { diff --git a/packages/core/src/job-queue/sql-job-queue-strategy.ts b/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts similarity index 92% rename from packages/core/src/job-queue/sql-job-queue-strategy.ts rename to packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts index 614ff012ac..b53b9c87a6 100644 --- a/packages/core/src/job-queue/sql-job-queue-strategy.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts @@ -4,12 +4,12 @@ import { JobListOptions, JobState } from '@vendure/common/lib/generated-types'; import { ID, PaginatedList } from '@vendure/common/lib/shared-types'; import { Brackets, Connection } from 'typeorm'; -import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy'; -import { JobRecord } from '../entity/job-record/job-record.entity'; -import { ProcessContext } from '../process-context/process-context'; -import { ListQueryBuilder } from '../service/helpers/list-query-builder/list-query-builder'; +import { JobQueueStrategy } from '../../config/job-queue/job-queue-strategy'; +import { Job } from '../../job-queue/job'; +import { ProcessContext } from '../../process-context/process-context'; +import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder'; -import { Job } from './job'; +import { JobRecord } from './job-record.entity'; export class SqlJobQueueStrategy implements JobQueueStrategy { private connection: Connection | undefined; diff --git a/packages/core/src/plugin/index.ts b/packages/core/src/plugin/index.ts index 3fbfc5f7d7..10666bf236 100644 --- a/packages/core/src/plugin/index.ts +++ b/packages/core/src/plugin/index.ts @@ -1,4 +1,5 @@ export * from './default-search-plugin/default-search-plugin'; +export * from './default-job-queue-plugin/default-job-queue-plugin'; export * from './vendure-plugin'; export * from './plugin-common.module'; export { createProxyHandler, ProxyOptions } from './plugin-utils'; diff --git a/packages/dev-server/dev-config.ts b/packages/dev-server/dev-config.ts index bc1aff654d..e3a0e92e5b 100644 --- a/packages/dev-server/dev-config.ts +++ b/packages/dev-server/dev-config.ts @@ -3,6 +3,7 @@ import { AdminUiPlugin } from '@vendure/admin-ui-plugin'; import { AssetServerPlugin } from '@vendure/asset-server-plugin'; import { ADMIN_API_PATH, API_PORT, SHOP_API_PATH } from '@vendure/common/lib/shared-constants'; import { + DefaultJobQueuePlugin, DefaultLogger, DefaultSearchPlugin, examplePaymentHandler, @@ -51,6 +52,7 @@ export const devConfig: VendureConfig = { port: 5002, }), DefaultSearchPlugin, + DefaultJobQueuePlugin, // ElasticsearchPlugin.init({ // host: 'http://192.168.99.100', // port: 9200,