diff --git a/packages/core/src/api/resolvers/admin/job.resolver.ts b/packages/core/src/api/resolvers/admin/job.resolver.ts index 22bddeee75..099cb64289 100644 --- a/packages/core/src/api/resolvers/admin/job.resolver.ts +++ b/packages/core/src/api/resolvers/admin/job.resolver.ts @@ -86,7 +86,7 @@ export class JobResolver { @Query() @Allow(Permission.ReadSettings, Permission.ReadSystem) async jobBufferSize(@Args() args: QueryJobBufferSizeArgs) { - const bufferSizes = this.jobBuffer.bufferSize(args.processorIds); + const bufferSizes = await this.jobBuffer.bufferSize(args.processorIds); return Object.entries(bufferSizes).map(([processorId, size]) => ({ processorId, size })); } diff --git a/packages/core/src/job-queue/index.ts b/packages/core/src/job-queue/index.ts index aecc4e94fe..1657a0c796 100644 --- a/packages/core/src/job-queue/index.ts +++ b/packages/core/src/job-queue/index.ts @@ -1,4 +1,9 @@ export * from './injectable-job-queue-strategy'; +export * from './job-buffer/in-memory-job-buffer-storage-strategy'; +export * from './job-buffer/job-buffer'; +export * from './job-buffer/job-buffer-processor'; +export * from './job-buffer/job-buffer-storage-strategy'; +export * from './job-buffer/sql-job-buffer-storage-strategy'; export * from './job'; export * from './job-queue'; export * from './job-queue.service'; diff --git a/packages/core/src/job-queue/job-buffer/job-buffer.ts b/packages/core/src/job-queue/job-buffer/job-buffer.ts index a1781f41bf..7571e45aea 100644 --- a/packages/core/src/job-queue/job-buffer/job-buffer.ts +++ b/packages/core/src/job-queue/job-buffer/job-buffer.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import { InternalServerError } from '../../common/error/errors'; import { ConfigService } from '../../config/config.service'; +import { Logger } from '../../config/logger/vendure-logger'; import { Job } from '../job'; import { JobBufferProcessor } from './job-buffer-processor'; @@ -16,7 +17,7 @@ export class JobBuffer { this.storageStrategy = configService.jobQueueOptions.jobBufferStorageStrategy; } - addProcessor(processor: JobBufferProcessor) { + addProcessor(processor: JobBufferProcessor) { const idAlreadyExists = Array.from(this.processors).find(p => p.id === processor.id); if (idAlreadyExists) { throw new InternalServerError( @@ -26,7 +27,7 @@ export class JobBuffer { this.processors.add(processor); } - removeProcessor(processor: JobBufferProcessor) { + removeProcessor(processor: JobBufferProcessor) { this.processors.delete(processor); } @@ -58,8 +59,17 @@ export class JobBuffer { for (const processor of this.processors) { const jobsForProcessor = flushResult[processor.id]; if (jobsForProcessor?.length) { - const reducedJobs = await processor.reduce(jobsForProcessor); - for (const job of reducedJobs) { + let jobsToAdd = jobsForProcessor; + try { + jobsToAdd = await processor.reduce(jobsForProcessor); + } catch (e) { + Logger.error( + `Error encountered processing jobs in "${processor.id}:\n${e.message}"`, + undefined, + e.stack, + ); + } + for (const job of jobsToAdd) { await jobQueueStrategy.add(job); } } diff --git a/packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts b/packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts index 2bbebe0c97..0868409d13 100644 --- a/packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts +++ b/packages/core/src/job-queue/job-buffer/sql-job-buffer-storage-strategy.ts @@ -15,11 +15,11 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy { return Promise.resolve(job); } - bufferSize(processorIds?: string[]): Promise { - return Promise.resolve(0); + bufferSize(processorIds?: string[]) { + return Promise.resolve({}); } - flush(processorIds?: string[]): Promise { - return Promise.resolve(undefined); + flush(processorIds?: string[]) { + return Promise.resolve({}); } } diff --git a/packages/core/src/plugin/default-search-plugin/collection-job-buffer-processor.ts b/packages/core/src/plugin/default-search-plugin/collection-job-buffer-processor.ts new file mode 100644 index 0000000000..227718637a --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/collection-job-buffer-processor.ts @@ -0,0 +1,34 @@ +import { ID } from '@vendure/common/lib/shared-types'; +import { unique } from '@vendure/common/lib/unique'; + +import { Job, JobBufferProcessor } from '../../job-queue'; +import { ApplyCollectionFiltersJobData } from '../../service/services/collection.service'; + +import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types'; + +export class CollectionJobBufferProcessor implements JobBufferProcessor { + readonly id = 'search-plugin-apply-collection-filters'; + + collect(job: Job): boolean { + return job.queueName === 'apply-collection-filters'; + } + + reduce(collectedJobs: Array>): Array> { + const collectionIdsToUpdate = collectedJobs.reduce((result, job) => { + return [...result, ...job.data.collectionIds]; + }, [] as ID[]); + + const referenceJob = collectedJobs[0]; + const batchedCollectionJob = new Job({ + ...referenceJob, + id: undefined, + data: { + collectionIds: unique(collectionIdsToUpdate), + ctx: referenceJob.data.ctx, + applyToChangedVariantsOnly: referenceJob.data.applyToChangedVariantsOnly, + }, + }); + + return [batchedCollectionJob]; + } +} diff --git a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts index 842641d7d1..b85985a06c 100644 --- a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts +++ b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts @@ -12,14 +12,17 @@ import { ProductEvent } from '../../event-bus/events/product-event'; import { ProductVariantChannelEvent } from '../../event-bus/events/product-variant-channel-event'; import { ProductVariantEvent } from '../../event-bus/events/product-variant-event'; import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event'; +import { JobBuffer } from '../../job-queue/job-buffer/job-buffer'; import { PluginCommonModule } from '../plugin-common.module'; import { VendurePlugin } from '../vendure-plugin'; +import { CollectionJobBufferProcessor } from './collection-job-buffer-processor'; import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver'; import { FulltextSearchService } from './fulltext-search.service'; import { IndexerController } from './indexer/indexer.controller'; import { SearchIndexService } from './indexer/search-index.service'; import { SearchIndexItem } from './search-index-item.entity'; +import { SearchJobBufferProcessor } from './search-job-buffer-processor'; export interface DefaultSearchReindexResponse extends SearchReindexResponse { timeTaken: number; @@ -64,10 +67,17 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse { }) export class DefaultSearchPlugin implements OnApplicationBootstrap { /** @internal */ - constructor(private eventBus: EventBus, private searchIndexService: SearchIndexService) {} + constructor( + private eventBus: EventBus, + private searchIndexService: SearchIndexService, + private jobBuffer: JobBuffer, + ) {} /** @internal */ async onApplicationBootstrap() { + this.jobBuffer.addProcessor(new SearchJobBufferProcessor()); + this.jobBuffer.addProcessor(new CollectionJobBufferProcessor()); + this.eventBus.ofType(ProductEvent).subscribe(event => { if (event.type === 'deleted') { return this.searchIndexService.deleteProduct(event.ctx, event.product); diff --git a/packages/core/src/plugin/default-search-plugin/search-job-buffer-processor.ts b/packages/core/src/plugin/default-search-plugin/search-job-buffer-processor.ts new file mode 100644 index 0000000000..e7532f4af9 --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/search-job-buffer-processor.ts @@ -0,0 +1,61 @@ +import { ID } from '@vendure/common/lib/shared-types'; +import { unique } from '@vendure/common/lib/unique'; + +import { Job, JobBufferProcessor } from '../../job-queue'; + +import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types'; + +export class SearchJobBufferProcessor implements JobBufferProcessor { + readonly id = 'search-plugin-update-search-index'; + + collect(job: Job): boolean | Promise { + return job.queueName === 'update-search-index'; + } + + reduce(collectedJobs: Array>): Array> { + const variantsByIdJobs = this.removeBy>( + collectedJobs, + item => item.data.type === 'update-variants-by-id' || item.data.type === 'update-variants', + ); + + const jobsToAdd = [...collectedJobs]; + + if (variantsByIdJobs.length) { + const variantIdsToUpdate = variantsByIdJobs.reduce((result, job) => { + const ids = job.data.type === 'update-variants-by-id' ? job.data.ids : job.data.variantIds; + return [...result, ...ids]; + }, [] as ID[]); + + const referenceJob = variantsByIdJobs[0]; + const batchedVariantJob = new Job({ + ...referenceJob, + id: undefined, + data: { + type: 'update-variants-by-id', + ids: unique(variantIdsToUpdate), + ctx: referenceJob.data.ctx, + }, + }); + + jobsToAdd.push(batchedVariantJob as any); + } + + return jobsToAdd; + } + + /** + * Removes items from the array based on the filterFn and returns a new array with only the removed + * items. The original input array is mutated. + */ + private removeBy(input: T[], filterFn: (item: T) => boolean): R[] { + const removed: R[] = []; + for (let i = input.length - 1; i >= 0; i--) { + const item = input[i]; + if (filterFn(item)) { + removed.push(item as R); + input.splice(i, 1); + } + } + return removed; + } +} diff --git a/packages/core/src/plugin/default-search-plugin/types.ts b/packages/core/src/plugin/default-search-plugin/types.ts index 9933b17483..28cc18b169 100644 --- a/packages/core/src/plugin/default-search-plugin/types.ts +++ b/packages/core/src/plugin/default-search-plugin/types.ts @@ -48,17 +48,29 @@ export type VariantChannelMessageData = { type NamedJobData = { type: Type } & MessageData; export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>; -type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>; -type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>; -type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>; -type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>; -type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>; -type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>; -type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>; -type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>; -type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>; -type AssignVariantToChannelJobData = NamedJobData<'assign-variant-to-channel', VariantChannelMessageData>; -type RemoveVariantFromChannelJobData = NamedJobData<'remove-variant-from-channel', VariantChannelMessageData>; +export type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>; +export type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>; +export type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>; +export type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>; +export type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>; +export type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>; +export type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>; +export type AssignProductToChannelJobData = NamedJobData< + 'assign-product-to-channel', + ProductChannelMessageData +>; +export type RemoveProductFromChannelJobData = NamedJobData< + 'remove-product-from-channel', + ProductChannelMessageData +>; +export type AssignVariantToChannelJobData = NamedJobData< + 'assign-variant-to-channel', + VariantChannelMessageData +>; +export type RemoveVariantFromChannelJobData = NamedJobData< + 'remove-variant-from-channel', + VariantChannelMessageData +>; export type UpdateIndexQueueJobData = | ReindexJobData | UpdateProductJobData diff --git a/packages/core/src/service/services/collection.service.ts b/packages/core/src/service/services/collection.service.ts index 7ae5925936..862d228874 100644 --- a/packages/core/src/service/services/collection.service.ts +++ b/packages/core/src/service/services/collection.service.ts @@ -44,7 +44,7 @@ import { AssetService } from './asset.service'; import { ChannelService } from './channel.service'; import { FacetValueService } from './facet-value.service'; -type ApplyCollectionFiltersJobData = { +export type ApplyCollectionFiltersJobData = { ctx: SerializedRequestContext; collectionIds: ID[]; applyToChangedVariantsOnly?: boolean;