Skip to content

Commit

Permalink
feat(core): Process all updates to the search index on worker thread
Browse files Browse the repository at this point in the history
The main server thread only reads from the search index. All updates are now performed via a worker running on another thread.
  • Loading branch information
michaelbromley committed Jun 5, 2019
1 parent b78354e commit fe40641
Show file tree
Hide file tree
Showing 17 changed files with 798 additions and 579 deletions.
24 changes: 0 additions & 24 deletions docs/content/docs/plugins/default-search-plugin.md

This file was deleted.

2 changes: 1 addition & 1 deletion packages/core/e2e/default-search-plugin.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('Default search plugin', () => {
customerCount: 1,
},
{
plugins: [new DefaultSearchPlugin()],
plugins: [new DefaultSearchPlugin({ runInForkedProcess: false })],
},
);
await adminClient.init();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const SEARCH_PLUGIN_OPTIONS = Symbol('SEARCH_PLUGIN_OPTIONS');
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,93 @@ import { CollectionModificationEvent } from '../../event-bus/events/collection-m
import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
import { SearchService } from '../../service/services/search.service';

import { SEARCH_PLUGIN_OPTIONS } from './constants';
import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
import { FulltextSearchService } from './fulltext-search.service';
import { SearchIndexService } from './search-index.service';
import { SearchIndexService } from './indexer/search-index.service';
import { SearchIndexItem } from './search-index-item.entity';

export interface DefaultSearchReindexResponse extends SearchReindexResponse {
timeTaken: number;
indexedItemCount: number;
}

/**
* @description
* Options for configuring the DefaultSearchPlugin.
*
* @docsCategory DefaultSearchPlugin
*/
export interface DefaultSearchPluginOptions {
/**
* @description
* By default, the DefaultSearchPlugin will spawn a background process which is responsible
* for updating the search index. By setting this option to `false`, indexing will be
* performed on the main server process instead. Usually this is undesirable as performance will
* be degraded during indexing, but the option is useful for certain debugging and testing scenarios.
* @default true
*/
runInForkedProcess: boolean;
}

/**
* @description
* The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the
* underlying database.
*
* The DefaultSearchPlugin is bundled with the `@vendure/core` package. If you are not using an alternative search
* plugin, then make sure this one is used, otherwise you will not be able to search products via the [`search` query](/docs/graphql-api/shop/queries#search).
*
* @example
* ```ts
* import { DefaultSearchPlugin } from '@vendure/core';
*
* const config: VendureConfig = {
* // Add an instance of the plugin to the plugins array
* plugins: [
* new DefaultSearchPlugin(),
* ],
* };
* ```
*
* {{% alert "warning" %}}
* Note that the quality of the fulltext search capabilities varies depending on the underlying database being used. For example, the MySQL & Postgres implementations will typically yield better results than the SQLite implementation.
* {{% /alert %}}
*
* @docsCategory DefaultSearchPlugin
*/
export class DefaultSearchPlugin implements VendurePlugin {
private readonly options: DefaultSearchPluginOptions;

constructor(options?: DefaultSearchPluginOptions) {
const defaultOptions: DefaultSearchPluginOptions = {
runInForkedProcess: true,
};
this.options = { ...defaultOptions, ...options };
}

/** @internal */
async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
const eventBus = inject(EventBus);
const fulltextSearchService = inject(FulltextSearchService);
const searchIndexService = inject(SearchIndexService);
eventBus.subscribe(CatalogModificationEvent, event => {
if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity);
return searchIndexService.updateProductOrVariant(event.ctx, event.entity);
}
});
eventBus.subscribe(CollectionModificationEvent, event => {
return fulltextSearchService.updateVariantsById(event.ctx, event.productVariantIds);
return searchIndexService.updateVariantsById(event.ctx, event.productVariantIds);
});
eventBus.subscribe(TaxRateModificationEvent, event => {
const defaultTaxZone = event.ctx.channel.defaultTaxZone;
if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
return fulltextSearchService.reindex(event.ctx);
return searchIndexService.reindex(event.ctx).start();
}
});
await searchIndexService.connect();
}

/** @internal */
extendAdminAPI(): APIExtensionDefinition {
return {
resolvers: [AdminFulltextSearchResolver],
Expand All @@ -57,6 +112,7 @@ export class DefaultSearchPlugin implements VendurePlugin {
};
}

/** @internal */
extendShopAPI(): APIExtensionDefinition {
return {
resolvers: [ShopFulltextSearchResolver],
Expand All @@ -69,11 +125,18 @@ export class DefaultSearchPlugin implements VendurePlugin {
};
}

/** @internal */
defineEntities(): Array<Type<any>> {
return [SearchIndexItem];
}

/** @internal */
defineProviders(): Provider[] {
return [FulltextSearchService, SearchIndexService, { provide: SearchService, useClass: FulltextSearchService }];
return [
FulltextSearchService,
SearchIndexService,
{ provide: SearchService, useClass: FulltextSearchService },
{ provide: SEARCH_PLUGIN_OPTIONS, useFactory: () => this.options },
];
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import { Injectable } from '@nestjs/common';
import { InjectConnection } from '@nestjs/typeorm';
import { JobInfo, LanguageCode, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
import { JobInfo, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
import { Omit } from '@vendure/common/lib/omit';
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';
import { Connection } from 'typeorm';

import { RequestContext } from '../../api/common/request-context';
import { InternalServerError } from '../../common/error/errors';
import { Logger } from '../../config/logger/vendure-logger';
import { FacetValue, Product, ProductVariant } from '../../entity';
import { FacetValue } from '../../entity';
import { EventBus } from '../../event-bus/event-bus';
import { translateDeep } from '../../service/helpers/utils/translate-entity';
import { FacetValueService } from '../../service/services/facet-value.service';
import { JobService } from '../../service/services/job.service';
import { ProductVariantService } from '../../service/services/product-variant.service';
import { SearchService } from '../../service/services/search.service';

import { AsyncQueue } from './async-queue';
import { SearchIndexItem } from './search-index-item.entity';
import { SearchIndexService } from './search-index.service';
import { SearchIndexService } from './indexer/search-index.service';
import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy';
import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy';
import { SearchStrategy } from './search-strategy/search-strategy';
Expand All @@ -31,20 +25,8 @@ import { SqliteSearchStrategy } from './search-strategy/sqlite-search-strategy';
*/
@Injectable()
export class FulltextSearchService implements SearchService {
private taskQueue = new AsyncQueue('search-service', 1);
private searchStrategy: SearchStrategy;
private readonly minTermLength = 2;
private readonly variantRelations = [
'product',
'product.featuredAsset',
'product.facetValues',
'product.facetValues.facet',
'featuredAsset',
'facetValues',
'facetValues.facet',
'collections',
'taxCategory',
];

constructor(
@InjectConnection() private connection: Connection,
Expand Down Expand Up @@ -94,74 +76,11 @@ export class FulltextSearchService implements SearchService {
* Rebuilds the full search index.
*/
async reindex(ctx: RequestContext): Promise<JobInfo> {
const job = this.jobService.startJob('reindex', async reporter => {
return this.searchIndexService.reindex(ctx, reporter);
});
const job = this.searchIndexService.reindex(ctx);
job.start();
return job;
}

/**
* Updates the search index only for the affected entities.
*/
async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
let updatedVariants: ProductVariant[] = [];
let removedVariantIds: ID[] = [];
if (updatedEntity instanceof Product) {
const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, {
relations: ['variants'],
});
if (product) {
if (product.deletedAt) {
removedVariantIds = product.variants.map(v => v.id);
} else {
updatedVariants = await this.connection
.getRepository(ProductVariant)
.findByIds(product.variants.map(v => v.id), {
relations: this.variantRelations,
});
if (product.enabled === false) {
updatedVariants.forEach(v => v.enabled = false);
}
}
}
} else {
const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, {
relations: this.variantRelations,
});
if (variant) {
updatedVariants = [variant];
}
}
await this.taskQueue.push(async () => {
if (updatedVariants.length) {
await this.saveSearchIndexItems(ctx, updatedVariants);
}
if (removedVariantIds.length) {
await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
}
});

}

async updateVariantsById(ctx: RequestContext, ids: ID[]) {
if (ids.length) {
const BATCH_SIZE = 100;
const batches = Math.ceil(ids.length / BATCH_SIZE);
for (let i = 0; i < batches; i++) {
const begin = i * BATCH_SIZE;
const end = begin + BATCH_SIZE;
Logger.verbose(`Updating ids from index ${begin} to ${end}`);
const batch = ids.slice(begin, end);
const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(batch, {
relations: this.variantRelations,
});
this.taskQueue.push(async () => {
await this.saveSearchIndexItems(ctx, updatedVariants);
});
}
}
}

/**
* Sets the SearchStrategy appropriate to th configured database type.
*/
Expand All @@ -182,60 +101,4 @@ export class FulltextSearchService implements SearchService {
throw new InternalServerError(`error.database-not-supported-by-default-search-plugin`);
}
}

/**
* Add or update items in the search index
*/
private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) {
const items = variants
.map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
.map(v => translateDeep(v, ctx.languageCode, ['product']))
.map(
v =>
new SearchIndexItem({
sku: v.sku,
enabled: v.enabled,
slug: v.product.slug,
price: v.price,
priceWithTax: v.priceWithTax,
languageCode: ctx.languageCode,
productVariantId: v.id,
productId: v.product.id,
productName: v.product.name,
description: v.product.description,
productVariantName: v.name,
productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
facetIds: this.getFacetIds(v),
facetValueIds: this.getFacetValueIds(v),
collectionIds: v.collections.map(c => c.id.toString()),
}),
);
return this.connection.getRepository(SearchIndexItem).save(items);
}

/**
* Remove items from the search index
*/
private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
const compositeKeys = variantIds.map(id => ({
productVariantId: id,
languageCode,
})) as any[];
await this.connection.getRepository(SearchIndexItem).delete(compositeKeys);
}

private getFacetIds(variant: ProductVariant): string[] {
const facetIds = (fv: FacetValue) => fv.facet.id.toString();
const variantFacetIds = variant.facetValues.map(facetIds);
const productFacetIds = variant.product.facetValues.map(facetIds);
return unique([...variantFacetIds, ...productFacetIds]);
}

private getFacetValueIds(variant: ProductVariant): string[] {
const facetValueIds = (fv: FacetValue) => fv.id.toString();
const variantFacetValueIds = variant.facetValues.map(facetValueIds);
const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
return unique([...variantFacetValueIds, ...productFacetValueIds]);
}
}
Loading

0 comments on commit fe40641

Please sign in to comment.