Skip to content

Commit

Permalink
feat(core): Add cancelJob mutation
Browse files Browse the repository at this point in the history
Relates to #614
  • Loading branch information
michaelbromley committed Jan 7, 2021
1 parent 77a4e73 commit 2d099cf
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ export type Mutation = {
importProducts?: Maybe<ImportInfo>;
/** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
removeSettledJobs: Scalars['Int'];
cancelJob: Job;
settlePayment: SettlePaymentResult;
addFulfillmentToOrder: AddFulfillmentToOrderResult;
cancelOrder: CancelOrderResult;
Expand Down Expand Up @@ -611,6 +612,10 @@ export type MutationRemoveSettledJobsArgs = {
olderThan?: Maybe<Scalars['DateTime']>;
};

export type MutationCancelJobArgs = {
jobId: Scalars['ID'];
};

export type MutationSettlePaymentArgs = {
id: Scalars['ID'];
};
Expand Down Expand Up @@ -1208,6 +1213,7 @@ export enum JobState {
COMPLETED = 'COMPLETED',
RETRYING = 'RETRYING',
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
}

export type JobList = PaginatedList & {
Expand Down
9 changes: 8 additions & 1 deletion packages/common/src/generated-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ export type Mutation = {
importProducts?: Maybe<ImportInfo>;
/** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
removeSettledJobs: Scalars['Int'];
cancelJob: Job;
settlePayment: SettlePaymentResult;
addFulfillmentToOrder: AddFulfillmentToOrderResult;
cancelOrder: CancelOrderResult;
Expand Down Expand Up @@ -698,6 +699,11 @@ export type MutationRemoveSettledJobsArgs = {
};


export type MutationCancelJobArgs = {
jobId: Scalars['ID'];
};


export type MutationSettlePaymentArgs = {
id: Scalars['ID'];
};
Expand Down Expand Up @@ -1361,7 +1367,8 @@ export enum JobState {
RUNNING = 'RUNNING',
COMPLETED = 'COMPLETED',
RETRYING = 'RETRYING',
FAILED = 'FAILED'
FAILED = 'FAILED',
CANCELLED = 'CANCELLED'
}

export type JobList = PaginatedList & {
Expand Down
6 changes: 3 additions & 3 deletions packages/core/e2e/fixtures/test-plugins/with-job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ class TestController implements OnModuleInit {
this.queue = this.jobQueueService.createQueue({
name: 'test',
concurrency: 1,
process: (job) => {
process: job => {
PluginWithJobQueue.jobSubject.subscribe({
complete: () => {
next: () => {
PluginWithJobQueue.jobHasDoneWork = true;
job.complete();
},
error: (err) => job.fail(err),
error: err => job.fail(err),
});
},
});
Expand Down
18 changes: 18 additions & 0 deletions packages/core/e2e/graphql/generated-e2e-admin-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ export type Mutation = {
importProducts?: Maybe<ImportInfo>;
/** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
removeSettledJobs: Scalars['Int'];
cancelJob: Job;
settlePayment: SettlePaymentResult;
addFulfillmentToOrder: AddFulfillmentToOrderResult;
cancelOrder: CancelOrderResult;
Expand Down Expand Up @@ -611,6 +612,10 @@ export type MutationRemoveSettledJobsArgs = {
olderThan?: Maybe<Scalars['DateTime']>;
};

export type MutationCancelJobArgs = {
jobId: Scalars['ID'];
};

export type MutationSettlePaymentArgs = {
id: Scalars['ID'];
};
Expand Down Expand Up @@ -1208,6 +1213,7 @@ export enum JobState {
COMPLETED = 'COMPLETED',
RETRYING = 'RETRYING',
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
}

export type JobList = PaginatedList & {
Expand Down Expand Up @@ -5683,6 +5689,12 @@ export type GetOrderHistoryQuery = {
>;
};

export type CancelJobMutationVariables = Exact<{
id: Scalars['ID'];
}>;

export type CancelJobMutation = { cancelJob: Pick<Job, 'id' | 'state' | 'isSettled' | 'settledAt'> };

export type UpdateOptionGroupMutationVariables = Exact<{
input: UpdateProductOptionGroupInput;
}>;
Expand Down Expand Up @@ -7660,6 +7672,12 @@ export namespace GetOrderHistory {
>;
}

export namespace CancelJob {
export type Variables = CancelJobMutationVariables;
export type Mutation = CancelJobMutation;
export type CancelJob = NonNullable<CancelJobMutation['cancelJob']>;
}

export namespace UpdateOptionGroup {
export type Variables = UpdateOptionGroupMutationVariables;
export type Mutation = UpdateOptionGroupMutation;
Expand Down
50 changes: 47 additions & 3 deletions packages/core/e2e/job-queue.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { DefaultJobQueuePlugin, mergeConfig } from '@vendure/core';
import { createTestEnvironment } from '@vendure/testing';
import gql from 'graphql-tag';
import path from 'path';

import { initialData } from '../../../e2e-common/e2e-initial-data';
import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';

import { PluginWithJobQueue } from './fixtures/test-plugins/with-job-queue';
import { GetRunningJobs, JobState } from './graphql/generated-e2e-admin-types';
import { CancelJob, GetRunningJobs, JobState } from './graphql/generated-e2e-admin-types';
import { GET_RUNNING_JOBS } from './graphql/shared-definitions';

describe('JobQueue', () => {
Expand Down Expand Up @@ -37,17 +38,23 @@ describe('JobQueue', () => {
}, TEST_SETUP_TIMEOUT_MS);

afterAll(async () => {
PluginWithJobQueue.jobSubject.complete();
await server.destroy();
});

function getJobsInTestQueue() {
function getJobsInTestQueue(state?: JobState) {
return adminClient
.query<GetRunningJobs.Query, GetRunningJobs.Variables>(GET_RUNNING_JOBS, {
options: {
filter: {
queueName: {
eq: 'test',
},
...(state
? {
state: { eq: state },
}
: {}),
},
},
})
Expand Down Expand Up @@ -88,7 +95,7 @@ describe('JobQueue', () => {
);

it('complete job after restart', async () => {
PluginWithJobQueue.jobSubject.complete();
PluginWithJobQueue.jobSubject.next();

await sleep(300);
const jobs = await getJobsInTestQueue();
Expand All @@ -98,8 +105,45 @@ describe('JobQueue', () => {
expect(jobs.items[0].id).toBe(testJobId);
expect(PluginWithJobQueue.jobHasDoneWork).toBe(true);
});

it('cancels a running job', async () => {
PluginWithJobQueue.jobHasDoneWork = false;
const restControllerUrl = `http://localhost:${testConfig.apiOptions.port}/run-job`;
await adminClient.fetch(restControllerUrl);

await sleep(300);
const jobs = await getJobsInTestQueue(JobState.RUNNING);

expect(jobs.items.length).toBe(1);
expect(jobs.items[0].state).toBe(JobState.RUNNING);
expect(PluginWithJobQueue.jobHasDoneWork).toBe(false);
const jobId = jobs.items[0].id;

const { cancelJob } = await adminClient.query<CancelJob.Mutation, CancelJob.Variables>(CANCEL_JOB, {
id: jobId,
});

expect(cancelJob.state).toBe(JobState.CANCELLED);
expect(cancelJob.isSettled).toBe(true);
expect(cancelJob.settledAt).not.toBeNull();

const jobs2 = await getJobsInTestQueue(JobState.CANCELLED);
expect(jobs.items.length).toBe(1);
expect(jobs.items[0].id).toBe(jobId);
});
});

function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}

const CANCEL_JOB = gql`
mutation CancelJob($id: ID!) {
cancelJob(jobId: $id) {
id
state
isSettled
settledAt
}
}
`;
7 changes: 7 additions & 0 deletions packages/core/src/api/resolvers/admin/job.resolver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
import {
MutationCancelJobArgs,
MutationRemoveSettledJobsArgs,
Permission,
QueryJobArgs,
Expand Down Expand Up @@ -43,4 +44,10 @@ export class JobResolver {
removeSettledJobs(@Args() args: MutationRemoveSettledJobsArgs) {
return this.jobService.removeSettledJobs(args.queueNames || [], args.olderThan);
}

@Mutation()
@Allow(Permission.DeleteSettings)
cancelJob(@Args() args: MutationCancelJobArgs) {
return this.jobService.cancelJob(args.jobId);
}
}
2 changes: 2 additions & 0 deletions packages/core/src/api/schema/admin-api/job.api.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Query {
type Mutation {
"Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted."
removeSettledJobs(queueNames: [String!], olderThan: DateTime): Int!
cancelJob(jobId: ID!): Job!
}

"""
Expand All @@ -22,6 +23,7 @@ enum JobState {
COMPLETED
RETRYING
FAILED
CANCELLED
}

input JobListOptions
Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/job-queue/job-queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
const { pollInterval } = this.configService.jobQueueOptions;
if (pollInterval < 100) {
Logger.warn(
`jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not receommended to set this lower than 100ms`,
`jobQueueOptions.pollInterval is set to ${pollInterval}ms. It is not recommended to set this lower than 100ms`,
);
}
await new Promise(resolve => setTimeout(resolve, 1000));
Expand Down Expand Up @@ -149,4 +149,13 @@ export class JobQueueService implements OnApplicationBootstrap, OnModuleDestroy
removeSettledJobs(queueNames: string[], olderThan?: Date) {
return this.jobQueueStrategy.removeSettledJobs(queueNames, olderThan);
}

async cancelJob(jobId: ID) {
const job = await this.jobQueueStrategy.findOne(jobId);
if (job) {
job.cancel();
await this.jobQueueStrategy.update(job);
return job;
}
}
}
10 changes: 9 additions & 1 deletion packages/core/src/job-queue/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { JobConfig, JobData } from './types';
* @docsCategory JobQueue
* @docsPage Job
*/
export type JobEventType = 'start' | 'progress' | 'complete' | 'fail';
export type JobEventType = 'start' | 'progress' | 'complete' | 'fail' | 'cancel';

/**
* @description
Expand Down Expand Up @@ -51,6 +51,7 @@ export class Job<T extends JobData<T> = any> {
progress: [],
complete: [],
fail: [],
cancel: [],
};

get name(): string {
Expand Down Expand Up @@ -165,6 +166,13 @@ export class Job<T extends JobData<T> = any> {
this.fireEvent('fail');
}

cancel() {
this._progress = 0;
this._settledAt = new Date();
this._state = JobState.CANCELLED;
this.fireEvent('cancel');
}

/**
* @description
* Sets a RUNNING job back to PENDING. Should be used when the JobQueue is being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ export type Mutation = {
importProducts?: Maybe<ImportInfo>;
/** Remove all settled jobs in the given queues olfer than the given date. Returns the number of jobs deleted. */
removeSettledJobs: Scalars['Int'];
cancelJob: Job;
settlePayment: SettlePaymentResult;
addFulfillmentToOrder: AddFulfillmentToOrderResult;
cancelOrder: CancelOrderResult;
Expand Down Expand Up @@ -611,6 +612,10 @@ export type MutationRemoveSettledJobsArgs = {
olderThan?: Maybe<Scalars['DateTime']>;
};

export type MutationCancelJobArgs = {
jobId: Scalars['ID'];
};

export type MutationSettlePaymentArgs = {
id: Scalars['ID'];
};
Expand Down Expand Up @@ -1208,6 +1213,7 @@ export enum JobState {
COMPLETED = 'COMPLETED',
RETRYING = 'RETRYING',
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
}

export type JobList = PaginatedList & {
Expand Down
2 changes: 1 addition & 1 deletion schema-admin.json

Large diffs are not rendered by default.

0 comments on commit 2d099cf

Please sign in to comment.