From d4229684e36db66d320d8766f5c646ab547bba4c Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Fri, 2 Feb 2024 16:26:11 +0100 Subject: [PATCH] docs: Add docs on job cancellation handling Relates to #1127, relates to #2650. --- .../worker-job-queue/index.mdx | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/docs/docs/guides/developer-guide/worker-job-queue/index.mdx b/docs/docs/guides/developer-guide/worker-job-queue/index.mdx index 2e30471358..bba9612b3b 100644 --- a/docs/docs/guides/developer-guide/worker-job-queue/index.mdx +++ b/docs/docs/guides/developer-guide/worker-job-queue/index.mdx @@ -318,6 +318,106 @@ import { adminApiExtensions } from './api/api-extensions'; }) export class ProductVideoPlugin {} ``` + +### Passing the RequestContext + +It is common to need to pass the [RequestContext object](/reference/typescript-api/request/request-context) to the `process` function of a job, since `ctx` is required by many Vendure +service methods that you may be using inside your `process` function. However, the `RequestContext` object itself is not serializable, +so it cannot be passed directly to the `JobQueue.add()` method. Instead, you can serialize the `RequestContext` using the [`RequestContext.serialize()` +method](/reference/typescript-api/request/request-context/#serialize), and then deserialize it in the `process` function using the static `deserialize` method: + +```ts +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { JobQueue, JobQueueService, Product, TransactionalConnection, + SerializedRequestContext, RequestContext } from '@vendure/core'; + +@Injectable() +class ProductExportService implements OnModuleInit { + + // highlight-next-line + private jobQueue: JobQueue<{ ctx: SerializedRequestContext; }>; + + constructor(private jobQueueService: JobQueueService, + private connection: TransactionalConnection) { + } + + async onModuleInit() { + this.jobQueue = await this.jobQueueService.createQueue({ + name: 'export-products', + process: async job => { + // highlight-next-line + const ctx = RequestContext.deserialize(job.data.ctx); + const allProducts = await this.connection.getRepository(ctx, Product).find(); + // ... logic to export the product omitted for brevity + }, + }); + } + + exportAllProducts(ctx: RequestContext) { + // highlight-next-line + return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) }); + } +} +``` + +### Handling job cancellation + +It is possible for an administrator to cancel a running job. Doing so will cause the configured job queue strategy to mark the job as cancelled, but +on its own this will not stop the job from running. This is because the job queue itself has no direct control over the `process` function once +it has been started. + +It is up to the `process` function to check for cancellation and stop processing if the job has been cancelled. This can be done by checking the +`job.state` property, and if the job is cancelled, the `process` function can throw an error to indicate that the job was interrupted +by early cancellation: + +```ts +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { JobQueue, JobQueueService, Product, TransactionalConnection, + SerializedRequestContext, RequestContext, Job, JobState } from '@vendure/core'; +import { IsNull } from 'typeorm'; + +@Injectable() +class ProductExportService implements OnModuleInit { + + private jobQueue: JobQueue<{ ctx: SerializedRequestContext; }>; + + constructor(private jobQueueService: JobQueueService, + private connection: TransactionalConnection) { + } + + async onModuleInit() { + this.jobQueue = await this.jobQueueService.createQueue({ + name: 'export-products', + process: async job => { + const ctx = RequestContext.deserialize(job.data.ctx); + const allProducts = await this.connection.getRepository(ctx, Product).find({ + where: { deletedAt: IsNull() } + }); + let successfulExportCount = 0; + for (const product of allProducts) { + // highlight-start + if (job.state === JobState.CANCELLED) { + // If the job has been cancelled, stop processing + // to prevent unnecessary work. + throw new Error('Job was cancelled'); + } + // highlight-end + + // ... logic to export the product omitted for brevity + successfulExportCount++; + } + return { successfulExportCount }; + }, + }); + } + + exportAllProducts(ctx: RequestContext) { + return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) }); + } +} +``` + + ### Subscribing to job updates When creating a new job via `JobQueue.add()`, it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.