diff --git a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts index e6b5336bda..d8656ccb92 100644 --- a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts +++ b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts @@ -15,6 +15,7 @@ import { SearchService } from '../../service/services/search.service'; import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver'; import { FulltextSearchService } from './fulltext-search.service'; +import { SearchIndexService } from './search-index.service'; import { SearchIndexItem } from './search-index-item.entity'; export interface DefaultSearchReindexResponse extends SearchReindexResponse { @@ -23,9 +24,10 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse { } export class DefaultSearchPlugin implements VendurePlugin { - onBootstrap(inject: (type: Type) => T): void | Promise { + async onBootstrap(inject: (type: Type) => T): Promise { const eventBus = inject(EventBus); const fulltextSearchService = inject(FulltextSearchService); + const searchIndexService = inject(SearchIndexService); eventBus.subscribe(CatalogModificationEvent, event => { if (event.entity instanceof Product || event.entity instanceof ProductVariant) { return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity); @@ -40,6 +42,7 @@ export class DefaultSearchPlugin implements VendurePlugin { return fulltextSearchService.reindex(event.ctx); } }); + await searchIndexService.connect(); } extendAdminAPI(): APIExtensionDefinition { @@ -71,6 +74,6 @@ export class DefaultSearchPlugin implements VendurePlugin { } defineProviders(): Provider[] { - return [FulltextSearchService, { provide: SearchService, useClass: FulltextSearchService }]; + return [FulltextSearchService, SearchIndexService, { provide: SearchService, useClass: FulltextSearchService }]; } } diff --git a/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts b/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts index a3cdedea99..965e0e6d9b 100644 --- a/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts +++ b/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts @@ -5,7 +5,6 @@ import { Omit } from '@vendure/common/lib/omit'; import { ID } from '@vendure/common/lib/shared-types'; import { unique } from '@vendure/common/lib/unique'; import { Connection } from 'typeorm'; -import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; import { RequestContext } from '../../api/common/request-context'; import { InternalServerError } from '../../common/error/errors'; @@ -20,6 +19,7 @@ import { SearchService } from '../../service/services/search.service'; import { AsyncQueue } from './async-queue'; import { SearchIndexItem } from './search-index-item.entity'; +import { SearchIndexService } from './search-index.service'; import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy'; import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy'; import { SearchStrategy } from './search-strategy/search-strategy'; @@ -52,6 +52,7 @@ export class FulltextSearchService implements SearchService { private eventBus: EventBus, private facetValueService: FacetValueService, private productVariantService: ProductVariantService, + private searchIndexService: SearchIndexService, ) { this.setSearchStrategy(); } @@ -94,43 +95,7 @@ export class FulltextSearchService implements SearchService { */ async reindex(ctx: RequestContext): Promise { const job = this.jobService.startJob('reindex', async reporter => { - const timeStart = Date.now(); - const BATCH_SIZE = 100; - Logger.verbose('Reindexing search index...'); - const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants'); - FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, { - relations: this.variantRelations, - }); - FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant)); - const count = await qb.where('variants__product.deletedAt IS NULL').getCount(); - Logger.verbose(`Getting ${count} variants`); - const batches = Math.ceil(count / BATCH_SIZE); - - Logger.verbose('Deleting existing index items...'); - await this.connection.getRepository(SearchIndexItem).delete({languageCode: ctx.languageCode}); - Logger.verbose('Deleted!'); - - for (let i = 0; i < batches; i++) { - Logger.verbose(`Processing batch ${i + 1} of ${batches}, heap used: ` - + (process.memoryUsage().heapUsed / 1000 / 1000).toFixed(2) + 'MB'); - const variants = await qb - .where('variants__product.deletedAt IS NULL') - .take(BATCH_SIZE) - .skip(i * BATCH_SIZE) - .getMany(); - await this.taskQueue.push(async () => { - await this.saveSearchIndexItems(ctx, variants); - }); - reporter.setProgress(Math.round((i / batches) * 100)); - } - - Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`); - - return { - success: true, - indexedItemCount: count, - timeTaken: Date.now() - timeStart, - }; + return this.searchIndexService.reindex(ctx, reporter); }); return job; } diff --git a/packages/core/src/plugin/default-search-plugin/ipc.ts b/packages/core/src/plugin/default-search-plugin/ipc.ts new file mode 100644 index 0000000000..afb9c1270e --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/ipc.ts @@ -0,0 +1,76 @@ +import { ChildProcess } from 'child_process'; +import { ConnectionOptions } from 'typeorm'; + +import { RequestContext } from '../../api/common/request-context'; +import { ProductVariant } from '../../entity/product-variant/product-variant.entity'; + +export enum MessageType { + CONNECTION_OPTIONS, + CONNECTED, + GET_RAW_BATCH, + RETURN_RAW_BATCH, + SAVE_VARIANTS, + VARIANTS_SAVED, + COMPLETED, +} + +export interface SaveVariantsPayload { + variants: ProductVariant[]; + ctx: RequestContext; + batch: number; + total: number; +} + +export interface IPCMessage { + type: MessageType; +} + +export class ConnectionOptionsMessage implements IPCMessage { + readonly type = MessageType.CONNECTION_OPTIONS; + constructor(public value: ConnectionOptions) {} +} + +export class ConnectedMessage implements IPCMessage { + readonly type = MessageType.CONNECTED; + constructor(public value: boolean) {} +} + +export class GetRawBatchMessage implements IPCMessage { + readonly type = MessageType.GET_RAW_BATCH; + constructor(public value: { batchNumber: number; }) {} +} + +export class ReturnRawBatchMessage implements IPCMessage { + readonly type = MessageType.RETURN_RAW_BATCH; + constructor(public value: { variants: ProductVariant[]; }) {} +} + +export class SaveVariantsMessage implements IPCMessage { + readonly type = MessageType.SAVE_VARIANTS; + constructor(public value: SaveVariantsPayload) {} +} + +export class VariantsSavedMessage implements IPCMessage { + readonly type = MessageType.VARIANTS_SAVED; + constructor(public value: { batchNumber: number; }) {} +} + +export class CompletedMessage implements IPCMessage { + readonly type = MessageType.COMPLETED; + constructor(public value: boolean) {} +} + +export type Message = ConnectionOptionsMessage | + ConnectedMessage | + GetRawBatchMessage | + ReturnRawBatchMessage | + SaveVariantsMessage | + VariantsSavedMessage | + CompletedMessage; + +export type MessageOfType = Extract; + +export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) { + // tslint:disable-next-line:no-non-null-assertion + target.send!(JSON.stringify({ type: message.type, value: message.value })); +} diff --git a/packages/core/src/plugin/default-search-plugin/search-index-worker.ts b/packages/core/src/plugin/default-search-plugin/search-index-worker.ts new file mode 100644 index 0000000000..efcb6e989f --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/search-index-worker.ts @@ -0,0 +1,133 @@ +/* tslint:disable:no-non-null-assertion no-console */ +import { Type } from '@vendure/common/lib/shared-types'; +import { unique } from '@vendure/common/lib/unique'; +import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm'; +import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; + +import { RequestContext } from '../../api/common/request-context'; +import { FacetValue } from '../../entity/facet-value/facet-value.entity'; +import { ProductVariant } from '../../entity/product-variant/product-variant.entity'; + +import { + CompletedMessage, + ConnectedMessage, + ConnectionOptionsMessage, + GetRawBatchMessage, + MessageType, + ReturnRawBatchMessage, + SaveVariantsMessage, + SaveVariantsPayload, + sendIPCMessage, + VariantsSavedMessage, +} from './ipc'; +import { SearchIndexItem } from './search-index-item.entity'; + +export const BATCH_SIZE = 100; +export const variantRelations = [ + 'product', + 'product.featuredAsset', + 'product.facetValues', + 'product.facetValues.facet', + 'featuredAsset', + 'facetValues', + 'facetValues.facet', + 'collections', + 'taxCategory', +]; + +export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | SaveVariantsMessage; + +export class SearchIndexWorker { + + private connection: Connection; + private indexQueryBuilder: SelectQueryBuilder; + + async connect(dbConnectionOptions: ConnectionOptions) { + const { coreEntitiesMap } = await import('../../entity/entities'); + const coreEntities = Object.values(coreEntitiesMap) as Array>; + this.connection = await createConnection({ ...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities] }); + + this.indexQueryBuilder = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants'); + FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(this.indexQueryBuilder, { + relations: variantRelations, + }); + FindOptionsUtils.joinEagerRelations(this.indexQueryBuilder, this.indexQueryBuilder.alias, this.connection.getMetadata(ProductVariant)); + + sendIPCMessage(process, new ConnectedMessage(this.connection.isConnected)); + } + + async getRawBatch(batchNumber: string | number) { + const i = Number.parseInt(batchNumber.toString(), 10); + const variants = await this.indexQueryBuilder + .where('variants__product.deletedAt IS NULL') + .take(BATCH_SIZE) + .skip(i * BATCH_SIZE) + .getMany(); + + sendIPCMessage(process, new ReturnRawBatchMessage({variants})); + } + + async saveVariants(payload: SaveVariantsPayload) { + const { variants, ctx, batch, total } = payload; + const requestContext = new RequestContext(ctx); + + const items = variants.map((v: ProductVariant) => + new SearchIndexItem({ + sku: v.sku, + enabled: v.enabled, + slug: v.product.slug, + price: v.price, + priceWithTax: v.priceWithTax, + languageCode: requestContext.languageCode, + productVariantId: v.id, + productId: v.product.id, + productName: v.product.name, + description: v.product.description, + productVariantName: v.name, + productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '', + productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '', + facetIds: this.getFacetIds(v), + facetValueIds: this.getFacetValueIds(v), + collectionIds: v.collections.map(c => c.id.toString()), + }), + ); + await this.connection.getRepository(SearchIndexItem).save(items); + sendIPCMessage(process, new VariantsSavedMessage({batchNumber: batch})); + if (batch === total - 1) { + sendIPCMessage(process, new CompletedMessage(true)); + } + } + + private getFacetIds(variant: ProductVariant): string[] { + const facetIds = (fv: FacetValue) => fv.facet.id.toString(); + const variantFacetIds = variant.facetValues.map(facetIds); + const productFacetIds = variant.product.facetValues.map(facetIds); + return unique([...variantFacetIds, ...productFacetIds]); + } + + private getFacetValueIds(variant: ProductVariant): string[] { + const facetValueIds = (fv: FacetValue) => fv.id.toString(); + const variantFacetValueIds = variant.facetValues.map(facetValueIds); + const productFacetValueIds = variant.product.facetValues.map(facetValueIds); + return unique([...variantFacetValueIds, ...productFacetValueIds]); + } +} + +const worker = new SearchIndexWorker(); + +process.on('message', (messageString) => { + const message: IncomingMessage = JSON.parse(messageString); + switch (message.type) { + case MessageType.CONNECTION_OPTIONS: + worker.connect(message.value); + break; + case MessageType.GET_RAW_BATCH: + worker.getRawBatch(message.value.batchNumber); + break; + case MessageType.SAVE_VARIANTS: + worker.saveVariants(message.value); + break; + default: + // ignore + } +}); diff --git a/packages/core/src/plugin/default-search-plugin/search-index.service.ts b/packages/core/src/plugin/default-search-plugin/search-index.service.ts new file mode 100644 index 0000000000..57346cabeb --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/search-index.service.ts @@ -0,0 +1,173 @@ +import { Injectable } from '@nestjs/common'; +import { InjectConnection } from '@nestjs/typeorm'; +import { pick } from '@vendure/common/lib/pick'; +import { ChildProcess, fork } from 'child_process'; +import fs from 'fs-extra'; +import path from 'path'; +import { Connection } from 'typeorm'; +import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; + +import { RequestContext } from '../../api/common/request-context'; +import { ConfigService } from '../../config/config.service'; +import { Logger } from '../../config/logger/vendure-logger'; +import { ProductVariant } from '../../entity/product-variant/product-variant.entity'; +import { JobReporter } from '../../service/helpers/job-manager/job-manager'; +import { translateDeep } from '../../service/helpers/utils/translate-entity'; +import { ProductVariantService } from '../../service/services/product-variant.service'; + +import { + CompletedMessage, + ConnectedMessage, + ConnectionOptionsMessage, + GetRawBatchMessage, + MessageOfType, + MessageType, + ReturnRawBatchMessage, + SaveVariantsMessage, + sendIPCMessage, + VariantsSavedMessage, +} from './ipc'; +import { SearchIndexItem } from './search-index-item.entity'; +import { BATCH_SIZE, variantRelations } from './search-index-worker'; + +export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage; + +@Injectable() +export class SearchIndexService { + private workerProcess: ChildProcess; + + constructor(@InjectConnection() private connection: Connection, + private productVariantService: ProductVariantService, + private configService: ConfigService) {} + + /** + * Creates the search index worker process and has it connect to the database. + */ + async connect() { + try { + this.workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts')); + } catch (e) { + Logger.error(e); + } + Logger.verbose(`Created search index worker process`, 'DefaultSearchPlugin'); + this.workerProcess.on('error', err => `Search index worker error: ` + err.message); + await this.establishConnection(this.workerProcess); + } + + async reindex(ctx: RequestContext, reporter: JobReporter) { + const timeStart = Date.now(); + Logger.verbose('Reindexing search index...', 'DefaultSearchPlugin'); + const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants'); + FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, { + relations: variantRelations, + }); + FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant)); + const count = await qb.where('variants__product.deletedAt IS NULL').getCount(); + Logger.verbose(`Getting ${count} variants`, 'DefaultSearchPlugin'); + const batches = Math.ceil(count / BATCH_SIZE); + + Logger.verbose('Deleting existing index items...', 'DefaultSearchPlugin'); + await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode }); + Logger.verbose('Deleted!', 'DefaultSearchPlugin'); + + return new Promise(async (resolve, reject) => { + this.subscribe(MessageType.COMPLETED, (message, unsubscribe) => { + Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, 'DefaultSearchPlugin'); + unsubscribe(); + unsubscribeProgress(); + resolve({ + success: true, + indexedItemCount: count, + timeTaken: Date.now() - timeStart, + }); + }); + const unsubscribeProgress = this.subscribe(MessageType.VARIANTS_SAVED, (message, unsubscribe) => { + reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100)); + Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, 'DefaultSearchPlugin'); + }); + + for (let i = 0; i < batches; i++) { + Logger.verbose(`Processing batch ${i + 1} of ${batches}`, 'DefaultSearchPlugin'); + + const variants = await this.getBatch(this.workerProcess, i); + const items = variants + .map((v: any) => this.productVariantService.applyChannelPriceAndTax(v, ctx)) + .map((v: any) => translateDeep(v, ctx.languageCode, ['product'])); + + sendIPCMessage(this.workerProcess, new SaveVariantsMessage({ variants: items, ctx, batch: i, total: batches })); + } + }); + } + + /** + * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to + * use either (attempts JS first). + */ + private getChildProcess(filename: string): ChildProcess { + const ext = path.extname(filename); + const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), ''); + let error: any; + try { + const jsFile = fileWithoutExt + '.js'; + if (fs.existsSync(jsFile)) { + return fork(jsFile, [], { execArgv: [] }); + } + } catch (e) { + // ignore and try ts + error = e; + } + try { + const tsFile = fileWithoutExt + '.ts'; + if (fs.existsSync(tsFile)) { + // Fork the TS file using ts-node. This is useful when running in dev mode or + // for e2e tests. + return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] }); + } + } catch (e) { + // ignore and thow at the end. + error = e; + } + throw error; + } + + establishConnection(child: ChildProcess): Promise { + const connectionOptions = pick(this.configService.dbConnectionOptions as any, + ['type', 'name', 'database', 'host', 'port', 'username', 'password']); + return new Promise(resolve => { + sendIPCMessage(child, new ConnectionOptionsMessage(connectionOptions)); + this.subscribe(MessageType.CONNECTED, (message, unsubscribe) => { + Logger.verbose(`Connection result: ${message.value}`, 'DefaultSearchPlugin'); + unsubscribe(); + resolve(message.value); + }); + }); + } + + getBatch(child: ChildProcess, batch: number): Promise { + return new Promise(resolve => { + sendIPCMessage(child, new GetRawBatchMessage({ batchNumber: batch })); + this.subscribe(MessageType.RETURN_RAW_BATCH, (message, unsubscribe) => { + unsubscribe(); + resolve(message.value.variants); + }); + }); + } + + /** + * Subscribes to the given IPC message and executes the callback when the message is received. Returns an unsubscribe + * function which should be called to clean up the event listener. Alternatively, if only the first such event is + * important, call the `unsubscribe` function which is passed to the handler as the second argument. + */ + private subscribe(messageType: T, callback: (message: MessageOfType, unsubscribe: () => void) => any): () => void { + const handler = (messageString: string) => { + const message = JSON.parse(messageString) as IncomingMessage; + if (message.type === messageType) { + callback(message as MessageOfType, unsubscribe); + } + }; + const unsubscribe = () => this.workerProcess.off('message', handler); + this.workerProcess.on('message', handler); + return unsubscribe; + } + +}