Skip to content

Commit

Permalink
feat(core): Add DB-based persistence for JobQueue
Browse files Browse the repository at this point in the history
Relates to #282

BREAKING CHANGE: A new JobRecord entity has been added, so a DB migration will be needed.
  • Loading branch information
michaelbromley committed Apr 3, 2020
1 parent 7acf532 commit a61df93
Show file tree
Hide file tree
Showing 31 changed files with 511 additions and 132 deletions.
13 changes: 8 additions & 5 deletions e2e-common/test-config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { mergeConfig } from '@vendure/core';
import { MysqlInitializer, PostgresInitializer, registerInitializer, SqljsInitializer, testConfig as defaultTestConfig } from '@vendure/testing';
import {
MysqlInitializer,
PostgresInitializer,
registerInitializer,
SqljsInitializer,
testConfig as defaultTestConfig,
} from '@vendure/testing';
import path from 'path';
import { ConnectionOptions } from 'typeorm';

import { TestingJobQueueStrategy } from '../packages/core/src/job-queue/testing-job-queue-strategy';

import { getPackageDir } from './get-package-dir';

/**
Expand Down Expand Up @@ -35,8 +39,7 @@ export const testConfig = mergeConfig(defaultTestConfig, {
importAssetsDir: path.join(packageDir, 'fixtures/assets'),
},
jobQueueOptions: {
jobQueueStrategy: new TestingJobQueueStrategy(),
pollInterval: 10,
pollInterval: 100,
},
dbConnectionOptions: getDbConfig(),
});
Expand Down
15 changes: 9 additions & 6 deletions packages/admin-ui/src/lib/core/src/common/generated-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1269,10 +1269,11 @@ export type IntCustomFieldConfig = CustomField & {
export type Job = Node & {
__typename?: 'Job';
id: Scalars['ID'];
name: Scalars['String'];
createdAt: Scalars['DateTime'];
queueName: Scalars['String'];
state: JobState;
progress: Scalars['Float'];
metadata?: Maybe<Scalars['JSON']>;
data?: Maybe<Scalars['JSON']>;
result?: Maybe<Scalars['JSON']>;
error?: Maybe<Scalars['JSON']>;
started: Scalars['DateTime'];
Expand All @@ -1282,7 +1283,8 @@ export type Job = Node & {
};

export type JobFilterParameter = {
name?: Maybe<StringOperators>;
createdAt?: Maybe<DateOperators>;
queueName?: Maybe<StringOperators>;
state?: Maybe<StringOperators>;
progress?: Maybe<NumberOperators>;
started?: Maybe<DateOperators>;
Expand All @@ -1306,7 +1308,8 @@ export type JobListOptions = {

export type JobSortParameter = {
id?: Maybe<SortOrder>;
name?: Maybe<SortOrder>;
createdAt?: Maybe<SortOrder>;
queueName?: Maybe<SortOrder>;
progress?: Maybe<SortOrder>;
started?: Maybe<SortOrder>;
settled?: Maybe<SortOrder>;
Expand Down Expand Up @@ -2952,7 +2955,7 @@ export type QueryJobArgs = {


export type QueryJobsArgs = {
input?: Maybe<JobListOptions>;
options?: Maybe<JobListOptions>;
};


Expand Down Expand Up @@ -6094,7 +6097,7 @@ export type GetServerConfigQuery = (

export type JobInfoFragment = (
{ __typename?: 'Job' }
& Pick<Job, 'id' | 'name' | 'state' | 'progress' | 'duration' | 'result'>
& Pick<Job, 'id' | 'queueName' | 'state' | 'progress' | 'duration' | 'result'>
);

export type GetJobInfoQueryVariables = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ export const GET_SERVER_CONFIG = gql`
export const JOB_INFO_FRAGMENT = gql`
fragment JobInfo on Job {
id
name
queueName
state
progress
duration
Expand All @@ -592,7 +592,7 @@ export const GET_JOB_INFO = gql`

export const GET_ALL_JOBS = gql`
query GetAllJobs($input: JobListOptions) {
jobs(input: $input) {
jobs(options: $input) {
items {
...JobInfo
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,10 +1265,11 @@ export type IntCustomFieldConfig = CustomField & {
export type Job = Node & {
__typename?: 'Job';
id: Scalars['ID'];
name: Scalars['String'];
createdAt: Scalars['DateTime'];
queueName: Scalars['String'];
state: JobState;
progress: Scalars['Float'];
metadata?: Maybe<Scalars['JSON']>;
data?: Maybe<Scalars['JSON']>;
result?: Maybe<Scalars['JSON']>;
error?: Maybe<Scalars['JSON']>;
started: Scalars['DateTime'];
Expand All @@ -1278,7 +1279,8 @@ export type Job = Node & {
};

export type JobFilterParameter = {
name?: Maybe<StringOperators>;
createdAt?: Maybe<DateOperators>;
queueName?: Maybe<StringOperators>;
state?: Maybe<StringOperators>;
progress?: Maybe<NumberOperators>;
started?: Maybe<DateOperators>;
Expand All @@ -1302,7 +1304,8 @@ export type JobListOptions = {

export type JobSortParameter = {
id?: Maybe<SortOrder>;
name?: Maybe<SortOrder>;
createdAt?: Maybe<SortOrder>;
queueName?: Maybe<SortOrder>;
progress?: Maybe<SortOrder>;
started?: Maybe<SortOrder>;
settled?: Maybe<SortOrder>;
Expand Down Expand Up @@ -2819,7 +2822,7 @@ export type QueryJobArgs = {
};

export type QueryJobsArgs = {
input?: Maybe<JobListOptions>;
options?: Maybe<JobListOptions>;
};

export type QueryJobsByIdArgs = {
Expand Down
13 changes: 8 additions & 5 deletions packages/common/src/generated-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1261,10 +1261,11 @@ export type IntCustomFieldConfig = CustomField & {
export type Job = Node & {
__typename?: 'Job';
id: Scalars['ID'];
name: Scalars['String'];
createdAt: Scalars['DateTime'];
queueName: Scalars['String'];
state: JobState;
progress: Scalars['Float'];
metadata?: Maybe<Scalars['JSON']>;
data?: Maybe<Scalars['JSON']>;
result?: Maybe<Scalars['JSON']>;
error?: Maybe<Scalars['JSON']>;
started: Scalars['DateTime'];
Expand All @@ -1274,7 +1275,8 @@ export type Job = Node & {
};

export type JobFilterParameter = {
name?: Maybe<StringOperators>;
createdAt?: Maybe<DateOperators>;
queueName?: Maybe<StringOperators>;
state?: Maybe<StringOperators>;
progress?: Maybe<NumberOperators>;
started?: Maybe<DateOperators>;
Expand All @@ -1298,7 +1300,8 @@ export type JobListOptions = {

export type JobSortParameter = {
id?: Maybe<SortOrder>;
name?: Maybe<SortOrder>;
createdAt?: Maybe<SortOrder>;
queueName?: Maybe<SortOrder>;
progress?: Maybe<SortOrder>;
started?: Maybe<SortOrder>;
settled?: Maybe<SortOrder>;
Expand Down Expand Up @@ -2909,7 +2912,7 @@ export type QueryJobArgs = {


export type QueryJobsArgs = {
input?: Maybe<JobListOptions>;
options?: Maybe<JobListOptions>;
};


Expand Down
2 changes: 2 additions & 0 deletions packages/core/e2e/collection.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ describe('Collection resolver', () => {
});

it('re-evaluates Collection contents on move', async () => {
await awaitRunningJobs(adminClient);

const result = await adminClient.query<
GetCollectionProducts.Query,
GetCollectionProducts.Variables
Expand Down
39 changes: 23 additions & 16 deletions packages/core/e2e/default-search-plugin.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe('Default search plugin', () => {
},
},
);
expect(result.search.items.map(i => i.productName)).toEqual([
expect(result.search.items.map((i) => i.productName)).toEqual([
'Camera Lens',
'Instant Camera',
'Slr Camera',
Expand All @@ -125,7 +125,7 @@ describe('Default search plugin', () => {
},
},
);
expect(result.search.items.map(i => i.productName)).toEqual([
expect(result.search.items.map((i) => i.productName)).toEqual([
'Laptop',
'Curvy Monitor',
'Gaming PC',
Expand All @@ -145,7 +145,7 @@ describe('Default search plugin', () => {
},
},
);
expect(result.search.items.map(i => i.productName)).toEqual([
expect(result.search.items.map((i) => i.productName)).toEqual([
'Spiky Cactus',
'Orchid',
'Bonsai Tree',
Expand Down Expand Up @@ -335,7 +335,7 @@ describe('Default search plugin', () => {
},
},
);
expect(result.search.items.map(i => i.productVariantId)).toEqual(['T_1', 'T_2', 'T_4']);
expect(result.search.items.map((i) => i.productVariantId)).toEqual(['T_1', 'T_2', 'T_4']);
});

it('encodes collectionIds', async () => {
Expand Down Expand Up @@ -373,7 +373,7 @@ describe('Default search plugin', () => {
it('updates index when ProductVariants are changed', async () => {
await awaitRunningJobs(adminClient);
const { search } = await doAdminSearchQuery({ term: 'drive', groupByProduct: false });
expect(search.items.map(i => i.sku)).toEqual([
expect(search.items.map((i) => i.sku)).toEqual([
'IHD455T1',
'IHD455T2',
'IHD455T3',
Expand All @@ -384,7 +384,7 @@ describe('Default search plugin', () => {
await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
UPDATE_PRODUCT_VARIANTS,
{
input: search.items.map(i => ({
input: search.items.map((i) => ({
id: i.productVariantId,
sku: i.sku + '_updated',
})),
Expand All @@ -397,7 +397,7 @@ describe('Default search plugin', () => {
groupByProduct: false,
});

expect(search2.items.map(i => i.sku)).toEqual([
expect(search2.items.map((i) => i.sku)).toEqual([
'IHD455T1_updated',
'IHD455T2_updated',
'IHD455T3_updated',
Expand All @@ -423,7 +423,7 @@ describe('Default search plugin', () => {
groupByProduct: false,
});

expect(search2.items.map(i => i.sku)).toEqual([
expect(search2.items.map((i) => i.sku)).toEqual([
'IHD455T2_updated',
'IHD455T3_updated',
'IHD455T4_updated',
Expand All @@ -440,7 +440,7 @@ describe('Default search plugin', () => {
});
await awaitRunningJobs(adminClient);
const result = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
expect(result.search.items.map(i => i.productName)).toEqual([
expect(result.search.items.map((i) => i.productName)).toEqual([
'Curvy Monitor',
'Gaming PC',
'Hard Drive',
Expand All @@ -451,7 +451,7 @@ describe('Default search plugin', () => {

it('updates index when a Product is deleted', async () => {
const { search } = await doAdminSearchQuery({ facetValueIds: ['T_2'], groupByProduct: true });
expect(search.items.map(i => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_5', 'T_6']);
expect(search.items.map((i) => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_5', 'T_6']);
await adminClient.query<DeleteProduct.Mutation, DeleteProduct.Variables>(DELETE_PRODUCT, {
id: 'T_5',
});
Expand All @@ -460,7 +460,7 @@ describe('Default search plugin', () => {
facetValueIds: ['T_2'],
groupByProduct: true,
});
expect(search2.items.map(i => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_6']);
expect(search2.items.map((i) => i.productId)).toEqual(['T_2', 'T_3', 'T_4', 'T_6']);
});

it('updates index when a Collection is changed', async () => {
Expand Down Expand Up @@ -490,9 +490,11 @@ describe('Default search plugin', () => {
},
);
await awaitRunningJobs(adminClient);
// add an additional check for the collection filters to update
await awaitRunningJobs(adminClient);
const result = await doAdminSearchQuery({ collectionId: 'T_2', groupByProduct: true });

expect(result.search.items.map(i => i.productName)).toEqual([
expect(result.search.items.map((i) => i.productName)).toEqual([
'Road Bike',
'Skipping Rope',
'Boxing Gloves',
Expand Down Expand Up @@ -536,11 +538,13 @@ describe('Default search plugin', () => {
},
});
await awaitRunningJobs(adminClient);
// add an additional check for the collection filters to update
await awaitRunningJobs(adminClient);
const result = await doAdminSearchQuery({
collectionId: createCollection.id,
groupByProduct: true,
});
expect(result.search.items.map(i => i.productName)).toEqual([
expect(result.search.items.map((i) => i.productName)).toEqual([
'Instant Camera',
'Camera Lens',
'Tripod',
Expand Down Expand Up @@ -646,7 +650,10 @@ describe('Default search plugin', () => {
await adminClient.query<UpdateProductVariants.Mutation, UpdateProductVariants.Variables>(
UPDATE_PRODUCT_VARIANTS,
{
input: [{ id: 'T_1', enabled: false }, { id: 'T_2', enabled: false }],
input: [
{ id: 'T_1', enabled: false },
{ id: 'T_2', enabled: false },
],
},
);
await awaitRunningJobs(adminClient);
Expand Down Expand Up @@ -725,7 +732,7 @@ describe('Default search plugin', () => {

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
const { search } = await doAdminSearchQuery({ groupByProduct: true });
expect(search.items.map(i => i.productId)).toEqual(['T_1', 'T_2']);
expect(search.items.map((i) => i.productId)).toEqual(['T_1', 'T_2']);
}, 10000);

it('removing product from channel', async () => {
Expand All @@ -743,7 +750,7 @@ describe('Default search plugin', () => {

adminClient.setChannelToken(SECOND_CHANNEL_TOKEN);
const { search } = await doAdminSearchQuery({ groupByProduct: true });
expect(search.items.map(i => i.productId)).toEqual(['T_1']);
expect(search.items.map((i) => i.productId)).toEqual(['T_1']);
}, 10000);
});
});
Expand Down
Loading

0 comments on commit a61df93

Please sign in to comment.