Skip to content

Commit

Permalink
Generate uuid in task Manager as Kibana uuid may not yet have been in…
Browse files Browse the repository at this point in the history
…itialised

Generate uuid for TaskManager at startup
  • Loading branch information
gmmorris authored Oct 5, 2019
1 parent c505fcb commit 02012f0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 23 deletions.
10 changes: 9 additions & 1 deletion x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import uuid from 'uuid';
import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server';
import { Logger } from './types';
import { fillPool } from './lib/fill_pool';
Expand All @@ -30,6 +31,12 @@ export interface TaskManagerOpts {
serializer: SavedObjectsSerializer;
}

function generateTaskManagerUUID(logger: Logger): string {
const taskManagerUUID = uuid.v4();
logger.info(`Initialising Task Manager with UUID: ${taskManagerUUID}`);
return taskManagerUUID;
}

/*
* The TaskManager is the public interface into the task manager system. This glues together
* all of the disparate modules in one integration point. The task manager operates in two different ways:
Expand Down Expand Up @@ -78,8 +85,9 @@ export class TaskManager {
index: opts.config.get('xpack.task_manager.index'),
maxAttempts: opts.config.get('xpack.task_manager.max_attempts'),
definitions: this.definitions,
kibanaId: opts.config.get('server.uuid'),
taskManagerId: generateTaskManagerUUID(this.logger),
});

const pool = new TaskPool({
logger: this.logger,
maxWorkers: this.maxWorkers,
Expand Down
34 changes: 17 additions & 17 deletions x-pack/legacy/plugins/task_manager/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe('TaskStore', () => {
);
const store = new TaskStore({
index: 'tasky',
kibanaId: '',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
Expand Down Expand Up @@ -161,7 +161,7 @@ describe('TaskStore', () => {
const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits } }));
const store = new TaskStore({
index: 'tasky',
kibanaId: '',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
Expand Down Expand Up @@ -353,7 +353,7 @@ describe('TaskStore', () => {
const callCluster = sinon.spy(async (name: string, params?: any) => ({ hits: { hits: [] } }));
const store = new TaskStore({
index: 'tasky',
kibanaId: '',
taskManagerId: '',
serializer,
callCluster,
definitions: taskDefinitions,
Expand Down Expand Up @@ -579,7 +579,7 @@ describe('TaskStore', () => {
definitions: taskDefinitions,
serializer,
savedObjectsRepository: savedObjectsClient,
kibanaId: '',
taskManagerId: '',
index: '',
...opts,
});
Expand All @@ -603,7 +603,7 @@ describe('TaskStore', () => {
}));
const store = new TaskStore({
index: 'tasky',
kibanaId: '',
taskManagerId: '',
serializer,
callCluster,
definitions: taskDefinitions,
Expand Down Expand Up @@ -730,7 +730,7 @@ describe('TaskStore', () => {
});

test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const kibanaId = uuid.v1();
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const {
args: {
Expand All @@ -740,7 +740,7 @@ describe('TaskStore', () => {
},
} = await testClaimAvailableTasks({
opts: {
kibanaId,
taskManagerId,
},
claimingOpts: {
claimOwnershipUntil,
Expand All @@ -751,15 +751,15 @@ describe('TaskStore', () => {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
lang: 'painless',
params: {
ownerId: kibanaId,
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
},
});
});

test('it returns task objects', async () => {
const kibanaId = uuid.v1();
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const runAt = new Date();
const tasks = [
Expand All @@ -777,7 +777,7 @@ describe('TaskStore', () => {
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
ownerId: kibanaId,
ownerId: taskManagerId,
},
},
_seq_no: 1,
Expand All @@ -798,7 +798,7 @@ describe('TaskStore', () => {
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: kibanaId,
ownerId: taskManagerId,
},
},
_seq_no: 3,
Expand All @@ -815,7 +815,7 @@ describe('TaskStore', () => {
},
} = await testClaimAvailableTasks({
opts: {
kibanaId,
taskManagerId,
},
claimingOpts: {
claimOwnershipUntil,
Expand All @@ -829,7 +829,7 @@ describe('TaskStore', () => {
must: [
{
term: {
'task.ownerId': kibanaId,
'task.ownerId': taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } },
Expand All @@ -849,7 +849,7 @@ describe('TaskStore', () => {
status: 'idle',
taskType: 'foo',
user: 'jimbo',
ownerId: kibanaId,
ownerId: taskManagerId,
},
{
attempts: 2,
Expand All @@ -862,7 +862,7 @@ describe('TaskStore', () => {
status: 'running',
taskType: 'bar',
user: 'dabo',
ownerId: kibanaId,
ownerId: taskManagerId,
},
]);
});
Expand Down Expand Up @@ -899,7 +899,7 @@ describe('TaskStore', () => {

const store = new TaskStore({
index: 'tasky',
kibanaId: '',
taskManagerId: '',
serializer,
callCluster: jest.fn(),
maxAttempts: 2,
Expand Down Expand Up @@ -948,7 +948,7 @@ describe('TaskStore', () => {
const callCluster = jest.fn();
const store = new TaskStore({
index: 'tasky',
kibanaId: '',
taskManagerId: '',
serializer,
callCluster,
maxAttempts: 2,
Expand Down
10 changes: 5 additions & 5 deletions x-pack/legacy/plugins/task_manager/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
export interface StoreOpts {
callCluster: ElasticJs;
index: string;
kibanaId: string;
taskManagerId: string;
maxAttempts: number;
definitions: TaskDictionary<TaskDefinition>;
savedObjectsRepository: SavedObjectsClientContract;
Expand Down Expand Up @@ -83,7 +83,7 @@ export interface UpdateByQueryResult {
export class TaskStore {
public readonly maxAttempts: number;
public readonly index: string;
public readonly kibanaId: string;
public readonly taskManagerId: string;
private callCluster: ElasticJs;
private definitions: TaskDictionary<TaskDefinition>;
private savedObjectsRepository: SavedObjectsClientContract;
Expand All @@ -102,7 +102,7 @@ export class TaskStore {
constructor(opts: StoreOpts) {
this.callCluster = opts.callCluster;
this.index = opts.index;
this.kibanaId = opts.kibanaId;
this.taskManagerId = opts.taskManagerId;
this.maxAttempts = opts.maxAttempts;
this.definitions = opts.definitions;
this.serializer = opts.serializer;
Expand Down Expand Up @@ -328,7 +328,7 @@ export class TaskStore {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
lang: 'painless',
params: {
ownerId: this.kibanaId,
ownerId: this.taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
},
Expand All @@ -353,7 +353,7 @@ export class TaskStore {
must: [
{
term: {
'task.ownerId': this.kibanaId,
'task.ownerId': this.taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } },
Expand Down

0 comments on commit 02012f0

Please sign in to comment.