Skip to content

Commit

Permalink
refactor(core): Make search job buffer usable in external search plugins
Browse files Browse the repository at this point in the history
Relates to #1137
  • Loading branch information
michaelbromley committed Oct 7, 2021
1 parent e2e65b4 commit becf132
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export const PLUGIN_INIT_OPTIONS = Symbol('PLUGIN_INIT_OPTIONS');
export const BUFFER_SEARCH_INDEX_UPDATES = Symbol('BUFFER_SEARCH_INDEX_UPDATES');
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { JobQueueService } from '../../job-queue/job-queue.service';
import { PluginCommonModule } from '../plugin-common.module';
import { VendurePlugin } from '../vendure-plugin';

import { PLUGIN_INIT_OPTIONS } from './constants';
import { BUFFER_SEARCH_INDEX_UPDATES, PLUGIN_INIT_OPTIONS } from './constants';
import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
import { FulltextSearchService } from './fulltext-search.service';
import { IndexerController } from './indexer/indexer.controller';
Expand Down Expand Up @@ -70,6 +70,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
IndexerController,
SearchJobBufferService,
{ provide: PLUGIN_INIT_OPTIONS, useFactory: () => DefaultSearchPlugin.options },
{
provide: BUFFER_SEARCH_INDEX_UPDATES,
useFactory: () => DefaultSearchPlugin.options.bufferUpdates === true,
},
],
adminApiExtensions: { resolvers: [AdminFulltextSearchResolver] },
shopApiExtensions: { resolvers: [ShopFulltextSearchResolver] },
Expand Down Expand Up @@ -145,6 +149,7 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap {
}
});

// TODO: Remove this buffering logic because because we have dedicated buffering based on #1137
const collectionModification$ = this.eventBus.ofType(CollectionModificationEvent);
const closingNotifier$ = collectionModification$.pipe(debounceTime(50));
collectionModification$
Expand All @@ -166,6 +171,7 @@ export class DefaultSearchPlugin implements OnApplicationBootstrap {
// The delay prevents a "TransactionNotStartedError" (in SQLite/sqljs) by allowing any existing
// transactions to complete before a new job is added to the queue (assuming the SQL-based
// JobQueueStrategy).
// TODO: should be able to remove owing to f0fd6625
.pipe(delay(1))
.subscribe(event => {
const defaultTaxZone = event.ctx.channel.defaultTaxZone;
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/plugin/default-search-plugin/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export * from './constants';
export * from './default-search-plugin';
export * from './search-job-buffer/collection-job-buffer';
export * from './search-job-buffer/search-index-job-buffer';
export * from './search-job-buffer/search-job-buffer.service';
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,42 @@ import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';

import { Job, JobBuffer } from '../../../job-queue/index';
import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types';
import {
UpdateIndexQueueJobData,
UpdateProductJobData,
UpdateVariantsByIdJobData,
UpdateVariantsJobData,
} from '../types';

export class SearchIndexJobBuffer implements JobBuffer<UpdateIndexQueueJobData> {
readonly id = 'search-plugin-update-search-index';

collect(job: Job<UpdateIndexQueueJobData>): boolean | Promise<boolean> {
return job.queueName === 'update-search-index';
return (
job.queueName === 'update-search-index' &&
['update-product', 'update-variants', 'update-variants-by-id'].includes(job.data.type)
);
}

reduce(collectedJobs: Array<Job<UpdateIndexQueueJobData>>): Array<Job<any>> {
const variantsByIdJobs = this.removeBy<Job<UpdateVariantsByIdJobData | UpdateVariantsJobData>>(
const variantsJobs = this.removeBy<Job<UpdateVariantsByIdJobData | UpdateVariantsJobData>>(
collectedJobs,
item => item.data.type === 'update-variants-by-id' || item.data.type === 'update-variants',
);
const productsJobs = this.removeBy<Job<UpdateProductJobData>>(
collectedJobs,
item => item.data.type === 'update-product',
);

const jobsToAdd = [...collectedJobs];

if (variantsByIdJobs.length) {
const variantIdsToUpdate = variantsByIdJobs.reduce((result, job) => {
if (variantsJobs.length) {
const variantIdsToUpdate = variantsJobs.reduce((result, job) => {
const ids = job.data.type === 'update-variants-by-id' ? job.data.ids : job.data.variantIds;
return [...result, ...ids];
}, [] as ID[]);

const referenceJob = variantsByIdJobs[0];
const referenceJob = variantsJobs[0];
const batchedVariantJob = new Job<UpdateVariantsByIdJobData>({
...referenceJob,
id: undefined,
Expand All @@ -36,7 +48,19 @@ export class SearchIndexJobBuffer implements JobBuffer<UpdateIndexQueueJobData>
},
});

jobsToAdd.push(batchedVariantJob as any);
jobsToAdd.push(batchedVariantJob as Job);
}
if (productsJobs.length) {
const seenIds = new Set<ID>();
const uniqueProductJobs: Array<Job<UpdateProductJobData>> = [];
for (const job of productsJobs) {
if (seenIds.has(job.data.productId)) {
continue;
}
uniqueProductJobs.push(job);
seenIds.add(job.data.productId);
}
jobsToAdd.push(...(uniqueProductJobs as Job[]));
}

return jobsToAdd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { ConfigService } from '../../../config/config.service';
import { isInspectableJobQueueStrategy } from '../../../config/job-queue/inspectable-job-queue-strategy';
import { JobQueueService } from '../../../job-queue/job-queue.service';
import { SubscribableJob } from '../../../job-queue/subscribable-job';
import { PLUGIN_INIT_OPTIONS } from '../constants';
import { DefaultSearchPluginInitOptions } from '../types';
import { BUFFER_SEARCH_INDEX_UPDATES } from '../constants';

import { CollectionJobBuffer } from './collection-job-buffer';
import { SearchIndexJobBuffer } from './search-index-job-buffer';
Expand All @@ -19,18 +18,18 @@ export class SearchJobBufferService implements OnApplicationBootstrap {
constructor(
private jobQueueService: JobQueueService,
private configService: ConfigService,
@Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
@Inject(BUFFER_SEARCH_INDEX_UPDATES) private bufferUpdates: boolean,
) {}

onApplicationBootstrap(): any {
if (this.options.bufferUpdates === true) {
if (this.bufferUpdates === true) {
this.jobQueueService.addBuffer(this.searchIndexJobBuffer);
this.jobQueueService.addBuffer(this.collectionJobBuffer);
}
}

async getPendingSearchUpdates(): Promise<number> {
if (!this.options.bufferUpdates) {
if (!this.bufferUpdates) {
return 0;
}
const bufferSizes = await this.jobQueueService.bufferSize(
Expand All @@ -43,7 +42,7 @@ export class SearchJobBufferService implements OnApplicationBootstrap {
}

async runPendingSearchUpdates(): Promise<void> {
if (!this.options.bufferUpdates) {
if (!this.bufferUpdates) {
return;
}
const { jobQueueStrategy } = this.configService.jobQueueOptions;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/plugin/default-search-plugin/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { SerializedRequestContext } from '../../api/common/request-context';
import { Asset } from '../../entity/asset/asset.entity';

export interface DefaultSearchPluginInitOptions {
// TODO: docs
bufferUpdates?: boolean;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/plugin/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './default-search-plugin/default-search-plugin';
export * from './default-search-plugin/index';
export * from './default-job-queue-plugin/default-job-queue-plugin';
export * from './default-job-queue-plugin/job-record-buffer.entity';
export * from './default-job-queue-plugin/sql-job-buffer-storage-strategy';
Expand Down

0 comments on commit becf132

Please sign in to comment.