Skip to content

Commit

Permalink
use pinned query instead of sorting by _id
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Jan 14, 2020
1 parent af07e54 commit 0c33928
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { defaultsDeep } from 'lodash';
import {
BoolClause,
IDsClause,
SortClause,
ScriptClause,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
mustBeAllOf,
} from './query_clauses';

export const TaskWithSchedule: ExistsBoolClause = {
Expand All @@ -37,30 +36,35 @@ export function taskWithLessThanMaxAttempts(
};
}

export function claimedTasks(taskManagerId: string) {
return mustBeAllOf(
{
term: {
'task.ownerId': taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } }
);
}

export const IdleTaskWithExpiredRunAt: BoolClause<TermBoolClause | RangeBoolClause> = {
bool: {
must: [{ term: { 'task.status': 'idle' } }, { range: { 'task.runAt': { lte: 'now' } } }],
},
};

export const taskWithIDsAndRunnableStatus = (
claimTasksById: string[]
): BoolClause<TermBoolClause | IDsClause> => ({
export const InactiveTasks: BoolClause<TermBoolClause | RangeBoolClause> = {
bool: {
must: [
must_not: [
{
bool: {
should: [{ term: { 'task.status': 'idle' } }, { term: { 'task.status': 'failed' } }],
},
},
{
ids: {
values: claimTasksById,
should: [{ term: { 'task.status': 'running' } }, { term: { 'task.status': 'claiming' } }],
},
},
{ range: { 'task.retryAt': { gt: 'now' } } },
],
},
});
};

export const RunningOrClaimingTaskWithExpiredRetryAt: BoolClause<
TermBoolClause | RangeBoolClause
Expand Down Expand Up @@ -95,31 +99,6 @@ if (doc['task.runAt'].size()!=0) {
},
};

const SORT_VALUE_TO_BE_FIRST = 0;
export const sortByIdsThenByScheduling = (claimTasksById: string[]): SortClause => {
const {
_script: {
script: { source },
},
} = SortByRunAtAndRetryAt;
return defaultsDeep(
{
_script: {
script: {
source: `
if(params.ids.contains(doc['_id'].value)){
return ${SORT_VALUE_TO_BE_FIRST};
}
${source}
`,
params: { ids: claimTasksById },
},
},
},
SortByRunAtAndRetryAt
);
};

export const updateFields = (fieldUpdates: {
[field: string]: string | number | Date;
}): ScriptClause => ({
Expand Down
51 changes: 51 additions & 0 deletions x-pack/plugins/task_manager/server/queries/query_clauses.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import _ from 'lodash';
import {
BoolClause,
shouldBeOneOf,
mustBeAllOf,
ExistsBoolClause,
TermBoolClause,
RangeBoolClause,
mergeBoolClauses,
} from './query_clauses';

describe('mergeBoolClauses', () => {
test('merges multiple types of Bool Clauses into one', () => {
const TaskWithSchedule: ExistsBoolClause = {
exists: { field: 'task.schedule' },
};

const IdleTaskWithExpiredRunAt: BoolClause<TermBoolClause | RangeBoolClause> = {
bool: {
must: [{ term: { 'task.status': 'idle' } }, { range: { 'task.runAt': { lte: 'now' } } }],
},
};

const RunningTask: BoolClause<TermBoolClause> = {
bool: {
must: [{ term: { 'task.status': 'running' } }],
},
};

expect(
mergeBoolClauses(
mustBeAllOf(TaskWithSchedule),
shouldBeOneOf<ExistsBoolClause | TermBoolClause | RangeBoolClause>(
RunningTask,
IdleTaskWithExpiredRunAt
)
)
).toMatchObject({
bool: {
must: [TaskWithSchedule],
should: [RunningTask, IdleTaskWithExpiredRunAt],
},
});
});
});
93 changes: 77 additions & 16 deletions x-pack/plugins/task_manager/server/queries/query_clauses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,41 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { merge, isArray } from 'lodash';

export interface TermBoolClause {
term: { [field: string]: string | string[] };
}
export interface RangeBoolClause {
range: { [field: string]: { lte: string | number } | { lt: string | number } };
range: {
[field: string]: { lte: string | number } | { lt: string | number } | { gt: string | number };
};
}
export interface ExistsBoolClause {
exists: { field: string };
}

export interface IDsClause {
ids: {
values: string[];
};
}
type BoolClauseFilters<T> = BoolClause<T> | PinnedQuery<T> | T;
export interface ShouldClause<T> {
should: Array<BoolClause<T> | IDsClause | T>;
should: Array<BoolClauseFilters<T>>;
}
export interface MustClause<T> {
must: Array<BoolClause<T> | IDsClause | T>;
must: Array<BoolClauseFilters<T>>;
}
export interface MustNotClause<T> {
must_not: Array<BoolClauseFilters<T>>;
}
export interface FilterClause<T> {
filter: Array<BoolClauseFilters<T>>;
}
export interface BoolClause<T> {
bool: MustClause<T> | ShouldClause<T>;
bool: MustClause<T> | ShouldClause<T> | MustNotClause<T> | FilterClause<T>;
}

export interface BoolClauses<T> {
bool: Partial<MustClause<T> & ShouldClause<T> & MustNotClause<T> & FilterClause<T>>;
}

export interface SortClause {
_script: {
type: string;
Expand All @@ -39,22 +49,49 @@ export interface SortClause {
};
};
}
export type SortOptions = string | SortClause | Array<string | SortClause>;

export interface ScriptClause {
source: string;
lang: string;
params: {
[field: string]: string | number | Date;
};
}

export interface UpdateByQuery<T> {
query: BoolClause<T>;
sort: SortClause;
query: PinnedQuery<T> | BoolClause<T> | BoolClauses<T>;
sort: SortOptions;
seq_no_primary_term: true;
script: ScriptClause;
}

export interface PinnedQuery<T> {
pinned: PinnedClause<T>;
}

export interface PinnedClause<T> {
ids: string[];
organic: BoolClause<T>;
}

export function mergeBoolClauses<T>(...clauses: Array<BoolClause<T>>): BoolClauses<T> {
return merge({}, ...clauses, function(
existingBoolClause: Array<BoolClauseFilters<T>>,
boolClauseOfSameType: Array<BoolClauseFilters<T>>
) {
// If we have two bool clauses of same type (FOR EXAMPLE
// two `must` clauses, we merge them, into one)
if (isArray(existingBoolClause)) {
return existingBoolClause.concat(boolClauseOfSameType);
}
// otherwise dont return anything and the default behaviour
// merges this clause into the final object
});
}

export function shouldBeOneOf<T>(
...should: Array<BoolClause<T> | IDsClause | T>
...should: Array<BoolClauseFilters<T>>
): {
bool: ShouldClause<T>;
} {
Expand All @@ -66,7 +103,7 @@ export function shouldBeOneOf<T>(
}

export function mustBeAllOf<T>(
...must: Array<BoolClause<T> | IDsClause | T>
...must: Array<BoolClauseFilters<T>>
): {
bool: MustClause<T>;
} {
Expand All @@ -77,14 +114,38 @@ export function mustBeAllOf<T>(
};
}

export function filterDownBy<T>(
...filter: Array<BoolClauseFilters<T>>
): {
bool: FilterClause<T>;
} {
return {
bool: {
filter,
},
};
}

export function asPinnedQuery<T>(
ids: PinnedClause<T>['ids'],
organic: PinnedClause<T>['organic']
): PinnedQuery<T> {
return {
pinned: {
ids,
organic,
},
};
}

export function asUpdateByQuery<T>({
query,
update,
sort,
}: {
query: BoolClause<T>;
update: ScriptClause;
sort: SortClause;
query: UpdateByQuery<T>['query'];
update: UpdateByQuery<T>['script'];
sort: UpdateByQuery<T>['sort'];
}): UpdateByQuery<T> {
return {
query,
Expand Down
Loading

0 comments on commit 0c33928

Please sign in to comment.