diff --git a/packages/core/src/plugin/default-search-plugin/constants.ts b/packages/core/src/plugin/default-search-plugin/constants.ts index ef29567710..7065664526 100644 --- a/packages/core/src/plugin/default-search-plugin/constants.ts +++ b/packages/core/src/plugin/default-search-plugin/constants.ts @@ -1 +1,2 @@ export const PLUGIN_INIT_OPTIONS = Symbol('PLUGIN_INIT_OPTIONS'); +export const BUFFER_SEARCH_INDEX_UPDATES = Symbol('BUFFER_SEARCH_INDEX_UPDATES'); diff --git a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts index d4bc2f68fb..58cea0c674 100644 --- a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts +++ b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts @@ -17,7 +17,7 @@ import { JobQueueService } from '../../job-queue/job-queue.service'; import { PluginCommonModule } from '../plugin-common.module'; import { VendurePlugin } from '../vendure-plugin'; -import { PLUGIN_INIT_OPTIONS } from './constants'; +import { BUFFER_SEARCH_INDEX_UPDATES, PLUGIN_INIT_OPTIONS } from './constants'; import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver'; import { FulltextSearchService } from './fulltext-search.service'; import { IndexerController } from './indexer/indexer.controller'; @@ -70,6 +70,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse { IndexerController, SearchJobBufferService, { provide: PLUGIN_INIT_OPTIONS, useFactory: () => DefaultSearchPlugin.options }, + { + provide: BUFFER_SEARCH_INDEX_UPDATES, + useFactory: () => DefaultSearchPlugin.options.bufferUpdates === true, + }, ], adminApiExtensions: { resolvers: [AdminFulltextSearchResolver] }, shopApiExtensions: { resolvers: [ShopFulltextSearchResolver] }, @@ -145,6 +149,7 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap { } }); + // TODO: Remove this buffering logic because because we have dedicated buffering based on #1137 const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent); const closingNotifier$ = collectionModification$.pipe(debounceTime(50)); collectionModification$ @@ -166,6 +171,7 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap { // The delay prevents a "TransactionNotStartedError" (in SQLite/sqljs) by allowing any existing // transactions to complete before a new job is added to the queue (assuming the SQL-based // JobQueueStrategy). + // TODO: should be able to remove owing to f0fd6625 .pipe(delay(1)) .subscribe(event => { const defaultTaxZone = event.ctx.channel.defaultTaxZone; diff --git a/packages/core/src/plugin/default-search-plugin/index.ts b/packages/core/src/plugin/default-search-plugin/index.ts new file mode 100644 index 0000000000..525e4297d1 --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/index.ts @@ -0,0 +1,5 @@ +export * from './constants'; +export * from './default-search-plugin'; +export * from './search-job-buffer/collection-job-buffer'; +export * from './search-job-buffer/search-index-job-buffer'; +export * from './search-job-buffer/search-job-buffer.service'; diff --git a/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-index-job-buffer.ts b/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-index-job-buffer.ts index fd8c48f55a..2d916aa917 100644 --- a/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-index-job-buffer.ts +++ b/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-index-job-buffer.ts @@ -2,30 +2,42 @@ import { ID } from '@vendure/common/lib/shared-types'; import { unique } from '@vendure/common/lib/unique'; import { Job, JobBuffer } from '../../../job-queue/index'; -import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types'; +import { + UpdateIndexQueueJobData, + UpdateProductJobData, + UpdateVariantsByIdJobData, + UpdateVariantsJobData, +} from '../types'; export class SearchIndexJobBuffer implements JobBuffer { readonly id = 'search-plugin-update-search-index'; collect(job: Job): boolean | Promise { - return job.queueName === 'update-search-index'; + return ( + job.queueName === 'update-search-index' && + ['update-product', 'update-variants', 'update-variants-by-id'].includes(job.data.type) + ); } reduce(collectedJobs: Array>): Array> { - const variantsByIdJobs = this.removeBy>( + const variantsJobs = this.removeBy>( collectedJobs, item => item.data.type === 'update-variants-by-id' || item.data.type === 'update-variants', ); + const productsJobs = this.removeBy>( + collectedJobs, + item => item.data.type === 'update-product', + ); const jobsToAdd = [...collectedJobs]; - if (variantsByIdJobs.length) { - const variantIdsToUpdate = variantsByIdJobs.reduce((result, job) => { + if (variantsJobs.length) { + const variantIdsToUpdate = variantsJobs.reduce((result, job) => { const ids = job.data.type === 'update-variants-by-id' ? job.data.ids : job.data.variantIds; return [...result, ...ids]; }, [] as ID[]); - const referenceJob = variantsByIdJobs[0]; + const referenceJob = variantsJobs[0]; const batchedVariantJob = new Job({ ...referenceJob, id: undefined, @@ -36,7 +48,19 @@ export class SearchIndexJobBuffer implements JobBuffer }, }); - jobsToAdd.push(batchedVariantJob as any); + jobsToAdd.push(batchedVariantJob as Job); + } + if (productsJobs.length) { + const seenIds = new Set(); + const uniqueProductJobs: Array> = []; + for (const job of productsJobs) { + if (seenIds.has(job.data.productId)) { + continue; + } + uniqueProductJobs.push(job); + seenIds.add(job.data.productId); + } + jobsToAdd.push(...(uniqueProductJobs as Job[])); } return jobsToAdd; diff --git a/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-job-buffer.service.ts b/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-job-buffer.service.ts index cd9c309825..c4f10a0823 100644 --- a/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-job-buffer.service.ts +++ b/packages/core/src/plugin/default-search-plugin/search-job-buffer/search-job-buffer.service.ts @@ -5,8 +5,7 @@ import { ConfigService } from '../../../config/config.service'; import { isInspectableJobQueueStrategy } from '../../../config/job-queue/inspectable-job-queue-strategy'; import { JobQueueService } from '../../../job-queue/job-queue.service'; import { SubscribableJob } from '../../../job-queue/subscribable-job'; -import { PLUGIN_INIT_OPTIONS } from '../constants'; -import { DefaultSearchPluginInitOptions } from '../types'; +import { BUFFER_SEARCH_INDEX_UPDATES } from '../constants'; import { CollectionJobBuffer } from './collection-job-buffer'; import { SearchIndexJobBuffer } from './search-index-job-buffer'; @@ -19,18 +18,18 @@ export class SearchJobBufferService implements OnApplicationBootstrap { constructor( private jobQueueService: JobQueueService, private configService: ConfigService, - @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions, + @Inject(BUFFER_SEARCH_INDEX_UPDATES) private bufferUpdates: boolean, ) {} onApplicationBootstrap(): any { - if (this.options.bufferUpdates === true) { + if (this.bufferUpdates === true) { this.jobQueueService.addBuffer(this.searchIndexJobBuffer); this.jobQueueService.addBuffer(this.collectionJobBuffer); } } async getPendingSearchUpdates(): Promise { - if (!this.options.bufferUpdates) { + if (!this.bufferUpdates) { return 0; } const bufferSizes = await this.jobQueueService.bufferSize( @@ -43,7 +42,7 @@ export class SearchJobBufferService implements OnApplicationBootstrap { } async runPendingSearchUpdates(): Promise { - if (!this.options.bufferUpdates) { + if (!this.bufferUpdates) { return; } const { jobQueueStrategy } = this.configService.jobQueueOptions; diff --git a/packages/core/src/plugin/default-search-plugin/types.ts b/packages/core/src/plugin/default-search-plugin/types.ts index 1b9f2921d5..cca7ab4126 100644 --- a/packages/core/src/plugin/default-search-plugin/types.ts +++ b/packages/core/src/plugin/default-search-plugin/types.ts @@ -4,6 +4,7 @@ import { SerializedRequestContext } from '../../api/common/request-context'; import { Asset } from '../../entity/asset/asset.entity'; export interface DefaultSearchPluginInitOptions { + // TODO: docs bufferUpdates?: boolean; } diff --git a/packages/core/src/plugin/index.ts b/packages/core/src/plugin/index.ts index 507ccf9321..9cce0d8684 100644 --- a/packages/core/src/plugin/index.ts +++ b/packages/core/src/plugin/index.ts @@ -1,4 +1,4 @@ -export * from './default-search-plugin/default-search-plugin'; +export * from './default-search-plugin/index'; export * from './default-job-queue-plugin/default-job-queue-plugin'; export * from './default-job-queue-plugin/job-record-buffer.entity'; export * from './default-job-queue-plugin/sql-job-buffer-storage-strategy';