From 06da6d5b935ea1379e82119b86726aaaeb838235 Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Fri, 23 Apr 2021 10:46:56 +0200 Subject: [PATCH] refactor(elasticsearch-plugin): Execute all bulk index updates at once Previously, Elasticsearch bulk update operations were scattered throughout the execution of a method e.g. `updateProductsInternal` would bulk delete products, bulk update variants, bulk insert products. Between each of these bulk operations there was async DB queries. This intervening time allowed for race conditions to creep in which manifested as non-deterministically failing e2e tests. In real-world usage it may not have been noticeable, but in any case this commit groups _all_ batch operations together, and only at the very end of the method, all batch operations are passed to ES at once. --- .../e2e/elasticsearch-plugin.e2e-spec.ts | 1 + .../src/indexer.controller.ts | 202 ++++++++++++------ 2 files changed, 139 insertions(+), 64 deletions(-) diff --git a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts index 55dab2b53c..45888dc66a 100644 --- a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts +++ b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts @@ -403,6 +403,7 @@ describe('Elasticsearch plugin', () => { }, ); + await awaitRunningJobs(adminClient); await awaitRunningJobs(adminClient); const { search: search2 } = await doAdminSearchQuery(adminClient, { term: 'drive', diff --git a/packages/elasticsearch-plugin/src/indexer.controller.ts b/packages/elasticsearch-plugin/src/indexer.controller.ts index 3e80498f96..f726778e24 100644 --- a/packages/elasticsearch-plugin/src/indexer.controller.ts +++ b/packages/elasticsearch-plugin/src/indexer.controller.ts @@ -65,6 +65,15 @@ export interface ReindexMessageResponse { duration: number; } +type BulkProductOperation = { + index: typeof PRODUCT_INDEX_NAME; + operation: BulkOperation | BulkOperationDoc; +}; +type BulkVariantOperation = { + index: typeof VARIANT_INDEX_NAME; + operation: BulkOperation | BulkOperationDoc; +}; + @Injectable() export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy { private client: Client; @@ -100,7 +109,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes * Updates the search index only for the affected product. */ async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise { - await this.deleteProductInternal(productId); + const operations = await this.deleteProductOperations(productId); + await this.executeBulkOperations(operations); return true; } @@ -201,6 +211,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return asyncObservable(async observer => { return this.asyncQueue.push(async () => { const timeStart = Date.now(); + const operations: Array = []; if (dropIndices) { await deleteIndices(this.client, this.options.indexPrefix); @@ -219,7 +230,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes .getMany(); for (const { id: deletedProductId } of deletedProductIds) { - await this.deleteProductInternal(deletedProductId); + operations.push(...(await this.deleteProductOperations(deletedProductId))); } const productIds = await this.connection @@ -233,7 +244,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes let finishedProductsCount = 0; for (const { id: productId } of productIds) { - await this.updateProductsInternal([productId]); + operations.push(...(await this.updateProductsOperations([productId]))); finishedProductsCount++; observer.next({ total: productIds.length, @@ -241,6 +252,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes duration: +new Date() - timeStart, }); } + Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx); + await this.executeBulkOperations(operations); Logger.verbose(`Completed reindexing!`, loggerCtx); return { total: productIds.length, @@ -338,42 +351,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return result1.body.failures.length === 0 && result2.body.failures === 0; } - private async updateVariantsInternal(productVariants: ProductVariant[]) { - if (productVariants.length) { - const operations: Array> = []; - for (const variant of productVariants) { - const languageVariants = variant.translations.map(t => t.languageCode); - for (const channel of variant.channels) { - const channelCtx = new RequestContext({ - channel, - apiType: 'admin', - authorizedAsOwnerOnly: false, - isAuthorized: true, - session: {} as any, - }); - await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx); - for (const languageCode of languageVariants) { - operations.push( - { update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) } }, - { - doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode), - doc_as_upsert: true, - }, - ); - } - } - } - Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx); - await this.executeBulkOperations(VARIANT_INDEX_NAME, operations); - } + private async updateProductsInternal(productIds: ID[]) { + const operations = await this.updateProductsOperations(productIds); + await this.executeBulkOperations(operations); } - private async updateProductsInternal(productIds: ID[]) { + private async updateProductsOperations( + productIds: ID[], + ): Promise> { Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx); - const operations: Array> = []; + const operations: Array = []; for (const productId of productIds) { - await this.deleteProductInternal(productId); + operations.push(...(await this.deleteProductOperations(productId))); const product = await this.connection.getRepository(Product).findOne(productId, { relations: productRelations, where: { @@ -399,7 +389,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes } Logger.verbose(`Updating Product (${productId})`, loggerCtx); if (updatedProductVariants.length) { - await this.updateVariantsInternal(updatedProductVariants); + operations.push(...(await this.updateVariantsOperations(updatedProductVariants))); } const languageVariants = product.translations.map(t => t.languageCode); @@ -422,29 +412,82 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes for (const languageCode of languageVariants) { operations.push( { - update: { - _id: this.getId(product.id, channelCtx.channelId, languageCode), + index: PRODUCT_INDEX_NAME, + operation: { + update: { + _id: this.getId(product.id, channelCtx.channelId, languageCode), + }, }, }, { - doc: variantsInChannel.length - ? this.createProductIndexItem( - variantsInChannel, - channelCtx.channelId, - languageCode, - ) - : this.createSyntheticProductIndexItem(channelCtx, product, languageCode), - doc_as_upsert: true, + index: PRODUCT_INDEX_NAME, + operation: { + doc: variantsInChannel.length + ? this.createProductIndexItem( + variantsInChannel, + channelCtx.channelId, + languageCode, + ) + : this.createSyntheticProductIndexItem( + channelCtx, + product, + languageCode, + ), + doc_as_upsert: true, + }, }, ); } } } } - await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations); + return operations; } - private async deleteProductInternal(productId: ID) { + private async updateVariantsOperations( + productVariants: ProductVariant[], + ): Promise { + if (productVariants.length === 0) { + return []; + } + const operations: BulkVariantOperation[] = []; + for (const variant of productVariants) { + const languageVariants = variant.translations.map(t => t.languageCode); + for (const channel of variant.channels) { + const channelCtx = new RequestContext({ + channel, + apiType: 'admin', + authorizedAsOwnerOnly: false, + isAuthorized: true, + session: {} as any, + }); + await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx); + for (const languageCode of languageVariants) { + operations.push( + { + index: VARIANT_INDEX_NAME, + operation: { + update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) }, + }, + }, + { + index: VARIANT_INDEX_NAME, + operation: { + doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode), + doc_as_upsert: true, + }, + }, + ); + } + } + } + Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx); + return operations; + } + + private async deleteProductOperations( + productId: ID, + ): Promise> { const channels = await this.connection .getRepository(Channel) .createQueryBuilder('channel') @@ -453,37 +496,50 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes const product = await this.connection.getRepository(Product).findOne(productId, { relations: ['variants'], }); - if (product) { - Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx); - const operations: BulkOperation[] = []; - for (const { id: channelId } of channels) { - const languageVariants = product.translations.map(t => t.languageCode); - for (const languageCode of languageVariants) { - operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } }); - } + if (!product) { + return []; + } + + Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx); + const operations: Array = []; + for (const { id: channelId } of channels) { + const languageVariants = product.translations.map(t => t.languageCode); + for (const languageCode of languageVariants) { + operations.push({ + index: PRODUCT_INDEX_NAME, + operation: { delete: { _id: this.getId(product.id, channelId, languageCode) } }, + }); } - await this.deleteVariantsInternal( + } + operations.push( + ...(await this.deleteVariantsInternalOperations( product.variants, channels.map(c => c.id), - ); - await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations); - } + )), + ); + return operations; } - private async deleteVariantsInternal(variants: ProductVariant[], channelIds: ID[]) { + private async deleteVariantsInternalOperations( + variants: ProductVariant[], + channelIds: ID[], + ): Promise { Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx); - const operations: BulkOperation[] = []; + const operations: BulkVariantOperation[] = []; for (const variant of variants) { for (const channelId of channelIds) { const languageVariants = variant.translations.map(t => t.languageCode); for (const languageCode of languageVariants) { operations.push({ - delete: { _id: this.getId(variant.id, channelId, languageCode) }, + index: VARIANT_INDEX_NAME, + operation: { + delete: { _id: this.getId(variant.id, channelId, languageCode) }, + }, }); } } } - await this.executeBulkOperations(VARIANT_INDEX_NAME, operations); + return operations; } private async getProductIdsByVariantIds(variantIds: ID[]): Promise { @@ -494,7 +550,25 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes return unique(variants.map(v => v.product.id)); } - private async executeBulkOperations( + private async executeBulkOperations(operations: Array) { + const productOperations: Array> = []; + const variantOperations: Array> = []; + + for (const operation of operations) { + if (operation.index === PRODUCT_INDEX_NAME) { + productOperations.push(operation.operation); + } else { + variantOperations.push(operation.operation); + } + } + + return Promise.all([ + this.runBulkOperationsOnIndex(PRODUCT_INDEX_NAME, productOperations), + this.runBulkOperationsOnIndex(VARIANT_INDEX_NAME, variantOperations), + ]); + } + + private async runBulkOperationsOnIndex( indexName: string, operations: Array>, ) {