diff --git a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts index 3c1aad3dfa..f2233a2053 100644 --- a/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts +++ b/packages/elasticsearch-plugin/e2e/elasticsearch-plugin.e2e-spec.ts @@ -515,6 +515,8 @@ describe('Elasticsearch plugin', () => { }, ); await awaitRunningJobs(adminClient); + // add an additional check for the collection filters to update + await awaitRunningJobs(adminClient); const result = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true }); expect(result.search.items.map((i) => i.productName)).toEqual([ @@ -561,6 +563,8 @@ describe('Elasticsearch plugin', () => { }, }); await awaitRunningJobs(adminClient); + // add an additional check for the collection filters to update + await awaitRunningJobs(adminClient); const result = await doAdminSearchQuery({ collectionId: createCollection.id, groupByProduct: true, diff --git a/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts b/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts index b0280d2b9a..b15c2ace07 100644 --- a/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts +++ b/packages/elasticsearch-plugin/src/elasticsearch-index.service.ts @@ -3,8 +3,8 @@ import { Asset, ID, Job, - JobReporter, - JobService, + JobQueue, + JobQueueService, Logger, Product, ProductVariant, @@ -21,125 +21,127 @@ import { ReindexMessage, RemoveProductFromChannelMessage, UpdateAssetMessage, + UpdateIndexQueueJobData, UpdateProductMessage, UpdateVariantMessage, UpdateVariantsByIdMessage, } from './types'; +let updateIndexQueue: JobQueue | undefined; + @Injectable() export class ElasticsearchIndexService { - constructor(private workerService: WorkerService, private jobService: JobService) {} - - reindex(ctx: RequestContext, dropIndices: boolean): Job { - return this.jobService.createJob({ - name: 'reindex', - singleInstance: true, - work: async (reporter) => { - Logger.verbose(`sending reindex message`); - this.workerService - .send(new ReindexMessage({ ctx: ctx.serialize(), dropIndices })) - .subscribe(this.createObserver(reporter)); + constructor(private workerService: WorkerService, private jobService: JobQueueService) {} + + initJobQueue() { + updateIndexQueue = this.jobService.createQueue({ + name: 'update-search-index', + concurrency: 1, + process: (job) => { + const data = job.data; + switch (data.type) { + case 'reindex': + Logger.verbose(`sending ReindexMessage`); + this.sendMessageWithProgress(job, new ReindexMessage(data)); + break; + case 'update-product': + this.sendMessage(job, new UpdateProductMessage(data)); + break; + case 'update-variants': + this.sendMessage(job, new UpdateVariantMessage(data)); + break; + case 'delete-product': + this.sendMessage(job, new DeleteProductMessage(data)); + break; + case 'delete-variant': + this.sendMessage(job, new DeleteVariantMessage(data)); + break; + case 'update-variants-by-id': + this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data)); + break; + case 'update-asset': + this.sendMessage(job, new UpdateAssetMessage(data)); + break; + case 'assign-product-to-channel': + this.sendMessage(job, new AssignProductToChannelMessage(data)); + break; + case 'remove-product-from-channel': + this.sendMessage(job, new RemoveProductFromChannelMessage(data)); + break; + } }, }); } + reindex(ctx: RequestContext, dropIndices: boolean) { + return this.addJobToQueue({ type: 'reindex', ctx: ctx.serialize(), dropIndices }); + } + updateProduct(ctx: RequestContext, product: Product) { - const data = { ctx: ctx.serialize(), productId: product.id }; - return this.createShortWorkerJob(new UpdateProductMessage(data), { - entity: 'Product', - id: product.id, - }); + this.addJobToQueue({ type: 'update-product', ctx: ctx.serialize(), productId: product.id }); } updateVariants(ctx: RequestContext, variants: ProductVariant[]) { const variantIds = variants.map((v) => v.id); - const data = { ctx: ctx.serialize(), variantIds }; - return this.createShortWorkerJob(new UpdateVariantMessage(data), { - entity: 'ProductVariant', - ids: variantIds, - }); + this.addJobToQueue({ type: 'update-variants', ctx: ctx.serialize(), variantIds }); } deleteProduct(ctx: RequestContext, product: Product) { - const data = { ctx: ctx.serialize(), productId: product.id }; - return this.createShortWorkerJob(new DeleteProductMessage(data), { - entity: 'Product', - id: product.id, - }); + this.addJobToQueue({ type: 'delete-product', ctx: ctx.serialize(), productId: product.id }); } deleteVariant(ctx: RequestContext, variants: ProductVariant[]) { const variantIds = variants.map((v) => v.id); - const data = { ctx: ctx.serialize(), variantIds }; - return this.createShortWorkerJob(new DeleteVariantMessage(data), { - entity: 'ProductVariant', - id: variantIds, - }); + this.addJobToQueue({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }); } assignProductToChannel(ctx: RequestContext, product: Product, channelId: ID) { - const data = { ctx: ctx.serialize(), productId: product.id, channelId }; - return this.createShortWorkerJob(new AssignProductToChannelMessage(data), { - entity: 'Product', - id: product.id, + this.addJobToQueue({ + type: 'assign-product-to-channel', + ctx: ctx.serialize(), + productId: product.id, + channelId, }); } removeProductFromChannel(ctx: RequestContext, product: Product, channelId: ID) { - const data = { ctx: ctx.serialize(), productId: product.id, channelId }; - return this.createShortWorkerJob(new RemoveProductFromChannelMessage(data), { - entity: 'Product', - id: product.id, + this.addJobToQueue({ + type: 'remove-product-from-channel', + ctx: ctx.serialize(), + productId: product.id, + channelId, }); } updateVariantsById(ctx: RequestContext, ids: ID[]) { - return this.jobService.createJob({ - name: 'update-variants', - metadata: { - variantIds: ids, - }, - work: (reporter) => { - Logger.verbose(`sending UpdateVariantsByIdMessage`); - this.workerService - .send(new UpdateVariantsByIdMessage({ ctx: ctx.serialize(), ids })) - .subscribe(this.createObserver(reporter)); - }, - }); + this.addJobToQueue({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }); } updateAsset(ctx: RequestContext, asset: Asset) { - const data = { ctx: ctx.serialize(), asset }; - return this.createShortWorkerJob(new UpdateAssetMessage(data), { - entity: 'Asset', - id: asset.id, - }); + this.addJobToQueue({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }); } - /** - * Creates a short-running job that does not expect progress updates. - */ - private createShortWorkerJob>(message: T, metadata: any) { - return this.jobService.createJob({ - name: 'update-index', - metadata, - work: (reporter) => { - this.workerService.send(message).subscribe({ - complete: () => reporter.complete(true), - error: (err) => { - Logger.error(err); - reporter.complete(false); - }, - }); + private addJobToQueue(data: UpdateIndexQueueJobData) { + if (updateIndexQueue) { + return updateIndexQueue.add(data); + } + } + + private sendMessage(job: Job, message: WorkerMessage) { + this.workerService.send(message).subscribe({ + complete: () => job.complete(true), + error: (err) => { + Logger.error(err); + job.fail(err); }, }); } - private createObserver(reporter: JobReporter) { + private sendMessageWithProgress(job: Job, message: WorkerMessage) { let total: number | undefined; let duration = 0; let completed = 0; - return { + this.workerService.send(message).subscribe({ next: (response: ReindexMessageResponse) => { if (!total) { total = response.total; @@ -147,10 +149,10 @@ export class ElasticsearchIndexService { duration = response.duration; completed = response.completed; const progress = Math.ceil((completed / total) * 100); - reporter.setProgress(progress); + job.setProgress(progress); }, complete: () => { - reporter.complete({ + job.complete({ success: true, indexedItemCount: total, timeTaken: duration, @@ -158,12 +160,8 @@ export class ElasticsearchIndexService { }, error: (err: any) => { Logger.error(JSON.stringify(err)); - reporter.complete({ - success: false, - indexedItemCount: 0, - timeTaken: 0, - }); + job.fail(); }, - }; + }); } } diff --git a/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts b/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts index 92540e1c7f..ce72937afa 100644 --- a/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts +++ b/packages/elasticsearch-plugin/src/elasticsearch-resolver.ts @@ -1,6 +1,6 @@ import { Args, Mutation, Parent, Query, ResolveField, Resolver } from '@nestjs/graphql'; import { - JobInfo, + Job as GraphQLJob, Permission, QuerySearchArgs, SearchInput, @@ -34,7 +34,7 @@ export class ShopElasticSearchResolver implements Omit> { const facetValues = await this.elasticsearchService.facetValues(ctx, parent.input, true); - return facetValues.filter(i => !i.facetValue.facet.isPrivate); + return facetValues.filter((i) => !i.facetValue.facet.isPrivate); } @ResolveField() @@ -72,7 +72,7 @@ export class AdminElasticSearchResolver implements SearchResolver { @Mutation() @Allow(Permission.UpdateCatalog) - async reindex(@Ctx() ctx: RequestContext): Promise { - return this.elasticsearchService.reindex(ctx, false); + async reindex(@Ctx() ctx: RequestContext): Promise { + return (this.elasticsearchService.reindex(ctx, false) as unknown) as GraphQLJob; } } diff --git a/packages/elasticsearch-plugin/src/elasticsearch.service.ts b/packages/elasticsearch-plugin/src/elasticsearch.service.ts index 68425e4568..08b63aee72 100644 --- a/packages/elasticsearch-plugin/src/elasticsearch.service.ts +++ b/packages/elasticsearch-plugin/src/elasticsearch.service.ts @@ -1,11 +1,12 @@ import { Client } from '@elastic/elasticsearch'; import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; -import { JobInfo, SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types'; +import { SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types'; import { DeepRequired, FacetValue, FacetValueService, InternalServerError, + Job, Logger, RequestContext, SearchService, @@ -104,7 +105,7 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy { body: elasticSearchBody, }); return { - items: body.hits.hits.map(hit => this.mapProductToSearchResult(hit)), + items: body.hits.hits.map((hit) => this.mapProductToSearchResult(hit)), totalItems: body.hits.total.value, }; } else { @@ -114,7 +115,7 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy { body: elasticSearchBody, }); return { - items: body.hits.hits.map(hit => this.mapVariantToSearchResult(hit)), + items: body.hits.hits.map((hit) => this.mapVariantToSearchResult(hit)), totalItems: body.hits.total.value, }; } @@ -153,9 +154,12 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy { const buckets = body.aggregations ? body.aggregations.facetValue.buckets : []; - const facetValues = await this.facetValueService.findByIds(buckets.map(b => b.key), ctx.languageCode); + const facetValues = await this.facetValueService.findByIds( + buckets.map((b) => b.key), + ctx.languageCode, + ); return facetValues.map((facetValue, index) => { - const bucket = buckets.find(b => b.key.toString() === facetValue.id.toString()); + const bucket = buckets.find((b) => b.key.toString() === facetValue.id.toString()); return { facetValue, count: bucket ? bucket.doc_count : 0, @@ -229,28 +233,28 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy { min: aggregations.minPriceWithTax.value || 0, max: aggregations.maxPriceWithTax.value || 0, }, - buckets: aggregations.prices.buckets.map(mapPriceBuckets).filter(x => 0 < x.count), - bucketsWithTax: aggregations.prices.buckets.map(mapPriceBuckets).filter(x => 0 < x.count), + buckets: aggregations.prices.buckets.map(mapPriceBuckets).filter((x) => 0 < x.count), + bucketsWithTax: aggregations.prices.buckets.map(mapPriceBuckets).filter((x) => 0 < x.count), }; } /** * Rebuilds the full search index. */ - async reindex(ctx: RequestContext, dropIndices = true): Promise { + async reindex(ctx: RequestContext, dropIndices = true): Promise { const { indexPrefix } = this.options; - const job = this.elasticsearchIndexService.reindex(ctx, dropIndices); - job.start(); - return job; + const job = await this.elasticsearchIndexService.reindex(ctx, dropIndices); + // tslint:disable-next-line:no-non-null-assertion + return job!; } /** * Reindexes all in current Channel without dropping indices. */ - async updateAll(ctx: RequestContext): Promise { - const job = this.elasticsearchIndexService.reindex(ctx, false); - job.start(); - return job; + async updateAll(ctx: RequestContext): Promise { + const job = await this.elasticsearchIndexService.reindex(ctx, false); + // tslint:disable-next-line:no-non-null-assertion + return job!; } private mapVariantToSearchResult(hit: SearchHit): SearchResult { diff --git a/packages/elasticsearch-plugin/src/indexer.controller.ts b/packages/elasticsearch-plugin/src/indexer.controller.ts index 906b9ca1a3..05a76365bc 100644 --- a/packages/elasticsearch-plugin/src/indexer.controller.ts +++ b/packages/elasticsearch-plugin/src/indexer.controller.ts @@ -9,7 +9,6 @@ import { AsyncQueue, FacetValue, ID, - JobService, Logger, Product, ProductVariant, @@ -76,7 +75,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes @InjectConnection() private connection: Connection, @Inject(ELASTIC_SEARCH_OPTIONS) private options: Required, private productVariantService: ProductVariantService, - private jobService: JobService, ) {} onModuleInit(): any { diff --git a/packages/elasticsearch-plugin/src/plugin.ts b/packages/elasticsearch-plugin/src/plugin.ts index 42847e4310..4c866ca674 100644 --- a/packages/elasticsearch-plugin/src/plugin.ts +++ b/packages/elasticsearch-plugin/src/plugin.ts @@ -240,36 +240,41 @@ export class ElasticsearchPlugin implements OnVendureBootstrap { Logger.info(`Sucessfully connected to Elasticsearch instance at "${host}:${port}"`, loggerCtx); await this.elasticsearchService.createIndicesIfNotExists(); + this.elasticsearchIndexService.initJobQueue(); - this.eventBus.ofType(ProductEvent).subscribe(event => { + this.eventBus.ofType(ProductEvent).subscribe((event) => { if (event.type === 'deleted') { - return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product).start(); + return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product); } else { - return this.elasticsearchIndexService.updateProduct(event.ctx, event.product).start(); + return this.elasticsearchIndexService.updateProduct(event.ctx, event.product); } }); - this.eventBus.ofType(ProductVariantEvent).subscribe(event => { + this.eventBus.ofType(ProductVariantEvent).subscribe((event) => { if (event.type === 'deleted') { - return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants).start(); + return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants); } else { - return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants).start(); + return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants); } }); - this.eventBus.ofType(AssetEvent).subscribe(event => { + this.eventBus.ofType(AssetEvent).subscribe((event) => { if (event.type === 'updated') { - return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset).start(); + return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset); } }); - this.eventBus.ofType(ProductChannelEvent).subscribe(event => { + this.eventBus.ofType(ProductChannelEvent).subscribe((event) => { if (event.type === 'assigned') { - return this.elasticsearchIndexService - .assignProductToChannel(event.ctx, event.product, event.channelId) - .start(); + return this.elasticsearchIndexService.assignProductToChannel( + event.ctx, + event.product, + event.channelId, + ); } else { - return this.elasticsearchIndexService - .removeProductFromChannel(event.ctx, event.product, event.channelId) - .start(); + return this.elasticsearchIndexService.removeProductFromChannel( + event.ctx, + event.product, + event.channelId, + ); } }); @@ -278,18 +283,18 @@ export class ElasticsearchPlugin implements OnVendureBootstrap { collectionModification$ .pipe( buffer(closingNotifier$), - filter(events => 0 < events.length), - map(events => ({ + filter((events) => 0 < events.length), + map((events) => ({ ctx: events[0].ctx, ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]), })), - filter(e => 0 < e.ids.length), + filter((e) => 0 < e.ids.length), ) - .subscribe(events => { - return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids).start(); + .subscribe((events) => { + return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids); }); - this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => { + this.eventBus.ofType(TaxRateModificationEvent).subscribe((event) => { const defaultTaxZone = event.ctx.channel.defaultTaxZone; if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) { return this.elasticsearchService.updateAll(event.ctx); diff --git a/packages/elasticsearch-plugin/src/types.ts b/packages/elasticsearch-plugin/src/types.ts index 760585a106..b39f4d10fc 100644 --- a/packages/elasticsearch-plugin/src/types.ts +++ b/packages/elasticsearch-plugin/src/types.ts @@ -9,6 +9,8 @@ import { import { ID } from '@vendure/common/lib/shared-types'; import { Asset, SerializedRequestContext, WorkerMessage } from '@vendure/core'; +import { JsonCompatible } from '../../common/src/shared-types'; + export type ElasticSearchInput = SearchInput & { priceRange?: PriceRange; priceRangeWithTax?: PriceRange; @@ -175,7 +177,7 @@ export interface ProductChannelMessageData { } export interface UpdateAssetMessageData { ctx: SerializedRequestContext; - asset: Asset; + asset: JsonCompatible>; } export class ReindexMessage extends WorkerMessage { @@ -215,6 +217,28 @@ type CustomMappingDefinition = { valueFn: (...args: Args) => R; }; +type NamedJobData = { type: Type } & MessageData; + +export type ReindexJobData = NamedJobData<'reindex', ReindexMessageData>; +type UpdateProductJobData = NamedJobData<'update-product', UpdateProductMessageData>; +type UpdateVariantsJobData = NamedJobData<'update-variants', UpdateVariantMessageData>; +type DeleteProductJobData = NamedJobData<'delete-product', UpdateProductMessageData>; +type DeleteVariantJobData = NamedJobData<'delete-variant', UpdateVariantMessageData>; +type UpdateVariantsByIdJobData = NamedJobData<'update-variants-by-id', UpdateVariantsByIdMessageData>; +type UpdateAssetJobData = NamedJobData<'update-asset', UpdateAssetMessageData>; +type AssignProductToChannelJobData = NamedJobData<'assign-product-to-channel', ProductChannelMessageData>; +type RemoveProductFromChannelJobData = NamedJobData<'remove-product-from-channel', ProductChannelMessageData>; +export type UpdateIndexQueueJobData = + | ReindexJobData + | UpdateProductJobData + | UpdateVariantsJobData + | DeleteProductJobData + | DeleteVariantJobData + | UpdateVariantsByIdJobData + | UpdateAssetJobData + | AssignProductToChannelJobData + | RemoveProductFromChannelJobData; + type CustomStringMapping = CustomMappingDefinition; type CustomStringMappingNullable = CustomMappingDefinition>; type CustomIntMapping = CustomMappingDefinition;