Skip to content

Commit

Permalink
perf(elasticsearch-plugin): Optimize indexing using RequestContextCache
Browse files Browse the repository at this point in the history
This perf optimization uses the RequestContextCacheService to cache the results of DB calls which
get performed many times during indexing, i.e. at least once for every ProductVariant being indexed.

In testing this cut down the time to index ~10k variants by around 20% (3 mins to 2.5 mins).

This commit also adjusts some logging to `debug` from `verbose` as it was too noisy even for
verbose.
  • Loading branch information
michaelbromley committed Oct 15, 2021
1 parent 479505e commit 75da3b3
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 45 deletions.
107 changes: 62 additions & 45 deletions packages/elasticsearch-plugin/src/indexing/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
} from '../types';

import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils';
import { MutableRequestContext } from './mutable-request-context';

export const defaultProductRelations: Array<EntityRelationPaths<Product>> = [
'variants',
Expand Down Expand Up @@ -110,15 +111,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
* Updates the search index only for the affected product.
*/
async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
await this.updateProductsInternal([productId]);
const ctx = MutableRequestContext.deserialize(rawContext);
await this.updateProductsInternal(ctx, [productId]);
return true;
}

/**
* Updates the search index only for the affected product.
*/
async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
const operations = await this.deleteProductOperations(productId);
const operations = await this.deleteProductOperations(
RequestContext.deserialize(rawContext),
productId,
);
await this.executeBulkOperations(operations);
return true;
}
Expand All @@ -131,7 +136,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
productId,
channelId,
}: ProductChannelMessageData): Promise<boolean> {
await this.updateProductsInternal([productId]);
const ctx = MutableRequestContext.deserialize(rawContext);
await this.updateProductsInternal(ctx, [productId]);
return true;
}

Expand All @@ -143,7 +149,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
productId,
channelId,
}: ProductChannelMessageData): Promise<boolean> {
await this.updateProductsInternal([productId]);
const ctx = MutableRequestContext.deserialize(rawContext);
await this.updateProductsInternal(ctx, [productId]);
return true;
}

Expand All @@ -153,7 +160,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
channelId,
}: VariantChannelMessageData): Promise<boolean> {
const productIds = await this.getProductIdsByVariantIds([productVariantId]);
await this.updateProductsInternal(productIds);
const ctx = MutableRequestContext.deserialize(rawContext);
await this.updateProductsInternal(ctx, productIds);
return true;
}

Expand All @@ -163,25 +171,28 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
channelId,
}: VariantChannelMessageData): Promise<boolean> {
const productIds = await this.getProductIdsByVariantIds([productVariantId]);
await this.updateProductsInternal(productIds);
const ctx = MutableRequestContext.deserialize(rawContext);
await this.updateProductsInternal(ctx, productIds);
return true;
}

/**
* Updates the search index only for the affected entities.
*/
async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
const ctx = MutableRequestContext.deserialize(rawContext);
return this.asyncQueue.push(async () => {
const productIds = await this.getProductIdsByVariantIds(variantIds);
await this.updateProductsInternal(productIds);
await this.updateProductsInternal(ctx, productIds);
return true;
});
}

async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise<boolean> {
const ctx = MutableRequestContext.deserialize(rawContext);
const productIds = await this.getProductIdsByVariantIds(variantIds);
for (const productId of productIds) {
await this.updateProductsInternal([productId]);
await this.updateProductsInternal(ctx, [productId]);
}
return true;
}
Expand All @@ -190,14 +201,15 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
ctx: rawContext,
ids,
}: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
const ctx = MutableRequestContext.deserialize(rawContext);
return asyncObservable(async observer => {
return this.asyncQueue.push(async () => {
const timeStart = Date.now();
const productIds = await this.getProductIdsByVariantIds(ids);
if (productIds.length) {
let finishedProductsCount = 0;
for (const productId of productIds) {
await this.updateProductsInternal([productId]);
await this.updateProductsInternal(ctx, [productId]);
finishedProductsCount++;
observer.next({
total: productIds.length,
Expand All @@ -221,6 +233,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
return this.asyncQueue.push(async () => {
const timeStart = Date.now();
const operations: BulkVariantOperation[] = [];
const ctx = MutableRequestContext.deserialize(rawContext);

const reindexTempName = new Date().getTime();
const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME;
Expand Down Expand Up @@ -330,7 +343,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
.getMany();

for (const { id: deletedProductId } of deletedProductIds) {
operations.push(...(await this.deleteProductOperations(deletedProductId)));
operations.push(...(await this.deleteProductOperations(ctx, deletedProductId)));
}

const productIds = await this.connection
Expand All @@ -344,7 +357,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes

let finishedProductsCount = 0;
for (const { id: productId } of productIds) {
operations.push(...(await this.updateProductsOperations([productId])));
operations.push(...(await this.updateProductsOperations(ctx, [productId])));
finishedProductsCount++;
observer.next({
total: productIds.length,
Expand Down Expand Up @@ -443,17 +456,20 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
return result1.body.failures.length === 0 && result2.body.failures === 0;
}

private async updateProductsInternal(productIds: ID[]) {
const operations = await this.updateProductsOperations(productIds);
private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) {
const operations = await this.updateProductsOperations(ctx, productIds);
await this.executeBulkOperations(operations);
}

private async updateProductsOperations(productIds: ID[]): Promise<BulkVariantOperation[]> {
Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
private async updateProductsOperations(
ctx: MutableRequestContext,
productIds: ID[],
): Promise<BulkVariantOperation[]> {
Logger.debug(`Updating ${productIds.length} Products`, loggerCtx);
const operations: BulkVariantOperation[] = [];

for (const productId of productIds) {
operations.push(...(await this.deleteProductOperations(productId)));
operations.push(...(await this.deleteProductOperations(ctx, productId)));

let product: Product | undefined;
try {
Expand Down Expand Up @@ -485,7 +501,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
if (!product.enabled) {
updatedProductVariants.forEach(v => (v.enabled = false));
}
Logger.verbose(`Updating Product (${productId})`, loggerCtx);
Logger.debug(`Updating Product (${productId})`, loggerCtx);
const languageVariants: LanguageCode[] = [];
languageVariants.push(...product.translations.map(t => t.languageCode));
for (const variant of product.variants) {
Expand All @@ -494,19 +510,13 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
const uniqueLanguageVariants = unique(languageVariants);

for (const channel of product.channels) {
const channelCtx = new RequestContext({
channel,
apiType: 'admin',
authorizedAsOwnerOnly: false,
isAuthorized: true,
session: {} as any,
});
ctx.setChannel(channel);

const variantsInChannel = updatedProductVariants.filter(v =>
v.channels.map(c => c.id).includes(channelCtx.channelId),
v.channels.map(c => c.id).includes(ctx.channelId),
);
for (const variant of variantsInChannel) {
await this.productPriceApplicator.applyChannelPriceAndTax(variant, channelCtx);
await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx);
}
for (const languageCode of uniqueLanguageVariants) {
if (variantsInChannel.length) {
Expand All @@ -518,7 +528,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
update: {
_id: ElasticsearchIndexerController.getId(
variant.id,
channelCtx.channelId,
ctx.channelId,
languageCode,
),
},
Expand All @@ -530,7 +540,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
doc: await this.createVariantIndexItem(
variant,
variantsInChannel,
channelCtx,
ctx,
languageCode,
),
doc_as_upsert: true,
Expand All @@ -546,7 +556,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
update: {
_id: ElasticsearchIndexerController.getId(
-product.id,
channelCtx.channelId,
ctx.channelId,
languageCode,
),
},
Expand All @@ -555,11 +565,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
{
index: VARIANT_INDEX_NAME,
operation: {
doc: this.createSyntheticProductIndexItem(
product,
channelCtx,
languageCode,
),
doc: this.createSyntheticProductIndexItem(product, ctx, languageCode),
doc_as_upsert: true,
},
},
Expand Down Expand Up @@ -601,20 +607,25 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
return uniqueRelations;
}

private async deleteProductOperations(productId: ID): Promise<BulkVariantOperation[]> {
const channels = await this.connection
.getRepository(Channel)
.createQueryBuilder('channel')
.select('channel.id')
.getMany();
private async deleteProductOperations(
ctx: RequestContext,
productId: ID,
): Promise<BulkVariantOperation[]> {
const channels = await this.requestContextCache.get(ctx, `elastic-index-all-channels`, () =>
this.connection
.getRepository(Channel)
.createQueryBuilder('channel')
.select('channel.id')
.getMany(),
);
const product = await this.connection.getRepository(Product).findOne(productId, {
relations: ['variants'],
});
if (!product) {
return [];
}

Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx);
const operations: BulkVariantOperation[] = [];
const languageVariants: LanguageCode[] = [];
languageVariants.push(...product.translations.map(t => t.languageCode));
Expand Down Expand Up @@ -650,7 +661,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
channelIds: ID[],
languageVariants: LanguageCode[],
): Promise<BulkVariantOperation[]> {
Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx);
const operations: BulkVariantOperation[] = [];
for (const variant of variants) {
for (const channelId of channelIds) {
Expand Down Expand Up @@ -820,10 +831,16 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
}

private async getProductInStockValue(ctx: RequestContext, variants: ProductVariant[]): Promise<boolean> {
const stockLevels = await Promise.all(
variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)),
return this.requestContextCache.get(
ctx,
`elastic-index-product-in-stock-${variants.map(v => v.id).join(',')}`,
async () => {
const stockLevels = await Promise.all(
variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)),
);
return stockLevels.some(stockLevel => 0 < stockLevel);
},
);
return stockLevels.some(stockLevel => 0 < stockLevel);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { Channel, ID, RequestContext, SerializedRequestContext } from '@vendure/core';

/**
* @description
* This is used during search index creation to allow us to use a single
* RequestContext, but mutate the Channel. In this way, we can take
* full advantage of the RequestContextCacheService, and _massively_ cut
* down on the number of DB calls being made during indexing.
*/
export class MutableRequestContext extends RequestContext {
constructor(options: ConstructorParameters<typeof RequestContext>[0]) {
super(options);
}
private mutatedChannel: Channel | undefined;

setChannel(channel: Channel) {
this.mutatedChannel = channel;
}

get channel(): Channel {
return this.mutatedChannel ?? super.channel;
}

get channelId(): ID {
return this.mutatedChannel?.id ?? super.channelId;
}

static deserialize(ctxObject: SerializedRequestContext): MutableRequestContext {
return new MutableRequestContext({
req: ctxObject._req as any,
apiType: ctxObject._apiType,
channel: new Channel(ctxObject._channel),
session: {
...ctxObject._session,
expires: ctxObject._session?.expires && new Date(ctxObject._session.expires),
},
languageCode: ctxObject._languageCode,
isAuthorized: ctxObject._isAuthorized,
authorizedAsOwnerOnly: ctxObject._authorizedAsOwnerOnly,
});
}
}

0 comments on commit 75da3b3

Please sign in to comment.