diff --git a/packages/elasticsearch-plugin/src/constants.ts b/packages/elasticsearch-plugin/src/constants.ts index 457660f4eb..71e2ad907f 100644 --- a/packages/elasticsearch-plugin/src/constants.ts +++ b/packages/elasticsearch-plugin/src/constants.ts @@ -5,8 +5,3 @@ export const VARIANT_INDEX_TYPE = 'variant-index-item'; export const PRODUCT_INDEX_NAME = 'products'; export const PRODUCT_INDEX_TYPE = 'product-index-item'; export const loggerCtx = 'ElasticsearchPlugin'; -export enum Message { - Reindex = 'Reindex', - UpdateVariantsById = 'UpdateVariantsById', - UpdateProductOrVariant = 'UpdateProductOrVariant', -} diff --git a/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts b/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts index dddebff76d..b6be2eb705 100644 --- a/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts +++ b/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts @@ -1,15 +1,34 @@ import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common'; import { ClientProxy } from '@nestjs/microservices'; -import { ID, Job, JobService, Logger, Product, ProductVariant, RequestContext, VENDURE_WORKER_CLIENT } from '@vendure/core'; +import { + ID, + Job, + JobReporter, + JobService, + Logger, + Product, + ProductVariant, + RequestContext, + WorkerService, +} from '@vendure/core'; -import { Message } from './constants'; import { ReindexMessageResponse } from './indexer.controller'; +import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from './types'; @Injectable() -export class ElasticsearchIndexService implements OnModuleDestroy { +export class ElasticsearchIndexService { + constructor(private workerService: WorkerService, private jobService: JobService) {} - constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy, - private jobService: JobService) {} + reindex(ctx: RequestContext): Job { + return this.jobService.createJob({ + name: 'reindex', + singleInstance: true, + work: async reporter => { + Logger.verbose(`sending reindex message`); + this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter)); + }, + }); + } /** * Updates the search index only for the affected entities. @@ -17,16 +36,18 @@ export class ElasticsearchIndexService implements OnModuleDestroy { updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) { return this.jobService.createJob({ name: 'update-index', - work: async () => { - if (updatedEntity instanceof Product) { - return this.client.send(Message.UpdateProductOrVariant, { ctx, productId: updatedEntity.id }) - .toPromise() - .catch(err => Logger.error(err)); - } else { - return this.client.send(Message.UpdateProductOrVariant, { ctx, variantId: updatedEntity.id }) - .toPromise() - .catch(err => Logger.error(err)); - } + work: reporter => { + const data = + updatedEntity instanceof Product + ? { ctx, productId: updatedEntity.id } + : { ctx, variantId: updatedEntity.id }; + this.workerService.send(new UpdateProductOrVariantMessage(data)).subscribe({ + complete: () => reporter.complete(true), + error: err => { + Logger.error(err); + reporter.complete(false); + }, + }); }, }); } @@ -34,87 +55,44 @@ export class ElasticsearchIndexService implements OnModuleDestroy { updateVariantsById(ctx: RequestContext, ids: ID[]) { return this.jobService.createJob({ name: 'update-index', - work: async reporter => { - return new Promise((resolve, reject) => { - Logger.verbose(`sending reindex message`); - let total: number | undefined; - let duration = 0; - let completed = 0; - this.client.send(Message.UpdateVariantsById, { ctx, ids }) - .subscribe({ - next: response => { - if (!total) { - total = response.total; - } - duration = response.duration; - completed = response.completed; - const progress = Math.ceil((completed / total) * 100); - reporter.setProgress(progress); - }, - complete: () => { - resolve({ - success: true, - indexedItemCount: total, - timeTaken: duration, - }); - }, - error: (err) => { - Logger.error(JSON.stringify(err)); - resolve({ - success: false, - indexedItemCount: 0, - timeTaken: 0, - }); - }, - }); - }); + work: reporter => { + Logger.verbose(`sending reindex message`); + this.workerService + .send(new UpdateVariantsByIdMessage({ ctx, ids })) + .subscribe(this.createObserver(reporter)); }, }); } - reindex(ctx: RequestContext): Job { - return this.jobService.createJob({ - name: 'reindex', - singleInstance: true, - work: async reporter => { - return new Promise((resolve, reject) => { - Logger.verbose(`sending reindex message`); - let total: number | undefined; - let duration = 0; - let completed = 0; - this.client.send(Message.Reindex, { ctx }) - .subscribe({ - next: response => { - if (!total) { - total = response.total; - } - duration = response.duration; - completed = response.completed; - const progress = Math.ceil((completed / total) * 100); - reporter.setProgress(progress); - }, - complete: () => { - resolve({ - success: true, - indexedItemCount: total, - timeTaken: duration, - }); - }, - error: (err) => { - Logger.error(JSON.stringify(err)); - resolve({ - success: false, - indexedItemCount: 0, - timeTaken: 0, - }); - }, - }); + private createObserver(reporter: JobReporter) { + let total: number | undefined; + let duration = 0; + let completed = 0; + return { + next: (response: ReindexMessageResponse) => { + if (!total) { + total = response.total; + } + duration = response.duration; + completed = response.completed; + const progress = Math.ceil((completed / total) * 100); + reporter.setProgress(progress); + }, + complete: () => { + reporter.complete({ + success: true, + indexedItemCount: total, + timeTaken: duration, }); }, - }); - } - - onModuleDestroy(): any { - this.client.close(); + error: (err: any) => { + Logger.error(JSON.stringify(err)); + reporter.complete({ + success: false, + indexedItemCount: 0, + timeTaken: 0, + }); + }, + }; } } diff --git a/packages/elasticsearch-plugin/src/elasticsearch.service.ts b/packages/elasticsearch-plugin/src/elasticsearch.service.ts index c2c9196685..79d64e3cde 100644 --- a/packages/elasticsearch-plugin/src/elasticsearch.service.ts +++ b/packages/elasticsearch-plugin/src/elasticsearch.service.ts @@ -1,6 +1,6 @@ import { Client } from '@elastic/elasticsearch'; import { Inject, Injectable } from '@nestjs/common'; -import { JobInfo, SearchInput, SearchResponse, SearchResult } from '@vendure/common/lib/generated-types'; +import { JobInfo, SearchResult } from '@vendure/common/lib/generated-types'; import { DeepRequired, FacetValue, diff --git a/packages/elasticsearch-plugin/src/indexer.controller.ts b/packages/elasticsearch-plugin/src/indexer.controller.ts index da99dd8e70..9d762793a2 100644 --- a/packages/elasticsearch-plugin/src/indexer.controller.ts +++ b/packages/elasticsearch-plugin/src/indexer.controller.ts @@ -22,7 +22,6 @@ import { ELASTIC_SEARCH_CLIENT, ELASTIC_SEARCH_OPTIONS, loggerCtx, - Message, PRODUCT_INDEX_NAME, PRODUCT_INDEX_TYPE, VARIANT_INDEX_NAME, @@ -34,6 +33,9 @@ import { BulkOperationDoc, BulkResponseBody, ProductIndexItem, + ReindexMessage, + UpdateProductOrVariantMessage, + UpdateVariantsByIdMessage, VariantIndexItem, } from './types'; @@ -68,18 +70,13 @@ export class ElasticsearchIndexerController { /** * Updates the search index only for the affected entities. */ - @MessagePattern(Message.UpdateProductOrVariant) + @MessagePattern(UpdateProductOrVariantMessage.pattern) updateProductOrVariant({ ctx: rawContext, productId, variantId, - }: { - ctx: any; - productId?: ID; - variantId?: ID; - }): Observable { + }: UpdateProductOrVariantMessage['data']): Observable { const ctx = RequestContext.fromObject(rawContext); - return defer(async () => { if (productId) { await this.updateProduct(ctx, productId); @@ -90,14 +87,11 @@ export class ElasticsearchIndexerController { }); } - @MessagePattern(Message.UpdateVariantsById) + @MessagePattern(UpdateVariantsByIdMessage.pattern) updateVariantsById({ ctx: rawContext, ids, - }: { - ctx: any; - ids: ID[]; - }): Observable { + }: UpdateVariantsByIdMessage['data']): Observable { const ctx = RequestContext.fromObject(rawContext); const { batchSize } = this.options; @@ -164,8 +158,8 @@ export class ElasticsearchIndexerController { }); } - @MessagePattern(Message.Reindex) - reindex({ ctx: rawContext }: { ctx: any }): Observable { + @MessagePattern(ReindexMessage.pattern) + reindex({ ctx: rawContext }: ReindexMessage['data']): Observable { const ctx = RequestContext.fromObject(rawContext); const { batchSize } = this.options; diff --git a/packages/elasticsearch-plugin/src/types.ts b/packages/elasticsearch-plugin/src/types.ts index f0c7e8fc93..c968d3f5b4 100644 --- a/packages/elasticsearch-plugin/src/types.ts +++ b/packages/elasticsearch-plugin/src/types.ts @@ -6,6 +6,7 @@ import { SearchResult, } from '@vendure/common/lib/generated-types'; import { ID } from '@vendure/common/lib/shared-types'; +import { RequestContext, WorkerMessage } from '@vendure/core'; export type ElasticSearchInput = SearchInput & { priceRange?: PriceRange; @@ -122,3 +123,33 @@ export type BulkResponseBody = { errors: boolean; items: BulkResponseResult[]; }; + +export interface ReindexMessageResponse { + total: number; + completed: number; + duration: number; +} + +export type UpdateProductOrVariantMessageData = { + ctx: RequestContext; + productId?: ID; + variantId?: ID; +}; + +export interface UpdateVariantsByIdMessageData { + ctx: RequestContext; + ids: ID[]; +} + +export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> { + static readonly pattern = 'Reindex'; +} +export class UpdateProductOrVariantMessage extends WorkerMessage { + static readonly pattern = 'UpdateProductOrVariant'; +} +export class UpdateVariantsByIdMessage extends WorkerMessage< + UpdateVariantsByIdMessageData, + ReindexMessageResponse +> { + static readonly pattern = 'UpdateVariantsById'; +}