Skip to content

Commit

Permalink
feat(core): Pass ctx to job queue strategy add (#2759)
Browse files Browse the repository at this point in the history
Closes #2758
  • Loading branch information
seminarian authored Apr 4, 2024
1 parent 37b06cf commit 3909251
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 59 deletions.
2 changes: 1 addition & 1 deletion docs/docs/reference/typescript-api/job-queue/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class JobQueue<Data extends JobData<Data> = object> {
name: string
started: boolean
constructor(options: CreateQueueOptions<Data>, jobQueueStrategy: JobQueueStrategy, jobBufferService: JobBufferService)
add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>) => Promise<SubscribableJob<Data>>;
add(data: Data, options?: JobOptions) => Promise<SubscribableJob<Data>>;
}
```

Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/config/job-queue/job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { JobListOptions } from '@vendure/common/lib/generated-types';
import { ID, PaginatedList } from '@vendure/common/lib/shared-types';

import { InjectableStrategy } from '../../common';
import { JobData } from '../../job-queue';
import { JobData, JobQueueStrategyJobOptions } from '../../job-queue';
import { Job } from '../../job-queue';
import { JobOptions } from '../../job-queue';

/**
* @description
Expand All @@ -25,7 +26,7 @@ export interface JobQueueStrategy extends InjectableStrategy {
* @description
* Add a new job to the queue.
*/
add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>>;
add<Data extends JobData<Data> = object>(job: Job<Data>, jobOptions?: JobQueueStrategyJobOptions<Data>): Promise<Job<Data>>;

/**
* @description
Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/job-queue/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Logger } from '../config/logger/vendure-logger';
import { Job } from './job';
import { JobBufferService } from './job-buffer/job-buffer.service';
import { SubscribableJob } from './subscribable-job';
import { CreateQueueOptions, JobConfig, JobData } from './types';
import { CreateQueueOptions, JobConfig, JobData, JobOptions } from './types';

/**
* @description
Expand Down Expand Up @@ -90,7 +90,7 @@ export class JobQueue<Data extends JobData<Data> = object> {
* .catch(err => err.message);
* ```
*/
async add(data: Data, options?: Pick<JobConfig<Data>, 'retries'>): Promise<SubscribableJob<Data>> {
async add(data: Data, options?: JobOptions<Data>): Promise<SubscribableJob<Data>> {
const job = new Job<any>({
data,
queueName: this.options.name,
Expand All @@ -99,8 +99,8 @@ export class JobQueue<Data extends JobData<Data> = object> {

const isBuffered = await this.jobBufferService.add(job);
if (!isBuffered) {
const addedJob = await this.jobQueueStrategy.add(job);
return new SubscribableJob(addedJob, this.jobQueueStrategy);
const addedJob = await this.jobQueueStrategy.add(job);
return new SubscribableJob(addedJob, this.jobQueueStrategy);
} else {
const bufferedJob = new Job({
...job,
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/job-queue/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { JobState } from '@vendure/common/lib/generated-types';
import { ID, JsonCompatible } from '@vendure/common/lib/shared-types';

import { RequestContext } from '../api/common/request-context';

import { Job } from './job';

/**
Expand Down Expand Up @@ -55,3 +57,9 @@ export interface JobConfig<T extends JobData<T>> {
startedAt?: Date;
settledAt?: Date;
}

export type JobOptions<Data extends JsonCompatible<Data>> = Pick<JobConfig<Data>, 'retries'> & {
ctx?: RequestContext
};

export type JobQueueStrategyJobOptions<Data extends JsonCompatible<Data>> = Omit<JobOptions<Data>, "retries">
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Injector } from '../../common/injector';
import { InspectableJobQueueStrategy, JobQueueStrategy } from '../../config';
import { Logger } from '../../config/logger/vendure-logger';
import { TransactionalConnection } from '../../connection/transactional-connection';
import { Job, JobData } from '../../job-queue';
import { Job, JobData, JobQueueStrategyJobOptions } from '../../job-queue';
import { PollingJobQueueStrategy } from '../../job-queue/polling-job-queue-strategy';
import { ListQueryBuilder } from '../../service/helpers/list-query-builder/list-query-builder';

Expand All @@ -20,27 +20,31 @@ import { JobRecord } from './job-record.entity';
* @docsCategory JobQueue
*/
export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
private connection: Connection | undefined;
private rawConnection: Connection | undefined;
private connection: TransactionalConnection | undefined;
private listQueryBuilder: ListQueryBuilder;

init(injector: Injector) {
this.connection = injector.get(TransactionalConnection).rawConnection;
this.rawConnection = injector.get(TransactionalConnection).rawConnection;
this.connection = injector.get(TransactionalConnection);
this.listQueryBuilder = injector.get(ListQueryBuilder);
super.init(injector);
}

destroy() {
this.connection = undefined;
this.rawConnection = undefined;
super.destroy();
}

async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
if (!this.connectionAvailable(this.connection)) {
async add<Data extends JobData<Data> = object>(job: Job<Data>, jobOptions?: JobQueueStrategyJobOptions<Data>): Promise<Job<Data>> {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
const jobRecordRepository = jobOptions?.ctx && this.connection ? this.connection.getRepository(jobOptions.ctx, JobRecord) :
this.rawConnection.getRepository(JobRecord);
const constrainedData = this.constrainDataSize(job);
const newRecord = this.toRecord(job, constrainedData, this.setRetries(job.queueName, job));
const record = await this.connection.getRepository(JobRecord).save(newRecord);
const record = await jobRecordRepository.save(newRecord);
return this.fromRecord(record);
}

Expand All @@ -49,7 +53,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
* In order to try to prevent that, this method will truncate any strings in the `data` object over 2kb in size.
*/
private constrainDataSize<Data extends JobData<Data> = object>(job: Job<Data>): Data | undefined {
const type = this.connection?.options.type;
const type = this.rawConnection?.options.type;
if (type === 'mysql' || type === 'mariadb') {
const stringified = JSON.stringify(job.data);
if (64 * 1024 <= stringified.length) {
Expand All @@ -76,11 +80,11 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
}

async next(queueName: string): Promise<Job | undefined> {
if (!this.connectionAvailable(this.connection)) {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
const connection = this.connection;
const connectionType = this.connection.options.type;
const connection = this.rawConnection;
const connectionType = this.rawConnection.options.type;
const isSQLite =
connectionType === 'sqlite' || connectionType === 'sqljs' || connectionType === 'better-sqlite3';

Expand Down Expand Up @@ -157,10 +161,10 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
}

async update(job: Job<any>): Promise<void> {
if (!this.connectionAvailable(this.connection)) {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
await this.connection
await this.rawConnection
.getRepository(JobRecord)
.createQueryBuilder('job')
.update()
Expand All @@ -171,7 +175,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
}

async findMany(options?: JobListOptions): Promise<PaginatedList<Job>> {
if (!this.connectionAvailable(this.connection)) {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
return this.listQueryBuilder
Expand All @@ -184,42 +188,42 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
}

async findOne(id: ID): Promise<Job | undefined> {
if (!this.connectionAvailable(this.connection)) {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
const record = await this.connection.getRepository(JobRecord).findOne({ where: { id } });
const record = await this.rawConnection.getRepository(JobRecord).findOne({ where: { id } });
if (record) {
return this.fromRecord(record);
}
}

async findManyById(ids: ID[]): Promise<Job[]> {
if (!this.connectionAvailable(this.connection)) {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
return this.connection
return this.rawConnection
.getRepository(JobRecord)
.find({ where: { id: In(ids) } })
.then(records => records.map(this.fromRecord));
}

async removeSettledJobs(queueNames: string[] = [], olderThan?: Date) {
if (!this.connectionAvailable(this.connection)) {
if (!this.connectionAvailable(this.rawConnection)) {
throw new Error('Connection not available');
}
const findOptions: FindOptionsWhere<JobRecord> = {
...(0 < queueNames.length ? { queueName: In(queueNames) } : {}),
isSettled: true,
settledAt: LessThan(olderThan || new Date()),
};
const toDelete = await this.connection.getRepository(JobRecord).find({ where: findOptions });
const deleteCount = await this.connection.getRepository(JobRecord).count({ where: findOptions });
await this.connection.getRepository(JobRecord).delete(findOptions);
const toDelete = await this.rawConnection.getRepository(JobRecord).find({ where: findOptions });
const deleteCount = await this.rawConnection.getRepository(JobRecord).count({ where: findOptions });
await this.rawConnection.getRepository(JobRecord).delete(findOptions);
return deleteCount;
}

private connectionAvailable(connection: Connection | undefined): connection is Connection {
return !!this.connection && this.connection.isConnected;
return !!this.rawConnection && this.rawConnection.isConnected;
}

private toRecord(job: Job<any>, data?: any, retries?: number): JobRecord {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,45 +67,47 @@ export class SearchIndexService implements OnApplicationBootstrap {
}

reindex(ctx: RequestContext) {
return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() });
return this.updateIndexQueue.add({ type: 'reindex', ctx: ctx.serialize() }, { ctx });
}

updateProduct(ctx: RequestContext, product: Product) {
return this.updateIndexQueue.add({
type: 'update-product',
ctx: ctx.serialize(),
productId: product.id,
});
},
{ ctx });
}

updateVariants(ctx: RequestContext, variants: ProductVariant[]) {
const variantIds = variants.map(v => v.id);
return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds });
return this.updateIndexQueue.add({ type: 'update-variants', ctx: ctx.serialize(), variantIds }, { ctx });
}

deleteProduct(ctx: RequestContext, product: Product) {
return this.updateIndexQueue.add({
type: 'delete-product',
ctx: ctx.serialize(),
productId: product.id,
});
},
{ ctx });
}

deleteVariant(ctx: RequestContext, variants: ProductVariant[]) {
const variantIds = variants.map(v => v.id);
return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds });
return this.updateIndexQueue.add({ type: 'delete-variant', ctx: ctx.serialize(), variantIds }, { ctx });
}

updateVariantsById(ctx: RequestContext, ids: ID[]) {
return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids });
return this.updateIndexQueue.add({ type: 'update-variants-by-id', ctx: ctx.serialize(), ids }, { ctx });
}

updateAsset(ctx: RequestContext, asset: Asset) {
return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any });
return this.updateIndexQueue.add({ type: 'update-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx });
}

deleteAsset(ctx: RequestContext, asset: Asset) {
return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any });
return this.updateIndexQueue.add({ type: 'delete-asset', ctx: ctx.serialize(), asset: asset as any }, { ctx });
}

assignProductToChannel(ctx: RequestContext, productId: ID, channelId: ID) {
Expand All @@ -114,7 +116,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
ctx: ctx.serialize(),
productId,
channelId,
});
},
{ ctx });
}

removeProductFromChannel(ctx: RequestContext, productId: ID, channelId: ID) {
Expand All @@ -123,7 +126,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
ctx: ctx.serialize(),
productId,
channelId,
});
},
{ ctx });
}

assignVariantToChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
Expand All @@ -132,7 +136,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
ctx: ctx.serialize(),
productVariantId,
channelId,
});
},
{ ctx });
}

removeVariantFromChannel(ctx: RequestContext, productVariantId: ID, channelId: ID) {
Expand All @@ -141,7 +146,8 @@ export class SearchIndexService implements OnApplicationBootstrap {
ctx: ctx.serialize(),
productVariantId,
channelId,
});
},
{ ctx });
}

private jobWithProgress(
Expand Down
15 changes: 10 additions & 5 deletions packages/core/src/service/services/collection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ export class CollectionService implements OnModuleInit {
await this.applyFiltersQueue.add({
ctx: event.ctx.serialize(),
collectionIds: collections.map(c => c.id),
});
},
{ ctx: event.ctx });
});

this.applyFiltersQueue = await this.jobQueueService.createQueue({
Expand Down Expand Up @@ -471,7 +472,8 @@ export class CollectionService implements OnModuleInit {
await this.applyFiltersQueue.add({
ctx: ctx.serialize(),
collectionIds: [collection.id],
});
},
{ ctx });
await this.eventBus.publish(new CollectionEvent(ctx, collectionWithRelations, 'created', input));
return assertFound(this.findOne(ctx, collection.id));
}
Expand All @@ -497,7 +499,8 @@ export class CollectionService implements OnModuleInit {
ctx: ctx.serialize(),
collectionIds: [collection.id],
applyToChangedVariantsOnly: false,
});
},
{ ctx });
} else {
const affectedVariantIds = await this.getCollectionProductVariantIds(collection);
await this.eventBus.publish(new CollectionModificationEvent(ctx, collection, affectedVariantIds));
Expand Down Expand Up @@ -571,7 +574,8 @@ export class CollectionService implements OnModuleInit {
await this.applyFiltersQueue.add({
ctx: ctx.serialize(),
collectionIds: [target.id],
});
},
{ ctx });
return assertFound(this.findOne(ctx, input.collectionId));
}

Expand Down Expand Up @@ -829,7 +833,8 @@ export class CollectionService implements OnModuleInit {
await this.applyFiltersQueue.add({
ctx: ctx.serialize(),
collectionIds: collectionsToAssign.map(collection => collection.id),
});
},
{ ctx });

return this.connection
.findByIdsInChannel(
Expand Down
Loading

0 comments on commit 3909251

Please sign in to comment.