Skip to content

Commit

Permalink
Merge branch 'search-index-worker'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Jun 5, 2019
2 parents 59d8312 + fe40641 commit 4bfc132
Show file tree
Hide file tree
Showing 14 changed files with 802 additions and 233 deletions.
24 changes: 0 additions & 24 deletions docs/content/docs/plugins/default-search-plugin.md

This file was deleted.

2 changes: 1 addition & 1 deletion packages/core/e2e/default-search-plugin.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('Default search plugin', () => {
customerCount: 1,
},
{
plugins: [new DefaultSearchPlugin()],
plugins: [new DefaultSearchPlugin({ runInForkedProcess: false })],
},
);
await adminClient.init();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const SEARCH_PLUGIN_OPTIONS = Symbol('SEARCH_PLUGIN_OPTIONS');
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,93 @@ import { CollectionModificationEvent } from '../../event-bus/events/collection-m
import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
import { SearchService } from '../../service/services/search.service';

import { SEARCH_PLUGIN_OPTIONS } from './constants';
import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
import { FulltextSearchService } from './fulltext-search.service';
import { SearchIndexService } from './indexer/search-index.service';
import { SearchIndexItem } from './search-index-item.entity';

export interface DefaultSearchReindexResponse extends SearchReindexResponse {
timeTaken: number;
indexedItemCount: number;
}

/**
* @description
* Options for configuring the DefaultSearchPlugin.
*
* @docsCategory DefaultSearchPlugin
*/
export interface DefaultSearchPluginOptions {
/**
* @description
* By default, the DefaultSearchPlugin will spawn a background process which is responsible
* for updating the search index. By setting this option to `false`, indexing will be
* performed on the main server process instead. Usually this is undesirable as performance will
* be degraded during indexing, but the option is useful for certain debugging and testing scenarios.
* @default true
*/
runInForkedProcess: boolean;
}

/**
* @description
* The DefaultSearchPlugin provides a full-text Product search based on the full-text searching capabilities of the
* underlying database.
*
* The DefaultSearchPlugin is bundled with the `@vendure/core` package. If you are not using an alternative search
* plugin, then make sure this one is used, otherwise you will not be able to search products via the [`search` query](/docs/graphql-api/shop/queries#search).
*
* @example
* ```ts
* import { DefaultSearchPlugin } from '@vendure/core';
*
* const config: VendureConfig = {
* // Add an instance of the plugin to the plugins array
* plugins: [
* new DefaultSearchPlugin(),
* ],
* };
* ```
*
* {{% alert "warning" %}}
* Note that the quality of the fulltext search capabilities varies depending on the underlying database being used. For example, the MySQL & Postgres implementations will typically yield better results than the SQLite implementation.
* {{% /alert %}}
*
* @docsCategory DefaultSearchPlugin
*/
export class DefaultSearchPlugin implements VendurePlugin {
onBootstrap(inject: <T>(type: Type<T>) => T): void | Promise<void> {
private readonly options: DefaultSearchPluginOptions;

constructor(options?: DefaultSearchPluginOptions) {
const defaultOptions: DefaultSearchPluginOptions = {
runInForkedProcess: true,
};
this.options = { ...defaultOptions, ...options };
}

/** @internal */
async onBootstrap(inject: <T>(type: Type<T>) => T): Promise<void> {
const eventBus = inject(EventBus);
const fulltextSearchService = inject(FulltextSearchService);
const searchIndexService = inject(SearchIndexService);
eventBus.subscribe(CatalogModificationEvent, event => {
if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
return fulltextSearchService.updateProductOrVariant(event.ctx, event.entity);
return searchIndexService.updateProductOrVariant(event.ctx, event.entity);
}
});
eventBus.subscribe(CollectionModificationEvent, event => {
return fulltextSearchService.updateVariantsById(event.ctx, event.productVariantIds);
return searchIndexService.updateVariantsById(event.ctx, event.productVariantIds);
});
eventBus.subscribe(TaxRateModificationEvent, event => {
const defaultTaxZone = event.ctx.channel.defaultTaxZone;
if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
return fulltextSearchService.reindex(event.ctx);
return searchIndexService.reindex(event.ctx).start();
}
});
await searchIndexService.connect();
}

/** @internal */
extendAdminAPI(): APIExtensionDefinition {
return {
resolvers: [AdminFulltextSearchResolver],
Expand All @@ -54,6 +112,7 @@ export class DefaultSearchPlugin implements VendurePlugin {
};
}

/** @internal */
extendShopAPI(): APIExtensionDefinition {
return {
resolvers: [ShopFulltextSearchResolver],
Expand All @@ -66,11 +125,18 @@ export class DefaultSearchPlugin implements VendurePlugin {
};
}

/** @internal */
defineEntities(): Array<Type<any>> {
return [SearchIndexItem];
}

/** @internal */
defineProviders(): Provider[] {
return [FulltextSearchService, { provide: SearchService, useClass: FulltextSearchService }];
return [
FulltextSearchService,
SearchIndexService,
{ provide: SearchService, useClass: FulltextSearchService },
{ provide: SEARCH_PLUGIN_OPTIONS, useFactory: () => this.options },
];
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import { Injectable } from '@nestjs/common';
import { InjectConnection } from '@nestjs/typeorm';
import { JobInfo, LanguageCode, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
import { JobInfo, SearchInput, SearchResponse } from '@vendure/common/lib/generated-types';
import { Omit } from '@vendure/common/lib/omit';
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';
import { Connection } from 'typeorm';
import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';

import { RequestContext } from '../../api/common/request-context';
import { InternalServerError } from '../../common/error/errors';
import { Logger } from '../../config/logger/vendure-logger';
import { FacetValue, Product, ProductVariant } from '../../entity';
import { FacetValue } from '../../entity';
import { EventBus } from '../../event-bus/event-bus';
import { translateDeep } from '../../service/helpers/utils/translate-entity';
import { FacetValueService } from '../../service/services/facet-value.service';
import { JobService } from '../../service/services/job.service';
import { ProductVariantService } from '../../service/services/product-variant.service';
import { SearchService } from '../../service/services/search.service';

import { AsyncQueue } from './async-queue';
import { SearchIndexItem } from './search-index-item.entity';
import { SearchIndexService } from './indexer/search-index.service';
import { MysqlSearchStrategy } from './search-strategy/mysql-search-strategy';
import { PostgresSearchStrategy } from './search-strategy/postgres-search-strategy';
import { SearchStrategy } from './search-strategy/search-strategy';
Expand All @@ -31,27 +25,16 @@ import { SqliteSearchStrategy } from './search-strategy/sqlite-search-strategy';
*/
@Injectable()
export class FulltextSearchService implements SearchService {
private taskQueue = new AsyncQueue('search-service', 1);
private searchStrategy: SearchStrategy;
private readonly minTermLength = 2;
private readonly variantRelations = [
'product',
'product.featuredAsset',
'product.facetValues',
'product.facetValues.facet',
'featuredAsset',
'facetValues',
'facetValues.facet',
'collections',
'taxCategory',
];

constructor(
@InjectConnection() private connection: Connection,
private jobService: JobService,
private eventBus: EventBus,
private facetValueService: FacetValueService,
private productVariantService: ProductVariantService,
private searchIndexService: SearchIndexService,
) {
this.setSearchStrategy();
}
Expand Down Expand Up @@ -93,110 +76,11 @@ export class FulltextSearchService implements SearchService {
* Rebuilds the full search index.
*/
async reindex(ctx: RequestContext): Promise<JobInfo> {
const job = this.jobService.startJob('reindex', async reporter => {
const timeStart = Date.now();
const BATCH_SIZE = 100;
Logger.verbose('Reindexing search index...');
const qb = await this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
relations: this.variantRelations,
});
FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
Logger.verbose(`Getting ${count} variants`);
const batches = Math.ceil(count / BATCH_SIZE);

Logger.verbose('Deleting existing index items...');
await this.connection.getRepository(SearchIndexItem).delete({languageCode: ctx.languageCode});
Logger.verbose('Deleted!');

for (let i = 0; i < batches; i++) {
Logger.verbose(`Processing batch ${i + 1} of ${batches}, heap used: `
+ (process.memoryUsage().heapUsed / 1000 / 1000).toFixed(2) + 'MB');
const variants = await qb
.where('variants__product.deletedAt IS NULL')
.take(BATCH_SIZE)
.skip(i * BATCH_SIZE)
.getMany();
await this.taskQueue.push(async () => {
await this.saveSearchIndexItems(ctx, variants);
});
reporter.setProgress(Math.round((i / batches) * 100));
}

Logger.verbose(`Reindexing completed in ${Date.now() - timeStart}ms`);

return {
success: true,
indexedItemCount: count,
timeTaken: Date.now() - timeStart,
};
});
const job = this.searchIndexService.reindex(ctx);
job.start();
return job;
}

/**
* Updates the search index only for the affected entities.
*/
async updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
let updatedVariants: ProductVariant[] = [];
let removedVariantIds: ID[] = [];
if (updatedEntity instanceof Product) {
const product = await this.connection.getRepository(Product).findOne(updatedEntity.id, {
relations: ['variants'],
});
if (product) {
if (product.deletedAt) {
removedVariantIds = product.variants.map(v => v.id);
} else {
updatedVariants = await this.connection
.getRepository(ProductVariant)
.findByIds(product.variants.map(v => v.id), {
relations: this.variantRelations,
});
if (product.enabled === false) {
updatedVariants.forEach(v => v.enabled = false);
}
}
}
} else {
const variant = await this.connection.getRepository(ProductVariant).findOne(updatedEntity.id, {
relations: this.variantRelations,
});
if (variant) {
updatedVariants = [variant];
}
}
await this.taskQueue.push(async () => {
if (updatedVariants.length) {
await this.saveSearchIndexItems(ctx, updatedVariants);
}
if (removedVariantIds.length) {
await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
}
});

}

async updateVariantsById(ctx: RequestContext, ids: ID[]) {
if (ids.length) {
const BATCH_SIZE = 100;
const batches = Math.ceil(ids.length / BATCH_SIZE);
for (let i = 0; i < batches; i++) {
const begin = i * BATCH_SIZE;
const end = begin + BATCH_SIZE;
Logger.verbose(`Updating ids from index ${begin} to ${end}`);
const batch = ids.slice(begin, end);
const updatedVariants = await this.connection.getRepository(ProductVariant).findByIds(batch, {
relations: this.variantRelations,
});
this.taskQueue.push(async () => {
await this.saveSearchIndexItems(ctx, updatedVariants);
});
}
}
}

/**
* Sets the SearchStrategy appropriate to th configured database type.
*/
Expand All @@ -217,60 +101,4 @@ export class FulltextSearchService implements SearchService {
throw new InternalServerError(`error.database-not-supported-by-default-search-plugin`);
}
}

/**
* Add or update items in the search index
*/
private async saveSearchIndexItems(ctx: RequestContext, variants: ProductVariant[]) {
const items = variants
.map(v => this.productVariantService.applyChannelPriceAndTax(v, ctx))
.map(v => translateDeep(v, ctx.languageCode, ['product']))
.map(
v =>
new SearchIndexItem({
sku: v.sku,
enabled: v.enabled,
slug: v.product.slug,
price: v.price,
priceWithTax: v.priceWithTax,
languageCode: ctx.languageCode,
productVariantId: v.id,
productId: v.product.id,
productName: v.product.name,
description: v.product.description,
productVariantName: v.name,
productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
facetIds: this.getFacetIds(v),
facetValueIds: this.getFacetValueIds(v),
collectionIds: v.collections.map(c => c.id.toString()),
}),
);
return this.connection.getRepository(SearchIndexItem).save(items);
}

/**
* Remove items from the search index
*/
private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
const compositeKeys = variantIds.map(id => ({
productVariantId: id,
languageCode,
})) as any[];
await this.connection.getRepository(SearchIndexItem).delete(compositeKeys);
}

private getFacetIds(variant: ProductVariant): string[] {
const facetIds = (fv: FacetValue) => fv.facet.id.toString();
const variantFacetIds = variant.facetValues.map(facetIds);
const productFacetIds = variant.product.facetValues.map(facetIds);
return unique([...variantFacetIds, ...productFacetIds]);
}

private getFacetValueIds(variant: ProductVariant): string[] {
const facetValueIds = (fv: FacetValue) => fv.id.toString();
const variantFacetValueIds = variant.facetValues.map(facetValueIds);
const productFacetValueIds = variant.product.facetValues.map(facetValueIds);
return unique([...variantFacetValueIds, ...productFacetValueIds]);
}
}
Loading

0 comments on commit 4bfc132

Please sign in to comment.