diff --git a/packages/elasticsearch-plugin/src/indexing/indexer.controller.ts b/packages/elasticsearch-plugin/src/indexing/indexer.controller.ts index c0b501919d..09b2ec219d 100644 --- a/packages/elasticsearch-plugin/src/indexing/indexer.controller.ts +++ b/packages/elasticsearch-plugin/src/indexing/indexer.controller.ts @@ -43,6 +43,7 @@ import { } from '../types'; import { createIndices, getClient, getIndexNameByAlias } from './indexing-utils'; +import { MutableRequestContext } from './mutable-request-context'; export const defaultProductRelations: Array> = [ 'variants', @@ -110,7 +111,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes * Updates the search index only for the affected product. */ async updateProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise { - await this.updateProductsInternal([productId]); + const ctx = MutableRequestContext.deserialize(rawContext); + await this.updateProductsInternal(ctx, [productId]); return true; } @@ -118,7 +120,10 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes * Updates the search index only for the affected product. */ async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise { - const operations = await this.deleteProductOperations(productId); + const operations = await this.deleteProductOperations( + RequestContext.deserialize(rawContext), + productId, + ); await this.executeBulkOperations(operations); return true; } @@ -131,7 +136,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes productId, channelId, }: ProductChannelMessageData): Promise { - await this.updateProductsInternal([productId]); + const ctx = MutableRequestContext.deserialize(rawContext); + await this.updateProductsInternal(ctx, [productId]); return true; } @@ -143,7 +149,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes productId, channelId, }: ProductChannelMessageData): Promise { - await this.updateProductsInternal([productId]); + const ctx = MutableRequestContext.deserialize(rawContext); + await this.updateProductsInternal(ctx, [productId]); return true; } @@ -153,7 +160,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes channelId, }: VariantChannelMessageData): Promise { const productIds = await this.getProductIdsByVariantIds([productVariantId]); - await this.updateProductsInternal(productIds); + const ctx = MutableRequestContext.deserialize(rawContext); + await this.updateProductsInternal(ctx, productIds); return true; } @@ -163,7 +171,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes channelId, }: VariantChannelMessageData): Promise { const productIds = await this.getProductIdsByVariantIds([productVariantId]); - await this.updateProductsInternal(productIds); + const ctx = MutableRequestContext.deserialize(rawContext); + await this.updateProductsInternal(ctx, productIds); return true; } @@ -171,17 +180,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes * Updates the search index only for the affected entities. */ async updateVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise { + const ctx = MutableRequestContext.deserialize(rawContext); return this.asyncQueue.push(async () => { const productIds = await this.getProductIdsByVariantIds(variantIds); - await this.updateProductsInternal(productIds); + await this.updateProductsInternal(ctx, productIds); return true; }); } async deleteVariants({ ctx: rawContext, variantIds }: UpdateVariantMessageData): Promise { + const ctx = MutableRequestContext.deserialize(rawContext); const productIds = await this.getProductIdsByVariantIds(variantIds); for (const productId of productIds) { - await this.updateProductsInternal([productId]); + await this.updateProductsInternal(ctx, [productId]); } return true; } @@ -190,6 +201,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes ctx: rawContext, ids, }: UpdateVariantsByIdMessageData): Observable { + const ctx = MutableRequestContext.deserialize(rawContext); return asyncObservable(async observer => { return this.asyncQueue.push(async () => { const timeStart = Date.now(); @@ -197,7 +209,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes if (productIds.length) { let finishedProductsCount = 0; for (const productId of productIds) { - await this.updateProductsInternal([productId]); + await this.updateProductsInternal(ctx, [productId]); finishedProductsCount++; observer.next({ total: productIds.length, @@ -221,6 +233,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return this.asyncQueue.push(async () => { const timeStart = Date.now(); const operations: BulkVariantOperation[] = []; + const ctx = MutableRequestContext.deserialize(rawContext); const reindexTempName = new Date().getTime(); const variantIndexName = this.options.indexPrefix + VARIANT_INDEX_NAME; @@ -330,7 +343,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes .getMany(); for (const { id: deletedProductId } of deletedProductIds) { - operations.push(...(await this.deleteProductOperations(deletedProductId))); + operations.push(...(await this.deleteProductOperations(ctx, deletedProductId))); } const productIds = await this.connection @@ -344,7 +357,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes let finishedProductsCount = 0; for (const { id: productId } of productIds) { - operations.push(...(await this.updateProductsOperations([productId]))); + operations.push(...(await this.updateProductsOperations(ctx, [productId]))); finishedProductsCount++; observer.next({ total: productIds.length, @@ -443,17 +456,20 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return result1.body.failures.length === 0 && result2.body.failures === 0; } - private async updateProductsInternal(productIds: ID[]) { - const operations = await this.updateProductsOperations(productIds); + private async updateProductsInternal(ctx: MutableRequestContext, productIds: ID[]) { + const operations = await this.updateProductsOperations(ctx, productIds); await this.executeBulkOperations(operations); } - private async updateProductsOperations(productIds: ID[]): Promise { - Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx); + private async updateProductsOperations( + ctx: MutableRequestContext, + productIds: ID[], + ): Promise { + Logger.debug(`Updating ${productIds.length} Products`, loggerCtx); const operations: BulkVariantOperation[] = []; for (const productId of productIds) { - operations.push(...(await this.deleteProductOperations(productId))); + operations.push(...(await this.deleteProductOperations(ctx, productId))); let product: Product | undefined; try { @@ -485,7 +501,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes if (!product.enabled) { updatedProductVariants.forEach(v => (v.enabled = false)); } - Logger.verbose(`Updating Product (${productId})`, loggerCtx); + Logger.debug(`Updating Product (${productId})`, loggerCtx); const languageVariants: LanguageCode[] = []; languageVariants.push(...product.translations.map(t => t.languageCode)); for (const variant of product.variants) { @@ -494,19 +510,13 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes const uniqueLanguageVariants = unique(languageVariants); for (const channel of product.channels) { - const channelCtx = new RequestContext({ - channel, - apiType: 'admin', - authorizedAsOwnerOnly: false, - isAuthorized: true, - session: {} as any, - }); + ctx.setChannel(channel); const variantsInChannel = updatedProductVariants.filter(v => - v.channels.map(c => c.id).includes(channelCtx.channelId), + v.channels.map(c => c.id).includes(ctx.channelId), ); for (const variant of variantsInChannel) { - await this.productPriceApplicator.applyChannelPriceAndTax(variant, channelCtx); + await this.productPriceApplicator.applyChannelPriceAndTax(variant, ctx); } for (const languageCode of uniqueLanguageVariants) { if (variantsInChannel.length) { @@ -518,7 +528,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes update: { _id: ElasticsearchIndexerController.getId( variant.id, - channelCtx.channelId, + ctx.channelId, languageCode, ), }, @@ -530,7 +540,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes doc: await this.createVariantIndexItem( variant, variantsInChannel, - channelCtx, + ctx, languageCode, ), doc_as_upsert: true, @@ -546,7 +556,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes update: { _id: ElasticsearchIndexerController.getId( -product.id, - channelCtx.channelId, + ctx.channelId, languageCode, ), }, @@ -555,11 +565,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes { index: VARIANT_INDEX_NAME, operation: { - doc: this.createSyntheticProductIndexItem( - product, - channelCtx, - languageCode, - ), + doc: this.createSyntheticProductIndexItem(product, ctx, languageCode), doc_as_upsert: true, }, }, @@ -601,12 +607,17 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return uniqueRelations; } - private async deleteProductOperations(productId: ID): Promise { - const channels = await this.connection - .getRepository(Channel) - .createQueryBuilder('channel') - .select('channel.id') - .getMany(); + private async deleteProductOperations( + ctx: RequestContext, + productId: ID, + ): Promise { + const channels = await this.requestContextCache.get(ctx, `elastic-index-all-channels`, () => + this.connection + .getRepository(Channel) + .createQueryBuilder('channel') + .select('channel.id') + .getMany(), + ); const product = await this.connection.getRepository(Product).findOne(productId, { relations: ['variants'], }); @@ -614,7 +625,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return []; } - Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx); + Logger.debug(`Deleting 1 Product (id: ${productId})`, loggerCtx); const operations: BulkVariantOperation[] = []; const languageVariants: LanguageCode[] = []; languageVariants.push(...product.translations.map(t => t.languageCode)); @@ -650,7 +661,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes channelIds: ID[], languageVariants: LanguageCode[], ): Promise { - Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx); + Logger.debug(`Deleting ${variants.length} ProductVariants`, loggerCtx); const operations: BulkVariantOperation[] = []; for (const variant of variants) { for (const channelId of channelIds) { @@ -820,10 +831,16 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes } private async getProductInStockValue(ctx: RequestContext, variants: ProductVariant[]): Promise { - const stockLevels = await Promise.all( - variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)), + return this.requestContextCache.get( + ctx, + `elastic-index-product-in-stock-${variants.map(v => v.id).join(',')}`, + async () => { + const stockLevels = await Promise.all( + variants.map(variant => this.productVariantService.getSaleableStockLevel(ctx, variant)), + ); + return stockLevels.some(stockLevel => 0 < stockLevel); + }, ); - return stockLevels.some(stockLevel => 0 < stockLevel); } /** diff --git a/packages/elasticsearch-plugin/src/indexing/mutable-request-context.ts b/packages/elasticsearch-plugin/src/indexing/mutable-request-context.ts new file mode 100644 index 0000000000..b6f360ae93 --- /dev/null +++ b/packages/elasticsearch-plugin/src/indexing/mutable-request-context.ts @@ -0,0 +1,42 @@ +import { Channel, ID, RequestContext, SerializedRequestContext } from '@vendure/core'; + +/** + * @description + * This is used during search index creation to allow us to use a single + * RequestContext, but mutate the Channel. In this way, we can take + * full advantage of the RequestContextCacheService, and _massively_ cut + * down on the number of DB calls being made during indexing. + */ +export class MutableRequestContext extends RequestContext { + constructor(options: ConstructorParameters[0]) { + super(options); + } + private mutatedChannel: Channel | undefined; + + setChannel(channel: Channel) { + this.mutatedChannel = channel; + } + + get channel(): Channel { + return this.mutatedChannel ?? super.channel; + } + + get channelId(): ID { + return this.mutatedChannel?.id ?? super.channelId; + } + + static deserialize(ctxObject: SerializedRequestContext): MutableRequestContext { + return new MutableRequestContext({ + req: ctxObject._req as any, + apiType: ctxObject._apiType, + channel: new Channel(ctxObject._channel), + session: { + ...ctxObject._session, + expires: ctxObject._session?.expires && new Date(ctxObject._session.expires), + }, + languageCode: ctxObject._languageCode, + isAuthorized: ctxObject._isAuthorized, + authorizedAsOwnerOnly: ctxObject._authorizedAsOwnerOnly, + }); + } +}