diff --git a/docs/content/docs/plugins/default-search-plugin.md b/docs/content/docs/plugins/default-search-plugin.md deleted file mode 100644 index 059868863a..0000000000 --- a/docs/content/docs/plugins/default-search-plugin.md +++ /dev/null @@ -1,24 +0,0 @@ ---- -title: "DefaultSearchPlugin" ---- - -# DefaultSearchPlugin - -The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the underlying database. - -The DefaultSearchPlugin is bundled with the `@vendure/core` package. If you are not using an alternative search plugin, then make sure this one is used, otherwise you will not be able to search products via the [`search` query](/docs/graphql-api/shop/queries#search). - -```ts -import { DefaultSearchPlugin } from '@vendure/core'; - -const config: VendureConfig = { - // Add an instance of the plugin to the plugins array - plugins: [ - new DefaultSearchPlugin(), - ], -}; -``` - -{{% alert "warning" %}} -Note that the quality of the fulltext search capabilities varies depending on the underlying database being used. For example, the MySQL & Postgres implementations will typically yield better results than the SQLite implementation. -{{% /alert %}} diff --git a/packages/core/e2e/default-search-plugin.e2e-spec.ts b/packages/core/e2e/default-search-plugin.e2e-spec.ts index 4c62fa6411..96913f86cb 100644 --- a/packages/core/e2e/default-search-plugin.e2e-spec.ts +++ b/packages/core/e2e/default-search-plugin.e2e-spec.ts @@ -45,7 +45,7 @@ describe('Default search plugin', () => { customerCount: 1, }, { - plugins: [new DefaultSearchPlugin()], + plugins: [new DefaultSearchPlugin({ runInForkedProcess: false })], }, ); await adminClient.init(); diff --git a/packages/core/src/plugin/default-search-plugin/constants.ts b/packages/core/src/plugin/default-search-plugin/constants.ts new file mode 100644 index 0000000000..d6563911cf --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/constants.ts @@ -0,0 +1 @@ +export const SEARCH_PLUGIN_OPTIONS = Symbol('SEARCH_PLUGIN_OPTIONS'); diff --git a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts index d8656ccb92..8e9cb65283 100644 --- a/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts +++ b/packages/core/src/plugin/default-search-plugin/default-search-plugin.ts @@ -13,9 +13,10 @@ import { CollectionModificationEvent } from '../../event-bus/events/collection-m import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event'; import { SearchService } from '../../service/services/search.service'; +import { SEARCH_PLUGIN_OPTIONS } from './constants'; import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver'; import { FulltextSearchService } from './fulltext-search.service'; -import { SearchIndexService } from './search-index.service'; +import { SearchIndexService } from './indexer/search-index.service'; import { SearchIndexItem } from './search-index-item.entity'; export interface DefaultSearchReindexResponse extends SearchReindexResponse { @@ -23,28 +24,82 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse { indexedItemCount: number; } +/** + * @description + * Options for configuring the DefaultSearchPlugin. + * + * @docsCategory DefaultSearchPlugin + */ +export interface DefaultSearchPluginOptions { + /** + * @description + * By default, the DefaultSearchPlugin will spawn a background process which is responsible + * for updating the search index. By setting this option to `false`, indexing will be + * performed on the main server process instead. Usually this is undesirable as performance will + * be degraded during indexing, but the option is useful for certain debugging and testing scenarios. + * @default true + */ + runInForkedProcess: boolean; +} + +/** + * @description + * The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the + * underlying database. + * + * The DefaultSearchPlugin is bundled with the `@vendure/core` package. If you are not using an alternative search + * plugin, then make sure this one is used, otherwise you will not be able to search products via the [`search` query](/docs/graphql-api/shop/queries#search). + * + * @example + * ```ts + * import { DefaultSearchPlugin } from '@vendure/core'; + * + * const config: VendureConfig = { + * // Add an instance of the plugin to the plugins array + * plugins: [ + * new DefaultSearchPlugin(), + * ], + * }; + * ``` + * + * {{% alert "warning" %}} + * Note that the quality of the fulltext search capabilities varies depending on the underlying database being used. For example, the MySQL & Postgres implementations will typically yield better results than the SQLite implementation. + * {{% /alert %}} + * + * @docsCategory DefaultSearchPlugin + */ export class DefaultSearchPlugin implements VendurePlugin { + private readonly options: DefaultSearchPluginOptions; + + constructor(options?: DefaultSearchPluginOptions) { + const defaultOptions: DefaultSearchPluginOptions = { + runInForkedProcess: true, + }; + this.options = { ...defaultOptions, ...options }; + } + + /** @internal */ async onBootstrap(inject: (type: Type) => T): Promise { const eventBus = inject(EventBus); - const fulltextSearchService = inject(FulltextSearchService); const searchIndexService = inject(SearchIndexService); eventBus.subscribe(CatalogModificationEvent, event => { if (event.entity instanceof Product || event.entity instanceof ProductVariant) { - return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity); + return searchIndexService.updateProductOrVariant(event.ctx, event.entity); } }); eventBus.subscribe(CollectionModificationEvent, event => { - return fulltextSearchService.updateVariantsById(event.ctx, event.productVariantIds); + return searchIndexService.updateVariantsById(event.ctx, event.productVariantIds); }); eventBus.subscribe(TaxRateModificationEvent, event => { const defaultTaxZone = event.ctx.channel.defaultTaxZone; if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) { - return fulltextSearchService.reindex(event.ctx); + return searchIndexService.reindex(event.ctx).start(); } }); await searchIndexService.connect(); } + /** @internal */ extendAdminAPI(): APIExtensionDefinition { return { resolvers: [AdminFulltextSearchResolver], @@ -57,6 +112,7 @@ export class DefaultSearchPlugin implements VendurePlugin { }; } + /** @internal */ extendShopAPI(): APIExtensionDefinition { return { resolvers: [ShopFulltextSearchResolver], @@ -69,11 +125,18 @@ export class DefaultSearchPlugin implements VendurePlugin { }; } + /** @internal */ defineEntities(): Array> { return [SearchIndexItem]; } + /** @internal */ defineProviders(): Provider[] { - return [FulltextSearchService, SearchIndexService, { provide: SearchService, useClass: FulltextSearchService }]; + return [ + FulltextSearchService, + SearchIndexService, + { provide: SearchService, useClass: FulltextSearchService }, + { provide: SEARCH_PLUGIN_OPTIONS, useFactory: () => this.options }, + ]; } } diff --git a/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts b/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts index 965e0e6d9b..fdbb04174e 100644 --- a/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts +++ b/packages/core/src/plugin/default-search-plugin/fulltext-search.service.ts @@ -1,25 +1,19 @@ import { Injectable } from '@nestjs/common'; import { InjectConnection } from '@nestjs/typeorm'; -import { JobInfo, LanguageCode, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types'; +import { JobInfo, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types'; import { Omit } from '@vendure/common/lib/omit'; -import { ID } from '@vendure/common/lib/shared-types'; -import { unique } from '@vendure/common/lib/unique'; import { Connection } from 'typeorm'; import { RequestContext } from '../../api/common/request-context'; import { InternalServerError } from '../../common/error/errors'; -import { Logger } from '../../config/logger/vendure-logger'; -import { FacetValue, Product, ProductVariant } from '../../entity'; +import { FacetValue } from '../../entity'; import { EventBus } from '../../event-bus/event-bus'; -import { translateDeep } from '../../service/helpers/utils/translate-entity'; import { FacetValueService } from '../../service/services/facet-value.service'; import { JobService } from '../../service/services/job.service'; import { ProductVariantService } from '../../service/services/product-variant.service'; import { SearchService } from '../../service/services/search.service'; -import { AsyncQueue } from './async-queue'; -import { SearchIndexItem } from './search-index-item.entity'; -import { SearchIndexService } from './search-index.service'; +import { SearchIndexService } from './indexer/search-index.service'; import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy'; import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy'; import { SearchStrategy } from './search-strategy/search-strategy'; @@ -31,20 +25,8 @@ import { SqliteSearchStrategy } from './search-strategy/sqlite-search-strategy'; */ @Injectable() export class FulltextSearchService implements SearchService { - private taskQueue = new AsyncQueue('search-service', 1); private searchStrategy: SearchStrategy; private readonly minTermLength = 2; - private readonly variantRelations = [ - 'product', - 'product.featuredAsset', - 'product.facetValues', - 'product.facetValues.facet', - 'featuredAsset', - 'facetValues', - 'facetValues.facet', - 'collections', - 'taxCategory', - ]; constructor( @InjectConnection() private connection: Connection, @@ -94,74 +76,11 @@ export class FulltextSearchService implements SearchService { * Rebuilds the full search index. */ async reindex(ctx: RequestContext): Promise { - const job = this.jobService.startJob('reindex', async reporter => { - return this.searchIndexService.reindex(ctx, reporter); - }); + const job = this.searchIndexService.reindex(ctx); + job.start(); return job; } - /** - * Updates the search index only for the affected entities. - */ - async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) { - let updatedVariants: ProductVariant[] = []; - let removedVariantIds: ID[] = []; - if (updatedEntity instanceof Product) { - const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, { - relations: ['variants'], - }); - if (product) { - if (product.deletedAt) { - removedVariantIds = product.variants.map(v => v.id); - } else { - updatedVariants = await this.connection - .getRepository(ProductVariant) - .findByIds(product.variants.map(v => v.id), { - relations: this.variantRelations, - }); - if (product.enabled === false) { - updatedVariants.forEach(v => v.enabled = false); - } - } - } - } else { - const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, { - relations: this.variantRelations, - }); - if (variant) { - updatedVariants = [variant]; - } - } - await this.taskQueue.push(async () => { - if (updatedVariants.length) { - await this.saveSearchIndexItems(ctx, updatedVariants); - } - if (removedVariantIds.length) { - await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds); - } - }); - - } - - async updateVariantsById(ctx: RequestContext, ids: ID[]) { - if (ids.length) { - const BATCH_SIZE = 100; - const batches = Math.ceil(ids.length / BATCH_SIZE); - for (let i = 0; i < batches; i++) { - const begin = i * BATCH_SIZE; - const end = begin + BATCH_SIZE; - Logger.verbose(`Updating ids from index ${begin} to ${end}`); - const batch = ids.slice(begin, end); - const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(batch, { - relations: this.variantRelations, - }); - this.taskQueue.push(async () => { - await this.saveSearchIndexItems(ctx, updatedVariants); - }); - } - } - } - /** * Sets the SearchStrategy appropriate to th configured database type. */ @@ -182,60 +101,4 @@ export class FulltextSearchService implements SearchService { throw new InternalServerError(`error.database-not-supported-by-default-search-plugin`); } } - - /** - * Add or update items in the search index - */ - private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) { - const items = variants - .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx)) - .map(v => translateDeep(v, ctx.languageCode, ['product'])) - .map( - v => - new SearchIndexItem({ - sku: v.sku, - enabled: v.enabled, - slug: v.product.slug, - price: v.price, - priceWithTax: v.priceWithTax, - languageCode: ctx.languageCode, - productVariantId: v.id, - productId: v.product.id, - productName: v.product.name, - description: v.product.description, - productVariantName: v.name, - productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '', - productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '', - facetIds: this.getFacetIds(v), - facetValueIds: this.getFacetValueIds(v), - collectionIds: v.collections.map(c => c.id.toString()), - }), - ); - return this.connection.getRepository(SearchIndexItem).save(items); - } - - /** - * Remove items from the search index - */ - private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) { - const compositeKeys = variantIds.map(id => ({ - productVariantId: id, - languageCode, - })) as any[]; - await this.connection.getRepository(SearchIndexItem).delete(compositeKeys); - } - - private getFacetIds(variant: ProductVariant): string[] { - const facetIds = (fv: FacetValue) => fv.facet.id.toString(); - const variantFacetIds = variant.facetValues.map(facetIds); - const productFacetIds = variant.product.facetValues.map(facetIds); - return unique([...variantFacetIds, ...productFacetIds]); - } - - private getFacetValueIds(variant: ProductVariant): string[] { - const facetValueIds = (fv: FacetValue) => fv.id.toString(); - const variantFacetValueIds = variant.facetValues.map(facetValueIds); - const productFacetValueIds = variant.product.facetValues.map(facetValueIds); - return unique([...variantFacetValueIds, ...productFacetValueIds]); - } } diff --git a/packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts b/packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts new file mode 100644 index 0000000000..0af751c760 --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/indexer/index-builder.ts @@ -0,0 +1,163 @@ +import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm'; +import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; + +import { ID, Type } from '../../../../../common/lib/shared-types'; +import { unique } from '../../../../../common/lib/unique'; +import { RequestContext } from '../../../api/common/request-context'; +import { FacetValue } from '../../../entity/facet-value/facet-value.entity'; +import { ProductVariant } from '../../../entity/product-variant/product-variant.entity'; +import { SearchIndexItem } from '../search-index-item.entity'; + +import { CompletedMessage, ConnectedMessage, Message, MessageType, ReturnRawBatchMessage, SaveVariantsPayload, VariantsSavedMessage } from './ipc'; + +export const BATCH_SIZE = 500; +export const variantRelations = [ + 'product', + 'product.featuredAsset', + 'product.facetValues', + 'product.facetValues.facet', + 'featuredAsset', + 'facetValues', + 'facetValues.facet', + 'collections', + 'taxCategory', +]; + +export function getSearchIndexQueryBuilder(connection: Connection) { + const qb = connection.getRepository(ProductVariant).createQueryBuilder('variants'); + FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, { + relations: variantRelations, + }); + FindOptionsUtils.joinEagerRelations(qb, qb.alias, connection.getMetadata(ProductVariant)); + return qb; +} + +/** + * This class is responsible for all updates to the search index. + */ +export class IndexBuilder { + private connection: Connection; + private indexQueryBuilder: SelectQueryBuilder; + private onMessageHandlers = new Set<(message: string) => void>(); + + /** + * When running in the main process, it should be constructed with the existing connection. + * Otherwise, the connection will be created in the .connect() method in response to an + * IPC message. + */ + constructor(connection?: Connection) { + if (connection) { + this.connection = connection; + this.indexQueryBuilder = getSearchIndexQueryBuilder(this.connection); + } + } + + processMessage(message: Message): Promise { + switch (message.type) { + case MessageType.CONNECTION_OPTIONS: { + return this.connect(message.value); + } + case MessageType.GET_RAW_BATCH: { + return this.getRawBatch(message.value.batchNumber); + } + case MessageType.GET_RAW_BATCH_BY_IDS: { + return this.getRawBatchByIds(message.value.ids); + } + case MessageType.SAVE_VARIANTS: { + return this.saveVariants(message.value); + } + default: + return Promise.resolve(undefined); + } + } + + async processMessageAndEmitResult(message: Message) { + const result = await this.processMessage(message); + if (result) { + result.channelId = message.channelId; + this.onMessageHandlers.forEach(handler => { + handler(JSON.stringify(result)); + }); + } + } + + addMessageListener(handler: (message: string) => void) { + this.onMessageHandlers.add(handler); + } + + removeMessageListener(handler: (message: string) => void) { + this.onMessageHandlers.delete(handler); + } + + private async connect(dbConnectionOptions: ConnectionOptions): Promise { + const {coreEntitiesMap} = await import('../../../entity/entities'); + const coreEntities = Object.values(coreEntitiesMap) as Array>; + this.connection = await createConnection({...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities]}); + this.indexQueryBuilder = getSearchIndexQueryBuilder(this.connection); + return new ConnectedMessage(this.connection.isConnected); + } + + private async getRawBatchByIds(ids: ID[]): Promise { + const variants = await this.connection.getRepository(ProductVariant).findByIds(ids, { + relations: variantRelations, + }); + return new ReturnRawBatchMessage({variants}); + } + + private async getRawBatch(batchNumber: string | number): Promise { + const i = Number.parseInt(batchNumber.toString(), 10); + const variants = await this.indexQueryBuilder + .where('variants__product.deletedAt IS NULL') + .take(BATCH_SIZE) + .skip(i * BATCH_SIZE) + .getMany(); + + return new ReturnRawBatchMessage({variants}); + } + + private async saveVariants(payload: SaveVariantsPayload): Promise { + const {variants, ctx, batch, total} = payload; + const requestContext = new RequestContext(ctx); + + const items = variants.map((v: ProductVariant) => + new SearchIndexItem({ + sku: v.sku, + enabled: v.enabled, + slug: v.product.slug, + price: v.price, + priceWithTax: v.priceWithTax, + languageCode: requestContext.languageCode, + productVariantId: v.id, + productId: v.product.id, + productName: v.product.name, + description: v.product.description, + productVariantName: v.name, + productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '', + productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '', + facetIds: this.getFacetIds(v), + facetValueIds: this.getFacetValueIds(v), + collectionIds: v.collections.map(c => c.id.toString()), + }), + ); + await this.connection.getRepository(SearchIndexItem).save(items); + if (batch === total - 1) { + return new CompletedMessage(true); + } else { + return new VariantsSavedMessage({batchNumber: batch}); + } + } + + private getFacetIds(variant: ProductVariant): string[] { + const facetIds = (fv: FacetValue) => fv.facet.id.toString(); + const variantFacetIds = variant.facetValues.map(facetIds); + const productFacetIds = variant.product.facetValues.map(facetIds); + return unique([...variantFacetIds, ...productFacetIds]); + } + + private getFacetValueIds(variant: ProductVariant): string[] { + const facetValueIds = (fv: FacetValue) => fv.id.toString(); + const variantFacetValueIds = variant.facetValues.map(facetValueIds); + const productFacetValueIds = variant.product.facetValues.map(facetValueIds); + return unique([...variantFacetValueIds, ...productFacetValueIds]); + } +} diff --git a/packages/core/src/plugin/default-search-plugin/indexer/ipc.ts b/packages/core/src/plugin/default-search-plugin/indexer/ipc.ts new file mode 100644 index 0000000000..3bd84eb24b --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/indexer/ipc.ts @@ -0,0 +1,154 @@ +import { ID } from '@vendure/common/lib/shared-types'; +import { ChildProcess } from 'child_process'; +import { ConnectionOptions } from 'typeorm'; + +import { RequestContext } from '../../../api/common/request-context'; +import { ProductVariant } from '../../../entity/product-variant/product-variant.entity'; + +import { IndexBuilder } from './index-builder'; + +export enum MessageType { + CONNECTION_OPTIONS, + CONNECTED, + GET_RAW_BATCH, + GET_RAW_BATCH_BY_IDS, + RETURN_RAW_BATCH, + SAVE_VARIANTS, + VARIANTS_SAVED, + COMPLETED, +} + +export interface SaveVariantsPayload { + variants: ProductVariant[]; + ctx: RequestContext; + batch: number; + total: number; +} + +export interface IPCMessage { + type: MessageType; + value: any; + channelId: string; +} + +export class ConnectionOptionsMessage implements IPCMessage { + readonly type = MessageType.CONNECTION_OPTIONS; + channelId: string; + constructor(public value: ConnectionOptions) {} +} + +export class ConnectedMessage implements IPCMessage { + readonly type = MessageType.CONNECTED; + channelId: string; + constructor(public value: boolean) {} +} + +export class GetRawBatchMessage implements IPCMessage { + readonly type = MessageType.GET_RAW_BATCH; + channelId: string; + constructor(public value: { batchNumber: number; }) {} +} + +export class GetRawBatchByIdsMessage implements IPCMessage { + readonly type = MessageType.GET_RAW_BATCH_BY_IDS; + channelId: string; + constructor(public value: { ids: ID[]; }) {} +} + +export class ReturnRawBatchMessage implements IPCMessage { + readonly type = MessageType.RETURN_RAW_BATCH; + channelId: string; + constructor(public value: { variants: ProductVariant[]; }) {} +} + +export class SaveVariantsMessage implements IPCMessage { + readonly type = MessageType.SAVE_VARIANTS; + channelId: string; + constructor(public value: SaveVariantsPayload) {} +} + +export class VariantsSavedMessage implements IPCMessage { + readonly type = MessageType.VARIANTS_SAVED; + channelId: string; + constructor(public value: { batchNumber: number; }) {} +} + +export class CompletedMessage implements IPCMessage { + readonly type = MessageType.COMPLETED; + channelId: string; + constructor(public value: boolean) {} +} + +export type Message = ConnectionOptionsMessage | + ConnectedMessage | + GetRawBatchMessage | + GetRawBatchByIdsMessage | + ReturnRawBatchMessage | + SaveVariantsMessage | + VariantsSavedMessage | + CompletedMessage; + +export type MessageOfType = Extract; + +export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) { + // tslint:disable-next-line:no-non-null-assertion + target.send!(JSON.stringify(message)); +} + +/** + * An IpcChannel allows safe communication between main thread and worker. It achieves + * this by adding a unique ID to each outgoing message, which the worker then adds + * to any responses. + * + * If the `target` is an instance of IndexBuilder running on the main process (not in + * a worker thread), then the channel interacts directly with it, whilst keeping the + * differences abstracted away from the consuming code. + */ +export class IpcChannel { + private readonly channelId = Math.random().toString(32); + private handlers: Array<(m: string) => void> = []; + constructor(private readonly target: NodeJS.Process | ChildProcess | IndexBuilder) {} + + /** + * Send a message to the worker process. + */ + send(message: Message) { + message.channelId = this.channelId; + if (this.target instanceof IndexBuilder) { + this.target.processMessageAndEmitResult(message); + } else { + sendIPCMessage(this.target, message); + } + } + + /** + * Subscribes to the given IPC message which is sent from the worker in response to a message + * send with the `send()` method. + */ + subscribe(messageType: T, callback: (message: MessageOfType) => void): void { + const handler = (messageString: string) => { + const message = JSON.parse(messageString) as Message; + if (message.type === messageType && message.channelId === this.channelId) { + callback(message as MessageOfType); + } + }; + if (this.target instanceof IndexBuilder) { + this.target.addMessageListener(handler); + } else { + this.target.on('message', handler); + } + this.handlers.push(handler); + } + + /** + * Clean up all event listeners created by subscriptions. + */ + close() { + const target = this.target; + if (target instanceof IndexBuilder) { + this.handlers.forEach(handler => target.removeMessageListener(handler)); + } else { + this.handlers.forEach(handler => target.off('message', handler)); + } + } +} diff --git a/packages/core/src/plugin/default-search-plugin/indexer/search-index-worker.ts b/packages/core/src/plugin/default-search-plugin/indexer/search-index-worker.ts new file mode 100644 index 0000000000..08d5ec69cb --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/indexer/search-index-worker.ts @@ -0,0 +1,16 @@ +/* tslint:disable:no-non-null-assertion no-console */ +import { IndexBuilder } from './index-builder'; +import { ConnectionOptionsMessage, GetRawBatchByIdsMessage, GetRawBatchMessage, SaveVariantsMessage, sendIPCMessage } from './ipc'; + +export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | GetRawBatchByIdsMessage | SaveVariantsMessage; + +const indexBuilder = new IndexBuilder(); + +process.on('message', async (messageString) => { + const message: IncomingMessage = JSON.parse(messageString); + const result = await indexBuilder.processMessage(message); + if (result) { + result.channelId = message.channelId; + sendIPCMessage(process, result); + } +}); diff --git a/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts b/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts new file mode 100644 index 0000000000..8d80a25984 --- /dev/null +++ b/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts @@ -0,0 +1,312 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { InjectConnection } from '@nestjs/typeorm'; +import { LanguageCode } from '@vendure/common/lib/generated-types'; +import { pick } from '@vendure/common/lib/pick'; +import { ID } from '@vendure/common/lib/shared-types'; +import { ChildProcess, fork } from 'child_process'; +import fs from 'fs-extra'; +import path from 'path'; +import { Connection } from 'typeorm'; + +import { RequestContext } from '../../../api/common/request-context'; +import { ConfigService } from '../../../config/config.service'; +import { Logger } from '../../../config/logger/vendure-logger'; +import { ProductVariant } from '../../../entity/product-variant/product-variant.entity'; +import { Product } from '../../../entity/product/product.entity'; +import { Job } from '../../../service/helpers/job-manager/job'; +import { translateDeep } from '../../../service/helpers/utils/translate-entity'; +import { JobService } from '../../../service/services/job.service'; +import { ProductVariantService } from '../../../service/services/product-variant.service'; +import { SEARCH_PLUGIN_OPTIONS } from '../constants'; +import { DefaultSearchPluginOptions } from '../default-search-plugin'; +import { SearchIndexItem } from '../search-index-item.entity'; + +import { BATCH_SIZE, getSearchIndexQueryBuilder, IndexBuilder, variantRelations } from './index-builder'; +import { + CompletedMessage, + ConnectedMessage, + ConnectionOptionsMessage, + GetRawBatchByIdsMessage, + GetRawBatchMessage, + IpcChannel, + MessageType, + ReturnRawBatchMessage, + SaveVariantsMessage, + VariantsSavedMessage, +} from './ipc'; + +export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage; +const loggerCtx = 'DefaultSearchPlugin'; + +/** + * This service is responsible for all writes to the search index. It works together with the SearchIndexWorker + * process to perform these often resource-intensive tasks in another thread, which keeps the main + * server thread responsive. + */ +@Injectable() +export class SearchIndexService { + private workerProcess: ChildProcess | IndexBuilder; + private restartAttempts = 0; + + constructor(@InjectConnection() private connection: Connection, + @Inject(SEARCH_PLUGIN_OPTIONS) private options: DefaultSearchPluginOptions, + private productVariantService: ProductVariantService, + private jobService: JobService, + private configService: ConfigService) {} + + /** + * Creates the search index worker process and has it connect to the database. + */ + async connect() { + if (this.options.runInForkedProcess) { + try { + const workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts')); + Logger.verbose(`IndexBuilder running as forked process`, loggerCtx); + workerProcess.on('error', err => { + Logger.error(`IndexBuilder worker error: ` + err.message, loggerCtx); + }); + workerProcess.on('close', () => { + this.restartAttempts++; + Logger.error(`IndexBuilder worker process died!`, loggerCtx); + if (this.restartAttempts <= 10) { + Logger.error(`Attempting to restart (${this.restartAttempts})...`, loggerCtx); + this.connect(); + } else { + Logger.error(`Too many failed restart attempts. Sorry!`); + } + }); + await this.establishConnection(workerProcess); + this.workerProcess = workerProcess; + } catch (e) { + Logger.error(e); + } + + } else { + this.workerProcess = new IndexBuilder(this.connection); + Logger.verbose(`IndexBuilder running in main process`, loggerCtx); + } + } + + reindex(ctx: RequestContext): Job { + return this.jobService.createJob({ + name: 'reindex', + singleInstance: true, + work: async reporter => { + const timeStart = Date.now(); + const qb = getSearchIndexQueryBuilder(this.connection); + const count = await qb.where('variants__product.deletedAt IS NULL').getCount(); + Logger.verbose(`Reindexing ${count} variants`, loggerCtx); + const batches = Math.ceil(count / BATCH_SIZE); + + await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode }); + Logger.verbose('Deleted existing index items', loggerCtx); + + return new Promise(async (resolve, reject) => { + const ipcChannel = new IpcChannel(this.workerProcess); + ipcChannel.subscribe(MessageType.COMPLETED, message => { + Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, loggerCtx); + ipcChannel.close(); + resolve({ + success: true, + indexedItemCount: count, + timeTaken: Date.now() - timeStart, + }); + }); + ipcChannel.subscribe(MessageType.VARIANTS_SAVED, message => { + reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100)); + Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, loggerCtx); + }); + + for (let i = 0; i < batches; i++) { + Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx); + + const variants = await this.getBatch(this.workerProcess, i); + const hydratedVariants = this.hydrateVariants(ctx, variants); + + ipcChannel.send(new SaveVariantsMessage({ + variants: hydratedVariants, + ctx, + batch: i, + total: batches, + })); + } + }); + }, + }); + } + + /** + * Updates the search index only for the affected entities. + */ + async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) { + let updatedVariants: ProductVariant[] = []; + let removedVariantIds: ID[] = []; + if (updatedEntity instanceof Product) { + const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, { + relations: ['variants'], + }); + if (product) { + if (product.deletedAt) { + removedVariantIds = product.variants.map(v => v.id); + } else { + updatedVariants = await this.connection + .getRepository(ProductVariant) + .findByIds(product.variants.map(v => v.id), { + relations: variantRelations, + }); + if (product.enabled === false) { + updatedVariants.forEach(v => v.enabled = false); + } + } + } + } else { + const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, { + relations: variantRelations, + }); + if (variant) { + updatedVariants = [variant]; + } + } + + if (updatedVariants.length) { + await this.saveSearchIndexItems(ctx, updatedVariants); + } + if (removedVariantIds.length) { + await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds); + } + } + + async updateVariantsById(ctx: RequestContext, ids: ID[]) { + return new Promise(async resolve => { + if (ids.length) { + const ipcChannel = new IpcChannel(this.workerProcess); + const batches = Math.ceil(ids.length / BATCH_SIZE); + Logger.verbose(`Updating ${ids.length} variants...`); + + ipcChannel.subscribe(MessageType.COMPLETED, message => { + Logger.verbose(`Completed updating variants`); + ipcChannel.close(); + resolve(); + }); + + for (let i = 0; i < batches; i++) { + const begin = i * BATCH_SIZE; + const end = begin + BATCH_SIZE; + Logger.verbose(`Updating ids from index ${begin} to ${end}`); + const batchIds = ids.slice(begin, end); + const batch = await this.getBatchByIds(this.workerProcess, batchIds); + const variants = this.hydrateVariants(ctx, batch); + + ipcChannel.send(new SaveVariantsMessage({ variants, ctx, batch: i, total: batches })); + } + } else { + resolve(); + } + }); + } + + /** + * Add or update items in the search index + */ + private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) { + const items = this.hydrateVariants(ctx, variants); + Logger.verbose(`Updating search index for ${variants.length} variants`, loggerCtx); + return new Promise(resolve => { + const ipcChannel = new IpcChannel(this.workerProcess); + ipcChannel.subscribe(MessageType.COMPLETED, message => { + Logger.verbose(`Done!`, loggerCtx); + ipcChannel.close(); + resolve(); + }); + ipcChannel.send(new SaveVariantsMessage({ variants: items, ctx, batch: 0, total: 1 })); + }); + } + + /** + * Remove items from the search index + */ + private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) { + const compositeKeys = variantIds.map(id => ({ + productVariantId: id, + languageCode, + })) as any[]; + await this.connection.getRepository(SearchIndexItem).delete(compositeKeys); + } + + /** + * Given an array of ProductVariants, this method applies the correct taxes and translations. + */ + private hydrateVariants(ctx: RequestContext, variants: ProductVariant[]): ProductVariant[] { + return variants + .map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx)) + .map(v => translateDeep(v, ctx.languageCode, ['product'])); + } + + /** + * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to + * use either (attempts JS first). + */ + private getChildProcess(filename: string): ChildProcess { + const ext = path.extname(filename); + const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), ''); + let error: any; + try { + const jsFile = fileWithoutExt + '.js'; + if (fs.existsSync(jsFile)) { + return fork(jsFile, [], { execArgv: [] }); + } + } catch (e) { + // ignore and try ts + error = e; + } + try { + const tsFile = fileWithoutExt + '.ts'; + if (fs.existsSync(tsFile)) { + // Fork the TS file using ts-node. This is useful when running in dev mode or + // for e2e tests. + return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] }); + } + } catch (e) { + // ignore and thow at the end. + error = e; + } + throw error; + } + + private establishConnection(child: ChildProcess): Promise { + const connectionOptions = pick(this.configService.dbConnectionOptions as any, + ['type', 'name', 'database', 'host', 'port', 'username', 'password']); + return new Promise(resolve => { + const ipcChannel = new IpcChannel(child); + ipcChannel.subscribe(MessageType.CONNECTED, message => { + Logger.verbose(`IndexBuilder connection result: ${message.value}`, loggerCtx); + ipcChannel.close(); + resolve(message.value); + }); + ipcChannel.send(new ConnectionOptionsMessage(connectionOptions)); + }); + } + + private getBatch(child: ChildProcess | IndexBuilder, batch: number): Promise { + return new Promise(resolve => { + const ipcChannel = new IpcChannel(child); + ipcChannel.subscribe(MessageType.RETURN_RAW_BATCH, message => { + ipcChannel.close(); + resolve(message.value.variants); + }); + ipcChannel.send(new GetRawBatchMessage({ batchNumber: batch })); + }); + } + + private getBatchByIds(child: ChildProcess | IndexBuilder, ids: ID[]): Promise { + return new Promise(resolve => { + const ipcChannel = new IpcChannel(child); + ipcChannel.subscribe(MessageType.RETURN_RAW_BATCH, message => { + ipcChannel.close(); + resolve(message.value.variants); + }); + ipcChannel.send(new GetRawBatchByIdsMessage({ ids })); + }); + } + +} diff --git a/packages/core/src/plugin/default-search-plugin/ipc.ts b/packages/core/src/plugin/default-search-plugin/ipc.ts deleted file mode 100644 index afb9c1270e..0000000000 --- a/packages/core/src/plugin/default-search-plugin/ipc.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { ChildProcess } from 'child_process'; -import { ConnectionOptions } from 'typeorm'; - -import { RequestContext } from '../../api/common/request-context'; -import { ProductVariant } from '../../entity/product-variant/product-variant.entity'; - -export enum MessageType { - CONNECTION_OPTIONS, - CONNECTED, - GET_RAW_BATCH, - RETURN_RAW_BATCH, - SAVE_VARIANTS, - VARIANTS_SAVED, - COMPLETED, -} - -export interface SaveVariantsPayload { - variants: ProductVariant[]; - ctx: RequestContext; - batch: number; - total: number; -} - -export interface IPCMessage { - type: MessageType; -} - -export class ConnectionOptionsMessage implements IPCMessage { - readonly type = MessageType.CONNECTION_OPTIONS; - constructor(public value: ConnectionOptions) {} -} - -export class ConnectedMessage implements IPCMessage { - readonly type = MessageType.CONNECTED; - constructor(public value: boolean) {} -} - -export class GetRawBatchMessage implements IPCMessage { - readonly type = MessageType.GET_RAW_BATCH; - constructor(public value: { batchNumber: number; }) {} -} - -export class ReturnRawBatchMessage implements IPCMessage { - readonly type = MessageType.RETURN_RAW_BATCH; - constructor(public value: { variants: ProductVariant[]; }) {} -} - -export class SaveVariantsMessage implements IPCMessage { - readonly type = MessageType.SAVE_VARIANTS; - constructor(public value: SaveVariantsPayload) {} -} - -export class VariantsSavedMessage implements IPCMessage { - readonly type = MessageType.VARIANTS_SAVED; - constructor(public value: { batchNumber: number; }) {} -} - -export class CompletedMessage implements IPCMessage { - readonly type = MessageType.COMPLETED; - constructor(public value: boolean) {} -} - -export type Message = ConnectionOptionsMessage | - ConnectedMessage | - GetRawBatchMessage | - ReturnRawBatchMessage | - SaveVariantsMessage | - VariantsSavedMessage | - CompletedMessage; - -export type MessageOfType = Extract; - -export function sendIPCMessage(target: NodeJS.Process | ChildProcess, message: Message) { - // tslint:disable-next-line:no-non-null-assertion - target.send!(JSON.stringify({ type: message.type, value: message.value })); -} diff --git a/packages/core/src/plugin/default-search-plugin/search-index-worker.ts b/packages/core/src/plugin/default-search-plugin/search-index-worker.ts deleted file mode 100644 index efcb6e989f..0000000000 --- a/packages/core/src/plugin/default-search-plugin/search-index-worker.ts +++ /dev/null @@ -1,133 +0,0 @@ -/* tslint:disable:no-non-null-assertion no-console */ -import { Type } from '@vendure/common/lib/shared-types'; -import { unique } from '@vendure/common/lib/unique'; -import { Connection, ConnectionOptions, createConnection, SelectQueryBuilder } from 'typeorm'; -import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; - -import { RequestContext } from '../../api/common/request-context'; -import { FacetValue } from '../../entity/facet-value/facet-value.entity'; -import { ProductVariant } from '../../entity/product-variant/product-variant.entity'; - -import { - CompletedMessage, - ConnectedMessage, - ConnectionOptionsMessage, - GetRawBatchMessage, - MessageType, - ReturnRawBatchMessage, - SaveVariantsMessage, - SaveVariantsPayload, - sendIPCMessage, - VariantsSavedMessage, -} from './ipc'; -import { SearchIndexItem } from './search-index-item.entity'; - -export const BATCH_SIZE = 100; -export const variantRelations = [ - 'product', - 'product.featuredAsset', - 'product.facetValues', - 'product.facetValues.facet', - 'featuredAsset', - 'facetValues', - 'facetValues.facet', - 'collections', - 'taxCategory', -]; - -export type IncomingMessage = ConnectionOptionsMessage | GetRawBatchMessage | SaveVariantsMessage; - -export class SearchIndexWorker { - - private connection: Connection; - private indexQueryBuilder: SelectQueryBuilder; - - async connect(dbConnectionOptions: ConnectionOptions) { - const { coreEntitiesMap } = await import('../../entity/entities'); - const coreEntities = Object.values(coreEntitiesMap) as Array>; - this.connection = await createConnection({ ...dbConnectionOptions, entities: [SearchIndexItem, ...coreEntities] }); - - this.indexQueryBuilder = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants'); - FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(this.indexQueryBuilder, { - relations: variantRelations, - }); - FindOptionsUtils.joinEagerRelations(this.indexQueryBuilder, this.indexQueryBuilder.alias, this.connection.getMetadata(ProductVariant)); - - sendIPCMessage(process, new ConnectedMessage(this.connection.isConnected)); - } - - async getRawBatch(batchNumber: string | number) { - const i = Number.parseInt(batchNumber.toString(), 10); - const variants = await this.indexQueryBuilder - .where('variants__product.deletedAt IS NULL') - .take(BATCH_SIZE) - .skip(i * BATCH_SIZE) - .getMany(); - - sendIPCMessage(process, new ReturnRawBatchMessage({variants})); - } - - async saveVariants(payload: SaveVariantsPayload) { - const { variants, ctx, batch, total } = payload; - const requestContext = new RequestContext(ctx); - - const items = variants.map((v: ProductVariant) => - new SearchIndexItem({ - sku: v.sku, - enabled: v.enabled, - slug: v.product.slug, - price: v.price, - priceWithTax: v.priceWithTax, - languageCode: requestContext.languageCode, - productVariantId: v.id, - productId: v.product.id, - productName: v.product.name, - description: v.product.description, - productVariantName: v.name, - productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '', - productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '', - facetIds: this.getFacetIds(v), - facetValueIds: this.getFacetValueIds(v), - collectionIds: v.collections.map(c => c.id.toString()), - }), - ); - await this.connection.getRepository(SearchIndexItem).save(items); - sendIPCMessage(process, new VariantsSavedMessage({batchNumber: batch})); - if (batch === total - 1) { - sendIPCMessage(process, new CompletedMessage(true)); - } - } - - private getFacetIds(variant: ProductVariant): string[] { - const facetIds = (fv: FacetValue) => fv.facet.id.toString(); - const variantFacetIds = variant.facetValues.map(facetIds); - const productFacetIds = variant.product.facetValues.map(facetIds); - return unique([...variantFacetIds, ...productFacetIds]); - } - - private getFacetValueIds(variant: ProductVariant): string[] { - const facetValueIds = (fv: FacetValue) => fv.id.toString(); - const variantFacetValueIds = variant.facetValues.map(facetValueIds); - const productFacetValueIds = variant.product.facetValues.map(facetValueIds); - return unique([...variantFacetValueIds, ...productFacetValueIds]); - } -} - -const worker = new SearchIndexWorker(); - -process.on('message', (messageString) => { - const message: IncomingMessage = JSON.parse(messageString); - switch (message.type) { - case MessageType.CONNECTION_OPTIONS: - worker.connect(message.value); - break; - case MessageType.GET_RAW_BATCH: - worker.getRawBatch(message.value.batchNumber); - break; - case MessageType.SAVE_VARIANTS: - worker.saveVariants(message.value); - break; - default: - // ignore - } -}); diff --git a/packages/core/src/plugin/default-search-plugin/search-index.service.ts b/packages/core/src/plugin/default-search-plugin/search-index.service.ts deleted file mode 100644 index 57346cabeb..0000000000 --- a/packages/core/src/plugin/default-search-plugin/search-index.service.ts +++ /dev/null @@ -1,173 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { InjectConnection } from '@nestjs/typeorm'; -import { pick } from '@vendure/common/lib/pick'; -import { ChildProcess, fork } from 'child_process'; -import fs from 'fs-extra'; -import path from 'path'; -import { Connection } from 'typeorm'; -import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; - -import { RequestContext } from '../../api/common/request-context'; -import { ConfigService } from '../../config/config.service'; -import { Logger } from '../../config/logger/vendure-logger'; -import { ProductVariant } from '../../entity/product-variant/product-variant.entity'; -import { JobReporter } from '../../service/helpers/job-manager/job-manager'; -import { translateDeep } from '../../service/helpers/utils/translate-entity'; -import { ProductVariantService } from '../../service/services/product-variant.service'; - -import { - CompletedMessage, - ConnectedMessage, - ConnectionOptionsMessage, - GetRawBatchMessage, - MessageOfType, - MessageType, - ReturnRawBatchMessage, - SaveVariantsMessage, - sendIPCMessage, - VariantsSavedMessage, -} from './ipc'; -import { SearchIndexItem } from './search-index-item.entity'; -import { BATCH_SIZE, variantRelations } from './search-index-worker'; - -export type IncomingMessage = ConnectedMessage | ReturnRawBatchMessage | VariantsSavedMessage | CompletedMessage; - -@Injectable() -export class SearchIndexService { - private workerProcess: ChildProcess; - - constructor(@InjectConnection() private connection: Connection, - private productVariantService: ProductVariantService, - private configService: ConfigService) {} - - /** - * Creates the search index worker process and has it connect to the database. - */ - async connect() { - try { - this.workerProcess = this.getChildProcess(path.join(__dirname, 'search-index-worker.ts')); - } catch (e) { - Logger.error(e); - } - Logger.verbose(`Created search index worker process`, 'DefaultSearchPlugin'); - this.workerProcess.on('error', err => `Search index worker error: ` + err.message); - await this.establishConnection(this.workerProcess); - } - - async reindex(ctx: RequestContext, reporter: JobReporter) { - const timeStart = Date.now(); - Logger.verbose('Reindexing search index...', 'DefaultSearchPlugin'); - const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants'); - FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, { - relations: variantRelations, - }); - FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant)); - const count = await qb.where('variants__product.deletedAt IS NULL').getCount(); - Logger.verbose(`Getting ${count} variants`, 'DefaultSearchPlugin'); - const batches = Math.ceil(count / BATCH_SIZE); - - Logger.verbose('Deleting existing index items...', 'DefaultSearchPlugin'); - await this.connection.getRepository(SearchIndexItem).delete({ languageCode: ctx.languageCode }); - Logger.verbose('Deleted!', 'DefaultSearchPlugin'); - - return new Promise(async (resolve, reject) => { - this.subscribe(MessageType.COMPLETED, (message, unsubscribe) => { - Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`, 'DefaultSearchPlugin'); - unsubscribe(); - unsubscribeProgress(); - resolve({ - success: true, - indexedItemCount: count, - timeTaken: Date.now() - timeStart, - }); - }); - const unsubscribeProgress = this.subscribe(MessageType.VARIANTS_SAVED, (message, unsubscribe) => { - reporter.setProgress(Math.ceil(((message.value.batchNumber + 1) / batches) * 100)); - Logger.verbose(`Completed batch ${message.value.batchNumber + 1} of ${batches}`, 'DefaultSearchPlugin'); - }); - - for (let i = 0; i < batches; i++) { - Logger.verbose(`Processing batch ${i + 1} of ${batches}`, 'DefaultSearchPlugin'); - - const variants = await this.getBatch(this.workerProcess, i); - const items = variants - .map((v: any) => this.productVariantService.applyChannelPriceAndTax(v, ctx)) - .map((v: any) => translateDeep(v, ctx.languageCode, ['product'])); - - sendIPCMessage(this.workerProcess, new SaveVariantsMessage({ variants: items, ctx, batch: i, total: batches })); - } - }); - } - - /** - * Forks a child process based on the given filename. The filename can be a JS or TS file, as this method will attempt to - * use either (attempts JS first). - */ - private getChildProcess(filename: string): ChildProcess { - const ext = path.extname(filename); - const fileWithoutExt = filename.replace(new RegExp(`${ext}$`), ''); - let error: any; - try { - const jsFile = fileWithoutExt + '.js'; - if (fs.existsSync(jsFile)) { - return fork(jsFile, [], { execArgv: [] }); - } - } catch (e) { - // ignore and try ts - error = e; - } - try { - const tsFile = fileWithoutExt + '.ts'; - if (fs.existsSync(tsFile)) { - // Fork the TS file using ts-node. This is useful when running in dev mode or - // for e2e tests. - return fork(tsFile, [], { execArgv: ['-r', 'ts-node/register'] }); - } - } catch (e) { - // ignore and thow at the end. - error = e; - } - throw error; - } - - establishConnection(child: ChildProcess): Promise { - const connectionOptions = pick(this.configService.dbConnectionOptions as any, - ['type', 'name', 'database', 'host', 'port', 'username', 'password']); - return new Promise(resolve => { - sendIPCMessage(child, new ConnectionOptionsMessage(connectionOptions)); - this.subscribe(MessageType.CONNECTED, (message, unsubscribe) => { - Logger.verbose(`Connection result: ${message.value}`, 'DefaultSearchPlugin'); - unsubscribe(); - resolve(message.value); - }); - }); - } - - getBatch(child: ChildProcess, batch: number): Promise { - return new Promise(resolve => { - sendIPCMessage(child, new GetRawBatchMessage({ batchNumber: batch })); - this.subscribe(MessageType.RETURN_RAW_BATCH, (message, unsubscribe) => { - unsubscribe(); - resolve(message.value.variants); - }); - }); - } - - /** - * Subscribes to the given IPC message and executes the callback when the message is received. Returns an unsubscribe - * function which should be called to clean up the event listener. Alternatively, if only the first such event is - * important, call the `unsubscribe` function which is passed to the handler as the second argument. - */ - private subscribe(messageType: T, callback: (message: MessageOfType, unsubscribe: () => void) => any): () => void { - const handler = (messageString: string) => { - const message = JSON.parse(messageString) as IncomingMessage; - if (message.type === messageType) { - callback(message as MessageOfType, unsubscribe); - } - }; - const unsubscribe = () => this.workerProcess.off('message', handler); - this.workerProcess.on('message', handler); - return unsubscribe; - } - -} diff --git a/packages/core/src/service/helpers/job-manager/job-manager.spec.ts b/packages/core/src/service/helpers/job-manager/job-manager.spec.ts index f3be34a6d6..f42f57a4c1 100644 --- a/packages/core/src/service/helpers/job-manager/job-manager.spec.ts +++ b/packages/core/src/service/helpers/job-manager/job-manager.spec.ts @@ -13,15 +13,15 @@ describe('JobManager', () => { expect(jm.getOne('invalid')).toBeNull(); }); - it('startJob() returns a job', () => { + it('createJob() returns a job', () => { const jm = new JobManager(); - const job = jm.startJob('test', noop); + const job = jm.createJob('test', noop); expect(job.name).toBe('test'); }); it('getOne() returns job by id', () => { const jm = new JobManager(); - const job1 = jm.startJob('test', noop); + const job1 = jm.createJob('test', noop); const job2 = jm.getOne(job1.id); expect(job1.id).toBe(job2!.id); @@ -30,7 +30,8 @@ describe('JobManager', () => { it('job completes once work fn returns', async () => { const jm = new JobManager(); const subject = new Subject(); - const job = jm.startJob('test', () => subject.toPromise()); + const job = jm.createJob('test', () => subject.toPromise()); + job.start(); await tick(); expect(jm.getOne(job.id)!.state).toBe(JobState.RUNNING); @@ -47,7 +48,8 @@ describe('JobManager', () => { it('job fails if work fn throws', async () => { const jm = new JobManager(); const subject = new Subject(); - const job = jm.startJob('test', () => subject.toPromise()); + const job = jm.createJob('test', () => subject.toPromise()); + job.start(); await tick(); expect(jm.getOne(job.id)!.state).toBe(JobState.RUNNING); @@ -64,10 +66,11 @@ describe('JobManager', () => { const jm = new JobManager(); const subject = new Subject(); const progressSubject = new Subject(); - const job = jm.startJob('test', (reporter => { + const job = jm.createJob('test', (reporter => { progressSubject.subscribe(val => reporter.setProgress(val)); return subject.toPromise(); })); + job.start(); await tick(); expect(jm.getOne(job.id)!.progress).toBe(0); @@ -92,18 +95,18 @@ describe('JobManager', () => { it('getAll() returns all jobs', () => { const jm = new JobManager(); - const job1 = jm.startJob('job1', noop); - const job2 = jm.startJob('job2', noop); - const job3 = jm.startJob('job3', noop); + const job1 = jm.createJob('job1', noop); + const job2 = jm.createJob('job2', noop); + const job3 = jm.createJob('job3', noop); expect(jm.getAll().map(j => j.id)).toEqual([job1.id, job2.id, job3.id]); }); it('getAll() filters by id', () => { const jm = new JobManager(); - const job1 = jm.startJob('job1', noop); - const job2 = jm.startJob('job2', noop); - const job3 = jm.startJob('job3', noop); + const job1 = jm.createJob('job1', noop); + const job2 = jm.createJob('job2', noop); + const job3 = jm.createJob('job3', noop); expect(jm.getAll({ ids: [job1.id, job3.id]}).map(j => j.id)).toEqual([job1.id, job3.id]); }); @@ -111,9 +114,12 @@ describe('JobManager', () => { it('getAll() filters by state', async () => { const jm = new JobManager(); const subject = new Subject(); - const job1 = jm.startJob('job1', noop); - const job2 = jm.startJob('job2', noop); - const job3 = jm.startJob('job3', () => subject.toPromise()); + const job1 = jm.createJob('job1', noop); + const job2 = jm.createJob('job2', noop); + const job3 = jm.createJob('job3', () => subject.toPromise()); + job1.start(); + job2.start(); + job3.start(); await tick(); @@ -126,8 +132,10 @@ describe('JobManager', () => { const subject1 = new Subject(); const subject2 = new Subject(); - const job1 = jm.startJob('job1', () => subject1.toPromise()); - const job2 = jm.startJob('job2', () => subject2.toPromise()); + const job1 = jm.createJob('job1', () => subject1.toPromise()); + const job2 = jm.createJob('job2', () => subject2.toPromise()); + job1.start(); + job2.start(); subject1.complete(); await tick(); @@ -147,6 +155,16 @@ describe('JobManager', () => { { name: 'job2', state: JobState.RUNNING }, ]); }); + + it('findRunningJob() works', async () => { + const jm = new JobManager(); + const subject1 = new Subject(); + + const job1 = jm.createJob('job1', () => subject1.toPromise()); + job1.start(); + + expect(jm.findRunningJob('job1')).toBe(job1); + }); }); function tick(duration: number = 0) { diff --git a/packages/core/src/service/helpers/job-manager/job-manager.ts b/packages/core/src/service/helpers/job-manager/job-manager.ts index 00da75caee..42a700b019 100644 --- a/packages/core/src/service/helpers/job-manager/job-manager.ts +++ b/packages/core/src/service/helpers/job-manager/job-manager.ts @@ -1,4 +1,4 @@ -import { JobInfo, JobListInput } from '@vendure/common/lib/generated-types'; +import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-types'; import { pick } from '@vendure/common/lib/pick'; import ms = require('ms'); @@ -36,10 +36,9 @@ export class JobManager { * property of the job. If the function throws, the job will fail and the `result` property * will be the error thrown. */ - startJob(name: string, work: (reporter: JobReporter) => any | Promise): Job { + createJob(name: string, work: (reporter: JobReporter) => any | Promise): Job { const job = new Job(name, work); this.jobs.set(job.id, job); - job.start(); return job; } @@ -83,6 +82,10 @@ export class JobManager { }); } + findRunningJob(name: string): Job | undefined { + return Array.from(this.jobs.values()).find(job => job.name === name && job.state === JobState.RUNNING); + } + private toJobInfo(job: Job): JobInfo { const info = pick(job, ['id', 'name', 'state', 'progress', 'result', 'started', 'ended']); const duration = job.ended ? +job.ended - +info.started : Date.now() - +info.started; diff --git a/packages/core/src/service/helpers/job-manager/job.spec.ts b/packages/core/src/service/helpers/job-manager/job.spec.ts new file mode 100644 index 0000000000..6b0ecf3428 --- /dev/null +++ b/packages/core/src/service/helpers/job-manager/job.spec.ts @@ -0,0 +1,18 @@ +import { Job } from './job'; + +describe('Job', () => { + it('does not run work more than once', () => { + let counter = 0; + const job = new Job('test', () => { + counter++; + return new Promise(() => {}); + }); + job.start(); + + expect(counter).toBe(1); + + job.start(); + + expect(counter).toBe(1); + }); +}); diff --git a/packages/core/src/service/helpers/job-manager/job.ts b/packages/core/src/service/helpers/job-manager/job.ts index fb85446f28..55b644d5a6 100644 --- a/packages/core/src/service/helpers/job-manager/job.ts +++ b/packages/core/src/service/helpers/job-manager/job.ts @@ -22,6 +22,9 @@ export class Job { } async start() { + if (this.state !== JobState.PENDING) { + return; + } const reporter: JobReporter = { setProgress: (percentage: number) => { this.progress = Math.max(Math.min(percentage, 100), 0); diff --git a/packages/core/src/service/services/job.service.ts b/packages/core/src/service/services/job.service.ts index 7ba53aa2eb..59c9f5715e 100644 --- a/packages/core/src/service/services/job.service.ts +++ b/packages/core/src/service/services/job.service.ts @@ -1,9 +1,9 @@ import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; -import { JobInfo, JobListInput } from '@vendure/common/lib/generated-types'; -import ms = require('ms'); +import { JobInfo, JobListInput, JobState } from '@vendure/common/lib/generated-types'; import { Job } from '../helpers/job-manager/job'; import { JobManager, JobReporter } from '../helpers/job-manager/job-manager'; +import ms = require('ms'); @Injectable() export class JobService implements OnModuleInit, OnModuleDestroy { @@ -19,8 +19,19 @@ export class JobService implements OnModuleInit, OnModuleDestroy { global.clearInterval(this.cleanJobsTimer); } - startJob(name: string, work: (reporter: JobReporter) => any | Promise): Job { - return this.manager.startJob(name, work); + createJob(options: { + name: string; + work: (reporter: JobReporter) => any | Promise; + /** Limit this job to a single instance at a time */ + singleInstance?: boolean; + }): Job { + if (options.singleInstance === true) { + const runningInstance = this.manager.findRunningJob(options.name); + if (runningInstance) { + return runningInstance; + } + } + return this.manager.createJob(options.name, options.work); } getAll(input?: JobListInput): JobInfo[] {