Skip to content

Commit

Permalink
feat(core): Add channel handling to DefaultSearchPlugin
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The `SearchIndexItem` entity used by the `DefaultSearchPlugin` has a couple of new fields related to Channel handling. Once the schema is updated (either by synchronizing or running a migration), the search index should be rebuilt.
  • Loading branch information
michaelbromley committed Nov 8, 2019
1 parent 4f9a186 commit 280a38b
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import { ID } from '@vendure/common/lib/shared-types';
import { buffer, debounceTime, filter, map } from 'rxjs/operators';

import { idsAreEqual } from '../../common/utils';
import { ProductVariant } from '../../entity/product-variant/product-variant.entity';
import { Product } from '../../entity/product/product.entity';
import { EventBus } from '../../event-bus/event-bus';
import { CatalogModificationEvent } from '../../event-bus/events/catalog-modification-event';
import { CollectionModificationEvent } from '../../event-bus/events/collection-modification-event';
import { ProductEvent } from '../../event-bus/events/product-event';
import { ProductVariantEvent } from '../../event-bus/events/product-variant-event';
import { TaxRateModificationEvent } from '../../event-bus/events/tax-rate-modification-event';
import { PluginCommonModule } from '../plugin-common.module';
import { OnVendureBootstrap, VendurePlugin } from '../vendure-plugin';
Expand Down Expand Up @@ -66,9 +65,18 @@ export class DefaultSearchPlugin implements OnVendureBootstrap {

/** @internal */
async onVendureBootstrap() {
this.eventBus.ofType(CatalogModificationEvent).subscribe(event => {
if (event.entity instanceof Product || event.entity instanceof ProductVariant) {
return this.searchIndexService.updateProductOrVariant(event.ctx, event.entity).start();
this.eventBus.ofType(ProductEvent).subscribe(event => {
if (event.type === 'deleted') {
return this.searchIndexService.deleteProduct(event.ctx, event.product).start();
} else {
return this.searchIndexService.updateProduct(event.ctx, event.product).start();
}
});
this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
if (event.type === 'deleted') {
return this.searchIndexService.deleteVariant(event.ctx, event.variants).start();
} else {
return this.searchIndexService.updateVariants(event.ctx, event.variants).start();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ import { ProductVariantService } from '../../../service/services/product-variant
import { TaxRateService } from '../../../service/services/tax-rate.service';
import { AsyncQueue } from '../async-queue';
import { SearchIndexItem } from '../search-index-item.entity';
import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from '../types';
import {
DeleteProductMessage,
DeleteVariantMessage,
ReindexMessage,
UpdateProductMessage,
UpdateVariantMessage,
UpdateVariantsByIdMessage,
} from '../types';

export const BATCH_SIZE = 1000;
export const variantRelations = [
'product',
'product.featuredAsset',
'product.facetValues',
'product.facetValues.facet',
'product.channels',
'featuredAsset',
'facetValues',
'facetValues.facet',
Expand All @@ -51,24 +59,27 @@ export class IndexerController {
return new Observable(observer => {
(async () => {
const timeStart = Date.now();
const qb = this.getSearchIndexQueryBuilder();
const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
Logger.verbose(`Reindexing ${count} variants`, workerLoggerCtx);
const qb = this.getSearchIndexQueryBuilder(ctx.channelId);
const count = await qb.getCount();
Logger.verbose(
`Reindexing ${count} variants for channel ${ctx.channel.code}`,
workerLoggerCtx,
);
const batches = Math.ceil(count / BATCH_SIZE);

// Ensure tax rates are up-to-date.
await this.taxRateService.updateActiveTaxRates();

await this.connection
.getRepository(SearchIndexItem)
.delete({ languageCode: ctx.languageCode });
.delete({ languageCode: ctx.languageCode, channelId: ctx.channelId });
Logger.verbose('Deleted existing index items', workerLoggerCtx);

for (let i = 0; i < batches; i++) {
Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);

const variants = await qb
.where('variants__product.deletedAt IS NULL')
.andWhere('variants__product.deletedAt IS NULL')
.take(BATCH_SIZE)
.skip(i * BATCH_SIZE)
.getMany();
Expand Down Expand Up @@ -135,60 +146,87 @@ export class IndexerController {
});
}

/**
* Updates the search index only for the affected entities.
*/
@MessagePattern(UpdateProductOrVariantMessage.pattern)
updateProductOrVariant(data: UpdateProductOrVariantMessage['data']): Observable<boolean> {
@MessagePattern(UpdateProductMessage.pattern)
updateProduct(data: UpdateProductMessage['data']): Observable<UpdateProductMessage['response']> {
const ctx = RequestContext.fromObject(data.ctx);
const { productId, variantId } = data;
let updatedVariants: ProductVariant[] = [];
let removedVariantIds: ID[] = [];
return defer(async () => {
if (data.productId) {
const product = await this.connection.getRepository(Product).findOne(productId, {
relations: ['variants'],
});
if (product) {
if (product.deletedAt) {
removedVariantIds = product.variants.map(v => v.id);
} else {
updatedVariants = await this.connection
.getRepository(ProductVariant)
.findByIds(product.variants.map(v => v.id), {
relations: variantRelations,
});
if (product.enabled === false) {
updatedVariants.forEach(v => (v.enabled = false));
}
}
const product = await this.connection.getRepository(Product).findOne(data.productId, {
relations: ['variants'],
});
if (product) {
let updatedVariants = await this.connection
.getRepository(ProductVariant)
.findByIds(product.variants.map(v => v.id), {
relations: variantRelations,
});
if (product.enabled === false) {
updatedVariants.forEach(v => (v.enabled = false));
}
} else {
const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
relations: variantRelations,
});
if (variant) {
updatedVariants = [variant];
Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
updatedVariants = this.hydrateVariants(ctx, updatedVariants);
if (updatedVariants.length) {
await this.saveVariants(ctx, updatedVariants);
}
}
Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
updatedVariants = this.hydrateVariants(ctx, updatedVariants);
if (updatedVariants.length) {
return true;
});
}

@MessagePattern(UpdateVariantMessage.pattern)
updateVariants(data: UpdateVariantMessage['data']): Observable<UpdateVariantMessage['response']> {
const ctx = RequestContext.fromObject(data.ctx);
return defer(async () => {
const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds, {
relations: variantRelations,
});
if (variants) {
const updatedVariants = this.hydrateVariants(ctx, variants);
Logger.verbose(`Updating ${updatedVariants.length} variants`, workerLoggerCtx);
await this.saveVariants(ctx, updatedVariants);
}
if (removedVariantIds.length) {
await this.removeSearchIndexItems(ctx.languageCode, removedVariantIds);
return true;
});
}

@MessagePattern(DeleteProductMessage.pattern)
deleteProduct(data: DeleteProductMessage['data']): Observable<DeleteProductMessage['response']> {
const ctx = RequestContext.fromObject(data.ctx);
return defer(async () => {
const product = await this.connection.getRepository(Product).findOne(data.productId, {
relations: ['variants'],
});
if (product && product.deletedAt) {
const removedVariantIds = product.variants.map(v => v.id);
if (removedVariantIds.length) {
await this.removeSearchIndexItems(ctx, removedVariantIds);
}
}
return true;
});
}

@MessagePattern(DeleteVariantMessage.pattern)
deleteVariant(data: DeleteVariantMessage['data']): Observable<DeleteVariantMessage['response']> {
const ctx = RequestContext.fromObject(data.ctx);
return defer(async () => {
const variants = await this.connection.getRepository(ProductVariant).findByIds(data.variantIds);
if (variants.length) {
await this.removeSearchIndexItems(ctx, variants.map(v => v.id));
}
return true;
});
}

private getSearchIndexQueryBuilder() {
private getSearchIndexQueryBuilder(channelId: ID) {
const qb = this.connection.getRepository(ProductVariant).createQueryBuilder('variants');
FindOptionsUtils.applyFindManyOptionsOrConditionsToQueryBuilder(qb, {
relations: variantRelations,
});
FindOptionsUtils.joinEagerRelations(qb, qb.alias, this.connection.getMetadata(ProductVariant));
qb.leftJoin('variants.product', 'product')
.leftJoin('product.channels', 'channel')
.where('channel.id = :channelId', { channelId })
.andWhere('variants__product.deletedAt IS NULL');
return qb;
}

Expand All @@ -205,19 +243,21 @@ export class IndexerController {
const items = variants.map(
(v: ProductVariant) =>
new SearchIndexItem({
productVariantId: v.id,
channelId: ctx.channelId,
languageCode: ctx.languageCode,
sku: v.sku,
enabled: v.enabled,
slug: v.product.slug,
price: v.price,
priceWithTax: v.priceWithTax,
languageCode: ctx.languageCode,
productVariantId: v.id,
productId: v.product.id,
productName: v.product.name,
description: v.product.description,
productVariantName: v.name,
productPreview: v.product.featuredAsset ? v.product.featuredAsset.preview : '',
productVariantPreview: v.featuredAsset ? v.featuredAsset.preview : '',
channelIds: v.product.channels.map(c => c.id as string),
facetIds: this.getFacetIds(v),
facetValueIds: this.getFacetValueIds(v),
collectionIds: v.collections.map(c => c.id.toString()),
Expand All @@ -243,10 +283,11 @@ export class IndexerController {
/**
* Remove items from the search index
*/
private async removeSearchIndexItems(languageCode: LanguageCode, variantIds: ID[]) {
private async removeSearchIndexItems(ctx: RequestContext, variantIds: ID[]) {
const compositeKeys = variantIds.map(id => ({
productVariantId: id,
languageCode,
channelId: ctx.channelId,
languageCode: ctx.languageCode,
})) as any[];
await this.queue.push(() => this.connection.getRepository(SearchIndexItem).delete(compositeKeys));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import { ProductVariant } from '../../../entity/product-variant/product-variant.
import { Product } from '../../../entity/product/product.entity';
import { Job } from '../../../service/helpers/job-manager/job';
import { JobReporter, JobService } from '../../../service/services/job.service';
import { WorkerMessage } from '../../../worker/types';
import { WorkerService } from '../../../worker/worker.service';
import {
DeleteProductMessage,
DeleteVariantMessage,
ReindexMessage,
ReindexMessageResponse,
UpdateProductOrVariantMessage,
UpdateProductMessage,
UpdateVariantMessage,
UpdateVariantsByIdMessage,
} from '../types';

Expand All @@ -33,29 +37,37 @@ export class SearchIndexService {
});
}

/**
* Updates the search index only for the affected entities.
*/
updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
return this.jobService.createJob({
name: 'update-index',
metadata: {
entity: updatedEntity.constructor.name,
id: updatedEntity.id,
},
work: reporter => {
const data =
updatedEntity instanceof Product
? { ctx, productId: updatedEntity.id }
: { ctx, variantId: updatedEntity.id };
this.workerService.send(new UpdateProductOrVariantMessage(data)).subscribe({
complete: () => reporter.complete(true),
error: err => {
Logger.error(err);
reporter.complete(false);
},
});
},
updateProduct(ctx: RequestContext, product: Product) {
const data = { ctx, productId: product.id };
return this.createShortWorkerJob(new UpdateProductMessage(data), {
entity: 'Product',
id: product.id,
});
}

updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
const variantIds = variants.map(v => v.id);
const data = { ctx, variantIds };
return this.createShortWorkerJob(new UpdateVariantMessage(data), {
entity: 'ProductVariant',
ids: variantIds,
});
}

deleteProduct(ctx: RequestContext, product: Product) {
const data = { ctx, productId: product.id };
return this.createShortWorkerJob(new DeleteProductMessage(data), {
entity: 'Product',
id: product.id,
});
}

deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
const variantIds = variants.map(v => v.id);
const data = { ctx, variantIds };
return this.createShortWorkerJob(new DeleteVariantMessage(data), {
entity: 'ProductVariant',
id: variantIds,
});
}

Expand All @@ -74,6 +86,25 @@ export class SearchIndexService {
});
}

/**
* Creates a short-running job that does not expect progress updates.
*/
private createShortWorkerJob<T extends WorkerMessage<any, any>>(message: T, metadata: any) {
return this.jobService.createJob({
name: 'update-index',
metadata,
work: reporter => {
this.workerService.send(message).subscribe({
complete: () => reporter.complete(true),
error: err => {
Logger.error(err);
reporter.complete(false);
},
});
},
});
}

private createObserver(reporter: JobReporter) {
let total: number | undefined;
let duration = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export class SearchIndexItem {
@PrimaryColumn('varchar')
languageCode: LanguageCode;

@EntityId({ primary: true })
channelId: ID;

@EntityId()
productId: ID;

Expand Down Expand Up @@ -61,6 +64,9 @@ export class SearchIndexItem {
@Column('simple-array')
collectionIds: string[];

@Column('simple-array')
channelIds: string[];

@Column()
productPreview: string;

Expand Down
Loading

0 comments on commit 280a38b

Please sign in to comment.