Skip to content

Commit

Permalink
feat(core): Background thread search indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Jun 5, 2019
1 parent 59d8312 commit b78354e
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { SearchService } from '../../service/services/search.service';

import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
import { FulltextSearchService } from './fulltext-search.service';
import { SearchIndexService } from './search-index.service';
import { SearchIndexItem } from './search-index-item.entity';

export interface DefaultSearchReindexResponse extends SearchReindexResponse {
Expand All @@ -23,9 +24,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
}

export class DefaultSearchPlugin implements VendurePlugin {
onBootstrap(inject: <T>(type: Type<T>) => T): void | Promise<void> {
async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
const eventBus = inject(EventBus);
const fulltextSearchService = inject(FulltextSearchService);
const searchIndexService = inject(SearchIndexService);
eventBus.subscribe(CatalogModificationEvent, event => {
if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity);
Expand All @@ -40,6 +42,7 @@ export class DefaultSearchPlugin implements VendurePlugin {
return fulltextSearchService.reindex(event.ctx);
}
});
await searchIndexService.connect();
}

extendAdminAPI(): APIExtensionDefinition {
Expand Down Expand Up @@ -71,6 +74,6 @@ export class DefaultSearchPlugin implements VendurePlugin {
}

defineProviders(): Provider[] {
return [FulltextSearchService, { provide: SearchService, useClass: FulltextSearchService }];
return [FulltextSearchService, SearchIndexService, { provide: SearchService, useClass: FulltextSearchService }];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { Omit } from '@vendure/common/lib/omit';
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';
import { Connection } from 'typeorm';
import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';

import { RequestContext } from '../../api/common/request-context';
import { InternalServerError } from '../../common/error/errors';
Expand All @@ -20,6 +19,7 @@ import { SearchService } from '../../service/services/search.service';

import { AsyncQueue } from './async-queue';
import { SearchIndexItem } from './search-index-item.entity';
import { SearchIndexService } from './search-index.service';
import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy';
import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy';
import { SearchStrategy } from './search-strategy/search-strategy';
Expand Down Expand Up @@ -52,6 +52,7 @@ export class FulltextSearchService implements SearchService {
private eventBus: EventBus,
private facetValueService: FacetValueService,
private productVariantService: ProductVariantService,
private searchIndexService: SearchIndexService,
) {
this.setSearchStrategy();
}
Expand Down Expand Up @@ -94,43 +95,7 @@ export class FulltextSearchService implements SearchService {
*/
async reindex(ctx: RequestContext): Promise<JobInfo> {
const job = this.jobService.startJob('reindex', async reporter => {
const timeStart = Date.now();
const BATCH_SIZE = 100;
Logger.verbose('Reindexing search index...');
const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
relations: this.variantRelations,
});
FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
Logger.verbose(`Getting ${count} variants`);
const batches = Math.ceil(count / BATCH_SIZE);

Logger.verbose('Deleting existing index items...');
await this.connection.getRepository(SearchIndexItem).delete({languageCode: ctx.languageCode});
Logger.verbose('Deleted!');

for (let i = 0; i < batches; i++) {
Logger.verbose(`Processing batch ${i + 1} of ${batches}, heap used: `
+ (process.memoryUsage().heapUsed / 1000 / 1000).toFixed(2) + 'MB');
const variants = await qb
.where('variants__product.deletedAt IS NULL')
.take(BATCH_SIZE)
.skip(i * BATCH_SIZE)
.getMany();
await this.taskQueue.push(async () => {
await this.saveSearchIndexItems(ctx, variants);
});
reporter.setProgress(Math.round((i / batches) * 100));
}

Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`);

return {
success: true,
indexedItemCount: count,
timeTaken: Date.now() - timeStart,
};
return this.searchIndexService.reindex(ctx, reporter);
});
return job;
}
Expand Down
76 changes: 76 additions & 0 deletions packages/core/src/plugin/default-search-plugin/ipc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { ChildProcess } from 'child_process';
import { ConnectionOptions } from 'typeorm';

import { RequestContext } from '../../api/common/request-context';
import { ProductVariant } from '../../entity/product-variant/product-variant.entity';

export enum MessageType {
CONNECTION_OPTIONS,
CONNECTED,
GET_RAW_BATCH,
RETURN_RAW_BATCH,
SAVE_VARIANTS,
VARIANTS_SAVED,
COMPLETED,
}

export interface SaveVariantsPayload {
variants: ProductVariant[];
ctx: RequestContext;
batch: number;
total: number;
}

export interface IPCMessage {
type: MessageType;
}

export class ConnectionOptionsMessage implements IPCMessage {
readonly type = MessageType.CONNECTION_OPTIONS;
constructor(public value: ConnectionOptions) {}
}

export class ConnectedMessage implements IPCMessage {
readonly type = MessageType.CONNECTED;
constructor(public value: boolean) {}
}

export class GetRawBatchMessage implements IPCMessage {
readonly type = MessageType.GET_RAW_BATCH;
constructor(public value: { batchNumber: number; }) {}
}

export class ReturnRawBatchMessage implements IPCMessage {
readonly type = MessageType.RETURN_RAW_BATCH;
constructor(public value: { variants: ProductVariant[]; }) {}
}

export class SaveVariantsMessage implements IPCMessage {
readonly type = MessageType.SAVE_VARIANTS;
constructor(public value: SaveVariantsPayload) {}
}

export class VariantsSavedMessage implements IPCMessage {
readonly type = MessageType.VARIANTS_SAVED;
constructor(public value: { batchNumber: number; }) {}
}

export class CompletedMessage implements IPCMessage {
readonly type = MessageType.COMPLETED;
constructor(public value: boolean) {}
}

export type Message = ConnectionOptionsMessage |
ConnectedMessage |
GetRawBatchMessage |
ReturnRawBatchMessage |
SaveVariantsMessage |
VariantsSavedMessage |
CompletedMessage;

export type MessageOfType<T extends MessageType> = Extract<Message, { type: T }>;

export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) {
// tslint:disable-next-line:no-non-null-assertion
target.send!(JSON.stringify({ type: message.type, value: message.value }));
}
133 changes: 133 additions & 0 deletions packages/core/src/plugin/default-search-plugin/search-index-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/* tslint:disable:no-non-null-assertion no-console */
import { Type } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';
import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm';
import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';

import { RequestContext } from '../../api/common/request-context';
import { FacetValue } from '../../entity/facet-value/facet-value.entity';
import { ProductVariant } from '../../entity/product-variant/product-variant.entity';

import {
CompletedMessage,
ConnectedMessage,
ConnectionOptionsMessage,
GetRawBatchMessage,
MessageType,
ReturnRawBatchMessage,
SaveVariantsMessage,
SaveVariantsPayload,
sendIPCMessage,
VariantsSavedMessage,
} from './ipc';
import { SearchIndexItem } from './search-index-item.entity';

export const BATCH_SIZE = 100;
export const variantRelations = [
'product',
'product.featuredAsset',
'product.facetValues',
'product.facetValues.facet',
'featuredAsset',
'facetValues',
'facetValues.facet',
'collections',
'taxCategory',
];

export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | SaveVariantsMessage;

export class SearchIndexWorker {

private connection: Connection;
private indexQueryBuilder: SelectQueryBuilder<ProductVariant>;

async connect(dbConnectionOptions: ConnectionOptions) {
const { coreEntitiesMap } = await import('../../entity/entities');
const coreEntities = Object.values(coreEntitiesMap) as Array<Type<any>>;
this.connection = await createConnection({ ...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities] });

this.indexQueryBuilder = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(this.indexQueryBuilder, {
relations: variantRelations,
});
FindOptionsUtils.joinEagerRelations(this.indexQueryBuilder, this.indexQueryBuilder.alias, this.connection.getMetadata(ProductVariant));

sendIPCMessage(process, new ConnectedMessage(this.connection.isConnected));
}

async getRawBatch(batchNumber: string | number) {
const i = Number.parseInt(batchNumber.toString(), 10);
const variants = await this.indexQueryBuilder
.where('variants__product.deletedAt IS NULL')
.take(BATCH_SIZE)
.skip(i * BATCH_SIZE)
.getMany();

sendIPCMessage(process, new ReturnRawBatchMessage({variants}));
}

async saveVariants(payload: SaveVariantsPayload) {
const { variants, ctx, batch, total } = payload;
const requestContext = new RequestContext(ctx);

const items = variants.map((v: ProductVariant) =>
new SearchIndexItem({
sku: v.sku,
enabled: v.enabled,
slug: v.product.slug,
price: v.price,
priceWithTax: v.priceWithTax,
languageCode: requestContext.languageCode,
productVariantId: v.id,
productId: v.product.id,
productName: v.product.name,
description: v.product.description,
productVariantName: v.name,
productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
facetIds: this.getFacetIds(v),
facetValueIds: this.getFacetValueIds(v),
collectionIds: v.collections.map(c => c.id.toString()),
}),
);
await this.connection.getRepository(SearchIndexItem).save(items);
sendIPCMessage(process, new VariantsSavedMessage({batchNumber: batch}));
if (batch === total - 1) {
sendIPCMessage(process, new CompletedMessage(true));
}
}

private getFacetIds(variant: ProductVariant): string[] {
const facetIds = (fv: FacetValue) => fv.facet.id.toString();
const variantFacetIds = variant.facetValues.map(facetIds);
const productFacetIds = variant.product.facetValues.map(facetIds);
return unique([...variantFacetIds, ...productFacetIds]);
}

private getFacetValueIds(variant: ProductVariant): string[] {
const facetValueIds = (fv: FacetValue) => fv.id.toString();
const variantFacetValueIds = variant.facetValues.map(facetValueIds);
const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
return unique([...variantFacetValueIds, ...productFacetValueIds]);
}
}

const worker = new SearchIndexWorker();

process.on('message', (messageString) => {
const message: IncomingMessage = JSON.parse(messageString);
switch (message.type) {
case MessageType.CONNECTION_OPTIONS:
worker.connect(message.value);
break;
case MessageType.GET_RAW_BATCH:
worker.getRawBatch(message.value.batchNumber);
break;
case MessageType.SAVE_VARIANTS:
worker.saveVariants(message.value);
break;
default:
// ignore
}
});
Loading

0 comments on commit b78354e

Please sign in to comment.