Skip to content

Commit

Permalink
feat(core): Expose pending search index updates operations in Admin API
Browse files Browse the repository at this point in the history
Relates to #1137
  • Loading branch information
michaelbromley committed Oct 7, 2021
1 parent 95d4445 commit 53a1943
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 30 deletions.
2 changes: 2 additions & 0 deletions packages/admin-ui/src/lib/core/src/common/generated-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2308,6 +2308,7 @@ export type Mutation = {
removeSettledJobs: Scalars['Int'];
requestCompleted: Scalars['Int'];
requestStarted: Scalars['Int'];
runPendingSearchIndexUpdates: Success;
setActiveChannel: UserStatus;
setAsLoggedIn: UserStatus;
setAsLoggedOut: UserStatus;
Expand Down Expand Up @@ -3933,6 +3934,7 @@ export type Query = {
paymentMethodEligibilityCheckers: Array<ConfigurableOperationDefinition>;
paymentMethodHandlers: Array<ConfigurableOperationDefinition>;
paymentMethods: PaymentMethodList;
pendingSearchIndexUpdates: Scalars['Int'];
/** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */
product?: Maybe<Product>;
productOptionGroup?: Maybe<ProductOptionGroup>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,7 @@ export type Mutation = {
/** Create a new ProductOption within a ProductOptionGroup */
updateProductOption: ProductOption;
reindex: Job;
runPendingSearchIndexUpdates: Success;
/** Create a new Product */
createProduct: Product;
/** Update an existing Product */
Expand Down Expand Up @@ -3688,6 +3689,7 @@ export type Query = {
productOptionGroups: Array<ProductOptionGroup>;
productOptionGroup?: Maybe<ProductOptionGroup>;
search: SearchResponse;
pendingSearchIndexUpdates: Scalars['Int'];
/** List Products */
products: ProductList;
/** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */
Expand Down
2 changes: 2 additions & 0 deletions packages/common/src/generated-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2281,6 +2281,7 @@ export type Mutation = {
/** Create a new ProductOption within a ProductOptionGroup */
updateProductOption: ProductOption;
reindex: Job;
runPendingSearchIndexUpdates: Success;
/** Create a new Product */
createProduct: Product;
/** Update an existing Product */
Expand Down Expand Up @@ -3881,6 +3882,7 @@ export type Query = {
productOptionGroups: Array<ProductOptionGroup>;
productOptionGroup?: Maybe<ProductOptionGroup>;
search: SearchResponse;
pendingSearchIndexUpdates: Scalars['Int'];
/** List Products */
products: ProductList;
/** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */
Expand Down
2 changes: 2 additions & 0 deletions packages/core/e2e/graphql/generated-e2e-admin-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,7 @@ export type Mutation = {
/** Create a new ProductOption within a ProductOptionGroup */
updateProductOption: ProductOption;
reindex: Job;
runPendingSearchIndexUpdates: Success;
/** Create a new Product */
createProduct: Product;
/** Update an existing Product */
Expand Down Expand Up @@ -3688,6 +3689,7 @@ export type Query = {
productOptionGroups: Array<ProductOptionGroup>;
productOptionGroup?: Maybe<ProductOptionGroup>;
search: SearchResponse;
pendingSearchIndexUpdates: Scalars['Int'];
/** List Products */
products: ProductList;
/** Get a Product either by id or slug. If neither id nor slug is speicified, an error will result. */
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/api/resolvers/admin/job.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class JobResolver {
@Allow(Permission.ReadSettings, Permission.ReadSystem)
async jobBufferSize(@Args() args: QueryJobBufferSizeArgs) {
const bufferSizes = await this.jobBufferService.bufferSize(args.bufferIds);
return Object.entries(bufferSizes).map(([processorId, size]) => ({ processorId, size }));
return Object.entries(bufferSizes).map(([bufferId, size]) => ({ bufferId, size }));
}

@Mutation()
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/api/resolvers/admin/search.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,16 @@ export class SearchResolver {
async reindex(...args: any[]): Promise<any> {
throw new InternalServerError(`error.no-search-plugin-configured`);
}

@Query()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async pendingSearchIndexUpdates(...args: any[]): Promise<any> {
throw new InternalServerError(`error.no-search-plugin-configured`);
}

@Mutation()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async runPendingSearchIndexUpdates(...args: any[]): Promise<any> {
throw new InternalServerError(`error.no-search-plugin-configured`);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
type Query {
search(input: SearchInput!): SearchResponse!
pendingSearchIndexUpdates: Int!
}

type Mutation {
reindex: Job!
runPendingSearchIndexUpdates: Success!
}

6 changes: 4 additions & 2 deletions packages/core/src/job-queue/job-buffer/job-buffer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ export class JobBufferService {
return this.storageStrategy.bufferSize(buffer.map(p => (typeof p === 'string' ? p : p.id)));
}

async flush(forBuffers?: Array<JobBuffer | string>): Promise<void> {
async flush(forBuffers?: Array<JobBuffer | string>): Promise<Job[]> {
const { jobQueueStrategy } = this.configService.jobQueueOptions;
const buffers = forBuffers ?? Array.from(this.buffers);
const flushResult = await this.storageStrategy.flush(
buffers.map(p => (typeof p === 'string' ? p : p.id)),
);
const result: Job[] = [];
for (const buffer of this.buffers) {
const jobsForBuffer = flushResult[buffer.id];
if (jobsForBuffer?.length) {
Expand All @@ -68,9 +69,10 @@ export class JobBufferService {
);
}
for (const job of jobsToAdd) {
await jobQueueStrategy.add(job);
result.push(await jobQueueStrategy.add(job));
}
}
}
return result;
}
}
5 changes: 3 additions & 2 deletions packages/core/src/job-queue/job-queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { JobQueue as GraphQlJobQueue } from '@vendure/common/lib/generated-types
import { ConfigService, JobQueueStrategy, Logger } from '../config';

import { loggerCtx } from './constants';
import { Job } from './job';
import { JobBuffer } from './job-buffer/job-buffer';
import { JobBufferService } from './job-buffer/job-buffer.service';
import { JobQueue } from './job-queue';
Expand Down Expand Up @@ -94,11 +95,11 @@ export class JobQueueService implements OnModuleDestroy {
this.jobBufferService.removeBuffer(buffer);
}

bufferSize(forBuffers?: Array<JobBuffer | string>): Promise<{ [bufferId: string]: number }> {
bufferSize(...forBuffers: Array<JobBuffer<any> | string>): Promise<{ [bufferId: string]: number }> {
return this.jobBufferService.bufferSize(forBuffers);
}

flush(forBuffers?: Array<JobBuffer | string>): Promise<void> {
flush(...forBuffers: Array<JobBuffer<any> | string>): Promise<Job[]> {
return this.jobBufferService.flush(forBuffers);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const PLUGIN_INIT_OPTIONS = Symbol('PLUGIN_INIT_OPTIONS');
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { OnApplicationBootstrap } from '@nestjs/common';
import { Inject, OnApplicationBootstrap } from '@nestjs/common';
import { SearchReindexResponse } from '@vendure/common/lib/generated-types';
import { ID } from '@vendure/common/lib/shared-types';
import { ID, Type } from '@vendure/common/lib/shared-types';
import { buffer, debounceTime, delay, filter, map } from 'rxjs/operators';

import { idsAreEqual } from '../../common/utils';
Expand All @@ -17,13 +17,16 @@ import { JobQueueService } from '../../job-queue/job-queue.service';
import { PluginCommonModule } from '../plugin-common.module';
import { VendurePlugin } from '../vendure-plugin';

import { CollectionJobBuffer } from './collection-job-buffer';
import { PLUGIN_INIT_OPTIONS } from './constants';
import { AdminFulltextSearchResolver, ShopFulltextSearchResolver } from './fulltext-search.resolver';
import { FulltextSearchService } from './fulltext-search.service';
import { IndexerController } from './indexer/indexer.controller';
import { SearchIndexService } from './indexer/search-index.service';
import { SearchIndexItem } from './search-index-item.entity';
import { SearchJobBuffer } from './search-job-buffer';
import { CollectionJobBuffer } from './search-job-buffer/collection-job-buffer';
import { SearchIndexJobBuffer } from './search-job-buffer/search-index-job-buffer';
import { SearchJobBufferService } from './search-job-buffer/search-job-buffer.service';
import { DefaultSearchPluginInitOptions } from './types';

export interface DefaultSearchReindexResponse extends SearchReindexResponse {
timeTaken: number;
Expand Down Expand Up @@ -61,24 +64,34 @@ export interface DefaultSearchReindexResponse extends SearchReindexResponse {
*/
@VendurePlugin({
imports: [PluginCommonModule],
providers: [FulltextSearchService, SearchIndexService, IndexerController],
providers: [
FulltextSearchService,
SearchIndexService,
IndexerController,
SearchJobBufferService,
{ provide: PLUGIN_INIT_OPTIONS, useFactory: () => DefaultSearchPlugin.options },
],
adminApiExtensions: { resolvers: [AdminFulltextSearchResolver] },
shopApiExtensions: { resolvers: [ShopFulltextSearchResolver] },
entities: [SearchIndexItem],
})
export class DefaultSearchPlugin implements OnApplicationBootstrap {
static options: DefaultSearchPluginInitOptions = {};

/** @internal */
constructor(
private eventBus: EventBus,
private searchIndexService: SearchIndexService,
private jobQueueService: JobQueueService,
) {}

static init(options: DefaultSearchPluginInitOptions): Type<DefaultSearchPlugin> {
this.options = options;
return DefaultSearchPlugin;
}

/** @internal */
async onApplicationBootstrap() {
this.jobQueueService.addBuffer(new SearchJobBuffer());
this.jobQueueService.addBuffer(new CollectionJobBuffer());

this.eventBus.ofType(ProductEvent).subscribe(event => {
if (event.type === 'deleted') {
return this.searchIndexService.deleteProduct(event.ctx, event.product);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import { RequestContext } from '../../api/common/request-context';
import { Allow } from '../../api/decorators/allow.decorator';
import { Ctx } from '../../api/decorators/request-context.decorator';
import { SearchResolver as BaseSearchResolver } from '../../api/resolvers/admin/search.resolver';
import { InternalServerError } from '../../common/error/errors';
import { Collection, FacetValue } from '../../entity';

import { FulltextSearchService } from './fulltext-search.service';
import { SearchJobBufferService } from './search-job-buffer/search-job-buffer.service';

@Resolver('SearchResponse')
export class ShopFulltextSearchResolver implements Omit<BaseSearchResolver, 'reindex'> {
export class ShopFulltextSearchResolver
implements Pick<BaseSearchResolver, 'search' | 'facetValues' | 'collections'>
{
constructor(private fulltextSearchService: FulltextSearchService) {}

@Query()
Expand Down Expand Up @@ -52,14 +56,17 @@ export class ShopFulltextSearchResolver implements Omit<BaseSearchResolver, 'rei

@Resolver('SearchResponse')
export class AdminFulltextSearchResolver implements BaseSearchResolver {
constructor(private fulltextSearchService: FulltextSearchService) {}
constructor(
private fulltextSearchService: FulltextSearchService,
private searchJobBufferService: SearchJobBufferService,
) {}

@Query()
@Allow(Permission.ReadCatalog, Permission.ReadProduct)
async search(
@Ctx() ctx: RequestContext,
@Args() args: QuerySearchArgs,
): Promise<Omit<SearchResponse, 'facetValues'| 'collections'>> {
): Promise<Omit<SearchResponse, 'facetValues' | 'collections'>> {
const result = await this.fulltextSearchService.search(ctx, args.input, false);
// ensure the facetValues property resolver has access to the input args
(result as any).input = args.input;
Expand Down Expand Up @@ -87,4 +94,17 @@ export class AdminFulltextSearchResolver implements BaseSearchResolver {
async reindex(@Ctx() ctx: RequestContext) {
return this.fulltextSearchService.reindex(ctx);
}

@Query()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async pendingSearchIndexUpdates(...args: any[]): Promise<any> {
return this.searchJobBufferService.getPendingSearchUpdates();
}

@Mutation()
@Allow(Permission.UpdateCatalog, Permission.UpdateProduct)
async runPendingSearchIndexUpdates(...args: any[]): Promise<any> {
await this.searchJobBufferService.runPendingSearchUpdates();
return { success: true };
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';

import { Job, JobBuffer } from '../../job-queue';
import { ApplyCollectionFiltersJobData } from '../../service/services/collection.service';

import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';
import { Job, JobBuffer } from '../../../job-queue/index';
import { ApplyCollectionFiltersJobData } from '../../../service/services/collection.service';
import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types';

export class CollectionJobBuffer implements JobBuffer<ApplyCollectionFiltersJobData> {
readonly id = 'search-plugin-apply-collection-filters';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';

import { Job, JobBuffer } from '../../job-queue';
import { Job, JobBuffer } from '../../../job-queue/index';
import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from '../types';

import { UpdateIndexQueueJobData, UpdateVariantsByIdJobData, UpdateVariantsJobData } from './types';

export class SearchJobBuffer implements JobBuffer<UpdateIndexQueueJobData> {
export class SearchIndexJobBuffer implements JobBuffer<UpdateIndexQueueJobData> {
readonly id = 'search-plugin-update-search-index';

collect(job: Job<UpdateIndexQueueJobData>): boolean | Promise<boolean> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Inject, Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { forkJoin } from 'rxjs';

import { ConfigService } from '../../../config/config.service';
import { isInspectableJobQueueStrategy } from '../../../config/job-queue/inspectable-job-queue-strategy';
import { JobQueueService } from '../../../job-queue/job-queue.service';
import { SubscribableJob } from '../../../job-queue/subscribable-job';
import { PLUGIN_INIT_OPTIONS } from '../constants';
import { DefaultSearchPluginInitOptions } from '../types';

import { CollectionJobBuffer } from './collection-job-buffer';
import { SearchIndexJobBuffer } from './search-index-job-buffer';

@Injectable()
export class SearchJobBufferService implements OnApplicationBootstrap {
readonly searchIndexJobBuffer = new SearchIndexJobBuffer();
readonly collectionJobBuffer = new CollectionJobBuffer();

constructor(
private jobQueueService: JobQueueService,
private configService: ConfigService,
@Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
) {}

onApplicationBootstrap(): any {
if (this.options.bufferUpdates === true) {
this.jobQueueService.addBuffer(this.searchIndexJobBuffer);
this.jobQueueService.addBuffer(this.collectionJobBuffer);
}
}

async getPendingSearchUpdates(): Promise<number> {
if (!this.options.bufferUpdates) {
return 0;
}
const bufferSizes = await this.jobQueueService.bufferSize(
this.searchIndexJobBuffer,
this.collectionJobBuffer,
);
return (
(bufferSizes[this.searchIndexJobBuffer.id] ?? 0) + (bufferSizes[this.collectionJobBuffer.id] ?? 0)
);
}

async runPendingSearchUpdates(): Promise<void> {
if (!this.options.bufferUpdates) {
return;
}
const { jobQueueStrategy } = this.configService.jobQueueOptions;

const collectionFilterJobs = await this.jobQueueService.flush(this.collectionJobBuffer);
if (collectionFilterJobs.length && isInspectableJobQueueStrategy(jobQueueStrategy)) {
const subscribableCollectionJobs = collectionFilterJobs.map(
job => new SubscribableJob(job, jobQueueStrategy),
);
await forkJoin(
...subscribableCollectionJobs.map(sj =>
sj.updates({ pollInterval: 500, timeoutMs: 3 * 60 * 1000 }),
),
).toPromise();
}
await this.jobQueueService.flush(this.searchIndexJobBuffer);
}
}
4 changes: 4 additions & 0 deletions packages/core/src/plugin/default-search-plugin/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';
import { SerializedRequestContext } from '../../api/common/request-context';
import { Asset } from '../../entity/asset/asset.entity';

export interface DefaultSearchPluginInitOptions {
bufferUpdates?: boolean;
}

export type ReindexMessageResponse = {
total: number;
completed: number;
Expand Down
Loading

0 comments on commit 53a1943

Please sign in to comment.