Skip to content

Commit

Permalink
refactor(elasticsearch-plugin): Execute all bulk index updates at once
Browse files Browse the repository at this point in the history
Previously, Elasticsearch bulk update operations were scattered throughout the execution of a method
e.g. `updateProductsInternal` would bulk delete products, bulk update variants, bulk insert
products. Between each of these bulk operations there was async DB queries. This intervening time
allowed for race conditions to creep in which manifested as non-deterministically failing e2e tests.

In real-world usage it may not have been noticeable, but in any case this commit groups _all_ batch
operations together, and only at the very end of the method, all batch operations are passed to
ES at once.
  • Loading branch information
michaelbromley committed Apr 23, 2021
1 parent 9a20028 commit 06da6d5
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ describe('Elasticsearch plugin', () => {
},
);

await awaitRunningJobs(adminClient);
await awaitRunningJobs(adminClient);
const { search: search2 } = await doAdminSearchQuery(adminClient, {
term: 'drive',
Expand Down
202 changes: 138 additions & 64 deletions packages/elasticsearch-plugin/src/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ export interface ReindexMessageResponse {
duration: number;
}

type BulkProductOperation = {
index: typeof PRODUCT_INDEX_NAME;
operation: BulkOperation | BulkOperationDoc<ProductIndexItem>;
};
type BulkVariantOperation = {
index: typeof VARIANT_INDEX_NAME;
operation: BulkOperation | BulkOperationDoc<VariantIndexItem>;
};

@Injectable()
export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDestroy {
private client: Client;
Expand Down Expand Up @@ -100,7 +109,8 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
* Updates the search index only for the affected product.
*/
async deleteProduct({ ctx: rawContext, productId }: UpdateProductMessageData): Promise<boolean> {
await this.deleteProductInternal(productId);
const operations = await this.deleteProductOperations(productId);
await this.executeBulkOperations(operations);
return true;
}

Expand Down Expand Up @@ -201,6 +211,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
return asyncObservable(async observer => {
return this.asyncQueue.push(async () => {
const timeStart = Date.now();
const operations: Array<BulkProductOperation | BulkVariantOperation> = [];

if (dropIndices) {
await deleteIndices(this.client, this.options.indexPrefix);
Expand All @@ -219,7 +230,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
.getMany();

for (const { id: deletedProductId } of deletedProductIds) {
await this.deleteProductInternal(deletedProductId);
operations.push(...(await this.deleteProductOperations(deletedProductId)));
}

const productIds = await this.connection
Expand All @@ -233,14 +244,16 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes

let finishedProductsCount = 0;
for (const { id: productId } of productIds) {
await this.updateProductsInternal([productId]);
operations.push(...(await this.updateProductsOperations([productId])));
finishedProductsCount++;
observer.next({
total: productIds.length,
completed: Math.min(finishedProductsCount, productIds.length),
duration: +new Date() - timeStart,
});
}
Logger.verbose(`Will execute ${operations.length} bulk update operations`, loggerCtx);
await this.executeBulkOperations(operations);
Logger.verbose(`Completed reindexing!`, loggerCtx);
return {
total: productIds.length,
Expand Down Expand Up @@ -338,42 +351,19 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
return result1.body.failures.length === 0 && result2.body.failures === 0;
}

private async updateVariantsInternal(productVariants: ProductVariant[]) {
if (productVariants.length) {
const operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];
for (const variant of productVariants) {
const languageVariants = variant.translations.map(t => t.languageCode);
for (const channel of variant.channels) {
const channelCtx = new RequestContext({
channel,
apiType: 'admin',
authorizedAsOwnerOnly: false,
isAuthorized: true,
session: {} as any,
});
await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
for (const languageCode of languageVariants) {
operations.push(
{ update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) } },
{
doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode),
doc_as_upsert: true,
},
);
}
}
}
Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
}
private async updateProductsInternal(productIds: ID[]) {
const operations = await this.updateProductsOperations(productIds);
await this.executeBulkOperations(operations);
}

private async updateProductsInternal(productIds: ID[]) {
private async updateProductsOperations(
productIds: ID[],
): Promise<Array<BulkProductOperation | BulkVariantOperation>> {
Logger.verbose(`Updating ${productIds.length} Products`, loggerCtx);
const operations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
const operations: Array<BulkProductOperation | BulkVariantOperation> = [];

for (const productId of productIds) {
await this.deleteProductInternal(productId);
operations.push(...(await this.deleteProductOperations(productId)));
const product = await this.connection.getRepository(Product).findOne(productId, {
relations: productRelations,
where: {
Expand All @@ -399,7 +389,7 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
}
Logger.verbose(`Updating Product (${productId})`, loggerCtx);
if (updatedProductVariants.length) {
await this.updateVariantsInternal(updatedProductVariants);
operations.push(...(await this.updateVariantsOperations(updatedProductVariants)));
}

const languageVariants = product.translations.map(t => t.languageCode);
Expand All @@ -422,29 +412,82 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
for (const languageCode of languageVariants) {
operations.push(
{
update: {
_id: this.getId(product.id, channelCtx.channelId, languageCode),
index: PRODUCT_INDEX_NAME,
operation: {
update: {
_id: this.getId(product.id, channelCtx.channelId, languageCode),
},
},
},
{
doc: variantsInChannel.length
? this.createProductIndexItem(
variantsInChannel,
channelCtx.channelId,
languageCode,
)
: this.createSyntheticProductIndexItem(channelCtx, product, languageCode),
doc_as_upsert: true,
index: PRODUCT_INDEX_NAME,
operation: {
doc: variantsInChannel.length
? this.createProductIndexItem(
variantsInChannel,
channelCtx.channelId,
languageCode,
)
: this.createSyntheticProductIndexItem(
channelCtx,
product,
languageCode,
),
doc_as_upsert: true,
},
},
);
}
}
}
}
await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
return operations;
}

private async deleteProductInternal(productId: ID) {
private async updateVariantsOperations(
productVariants: ProductVariant[],
): Promise<BulkVariantOperation[]> {
if (productVariants.length === 0) {
return [];
}
const operations: BulkVariantOperation[] = [];
for (const variant of productVariants) {
const languageVariants = variant.translations.map(t => t.languageCode);
for (const channel of variant.channels) {
const channelCtx = new RequestContext({
channel,
apiType: 'admin',
authorizedAsOwnerOnly: false,
isAuthorized: true,
session: {} as any,
});
await this.productVariantService.applyChannelPriceAndTax(variant, channelCtx);
for (const languageCode of languageVariants) {
operations.push(
{
index: VARIANT_INDEX_NAME,
operation: {
update: { _id: this.getId(variant.id, channelCtx.channelId, languageCode) },
},
},
{
index: VARIANT_INDEX_NAME,
operation: {
doc: this.createVariantIndexItem(variant, channelCtx.channelId, languageCode),
doc_as_upsert: true,
},
},
);
}
}
}
Logger.verbose(`Updating ${productVariants.length} ProductVariants`, loggerCtx);
return operations;
}

private async deleteProductOperations(
productId: ID,
): Promise<Array<BulkProductOperation | BulkVariantOperation>> {
const channels = await this.connection
.getRepository(Channel)
.createQueryBuilder('channel')
Expand All @@ -453,37 +496,50 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
const product = await this.connection.getRepository(Product).findOne(productId, {
relations: ['variants'],
});
if (product) {
Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
const operations: BulkOperation[] = [];
for (const { id: channelId } of channels) {
const languageVariants = product.translations.map(t => t.languageCode);
for (const languageCode of languageVariants) {
operations.push({ delete: { _id: this.getId(product.id, channelId, languageCode) } });
}
if (!product) {
return [];
}

Logger.verbose(`Deleting 1 Product (id: ${productId})`, loggerCtx);
const operations: Array<BulkProductOperation | BulkVariantOperation> = [];
for (const { id: channelId } of channels) {
const languageVariants = product.translations.map(t => t.languageCode);
for (const languageCode of languageVariants) {
operations.push({
index: PRODUCT_INDEX_NAME,
operation: { delete: { _id: this.getId(product.id, channelId, languageCode) } },
});
}
await this.deleteVariantsInternal(
}
operations.push(
...(await this.deleteVariantsInternalOperations(
product.variants,
channels.map(c => c.id),
);
await this.executeBulkOperations(PRODUCT_INDEX_NAME, operations);
}
)),
);
return operations;
}

private async deleteVariantsInternal(variants: ProductVariant[], channelIds: ID[]) {
private async deleteVariantsInternalOperations(
variants: ProductVariant[],
channelIds: ID[],
): Promise<BulkVariantOperation[]> {
Logger.verbose(`Deleting ${variants.length} ProductVariants`, loggerCtx);
const operations: BulkOperation[] = [];
const operations: BulkVariantOperation[] = [];
for (const variant of variants) {
for (const channelId of channelIds) {
const languageVariants = variant.translations.map(t => t.languageCode);
for (const languageCode of languageVariants) {
operations.push({
delete: { _id: this.getId(variant.id, channelId, languageCode) },
index: VARIANT_INDEX_NAME,
operation: {
delete: { _id: this.getId(variant.id, channelId, languageCode) },
},
});
}
}
}
await this.executeBulkOperations(VARIANT_INDEX_NAME, operations);
return operations;
}

private async getProductIdsByVariantIds(variantIds: ID[]): Promise<ID[]> {
Expand All @@ -494,7 +550,25 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
return unique(variants.map(v => v.product.id));
}

private async executeBulkOperations(
private async executeBulkOperations(operations: Array<BulkProductOperation | BulkVariantOperation>) {
const productOperations: Array<BulkOperation | BulkOperationDoc<ProductIndexItem>> = [];
const variantOperations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem>> = [];

for (const operation of operations) {
if (operation.index === PRODUCT_INDEX_NAME) {
productOperations.push(operation.operation);
} else {
variantOperations.push(operation.operation);
}
}

return Promise.all([
this.runBulkOperationsOnIndex(PRODUCT_INDEX_NAME, productOperations),
this.runBulkOperationsOnIndex(VARIANT_INDEX_NAME, variantOperations),
]);
}

private async runBulkOperationsOnIndex(
indexName: string,
operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
) {
Expand Down

0 comments on commit 06da6d5

Please sign in to comment.