Skip to content

Commit

Permalink
docs: Add docs on job cancellation handling
Browse files Browse the repository at this point in the history
Relates to #1127, relates to #2650.
  • Loading branch information
michaelbromley committed Feb 2, 2024
1 parent c8022be commit d422968
Showing 1 changed file with 100 additions and 0 deletions.
100 changes: 100 additions & 0 deletions docs/docs/guides/developer-guide/worker-job-queue/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,106 @@ import { adminApiExtensions } from './api/api-extensions';
})
export class ProductVideoPlugin {}
```

### Passing the RequestContext

It is common to need to pass the [RequestContext object](/reference/typescript-api/request/request-context) to the `process` function of a job, since `ctx` is required by many Vendure
service methods that you may be using inside your `process` function. However, the `RequestContext` object itself is not serializable,
so it cannot be passed directly to the `JobQueue.add()` method. Instead, you can serialize the `RequestContext` using the [`RequestContext.serialize()`
method](/reference/typescript-api/request/request-context/#serialize), and then deserialize it in the `process` function using the static `deserialize` method:

```ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { JobQueue, JobQueueService, Product, TransactionalConnection,
SerializedRequestContext, RequestContext } from '@vendure/core';

@Injectable()
class ProductExportService implements OnModuleInit {

// highlight-next-line
private jobQueue: JobQueue<{ ctx: SerializedRequestContext; }>;

constructor(private jobQueueService: JobQueueService,
private connection: TransactionalConnection) {
}

async onModuleInit() {
this.jobQueue = await this.jobQueueService.createQueue({
name: 'export-products',
process: async job => {
// highlight-next-line
const ctx = RequestContext.deserialize(job.data.ctx);
const allProducts = await this.connection.getRepository(ctx, Product).find();
// ... logic to export the product omitted for brevity
},
});
}

exportAllProducts(ctx: RequestContext) {
// highlight-next-line
return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) });
}
}
```

### Handling job cancellation

It is possible for an administrator to cancel a running job. Doing so will cause the configured job queue strategy to mark the job as cancelled, but
on its own this will not stop the job from running. This is because the job queue itself has no direct control over the `process` function once
it has been started.

It is up to the `process` function to check for cancellation and stop processing if the job has been cancelled. This can be done by checking the
`job.state` property, and if the job is cancelled, the `process` function can throw an error to indicate that the job was interrupted
by early cancellation:

```ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { JobQueue, JobQueueService, Product, TransactionalConnection,
SerializedRequestContext, RequestContext, Job, JobState } from '@vendure/core';
import { IsNull } from 'typeorm';

@Injectable()
class ProductExportService implements OnModuleInit {

private jobQueue: JobQueue<{ ctx: SerializedRequestContext; }>;

constructor(private jobQueueService: JobQueueService,
private connection: TransactionalConnection) {
}

async onModuleInit() {
this.jobQueue = await this.jobQueueService.createQueue({
name: 'export-products',
process: async job => {
const ctx = RequestContext.deserialize(job.data.ctx);
const allProducts = await this.connection.getRepository(ctx, Product).find({
where: { deletedAt: IsNull() }
});
let successfulExportCount = 0;
for (const product of allProducts) {
// highlight-start
if (job.state === JobState.CANCELLED) {
// If the job has been cancelled, stop processing
// to prevent unnecessary work.
throw new Error('Job was cancelled');
}
// highlight-end

// ... logic to export the product omitted for brevity
successfulExportCount++;
}
return { successfulExportCount };
},
});
}

exportAllProducts(ctx: RequestContext) {
return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) });
}
}
```


### Subscribing to job updates

When creating a new job via `JobQueue.add()`, it is possible to subscribe to updates to that Job (progress and status changes). This allows you, for example, to create resolvers which are able to return the results of a given Job.
Expand Down

0 comments on commit d422968

Please sign in to comment.