Skip to content

Commit

Permalink
refactor(elasticsearch-plugin): Use WorkerService
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Sep 2, 2019
1 parent 16ab03d commit 7df2b9c
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 112 deletions.
5 changes: 0 additions & 5 deletions packages/elasticsearch-plugin/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,3 @@ export const VARIANT_INDEX_TYPE = 'variant-index-item';
export const PRODUCT_INDEX_NAME = 'products';
export const PRODUCT_INDEX_TYPE = 'product-index-item';
export const loggerCtx = 'ElasticsearchPlugin';
export enum Message {
Reindex = 'Reindex',
UpdateVariantsById = 'UpdateVariantsById',
UpdateProductOrVariant = 'UpdateProductOrVariant',
}
160 changes: 69 additions & 91 deletions packages/elasticsearch-plugin/src/elasticsearch-index.service.ts
Original file line number Diff line number Diff line change
@@ -1,120 +1,98 @@
import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ID, Job, JobService, Logger, Product, ProductVariant, RequestContext, VENDURE_WORKER_CLIENT } from '@vendure/core';
import {
ID,
Job,
JobReporter,
JobService,
Logger,
Product,
ProductVariant,
RequestContext,
WorkerService,
} from '@vendure/core';

import { Message } from './constants';
import { ReindexMessageResponse } from './indexer.controller';
import { ReindexMessage, UpdateProductOrVariantMessage, UpdateVariantsByIdMessage } from './types';

@Injectable()
export class ElasticsearchIndexService implements OnModuleDestroy {
export class ElasticsearchIndexService {
constructor(private workerService: WorkerService, private jobService: JobService) {}

constructor(@Inject(VENDURE_WORKER_CLIENT) private readonly client: ClientProxy,
private jobService: JobService) {}
reindex(ctx: RequestContext): Job {
return this.jobService.createJob({
name: 'reindex',
singleInstance: true,
work: async reporter => {
Logger.verbose(`sending reindex message`);
this.workerService.send(new ReindexMessage({ ctx })).subscribe(this.createObserver(reporter));
},
});
}

/**
* Updates the search index only for the affected entities.
*/
updateProductOrVariant(ctx: RequestContext, updatedEntity: Product | ProductVariant) {
return this.jobService.createJob({
name: 'update-index',
work: async () => {
if (updatedEntity instanceof Product) {
return this.client.send(Message.UpdateProductOrVariant, { ctx, productId: updatedEntity.id })
.toPromise()
.catch(err => Logger.error(err));
} else {
return this.client.send(Message.UpdateProductOrVariant, { ctx, variantId: updatedEntity.id })
.toPromise()
.catch(err => Logger.error(err));
}
work: reporter => {
const data =
updatedEntity instanceof Product
? { ctx, productId: updatedEntity.id }
: { ctx, variantId: updatedEntity.id };
this.workerService.send(new UpdateProductOrVariantMessage(data)).subscribe({
complete: () => reporter.complete(true),
error: err => {
Logger.error(err);
reporter.complete(false);
},
});
},
});
}

updateVariantsById(ctx: RequestContext, ids: ID[]) {
return this.jobService.createJob({
name: 'update-index',
work: async reporter => {
return new Promise((resolve, reject) => {
Logger.verbose(`sending reindex message`);
let total: number | undefined;
let duration = 0;
let completed = 0;
this.client.send<ReindexMessageResponse>(Message.UpdateVariantsById, { ctx, ids })
.subscribe({
next: response => {
if (!total) {
total = response.total;
}
duration = response.duration;
completed = response.completed;
const progress = Math.ceil((completed / total) * 100);
reporter.setProgress(progress);
},
complete: () => {
resolve({
success: true,
indexedItemCount: total,
timeTaken: duration,
});
},
error: (err) => {
Logger.error(JSON.stringify(err));
resolve({
success: false,
indexedItemCount: 0,
timeTaken: 0,
});
},
});
});
work: reporter => {
Logger.verbose(`sending reindex message`);
this.workerService
.send(new UpdateVariantsByIdMessage({ ctx, ids }))
.subscribe(this.createObserver(reporter));
},
});
}

reindex(ctx: RequestContext): Job {
return this.jobService.createJob({
name: 'reindex',
singleInstance: true,
work: async reporter => {
return new Promise((resolve, reject) => {
Logger.verbose(`sending reindex message`);
let total: number | undefined;
let duration = 0;
let completed = 0;
this.client.send<ReindexMessageResponse>(Message.Reindex, { ctx })
.subscribe({
next: response => {
if (!total) {
total = response.total;
}
duration = response.duration;
completed = response.completed;
const progress = Math.ceil((completed / total) * 100);
reporter.setProgress(progress);
},
complete: () => {
resolve({
success: true,
indexedItemCount: total,
timeTaken: duration,
});
},
error: (err) => {
Logger.error(JSON.stringify(err));
resolve({
success: false,
indexedItemCount: 0,
timeTaken: 0,
});
},
});
private createObserver(reporter: JobReporter) {
let total: number | undefined;
let duration = 0;
let completed = 0;
return {
next: (response: ReindexMessageResponse) => {
if (!total) {
total = response.total;
}
duration = response.duration;
completed = response.completed;
const progress = Math.ceil((completed / total) * 100);
reporter.setProgress(progress);
},
complete: () => {
reporter.complete({
success: true,
indexedItemCount: total,
timeTaken: duration,
});
},
});
}

onModuleDestroy(): any {
this.client.close();
error: (err: any) => {
Logger.error(JSON.stringify(err));
reporter.complete({
success: false,
indexedItemCount: 0,
timeTaken: 0,
});
},
};
}
}
2 changes: 1 addition & 1 deletion packages/elasticsearch-plugin/src/elasticsearch.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Client } from '@elastic/elasticsearch';
import { Inject, Injectable } from '@nestjs/common';
import { JobInfo, SearchInput, SearchResponse, SearchResult } from '@vendure/common/lib/generated-types';
import { JobInfo, SearchResult } from '@vendure/common/lib/generated-types';
import {
DeepRequired,
FacetValue,
Expand Down
24 changes: 9 additions & 15 deletions packages/elasticsearch-plugin/src/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
ELASTIC_SEARCH_CLIENT,
ELASTIC_SEARCH_OPTIONS,
loggerCtx,
Message,
PRODUCT_INDEX_NAME,
PRODUCT_INDEX_TYPE,
VARIANT_INDEX_NAME,
Expand All @@ -34,6 +33,9 @@ import {
BulkOperationDoc,
BulkResponseBody,
ProductIndexItem,
ReindexMessage,
UpdateProductOrVariantMessage,
UpdateVariantsByIdMessage,
VariantIndexItem,
} from './types';

Expand Down Expand Up @@ -68,18 +70,13 @@ export class ElasticsearchIndexerController {
/**
* Updates the search index only for the affected entities.
*/
@MessagePattern(Message.UpdateProductOrVariant)
@MessagePattern(UpdateProductOrVariantMessage.pattern)
updateProductOrVariant({
ctx: rawContext,
productId,
variantId,
}: {
ctx: any;
productId?: ID;
variantId?: ID;
}): Observable<boolean> {
}: UpdateProductOrVariantMessage['data']): Observable<boolean> {
const ctx = RequestContext.fromObject(rawContext);

return defer(async () => {
if (productId) {
await this.updateProduct(ctx, productId);
Expand All @@ -90,14 +87,11 @@ export class ElasticsearchIndexerController {
});
}

@MessagePattern(Message.UpdateVariantsById)
@MessagePattern(UpdateVariantsByIdMessage.pattern)
updateVariantsById({
ctx: rawContext,
ids,
}: {
ctx: any;
ids: ID[];
}): Observable<ReindexMessageResponse> {
}: UpdateVariantsByIdMessage['data']): Observable<UpdateVariantsByIdMessage['response']> {
const ctx = RequestContext.fromObject(rawContext);
const { batchSize } = this.options;

Expand Down Expand Up @@ -164,8 +158,8 @@ export class ElasticsearchIndexerController {
});
}

@MessagePattern(Message.Reindex)
reindex({ ctx: rawContext }: { ctx: any }): Observable<ReindexMessageResponse> {
@MessagePattern(ReindexMessage.pattern)
reindex({ ctx: rawContext }: ReindexMessage['data']): Observable<ReindexMessage['response']> {
const ctx = RequestContext.fromObject(rawContext);
const { batchSize } = this.options;

Expand Down
31 changes: 31 additions & 0 deletions packages/elasticsearch-plugin/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
SearchResult,
} from '@vendure/common/lib/generated-types';
import { ID } from '@vendure/common/lib/shared-types';
import { RequestContext, WorkerMessage } from '@vendure/core';

export type ElasticSearchInput = SearchInput & {
priceRange?: PriceRange;
Expand Down Expand Up @@ -122,3 +123,33 @@ export type BulkResponseBody = {
errors: boolean;
items: BulkResponseResult[];
};

export interface ReindexMessageResponse {
total: number;
completed: number;
duration: number;
}

export type UpdateProductOrVariantMessageData = {
ctx: RequestContext;
productId?: ID;
variantId?: ID;
};

export interface UpdateVariantsByIdMessageData {
ctx: RequestContext;
ids: ID[];
}

export class ReindexMessage extends WorkerMessage<{ ctx: RequestContext }, ReindexMessageResponse> {
static readonly pattern = 'Reindex';
}
export class UpdateProductOrVariantMessage extends WorkerMessage<UpdateProductOrVariantMessageData, boolean> {
static readonly pattern = 'UpdateProductOrVariant';
}
export class UpdateVariantsByIdMessage extends WorkerMessage<
UpdateVariantsByIdMessageData,
ReindexMessageResponse
> {
static readonly pattern = 'UpdateVariantsById';
}

0 comments on commit 7df2b9c

Please sign in to comment.