Skip to content

Commit

Permalink
[ML] Removing hardcoded datafeed ID in jobs list (elastic#20815) (ela…
Browse files Browse the repository at this point in the history
  • Loading branch information
jgowdyelastic authored Jul 16, 2018
1 parent a5f2725 commit d8959f1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 29 deletions.
61 changes: 40 additions & 21 deletions x-pack/plugins/ml/server/models/job_service/datafeeds.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
export function datafeedsProvider(callWithRequest) {

async function forceStartDatafeeds(datafeedIds, start, end) {
const jobIds = {};
const doStartsCalled = {};
const jobIds = await getJobIdsByDatafeedId();
const doStartsCalled = datafeedIds.reduce((p, c) => {
p[c] = false;
return p;
}, {});

const results = {};
const START_TIMEOUT = 10000; // 10s

Expand All @@ -26,29 +30,26 @@ export function datafeedsProvider(callWithRequest) {
}
}

datafeedIds.forEach((dId) => {
const jId = dId.replace('datafeed-', ''); // change this. this should be from a look up from the datafeeds endpoint
jobIds[dId] = jId;
doStartsCalled[dId] = false;
});

for (const datafeedId of datafeedIds) {
const jobId = jobIds[datafeedId];

setTimeout(async () => {
// in 10 seconds start the datafeed.
// this should give the openJob enough time.
// if not, the start request will be queued
// behind the open request on the server.
results[datafeedId] = await doStart(datafeedId);
}, START_TIMEOUT);

try {
if (await openJob(jobId)) {
if (jobId !== undefined) {
setTimeout(async () => {
// in 10 seconds start the datafeed.
// this should give the openJob enough time.
// if not, the start request will be queued
// behind the open request on the server.
results[datafeedId] = await doStart(datafeedId);
}, START_TIMEOUT);

try {
if (await openJob(jobId)) {
results[datafeedId] = await doStart(datafeedId);
}
} catch (error) {
results[datafeedId] = { started: false, error };
}
} catch (error) {
results[datafeedId] = { started: false, error };
} else {
results[datafeedId] = { started: false, error: 'Job has no datafeed' };
}
}

Expand Down Expand Up @@ -88,9 +89,27 @@ export function datafeedsProvider(callWithRequest) {
return callWithRequest('ml.deleteDatafeed', { datafeedId, force: true });
}

async function getDatafeedIdsByJobId() {
const datafeeds = await callWithRequest('ml.datafeeds');
return datafeeds.datafeeds.reduce((p, c) => {
p[c.job_id] = c.datafeed_id;
return p;
}, {});
}

async function getJobIdsByDatafeedId() {
const datafeeds = await callWithRequest('ml.datafeeds');
return datafeeds.datafeeds.reduce((p, c) => {
p[c.datafeed_id] = c.job_id;
return p;
}, {});
}

return {
forceStartDatafeeds,
stopDatafeeds,
forceDeleteDatafeed,
getDatafeedIdsByJobId,
getJobIdsByDatafeedId,
};
}
16 changes: 8 additions & 8 deletions x-pack/plugins/ml/server/models/job_service/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss';

export function jobsProvider(callWithRequest) {

const { forceDeleteDatafeed } = datafeedsProvider(callWithRequest);
const { forceDeleteDatafeed, getDatafeedIdsByJobId } = datafeedsProvider(callWithRequest);
const { getAuditMessagesSummary } = jobAuditMessagesProvider(callWithRequest);
const calMngr = new CalendarManager(callWithRequest);

Expand All @@ -24,14 +24,14 @@ export function jobsProvider(callWithRequest) {

async function deleteJobs(jobIds) {
const results = {};
const datafeedIds = jobIds.reduce((p, c) => {
p[c] = `datafeed-${c}`;
return p;
}, {});
const datafeedIds = await getDatafeedIdsByJobId();

for (const jobId of jobIds) {
try {
const datafeedResp = await forceDeleteDatafeed(datafeedIds[jobId]);
const datafeedResp = (datafeedIds[jobId] === undefined) ?
{ acknowledged: true } :
await forceDeleteDatafeed(datafeedIds[jobId]);

if (datafeedResp.acknowledged) {
try {
await forceDeleteJob(jobId);
Expand All @@ -56,15 +56,15 @@ export function jobsProvider(callWithRequest) {
}, {});

const jobs = fullJobsList.map((job) => {
const hasDatafeed = (job.datafeed_config !== undefined);
const hasDatafeed = (typeof job.datafeed_config === 'object' && Object.keys(job.datafeed_config).length);
const {
earliest: earliestTimeStamp,
latest: latestTimeStamp } = earliestAndLatestTimeStamps(job.data_counts);

const tempJob = {
id: job.job_id,
description: (job.description || ''),
groups: (job.groups || []),
groups: (Array.isArray(job.groups) ? job.groups.sort() : []),
processed_record_count: job.data_counts.processed_record_count,
memory_status: (job.model_size_stats) ? job.model_size_stats.memory_status : '',
jobState: job.state,
Expand Down

0 comments on commit d8959f1

Please sign in to comment.