diff --git a/x-pack/plugins/ml/server/models/job_service/datafeeds.js b/x-pack/plugins/ml/server/models/job_service/datafeeds.js index 7caa8ff1a3175..48aadfb688e76 100644 --- a/x-pack/plugins/ml/server/models/job_service/datafeeds.js +++ b/x-pack/plugins/ml/server/models/job_service/datafeeds.js @@ -8,8 +8,12 @@ export function datafeedsProvider(callWithRequest) { async function forceStartDatafeeds(datafeedIds, start, end) { - const jobIds = {}; - const doStartsCalled = {}; + const jobIds = await getJobIdsByDatafeedId(); + const doStartsCalled = datafeedIds.reduce((p, c) => { + p[c] = false; + return p; + }, {}); + const results = {}; const START_TIMEOUT = 10000; // 10s @@ -26,29 +30,26 @@ export function datafeedsProvider(callWithRequest) { } } - datafeedIds.forEach((dId) => { - const jId = dId.replace('datafeed-', ''); // change this. this should be from a look up from the datafeeds endpoint - jobIds[dId] = jId; - doStartsCalled[dId] = false; - }); - for (const datafeedId of datafeedIds) { const jobId = jobIds[datafeedId]; - - setTimeout(async () => { - // in 10 seconds start the datafeed. - // this should give the openJob enough time. - // if not, the start request will be queued - // behind the open request on the server. - results[datafeedId] = await doStart(datafeedId); - }, START_TIMEOUT); - - try { - if (await openJob(jobId)) { + if (jobId !== undefined) { + setTimeout(async () => { + // in 10 seconds start the datafeed. + // this should give the openJob enough time. + // if not, the start request will be queued + // behind the open request on the server. results[datafeedId] = await doStart(datafeedId); + }, START_TIMEOUT); + + try { + if (await openJob(jobId)) { + results[datafeedId] = await doStart(datafeedId); + } + } catch (error) { + results[datafeedId] = { started: false, error }; } - } catch (error) { - results[datafeedId] = { started: false, error }; + } else { + results[datafeedId] = { started: false, error: 'Job has no datafeed' }; } } @@ -88,9 +89,27 @@ export function datafeedsProvider(callWithRequest) { return callWithRequest('ml.deleteDatafeed', { datafeedId, force: true }); } + async function getDatafeedIdsByJobId() { + const datafeeds = await callWithRequest('ml.datafeeds'); + return datafeeds.datafeeds.reduce((p, c) => { + p[c.job_id] = c.datafeed_id; + return p; + }, {}); + } + + async function getJobIdsByDatafeedId() { + const datafeeds = await callWithRequest('ml.datafeeds'); + return datafeeds.datafeeds.reduce((p, c) => { + p[c.datafeed_id] = c.job_id; + return p; + }, {}); + } + return { forceStartDatafeeds, stopDatafeeds, forceDeleteDatafeed, + getDatafeedIdsByJobId, + getJobIdsByDatafeedId, }; } diff --git a/x-pack/plugins/ml/server/models/job_service/jobs.js b/x-pack/plugins/ml/server/models/job_service/jobs.js index 313b685ac10ad..f9761b4047539 100644 --- a/x-pack/plugins/ml/server/models/job_service/jobs.js +++ b/x-pack/plugins/ml/server/models/job_service/jobs.js @@ -14,7 +14,7 @@ const TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss'; export function jobsProvider(callWithRequest) { - const { forceDeleteDatafeed } = datafeedsProvider(callWithRequest); + const { forceDeleteDatafeed, getDatafeedIdsByJobId } = datafeedsProvider(callWithRequest); const { getAuditMessagesSummary } = jobAuditMessagesProvider(callWithRequest); const calMngr = new CalendarManager(callWithRequest); @@ -24,14 +24,14 @@ export function jobsProvider(callWithRequest) { async function deleteJobs(jobIds) { const results = {}; - const datafeedIds = jobIds.reduce((p, c) => { - p[c] = `datafeed-${c}`; - return p; - }, {}); + const datafeedIds = await getDatafeedIdsByJobId(); for (const jobId of jobIds) { try { - const datafeedResp = await forceDeleteDatafeed(datafeedIds[jobId]); + const datafeedResp = (datafeedIds[jobId] === undefined) ? + { acknowledged: true } : + await forceDeleteDatafeed(datafeedIds[jobId]); + if (datafeedResp.acknowledged) { try { await forceDeleteJob(jobId); @@ -56,7 +56,7 @@ export function jobsProvider(callWithRequest) { }, {}); const jobs = fullJobsList.map((job) => { - const hasDatafeed = (job.datafeed_config !== undefined); + const hasDatafeed = (typeof job.datafeed_config === 'object' && Object.keys(job.datafeed_config).length); const { earliest: earliestTimeStamp, latest: latestTimeStamp } = earliestAndLatestTimeStamps(job.data_counts); @@ -64,7 +64,7 @@ export function jobsProvider(callWithRequest) { const tempJob = { id: job.job_id, description: (job.description || ''), - groups: (job.groups || []), + groups: (Array.isArray(job.groups) ? job.groups.sort() : []), processed_record_count: job.data_counts.processed_record_count, memory_status: (job.model_size_stats) ? job.model_size_stats.memory_status : '', jobState: job.state,