Skip to content

Commit

Permalink
feat(elasticsearch-plugin): Update to use new job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Apr 3, 2020
1 parent bbe5855 commit 42b1d28
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ describe('Elasticsearch plugin', () => {
},
);
await awaitRunningJobs(adminClient);
// add an additional check for the collection filters to update
await awaitRunningJobs(adminClient);
const result = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true });

expect(result.search.items.map((i) => i.productName)).toEqual([
Expand Down Expand Up @@ -561,6 +563,8 @@ describe('Elasticsearch plugin', () => {
},
});
await awaitRunningJobs(adminClient);
// add an additional check for the collection filters to update
await awaitRunningJobs(adminClient);
const result = await doAdminSearchQuery({
collectionId: createCollection.id,
groupByProduct: true,
Expand Down
164 changes: 81 additions & 83 deletions packages/elasticsearch-plugin/src/elasticsearch-index.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import {
Asset,
ID,
Job,
JobReporter,
JobService,
JobQueue,
JobQueueService,
Logger,
Product,
ProductVariant,
Expand All @@ -21,149 +21,147 @@ import {
ReindexMessage,
RemoveProductFromChannelMessage,
UpdateAssetMessage,
UpdateIndexQueueJobData,
UpdateProductMessage,
UpdateVariantMessage,
UpdateVariantsByIdMessage,
} from './types';

let updateIndexQueue: JobQueue<UpdateIndexQueueJobData> | undefined;

@Injectable()
export class ElasticsearchIndexService {
constructor(private workerService: WorkerService, private jobService: JobService) {}

reindex(ctx: RequestContext, dropIndices: boolean): Job {
return this.jobService.createJob({
name: 'reindex',
singleInstance: true,
work: async (reporter) => {
Logger.verbose(`sending reindex message`);
this.workerService
.send(new ReindexMessage({ ctx: ctx.serialize(), dropIndices }))
.subscribe(this.createObserver(reporter));
constructor(private workerService: WorkerService, private jobService: JobQueueService) {}

initJobQueue() {
updateIndexQueue = this.jobService.createQueue({
name: 'update-search-index',
concurrency: 1,
process: (job) => {
const data = job.data;
switch (data.type) {
case 'reindex':
Logger.verbose(`sending ReindexMessage`);
this.sendMessageWithProgress(job, new ReindexMessage(data));
break;
case 'update-product':
this.sendMessage(job, new UpdateProductMessage(data));
break;
case 'update-variants':
this.sendMessage(job, new UpdateVariantMessage(data));
break;
case 'delete-product':
this.sendMessage(job, new DeleteProductMessage(data));
break;
case 'delete-variant':
this.sendMessage(job, new DeleteVariantMessage(data));
break;
case 'update-variants-by-id':
this.sendMessageWithProgress(job, new UpdateVariantsByIdMessage(data));
break;
case 'update-asset':
this.sendMessage(job, new UpdateAssetMessage(data));
break;
case 'assign-product-to-channel':
this.sendMessage(job, new AssignProductToChannelMessage(data));
break;
case 'remove-product-from-channel':
this.sendMessage(job, new RemoveProductFromChannelMessage(data));
break;
}
},
});
}

reindex(ctx: RequestContext, dropIndices: boolean) {
return this.addJobToQueue({ type: 'reindex', ctx: ctx.serialize(), dropIndices });
}

updateProduct(ctx: RequestContext, product: Product) {
const data = { ctx: ctx.serialize(), productId: product.id };
return this.createShortWorkerJob(new UpdateProductMessage(data), {
entity: 'Product',
id: product.id,
});
this.addJobToQueue({ type: 'update-product', ctx: ctx.serialize(), productId: product.id });
}

updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
const variantIds = variants.map((v) => v.id);
const data = { ctx: ctx.serialize(), variantIds };
return this.createShortWorkerJob(new UpdateVariantMessage(data), {
entity: 'ProductVariant',
ids: variantIds,
});
this.addJobToQueue({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
}

deleteProduct(ctx: RequestContext, product: Product) {
const data = { ctx: ctx.serialize(), productId: product.id };
return this.createShortWorkerJob(new DeleteProductMessage(data), {
entity: 'Product',
id: product.id,
});
this.addJobToQueue({ type: 'delete-product', ctx: ctx.serialize(), productId: product.id });
}

deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
const variantIds = variants.map((v) => v.id);
const data = { ctx: ctx.serialize(), variantIds };
return this.createShortWorkerJob(new DeleteVariantMessage(data), {
entity: 'ProductVariant',
id: variantIds,
});
this.addJobToQueue({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
}

assignProductToChannel(ctx: RequestContext, product: Product, channelId: ID) {
const data = { ctx: ctx.serialize(), productId: product.id, channelId };
return this.createShortWorkerJob(new AssignProductToChannelMessage(data), {
entity: 'Product',
id: product.id,
this.addJobToQueue({
type: 'assign-product-to-channel',
ctx: ctx.serialize(),
productId: product.id,
channelId,
});
}

removeProductFromChannel(ctx: RequestContext, product: Product, channelId: ID) {
const data = { ctx: ctx.serialize(), productId: product.id, channelId };
return this.createShortWorkerJob(new RemoveProductFromChannelMessage(data), {
entity: 'Product',
id: product.id,
this.addJobToQueue({
type: 'remove-product-from-channel',
ctx: ctx.serialize(),
productId: product.id,
channelId,
});
}

updateVariantsById(ctx: RequestContext, ids: ID[]) {
return this.jobService.createJob({
name: 'update-variants',
metadata: {
variantIds: ids,
},
work: (reporter) => {
Logger.verbose(`sending UpdateVariantsByIdMessage`);
this.workerService
.send(new UpdateVariantsByIdMessage({ ctx: ctx.serialize(), ids }))
.subscribe(this.createObserver(reporter));
},
});
this.addJobToQueue({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
}

updateAsset(ctx: RequestContext, asset: Asset) {
const data = { ctx: ctx.serialize(), asset };
return this.createShortWorkerJob(new UpdateAssetMessage(data), {
entity: 'Asset',
id: asset.id,
});
this.addJobToQueue({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
}

/**
* Creates a short-running job that does not expect progress updates.
*/
private createShortWorkerJob<T extends WorkerMessage<any, any>>(message: T, metadata: any) {
return this.jobService.createJob({
name: 'update-index',
metadata,
work: (reporter) => {
this.workerService.send(message).subscribe({
complete: () => reporter.complete(true),
error: (err) => {
Logger.error(err);
reporter.complete(false);
},
});
private addJobToQueue(data: UpdateIndexQueueJobData) {
if (updateIndexQueue) {
return updateIndexQueue.add(data);
}
}

private sendMessage(job: Job<any>, message: WorkerMessage<any, any>) {
this.workerService.send(message).subscribe({
complete: () => job.complete(true),
error: (err) => {
Logger.error(err);
job.fail(err);
},
});
}

private createObserver(reporter: JobReporter) {
private sendMessageWithProgress(job: Job<any>, message: WorkerMessage<any, ReindexMessageResponse>) {
let total: number | undefined;
let duration = 0;
let completed = 0;
return {
this.workerService.send(message).subscribe({
next: (response: ReindexMessageResponse) => {
if (!total) {
total = response.total;
}
duration = response.duration;
completed = response.completed;
const progress = Math.ceil((completed / total) * 100);
reporter.setProgress(progress);
job.setProgress(progress);
},
complete: () => {
reporter.complete({
job.complete({
success: true,
indexedItemCount: total,
timeTaken: duration,
});
},
error: (err: any) => {
Logger.error(JSON.stringify(err));
reporter.complete({
success: false,
indexedItemCount: 0,
timeTaken: 0,
});
job.fail();
},
};
});
}
}
8 changes: 4 additions & 4 deletions packages/elasticsearch-plugin/src/elasticsearch-resolver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Args, Mutation, Parent, Query, ResolveField, Resolver } from '@nestjs/graphql';
import {
JobInfo,
Job as GraphQLJob,
Permission,
QuerySearchArgs,
SearchInput,
Expand Down Expand Up @@ -34,7 +34,7 @@ export class ShopElasticSearchResolver implements Omit<SearchResolver, 'reindex'
@Parent() parent: { input: ElasticSearchInput },
): Promise<Array<{ facetValue: FacetValue; count: number }>> {
const facetValues = await this.elasticsearchService.facetValues(ctx, parent.input, true);
return facetValues.filter(i => !i.facetValue.facet.isPrivate);
return facetValues.filter((i) => !i.facetValue.facet.isPrivate);
}

@ResolveField()
Expand Down Expand Up @@ -72,7 +72,7 @@ export class AdminElasticSearchResolver implements SearchResolver {

@Mutation()
@Allow(Permission.UpdateCatalog)
async reindex(@Ctx() ctx: RequestContext): Promise<JobInfo> {
return this.elasticsearchService.reindex(ctx, false);
async reindex(@Ctx() ctx: RequestContext): Promise<GraphQLJob> {
return (this.elasticsearchService.reindex(ctx, false) as unknown) as GraphQLJob;
}
}
34 changes: 19 additions & 15 deletions packages/elasticsearch-plugin/src/elasticsearch.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Client } from '@elastic/elasticsearch';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { JobInfo, SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types';
import { SearchResult, SearchResultAsset } from '@vendure/common/lib/generated-types';
import {
DeepRequired,
FacetValue,
FacetValueService,
InternalServerError,
Job,
Logger,
RequestContext,
SearchService,
Expand Down Expand Up @@ -104,7 +105,7 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
body: elasticSearchBody,
});
return {
items: body.hits.hits.map(hit => this.mapProductToSearchResult(hit)),
items: body.hits.hits.map((hit) => this.mapProductToSearchResult(hit)),
totalItems: body.hits.total.value,
};
} else {
Expand All @@ -114,7 +115,7 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
body: elasticSearchBody,
});
return {
items: body.hits.hits.map(hit => this.mapVariantToSearchResult(hit)),
items: body.hits.hits.map((hit) => this.mapVariantToSearchResult(hit)),
totalItems: body.hits.total.value,
};
}
Expand Down Expand Up @@ -153,9 +154,12 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {

const buckets = body.aggregations ? body.aggregations.facetValue.buckets : [];

const facetValues = await this.facetValueService.findByIds(buckets.map(b => b.key), ctx.languageCode);
const facetValues = await this.facetValueService.findByIds(
buckets.map((b) => b.key),
ctx.languageCode,
);
return facetValues.map((facetValue, index) => {
const bucket = buckets.find(b => b.key.toString() === facetValue.id.toString());
const bucket = buckets.find((b) => b.key.toString() === facetValue.id.toString());
return {
facetValue,
count: bucket ? bucket.doc_count : 0,
Expand Down Expand Up @@ -229,28 +233,28 @@ export class ElasticsearchService implements OnModuleInit, OnModuleDestroy {
min: aggregations.minPriceWithTax.value || 0,
max: aggregations.maxPriceWithTax.value || 0,
},
buckets: aggregations.prices.buckets.map(mapPriceBuckets).filter(x => 0 < x.count),
bucketsWithTax: aggregations.prices.buckets.map(mapPriceBuckets).filter(x => 0 < x.count),
buckets: aggregations.prices.buckets.map(mapPriceBuckets).filter((x) => 0 < x.count),
bucketsWithTax: aggregations.prices.buckets.map(mapPriceBuckets).filter((x) => 0 < x.count),
};
}

/**
* Rebuilds the full search index.
*/
async reindex(ctx: RequestContext, dropIndices = true): Promise<JobInfo> {
async reindex(ctx: RequestContext, dropIndices = true): Promise<Job> {
const { indexPrefix } = this.options;
const job = this.elasticsearchIndexService.reindex(ctx, dropIndices);
job.start();
return job;
const job = await this.elasticsearchIndexService.reindex(ctx, dropIndices);
// tslint:disable-next-line:no-non-null-assertion
return job!;
}

/**
* Reindexes all in current Channel without dropping indices.
*/
async updateAll(ctx: RequestContext): Promise<JobInfo> {
const job = this.elasticsearchIndexService.reindex(ctx, false);
job.start();
return job;
async updateAll(ctx: RequestContext): Promise<Job> {
const job = await this.elasticsearchIndexService.reindex(ctx, false);
// tslint:disable-next-line:no-non-null-assertion
return job!;
}

private mapVariantToSearchResult(hit: SearchHit<VariantIndexItem>): SearchResult {
Expand Down
2 changes: 0 additions & 2 deletions packages/elasticsearch-plugin/src/indexer.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
AsyncQueue,
FacetValue,
ID,
JobService,
Logger,
Product,
ProductVariant,
Expand Down Expand Up @@ -76,7 +75,6 @@ export class ElasticsearchIndexerController implements OnModuleInit, OnModuleDes
@InjectConnection() private connection: Connection,
@Inject(ELASTIC_SEARCH_OPTIONS) private options: Required<ElasticsearchOptions>,
private productVariantService: ProductVariantService,
private jobService: JobService,
) {}

onModuleInit(): any {
Expand Down
Loading

0 comments on commit 42b1d28

Please sign in to comment.