From 3909251338942d70dc45cad6cd2e8d72349489cb Mon Sep 17 00:00:00 2001 From: Pieter Doms Date: Thu, 4 Apr 2024 09:33:40 +0200 Subject: [PATCH] feat(core): Pass ctx to job queue strategy add (#2759) Closes #2758 --- .../typescript-api/job-queue/index.md | 2 +- .../config/job-queue/job-queue-strategy.ts | 5 +- packages/core/src/job-queue/job-queue.ts | 8 +-- packages/core/src/job-queue/types.ts | 8 +++ .../sql-job-queue-strategy.ts | 50 ++++++++++--------- .../indexer/search-index.service.ts | 30 ++++++----- .../service/services/collection.service.ts | 15 ++++-- .../indexing/elasticsearch-index.service.ts | 30 ++++++----- 8 files changed, 89 insertions(+), 59 deletions(-) diff --git a/docs/docs/reference/typescript-api/job-queue/index.md b/docs/docs/reference/typescript-api/job-queue/index.md index a16b8551b6..a0c58707a0 100644 --- a/docs/docs/reference/typescript-api/job-queue/index.md +++ b/docs/docs/reference/typescript-api/job-queue/index.md @@ -26,7 +26,7 @@ class JobQueue = object> { name: string started: boolean constructor(options: CreateQueueOptions, jobQueueStrategy: JobQueueStrategy, jobBufferService: JobBufferService) - add(data: Data, options?: Pick, 'retries'>) => Promise>; + add(data: Data, options?: JobOptions) => Promise>; } ``` diff --git a/packages/core/src/config/job-queue/job-queue-strategy.ts b/packages/core/src/config/job-queue/job-queue-strategy.ts index 88f843f8eb..700430c9d7 100644 --- a/packages/core/src/config/job-queue/job-queue-strategy.ts +++ b/packages/core/src/config/job-queue/job-queue-strategy.ts @@ -2,8 +2,9 @@ import { JobListOptions } from '@vendure/common/lib/generated-types'; import { ID, PaginatedList } from '@vendure/common/lib/shared-types'; import { InjectableStrategy } from '../../common'; -import { JobData } from '../../job-queue'; +import { JobData, JobQueueStrategyJobOptions } from '../../job-queue'; import { Job } from '../../job-queue'; +import { JobOptions } from '../../job-queue'; /** * @description @@ -25,7 +26,7 @@ export interface JobQueueStrategy extends InjectableStrategy { * @description * Add a new job to the queue. */ - add = object>(job: Job): Promise>; + add = object>(job: Job, jobOptions?: JobQueueStrategyJobOptions): Promise>; /** * @description diff --git a/packages/core/src/job-queue/job-queue.ts b/packages/core/src/job-queue/job-queue.ts index 07a72edd5d..44fefaf548 100644 --- a/packages/core/src/job-queue/job-queue.ts +++ b/packages/core/src/job-queue/job-queue.ts @@ -8,7 +8,7 @@ import { Logger } from '../config/logger/vendure-logger'; import { Job } from './job'; import { JobBufferService } from './job-buffer/job-buffer.service'; import { SubscribableJob } from './subscribable-job'; -import { CreateQueueOptions, JobConfig, JobData } from './types'; +import { CreateQueueOptions, JobConfig, JobData, JobOptions } from './types'; /** * @description @@ -90,7 +90,7 @@ export class JobQueue = object> { * .catch(err => err.message); * ``` */ - async add(data: Data, options?: Pick, 'retries'>): Promise> { + async add(data: Data, options?: JobOptions): Promise> { const job = new Job({ data, queueName: this.options.name, @@ -99,8 +99,8 @@ export class JobQueue = object> { const isBuffered = await this.jobBufferService.add(job); if (!isBuffered) { - const addedJob = await this.jobQueueStrategy.add(job); - return new SubscribableJob(addedJob, this.jobQueueStrategy); + const addedJob = await this.jobQueueStrategy.add(job); + return new SubscribableJob(addedJob, this.jobQueueStrategy); } else { const bufferedJob = new Job({ ...job, diff --git a/packages/core/src/job-queue/types.ts b/packages/core/src/job-queue/types.ts index af97e5e5ca..990907f98a 100644 --- a/packages/core/src/job-queue/types.ts +++ b/packages/core/src/job-queue/types.ts @@ -1,6 +1,8 @@ import { JobState } from '@vendure/common/lib/generated-types'; import { ID, JsonCompatible } from '@vendure/common/lib/shared-types'; +import { RequestContext } from '../api/common/request-context'; + import { Job } from './job'; /** @@ -55,3 +57,9 @@ export interface JobConfig> { startedAt?: Date; settledAt?: Date; } + +export type JobOptions> = Pick, 'retries'> & { + ctx?: RequestContext +}; + +export type JobQueueStrategyJobOptions> = Omit, "retries"> \ No newline at end of file diff --git a/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts b/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts index 8388e66a9b..650284339a 100644 --- a/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts @@ -6,7 +6,7 @@ import { Injector } from '../../common/injector'; import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config'; import { Logger } from '../../config/logger/vendure-logger'; import { TransactionalConnection } from '../../connection/transactional-connection'; -import { Job, JobData } from '../../job-queue'; +import { Job, JobData, JobQueueStrategyJobOptions } from '../../job-queue'; import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy'; import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder'; @@ -20,27 +20,31 @@ import { JobRecord } from './job-record.entity'; * @docsCategory JobQueue */ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy { - private connection: Connection | undefined; + private rawConnection: Connection | undefined; + private connection: TransactionalConnection | undefined; private listQueryBuilder: ListQueryBuilder; init(injector: Injector) { - this.connection = injector.get(TransactionalConnection).rawConnection; + this.rawConnection = injector.get(TransactionalConnection).rawConnection; + this.connection = injector.get(TransactionalConnection); this.listQueryBuilder = injector.get(ListQueryBuilder); super.init(injector); } destroy() { - this.connection = undefined; + this.rawConnection = undefined; super.destroy(); } - async add = object>(job: Job): Promise> { - if (!this.connectionAvailable(this.connection)) { + async add = object>(job: Job, jobOptions?: JobQueueStrategyJobOptions): Promise> { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } + const jobRecordRepository = jobOptions?.ctx && this.connection ? this.connection.getRepository(jobOptions.ctx, JobRecord) : + this.rawConnection.getRepository(JobRecord); const constrainedData = this.constrainDataSize(job); const newRecord = this.toRecord(job, constrainedData, this.setRetries(job.queueName, job)); - const record = await this.connection.getRepository(JobRecord).save(newRecord); + const record = await jobRecordRepository.save(newRecord); return this.fromRecord(record); } @@ -49,7 +53,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp * In order to try to prevent that, this method will truncate any strings in the `data` object over 2kb in size. */ private constrainDataSize = object>(job: Job): Data | undefined { - const type = this.connection?.options.type; + const type = this.rawConnection?.options.type; if (type === 'mysql' || type === 'mariadb') { const stringified = JSON.stringify(job.data); if (64 * 1024 <= stringified.length) { @@ -76,11 +80,11 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp } async next(queueName: string): Promise { - if (!this.connectionAvailable(this.connection)) { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } - const connection = this.connection; - const connectionType = this.connection.options.type; + const connection = this.rawConnection; + const connectionType = this.rawConnection.options.type; const isSQLite = connectionType === 'sqlite' || connectionType === 'sqljs' || connectionType === 'better-sqlite3'; @@ -157,10 +161,10 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp } async update(job: Job): Promise { - if (!this.connectionAvailable(this.connection)) { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } - await this.connection + await this.rawConnection .getRepository(JobRecord) .createQueryBuilder('job') .update() @@ -171,7 +175,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp } async findMany(options?: JobListOptions): Promise> { - if (!this.connectionAvailable(this.connection)) { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } return this.listQueryBuilder @@ -184,27 +188,27 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp } async findOne(id: ID): Promise { - if (!this.connectionAvailable(this.connection)) { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } - const record = await this.connection.getRepository(JobRecord).findOne({ where: { id } }); + const record = await this.rawConnection.getRepository(JobRecord).findOne({ where: { id } }); if (record) { return this.fromRecord(record); } } async findManyById(ids: ID[]): Promise { - if (!this.connectionAvailable(this.connection)) { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } - return this.connection + return this.rawConnection .getRepository(JobRecord) .find({ where: { id: In(ids) } }) .then(records => records.map(this.fromRecord)); } async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) { - if (!this.connectionAvailable(this.connection)) { + if (!this.connectionAvailable(this.rawConnection)) { throw new Error('Connection not available'); } const findOptions: FindOptionsWhere = { @@ -212,14 +216,14 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp isSettled: true, settledAt: LessThan(olderThan || new Date()), }; - const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions }); - const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions }); - await this.connection.getRepository(JobRecord).delete(findOptions); + const toDelete = await this.rawConnection.getRepository(JobRecord).find({ where: findOptions }); + const deleteCount = await this.rawConnection.getRepository(JobRecord).count({ where: findOptions }); + await this.rawConnection.getRepository(JobRecord).delete(findOptions); return deleteCount; } private connectionAvailable(connection: Connection | undefined): connection is Connection { - return !!this.connection && this.connection.isConnected; + return !!this.rawConnection && this.rawConnection.isConnected; } private toRecord(job: Job, data?: any, retries?: number): JobRecord { diff --git a/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts b/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts index 06f80cde20..91a4a0ba30 100644 --- a/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts +++ b/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts @@ -67,7 +67,7 @@ export class SearchIndexService implements OnApplicationBootstrap { } reindex(ctx: RequestContext) { - return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }); + return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }, { ctx }); } updateProduct(ctx: RequestContext, product: Product) { @@ -75,12 +75,13 @@ export class SearchIndexService implements OnApplicationBootstrap { type: 'update-product', ctx: ctx.serialize(), productId: product.id, - }); + }, + { ctx }); } updateVariants(ctx: RequestContext, variants: ProductVariant[]) { const variantIds = variants.map(v => v.id); - return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }); + return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }, { ctx }); } deleteProduct(ctx: RequestContext, product: Product) { @@ -88,24 +89,25 @@ export class SearchIndexService implements OnApplicationBootstrap { type: 'delete-product', ctx: ctx.serialize(), productId: product.id, - }); + }, + { ctx }); } deleteVariant(ctx: RequestContext, variants: ProductVariant[]) { const variantIds = variants.map(v => v.id); - return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }); + return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }, { ctx }); } updateVariantsById(ctx: RequestContext, ids: ID[]) { - return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }); + return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }, { ctx }); } updateAsset(ctx: RequestContext, asset: Asset) { - return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }); + return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx }); } deleteAsset(ctx: RequestContext, asset: Asset) { - return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }); + return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx }); } assignProductToChannel(ctx: RequestContext, productId: ID, channelId: ID) { @@ -114,7 +116,8 @@ export class SearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productId, channelId, - }); + }, + { ctx }); } removeProductFromChannel(ctx: RequestContext, productId: ID, channelId: ID) { @@ -123,7 +126,8 @@ export class SearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productId, channelId, - }); + }, + { ctx }); } assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) { @@ -132,7 +136,8 @@ export class SearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productVariantId, channelId, - }); + }, + { ctx }); } removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) { @@ -141,7 +146,8 @@ export class SearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productVariantId, channelId, - }); + }, + { ctx }); } private jobWithProgress( diff --git a/packages/core/src/service/services/collection.service.ts b/packages/core/src/service/services/collection.service.ts index cd1ce08ff9..636dc78fc8 100644 --- a/packages/core/src/service/services/collection.service.ts +++ b/packages/core/src/service/services/collection.service.ts @@ -104,7 +104,8 @@ export class CollectionService implements OnModuleInit { await this.applyFiltersQueue.add({ ctx: event.ctx.serialize(), collectionIds: collections.map(c => c.id), - }); + }, + { ctx: event.ctx }); }); this.applyFiltersQueue = await this.jobQueueService.createQueue({ @@ -471,7 +472,8 @@ export class CollectionService implements OnModuleInit { await this.applyFiltersQueue.add({ ctx: ctx.serialize(), collectionIds: [collection.id], - }); + }, + { ctx }); await this.eventBus.publish(new CollectionEvent(ctx, collectionWithRelations, 'created', input)); return assertFound(this.findOne(ctx, collection.id)); } @@ -497,7 +499,8 @@ export class CollectionService implements OnModuleInit { ctx: ctx.serialize(), collectionIds: [collection.id], applyToChangedVariantsOnly: false, - }); + }, + { ctx }); } else { const affectedVariantIds = await this.getCollectionProductVariantIds(collection); await this.eventBus.publish(new CollectionModificationEvent(ctx, collection, affectedVariantIds)); @@ -571,7 +574,8 @@ export class CollectionService implements OnModuleInit { await this.applyFiltersQueue.add({ ctx: ctx.serialize(), collectionIds: [target.id], - }); + }, + { ctx }); return assertFound(this.findOne(ctx, input.collectionId)); } @@ -829,7 +833,8 @@ export class CollectionService implements OnModuleInit { await this.applyFiltersQueue.add({ ctx: ctx.serialize(), collectionIds: collectionsToAssign.map(collection => collection.id), - }); + }, + { ctx }); return this.connection .findByIdsInChannel( diff --git a/packages/elasticsearch-plugin/src/indexing/elasticsearch-index.service.ts b/packages/elasticsearch-plugin/src/indexing/elasticsearch-index.service.ts index 9037bf73e5..cb22daa998 100644 --- a/packages/elasticsearch-plugin/src/indexing/elasticsearch-index.service.ts +++ b/packages/elasticsearch-plugin/src/indexing/elasticsearch-index.service.ts @@ -67,7 +67,7 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { } reindex(ctx: RequestContext) { - return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }); + return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }, { ctx }); } updateProduct(ctx: RequestContext, product: Product) { @@ -75,12 +75,13 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { type: 'update-product', ctx: ctx.serialize(), productId: product.id, - }); + }, + { ctx }); } updateVariants(ctx: RequestContext, variants: ProductVariant[]) { const variantIds = variants.map(v => v.id); - return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }); + return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }, { ctx }); } deleteProduct(ctx: RequestContext, product: Product) { @@ -88,12 +89,13 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { type: 'delete-product', ctx: ctx.serialize(), productId: product.id, - }); + }, + { ctx }); } deleteVariant(ctx: RequestContext, variants: ProductVariant[]) { const variantIds = variants.map(v => v.id); - return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }); + return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }, { ctx }); } assignProductToChannel(ctx: RequestContext, product: Product, channelId: ID) { @@ -102,7 +104,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productId: product.id, channelId, - }); + }, + { ctx }); } removeProductFromChannel(ctx: RequestContext, product: Product, channelId: ID) { @@ -111,7 +114,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productId: product.id, channelId, - }); + }, + { ctx }); } assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) { @@ -120,7 +124,8 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productVariantId, channelId, - }); + }, + { ctx }); } removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) { @@ -129,19 +134,20 @@ export class ElasticsearchIndexService implements OnApplicationBootstrap { ctx: ctx.serialize(), productVariantId, channelId, - }); + }, + { ctx }); } updateVariantsById(ctx: RequestContext, ids: ID[]) { - return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }); + return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }, { ctx }); } updateAsset(ctx: RequestContext, asset: Asset) { - return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }); + return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx }); } deleteAsset(ctx: RequestContext, asset: Asset) { - return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }); + return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx }); } private jobWithProgress(