Skip to content

Commit

Permalink
feat(core): Create buffering logic for DefaultSearchPlugin
Browse files Browse the repository at this point in the history
Relates to #1137
  • Loading branch information
michaelbromley committed Oct 7, 2021
1 parent d6aa20f commit 6a47dcf
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 22 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/api/resolvers/admin/job.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class JobResolver {
@Query()
@Allow(Permission.ReadSettings, Permission.ReadSystem)
async jobBufferSize(@Args() args: QueryJobBufferSizeArgs) {
const bufferSizes = this.jobBuffer.bufferSize(args.processorIds);
const bufferSizes = await this.jobBuffer.bufferSize(args.processorIds);
return Object.entries(bufferSizes).map(([processorId, size]) => ({ processorId, size }));
}

Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/job-queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
export * from './injectable-job-queue-strategy';
export * from './job-buffer/in-memory-job-buffer-storage-strategy';
export * from './job-buffer/job-buffer';
export * from './job-buffer/job-buffer-processor';
export * from './job-buffer/job-buffer-storage-strategy';
export * from './job-buffer/sql-job-buffer-storage-strategy';
export * from './job';
export * from './job-queue';
export * from './job-queue.service';
Expand Down
18 changes: 14 additions & 4 deletions packages/core/src/job-queue/job-buffer/job-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common';

import { InternalServerError } from '../../common/error/errors';
import { ConfigService } from '../../config/config.service';
import { Logger } from '../../config/logger/vendure-logger';
import { Job } from '../job';

import { JobBufferProcessor } from './job-buffer-processor';
Expand All @@ -16,7 +17,7 @@ export class JobBuffer {
this.storageStrategy = configService.jobQueueOptions.jobBufferStorageStrategy;
}

addProcessor(processor: JobBufferProcessor) {
addProcessor(processor: JobBufferProcessor<any>) {
const idAlreadyExists = Array.from(this.processors).find(p => p.id === processor.id);
if (idAlreadyExists) {
throw new InternalServerError(
Expand All @@ -26,7 +27,7 @@ export class JobBuffer {
this.processors.add(processor);
}

removeProcessor(processor: JobBufferProcessor) {
removeProcessor(processor: JobBufferProcessor<any>) {
this.processors.delete(processor);
}

Expand Down Expand Up @@ -58,8 +59,17 @@ export class JobBuffer {
for (const processor of this.processors) {
const jobsForProcessor = flushResult[processor.id];
if (jobsForProcessor?.length) {
const reducedJobs = await processor.reduce(jobsForProcessor);
for (const job of reducedJobs) {
let jobsToAdd = jobsForProcessor;
try {
jobsToAdd = await processor.reduce(jobsForProcessor);
} catch (e) {
Logger.error(
`Error encountered processing jobs in "${processor.id}:\n${e.message}"`,
undefined,
e.stack,
);
}
for (const job of jobsToAdd) {
await jobQueueStrategy.add(job);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ export class SqlJobBufferStorageStrategy implements JobBufferStorageStrategy {
return Promise.resolve(job);
}

bufferSize(processorIds?: string[]): Promise<number> {
return Promise.resolve(0);
bufferSize(processorIds?: string[]) {
return Promise.resolve({});
}

flush(processorIds?: string[]): Promise<void> {
return Promise.resolve(undefined);
flush(processorIds?: string[]) {
return Promise.resolve({});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';

import { Job, JobBufferProcessor } from '../../job-queue';
import { ApplyCollectionFiltersJobData } from '../../service/services/collection.service';

import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';

export class CollectionJobBufferProcessor implements JobBufferProcessor<ApplyCollectionFiltersJobData> {
readonly id = 'search-plugin-apply-collection-filters';

collect(job: Job): boolean {
return job.queueName === 'apply-collection-filters';
}

reduce(collectedJobs: Array<Job<ApplyCollectionFiltersJobData>>): Array<Job<any>> {
const collectionIdsToUpdate = collectedJobs.reduce((result, job) => {
return [...result, ...job.data.collectionIds];
}, [] as ID[]);

const referenceJob = collectedJobs[0];
const batchedCollectionJob = new Job<ApplyCollectionFiltersJobData>({
...referenceJob,
id: undefined,
data: {
collectionIds: unique(collectionIdsToUpdate),
ctx: referenceJob.data.ctx,
applyToChangedVariantsOnly: referenceJob.data.applyToChangedVariantsOnly,
},
});

return [batchedCollectionJob];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import { ProductEvent } from '../../event-bus/events/product-event';
import { ProductVariantChannelEvent } from '../../event-bus/events/product-variant-channel-event';
import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
import { JobBuffer } from '../../job-queue/job-buffer/job-buffer';
import { PluginCommonModule } from '../plugin-common.module';
import { VendurePlugin } from '../vendure-plugin';

import { CollectionJobBufferProcessor } from './collection-job-buffer-processor';
import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
import { FulltextSearchService } from './fulltext-search.service';
import { IndexerController } from './indexer/indexer.controller';
import { SearchIndexService } from './indexer/search-index.service';
import { SearchIndexItem } from './search-index-item.entity';
import { SearchJobBufferProcessor } from './search-job-buffer-processor';

export interface DefaultSearchReindexResponse extends SearchReindexResponse {
timeTaken: number;
Expand Down Expand Up @@ -64,10 +67,17 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
})
export class DefaultSearchPlugin implements OnApplicationBootstrap {
/** @internal */
constructor(private eventBus: EventBus, private searchIndexService: SearchIndexService) {}
constructor(
private eventBus: EventBus,
private searchIndexService: SearchIndexService,
private jobBuffer: JobBuffer,
) {}

/** @internal */
async onApplicationBootstrap() {
this.jobBuffer.addProcessor(new SearchJobBufferProcessor());
this.jobBuffer.addProcessor(new CollectionJobBufferProcessor());

this.eventBus.ofType(ProductEvent).subscribe(event => {
if (event.type === 'deleted') {
return this.searchIndexService.deleteProduct(event.ctx, event.product);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';

import { Job, JobBufferProcessor } from '../../job-queue';

import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';

export class SearchJobBufferProcessor implements JobBufferProcessor<UpdateIndexQueueJobData> {
readonly id = 'search-plugin-update-search-index';

collect(job: Job<UpdateIndexQueueJobData>): boolean | Promise<boolean> {
return job.queueName === 'update-search-index';
}

reduce(collectedJobs: Array<Job<UpdateIndexQueueJobData>>): Array<Job<any>> {
const variantsByIdJobs = this.removeBy<Job<UpdateVariantsByIdJobData | UpdateVariantsJobData>>(
collectedJobs,
item => item.data.type === 'update-variants-by-id' || item.data.type === 'update-variants',
);

const jobsToAdd = [...collectedJobs];

if (variantsByIdJobs.length) {
const variantIdsToUpdate = variantsByIdJobs.reduce((result, job) => {
const ids = job.data.type === 'update-variants-by-id' ? job.data.ids : job.data.variantIds;
return [...result, ...ids];
}, [] as ID[]);

const referenceJob = variantsByIdJobs[0];
const batchedVariantJob = new Job<UpdateVariantsByIdJobData>({
...referenceJob,
id: undefined,
data: {
type: 'update-variants-by-id',
ids: unique(variantIdsToUpdate),
ctx: referenceJob.data.ctx,
},
});

jobsToAdd.push(batchedVariantJob as any);
}

return jobsToAdd;
}

/**
* Removes items from the array based on the filterFn and returns a new array with only the removed
* items. The original input array is mutated.
*/
private removeBy<R extends T, T = any>(input: T[], filterFn: (item: T) => boolean): R[] {
const removed: R[] = [];
for (let i = input.length - 1; i >= 0; i--) {
const item = input[i];
if (filterFn(item)) {
removed.push(item as R);
input.splice(i, 1);
}
}
return removed;
}
}
34 changes: 23 additions & 11 deletions packages/core/src/plugin/default-search-plugin/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,29 @@ export type VariantChannelMessageData = {
type NamedJobData<Type extends string, MessageData> = { type: Type } & MessageData;

export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>;
type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>;
type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>;
type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>;
type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>;
type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>;
type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>;
type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>;
type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>;
type AssignVariantToChannelJobData = NamedJobData<'assign-variant-to-channel', VariantChannelMessageData>;
type RemoveVariantFromChannelJobData = NamedJobData<'remove-variant-from-channel', VariantChannelMessageData>;
export type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>;
export type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>;
export type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>;
export type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>;
export type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>;
export type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>;
export type DeleteAssetJobData = NamedJobData<'delete-asset', UpdateAssetMessageData>;
export type AssignProductToChannelJobData = NamedJobData<
'assign-product-to-channel',
ProductChannelMessageData
>;
export type RemoveProductFromChannelJobData = NamedJobData<
'remove-product-from-channel',
ProductChannelMessageData
>;
export type AssignVariantToChannelJobData = NamedJobData<
'assign-variant-to-channel',
VariantChannelMessageData
>;
export type RemoveVariantFromChannelJobData = NamedJobData<
'remove-variant-from-channel',
VariantChannelMessageData
>;
export type UpdateIndexQueueJobData =
| ReindexJobData
| UpdateProductJobData
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/service/services/collection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import { AssetService } from './asset.service';
import { ChannelService } from './channel.service';
import { FacetValueService } from './facet-value.service';

type ApplyCollectionFiltersJobData = {
export type ApplyCollectionFiltersJobData = {
ctx: SerializedRequestContext;
collectionIds: ID[];
applyToChangedVariantsOnly?: boolean;
Expand Down

0 comments on commit 6a47dcf

Please sign in to comment.