diff --git a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts index 55dab2b53c..45888dc66a 100644 --- a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts +++ b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts @@ -403,6 +403,7 @@ describe('Elasticsearch plugin', () => { }, ); + await awaitRunningJobs(adminClient); await awaitRunningJobs(adminClient); const { search: search2 } = await doAdminSearchQuery(adminClient, { term: 'drive', diff --git a/packages/elasticsearch-plugin/src/indexer.controller.ts b/packages/elasticsearch-plugin/src/indexer.controller.ts index 3e80498f96..f726778e24 100644 --- a/packages/elasticsearch-plugin/src/indexer.controller.ts +++ b/packages/elasticsearch-plugin/src/indexer.controller.ts @@ -65,6 +65,15 @@ export interface ReindexMessageResponse { duration: number; } +type BulkProductOperation = { + index: typeof PRODUCT_INDEX_NAME; + operation: BulkOperation | BulkOperationDoc; +}; +type BulkVariantOperation = { + index: typeof VARIANT_INDEX_NAME; + operation: BulkOperation | BulkOperationDoc; +}; + @Injectable() export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy { private client: Client; @@ -100,7 +109,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes * Updates the search index only for the affected product. */ async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise { - await this.deleteProductInternal(productId); + const operations = await this.deleteProductOperations(productId); + await this.executeBulkOperations(operations); return true; } @@ -201,6 +211,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return asyncObservable(async observer => { return this.asyncQueue.push(async () => { const timeStart = Date.now(); + const operations: Array = []; if (dropIndices) { await deleteIndices(this.client, this.options.indexPrefix); @@ -219,7 +230,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes .getMany(); for (const { id: deletedProductId } of deletedProductIds) { - await this.deleteProductInternal(deletedProductId); + operations.push(...(await this.deleteProductOperations(deletedProductId))); } const productIds = await this.connection @@ -233,7 +244,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes let finishedProductsCount = 0; for (const { id: productId } of productIds) { - await this.updateProductsInternal([productId]); + operations.push(...(await this.updateProductsOperations([productId]))); finishedProductsCount++; observer.next({ total: productIds.length, @@ -241,6 +252,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes duration: +new Date() - timeStart, }); } + Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx); + await this.executeBulkOperations(operations); Logger.verbose(`Completed reindexing!`, loggerCtx); return { total: productIds.length, @@ -338,42 +351,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return result1.body.failures.length === 0 && result2.body.failures === 0; } - private async updateVariantsInternal(productVariants: ProductVariant[]) { - if (productVariants.length) { - const operations: Array> = []; - for (const variant of productVariants) { - const languageVariants = variant.translations.map(t => t.languageCode); - for (const channel of variant.channels) { - const channelCtx = new RequestContext({ - channel, - apiType: 'admin', - authorizedAsOwnerOnly: false, - isAuthorized: true, - session: {} as any, - }); - await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx); - for (const languageCode of languageVariants) { - operations.push( - { update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) } }, - { - doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode), - doc_as_upsert: true, - }, - ); - } - } - } - Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx); - await this.executeBulkOperations(VARIANT_INDEX_NAME, operations); - } + private async updateProductsInternal(productIds: ID[]) { + const operations = await this.updateProductsOperations(productIds); + await this.executeBulkOperations(operations); } - private async updateProductsInternal(productIds: ID[]) { + private async updateProductsOperations( + productIds: ID[], + ): Promise> { Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx); - const operations: Array> = []; + const operations: Array = []; for (const productId of productIds) { - await this.deleteProductInternal(productId); + operations.push(...(await this.deleteProductOperations(productId))); const product = await this.connection.getRepository(Product).findOne(productId, { relations: productRelations, where: { @@ -399,7 +389,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes } Logger.verbose(`Updating Product (${productId})`, loggerCtx); if (updatedProductVariants.length) { - await this.updateVariantsInternal(updatedProductVariants); + operations.push(...(await this.updateVariantsOperations(updatedProductVariants))); } const languageVariants = product.translations.map(t => t.languageCode); @@ -422,29 +412,82 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes for (const languageCode of languageVariants) { operations.push( { - update: { - _id: this.getId(product.id, channelCtx.channelId, languageCode), + index: PRODUCT_INDEX_NAME, + operation: { + update: { + _id: this.getId(product.id, channelCtx.channelId, languageCode), + }, }, }, { - doc: variantsInChannel.length - ? this.createProductIndexItem( - variantsInChannel, - channelCtx.channelId, - languageCode, - ) - : this.createSyntheticProductIndexItem(channelCtx, product, languageCode), - doc_as_upsert: true, + index: PRODUCT_INDEX_NAME, + operation: { + doc: variantsInChannel.length + ? this.createProductIndexItem( + variantsInChannel, + channelCtx.channelId, + languageCode, + ) + : this.createSyntheticProductIndexItem( + channelCtx, + product, + languageCode, + ), + doc_as_upsert: true, + }, }, ); } } } } - await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations); + return operations; } - private async deleteProductInternal(productId: ID) { + private async updateVariantsOperations( + productVariants: ProductVariant[], + ): Promise { + if (productVariants.length === 0) { + return []; + } + const operations: BulkVariantOperation[] = []; + for (const variant of productVariants) { + const languageVariants = variant.translations.map(t => t.languageCode); + for (const channel of variant.channels) { + const channelCtx = new RequestContext({ + channel, + apiType: 'admin', + authorizedAsOwnerOnly: false, + isAuthorized: true, + session: {} as any, + }); + await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx); + for (const languageCode of languageVariants) { + operations.push( + { + index: VARIANT_INDEX_NAME, + operation: { + update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) }, + }, + }, + { + index: VARIANT_INDEX_NAME, + operation: { + doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode), + doc_as_upsert: true, + }, + }, + ); + } + } + } + Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx); + return operations; + } + + private async deleteProductOperations( + productId: ID, + ): Promise> { const channels = await this.connection .getRepository(Channel) .createQueryBuilder('channel') @@ -453,37 +496,50 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes const product = await this.connection.getRepository(Product).findOne(productId, { relations: ['variants'], }); - if (product) { - Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx); - const operations: BulkOperation[] = []; - for (const { id: channelId } of channels) { - const languageVariants = product.translations.map(t => t.languageCode); - for (const languageCode of languageVariants) { - operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } }); - } + if (!product) { + return []; + } + + Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx); + const operations: Array = []; + for (const { id: channelId } of channels) { + const languageVariants = product.translations.map(t => t.languageCode); + for (const languageCode of languageVariants) { + operations.push({ + index: PRODUCT_INDEX_NAME, + operation: { delete: { _id: this.getId(product.id, channelId, languageCode) } }, + }); } - await this.deleteVariantsInternal( + } + operations.push( + ...(await this.deleteVariantsInternalOperations( product.variants, channels.map(c => c.id), - ); - await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations); - } + )), + ); + return operations; } - private async deleteVariantsInternal(variants: ProductVariant[], channelIds: ID[]) { + private async deleteVariantsInternalOperations( + variants: ProductVariant[], + channelIds: ID[], + ): Promise { Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx); - const operations: BulkOperation[] = []; + const operations: BulkVariantOperation[] = []; for (const variant of variants) { for (const channelId of channelIds) { const languageVariants = variant.translations.map(t => t.languageCode); for (const languageCode of languageVariants) { operations.push({ - delete: { _id: this.getId(variant.id, channelId, languageCode) }, + index: VARIANT_INDEX_NAME, + operation: { + delete: { _id: this.getId(variant.id, channelId, languageCode) }, + }, }); } } } - await this.executeBulkOperations(VARIANT_INDEX_NAME, operations); + return operations; } private async getProductIdsByVariantIds(variantIds: ID[]): Promise { @@ -494,7 +550,25 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return unique(variants.map(v => v.product.id)); } - private async executeBulkOperations( + private async executeBulkOperations(operations: Array) { + const productOperations: Array> = []; + const variantOperations: Array> = []; + + for (const operation of operations) { + if (operation.index === PRODUCT_INDEX_NAME) { + productOperations.push(operation.operation); + } else { + variantOperations.push(operation.operation); + } + } + + return Promise.all([ + this.runBulkOperationsOnIndex(PRODUCT_INDEX_NAME, productOperations), + this.runBulkOperationsOnIndex(VARIANT_INDEX_NAME, variantOperations), + ]); + } + + private async runBulkOperationsOnIndex( indexName: string, operations: Array>, ) {