Skip to content

Commit

Permalink
fix(elasticsearch-plugin): Correctly remove deleted items from index
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Aug 26, 2019
1 parent b6ae235 commit f0a56fa
Showing 1 changed file with 95 additions and 71 deletions.
166 changes: 95 additions & 71 deletions packages/elasticsearch-plugin/src/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,82 +79,17 @@ export class ElasticsearchIndexerController {
variantId?: ID;
}): Observable<boolean> {
const ctx = RequestContext.fromObject(rawContext);
let updatedVariants: ProductVariant[] = [];

return defer(async () => {
if (productId) {
await this.updateProduct(ctx, productId);
} else {
const variant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
relations: variantRelations,
});
if (variant) {
updatedVariants = [variant];
}
}
if (updatedVariants.length) {
// When ProductVariants change, we need to update the corresponding Product index
// since e.g. price changes must be reflected on the Product level too.
const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
for (const variantProductId of productIdsOfVariants) {
await this.updateProduct(ctx, variantProductId);
}
const operations = updatedVariants.reduce(
(ops, variant) => {
return [
...ops,
{ update: { _id: variant.id.toString() } },
{ doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
];
},
[] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
);
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
} else if (variantId) {
await this.updateProductVariant(ctx, variantId);
}
return true;
});
}

private async updateProduct(ctx: RequestContext, productId: ID) {
let updatedProductVariants: ProductVariant[] = [];
let removedProducts: Product[] = [];
let removedVariantIds: ID[] = [];
const product = await this.connection.getRepository(Product).findOne(productId, {
relations: ['variants'],
});
if (product) {
if (product.deletedAt) {
removedProducts = [product];
removedVariantIds = product.variants.map(v => v.id);
} else {
updatedProductVariants = await this.connection
.getRepository(ProductVariant)
.findByIds(product.variants.map(v => v.id), {
relations: variantRelations,
});
}
}
if (updatedProductVariants.length) {
Logger.verbose(`Updating 1 product`, loggerCtx);
updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
{ update: { _id: updatedProductIndexItem.productId.toString() } },
{ doc: updatedProductIndexItem, doc_as_upsert: true },
];
await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
}
if (removedVariantIds.length) {
const operations = removedVariantIds.reduce(
(ops, id) => {
return [...ops, { delete: { _id: id.toString() } }];
},
[] as BulkOperation[],
);
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
}
}

@MessagePattern(Message.UpdateVariantsById)
updateVariantsById({
ctx: rawContext,
Expand Down Expand Up @@ -239,7 +174,7 @@ export class ElasticsearchIndexerController {
const timeStart = Date.now();
const qb = this.getSearchIndexQueryBuilder();
const count = await qb.where('variants__product.deletedAt IS NULL').getCount();
Logger.verbose(`Reindexing ${count} variants`, loggerCtx);
Logger.verbose(`Reindexing ${count} ProductVariants`, loggerCtx);

const batches = Math.ceil(count / batchSize);
let variantsInProduct: ProductVariant[] = [];
Expand All @@ -248,7 +183,7 @@ export class ElasticsearchIndexerController {
Logger.verbose(`Processing batch ${i + 1} of ${batches}`, loggerCtx);

const variants = await this.getBatch(ctx, qb, i);
Logger.verbose(`variants count: ${variants.length}`);
Logger.verbose(`ProductVariants count: ${variants.length}`);

const variantsToIndex: Array<BulkOperation | VariantIndexItem> = [];
const productsToIndex: Array<BulkOperation | ProductIndexItem> = [];
Expand Down Expand Up @@ -286,15 +221,104 @@ export class ElasticsearchIndexerController {
});
}

private async updateProductVariant(ctx: RequestContext, variantId: ID) {
let updatedVariants: ProductVariant[] = [];
let removedVariantId: ID | undefined;

const productVariant = await this.connection.getRepository(ProductVariant).findOne(variantId, {
relations: variantRelations,
});
if (productVariant) {
if (productVariant.deletedAt) {
removedVariantId = variantId;
} else {
updatedVariants = this.hydrateVariants(ctx, [productVariant]);
}
}

if (updatedVariants.length) {
// When ProductVariants change, we need to update the corresponding Product index
// since e.g. price changes must be reflected on the Product level too.
const productIdsOfVariants = unique(updatedVariants.map(v => v.productId));
for (const variantProductId of productIdsOfVariants) {
await this.updateProduct(ctx, variantProductId);
}
const operations = updatedVariants.reduce(
(ops, variant) => {
return [
...ops,
{ update: { _id: variant.id.toString() } },
{ doc: this.createVariantIndexItem(variant), doc_as_upsert: true },
];
},
[] as Array<BulkOperation | BulkOperationDoc<VariantIndexItem>>,
);
Logger.verbose(`Updating ${updatedVariants.length} ProductVariants`, loggerCtx);
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
}
if (removedVariantId) {
Logger.verbose(`Deleting 1 ProductVariant (${removedVariantId})`, loggerCtx);
const operations: BulkOperation[] = [{ delete: { _id: removedVariantId.toString() } }];
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
}
}

private async updateProduct(ctx: RequestContext, productId: ID) {
let updatedProductVariants: ProductVariant[] = [];
let removedProductId: ID | undefined;
let removedVariantIds: ID[] = [];
const product = await this.connection.getRepository(Product).findOne(productId, {
relations: ['variants'],
});
if (product) {
if (product.deletedAt) {
removedProductId = productId;
removedVariantIds = product.variants.map(v => v.id);
} else {
updatedProductVariants = await this.connection
.getRepository(ProductVariant)
.findByIds(product.variants.map(v => v.id), {
relations: variantRelations,
});
}
}
if (updatedProductVariants.length) {
Logger.verbose(`Updating 1 Product (${productId})`, loggerCtx);
updatedProductVariants = this.hydrateVariants(ctx, updatedProductVariants);
const updatedProductIndexItem = this.createProductIndexItem(updatedProductVariants);
const operations: [BulkOperation, BulkOperationDoc<ProductIndexItem>] = [
{ update: { _id: updatedProductIndexItem.productId.toString() } },
{ doc: updatedProductIndexItem, doc_as_upsert: true },
];
await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
}
if (removedVariantIds.length) {
const operations = removedVariantIds.reduce(
(ops, id) => {
Logger.verbose(`Deleting 1 ProductVariant (${id})`, loggerCtx);
return [...ops, { delete: { _id: id.toString() } }];
},
[] as BulkOperation[],
);
await this.executeBulkOperations(VARIANT_INDEX_NAME, VARIANT_INDEX_TYPE, operations);
}
if (removedProductId) {
Logger.verbose(`Deleting 1 Product (${removedProductId})`, loggerCtx);
const operations: BulkOperation[] = [{ delete: { _id: removedProductId.toString() } }];
await this.executeBulkOperations(PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, operations);
}
}

private async executeBulkOperations(
indexName: string,
indexType: string,
operations: Array<BulkOperation | BulkOperationDoc<VariantIndexItem | ProductIndexItem>>,
) {
try {
const fullIndexName = this.options.indexPrefix + indexName;
const { body }: { body: BulkResponseBody } = await this.client.bulk({
refresh: 'true',
index: this.options.indexPrefix + indexName,
index: fullIndexName,
type: indexType,
body: operations,
});
Expand All @@ -316,7 +340,7 @@ export class ElasticsearchIndexerController {
}
});
} else {
Logger.verbose(`Executed ${body.items.length} bulk operations on ${indexType}`);
Logger.verbose(`Executed ${body.items.length} bulk operations on index [${fullIndexName}]`);
}
return body;
} catch (e) {
Expand Down

0 comments on commit f0a56fa

Please sign in to comment.