Skip to content

Commit

Permalink
[7.x] [Logs UI] Interpret finished analysis jobs as healthy (#… (#45353)
Browse files Browse the repository at this point in the history
Backports the following commits to 7.x:
 - [Logs UI] Interpret finished analysis jobs as healthy (#45268)
  • Loading branch information
weltenwort authored Sep 11, 2019
1 parent 81989e5 commit a925b9b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,33 @@ export const fetchJobStatusRequestPayloadRT = rt.type({

export type FetchJobStatusRequestPayload = rt.TypeOf<typeof fetchJobStatusRequestPayloadRT>;

// TODO: Get this to align with the payload - something is tripping it up somewhere
// export const fetchJobStatusResponsePayloadRT = rt.array(rt.type({
// datafeedId: rt.string,
// datafeedIndices: rt.array(rt.string),
// datafeedState: rt.string,
// description: rt.string,
// earliestTimestampMs: rt.number,
// groups: rt.array(rt.string),
// hasDatafeed: rt.boolean,
// id: rt.string,
// isSingleMetricViewerJob: rt.boolean,
// jobState: rt.string,
// latestResultsTimestampMs: rt.number,
// latestTimestampMs: rt.number,
// memory_status: rt.string,
// nodeName: rt.union([rt.string, rt.undefined]),
// processed_record_count: rt.number,
// fullJob: rt.any,
// auditMessage: rt.any,
// deleting: rt.union([rt.boolean, rt.undefined]),
// }));

export const fetchJobStatusResponsePayloadRT = rt.any;
const datafeedStateRT = rt.keyof({
started: null,
stopped: null,
});

const jobStateRT = rt.keyof({
closed: null,
closing: null,
failed: null,
opened: null,
opening: null,
});

export const jobSummaryRT = rt.intersection([
rt.type({
id: rt.string,
jobState: jobStateRT,
}),
rt.partial({
datafeedIndices: rt.array(rt.string),
datafeedState: datafeedStateRT,
fullJob: rt.partial({
finished_time: rt.number,
}),
}),
]);

export const fetchJobStatusResponsePayloadRT = rt.array(jobSummaryRT);

export type FetchJobStatusResponsePayload = rt.TypeOf<typeof fetchJobStatusResponsePayloadRT>;
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,23 @@
*/

import createContainer from 'constate-latest';
import { useMemo, useEffect, useState } from 'react';
import { bucketSpan, getJobId } from '../../../../common/log_analysis';
import { useEffect, useMemo, useState } from 'react';

import { bucketSpan, getDatafeedId, getJobId, JobType } from '../../../../common/log_analysis';
import { useTrackedPromise } from '../../../utils/use_tracked_promise';
import { callJobsSummaryAPI, FetchJobStatusResponsePayload } from './api/ml_get_jobs_summary_api';
import { callSetupMlModuleAPI, SetupMlModuleResponsePayload } from './api/ml_setup_module_api';
import { callJobsSummaryAPI } from './api/ml_get_jobs_summary_api';

// combines and abstracts job and datafeed status
type JobStatus =
| 'unknown'
| 'missing'
| 'inconsistent'
| 'created'
| 'initializing'
| 'stopped'
| 'started'
| 'opening'
| 'opened'
| 'finished'
| 'failed';

interface AllJobStatuses {
[key: string]: JobStatus;
}

const getInitialJobStatuses = (): AllJobStatuses => {
return {
logEntryRate: 'unknown',
};
};

export const useLogAnalysisJobs = ({
indexPattern,
sourceId,
Expand All @@ -43,14 +33,19 @@ export const useLogAnalysisJobs = ({
spaceId: string;
timeField: string;
}) => {
const [jobStatus, setJobStatus] = useState<AllJobStatuses>(getInitialJobStatuses());
const [jobStatus, setJobStatus] = useState<Record<JobType, JobStatus>>({
'log-entry-rate': 'unknown',
});
const [hasCompletedSetup, setHasCompletedSetup] = useState<boolean>(false);

const [setupMlModuleRequest, setupMlModule] = useTrackedPromise(
{
cancelPreviousOn: 'resolution',
createPromise: async (start, end) => {
setJobStatus(getInitialJobStatuses());
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': 'initializing',
}));
return await callSetupMlModuleAPI(
start,
end,
Expand All @@ -62,26 +57,25 @@ export const useLogAnalysisJobs = ({
);
},
onResolve: ({ datafeeds, jobs }: SetupMlModuleResponsePayload) => {
const hasSuccessfullyCreatedJobs = jobs.every(job => job.success);
const hasSuccessfullyStartedDatafeeds = datafeeds.every(
datafeed => datafeed.success && datafeed.started
);
const hasAnyErrors =
jobs.some(job => !!job.error) || datafeeds.some(datafeed => !!datafeed.error);

setJobStatus(currentJobStatus => ({
...currentJobStatus,
logEntryRate: hasAnyErrors
? 'failed'
: hasSuccessfullyCreatedJobs
? hasSuccessfullyStartedDatafeeds
'log-entry-rate':
hasSuccessfullyCreatedJob(getJobId(spaceId, sourceId, 'log-entry-rate'))(jobs) &&
hasSuccessfullyStartedDatafeed(getDatafeedId(spaceId, sourceId, 'log-entry-rate'))(
datafeeds
)
? 'started'
: 'failed'
: 'failed',
: 'failed',
}));

setHasCompletedSetup(true);
},
onReject: () => {
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': 'failed',
}));
},
},
[indexPattern, spaceId, sourceId]
);
Expand All @@ -90,20 +84,19 @@ export const useLogAnalysisJobs = ({
{
cancelPreviousOn: 'resolution',
createPromise: async () => {
return callJobsSummaryAPI(spaceId, sourceId);
return await callJobsSummaryAPI(spaceId, sourceId);
},
onResolve: response => {
if (response && response.length) {
const logEntryRate = response.find(
(job: any) => job.id === getJobId(spaceId, sourceId, 'log-entry-rate')
);
setJobStatus({
logEntryRate: logEntryRate ? logEntryRate.jobState : 'unknown',
});
}
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': getJobStatus(getJobId(spaceId, sourceId, 'log-entry-rate'))(response),
}));
},
onReject: error => {
// TODO: Handle errors
onReject: err => {
setJobStatus(currentJobStatus => ({
...currentJobStatus,
'log-entry-rate': 'unknown',
}));
},
},
[indexPattern, spaceId, sourceId]
Expand All @@ -114,11 +107,7 @@ export const useLogAnalysisJobs = ({
}, []);

const isSetupRequired = useMemo(() => {
const jobStates = Object.values(jobStatus);
return (
jobStates.filter(state => ['opened', 'opening', 'created', 'started'].includes(state))
.length < jobStates.length
);
return !Object.values(jobStatus).every(state => ['started', 'finished'].includes(state));
}, [jobStatus]);

const isLoadingSetupStatus = useMemo(() => fetchJobStatusRequest.state === 'pending', [
Expand Down Expand Up @@ -147,3 +136,52 @@ export const useLogAnalysisJobs = ({
};

export const LogAnalysisJobs = createContainer(useLogAnalysisJobs);

const hasSuccessfullyCreatedJob = (jobId: string) => (
jobSetupResponses: SetupMlModuleResponsePayload['jobs']
) =>
jobSetupResponses.filter(
jobSetupResponse =>
jobSetupResponse.id === jobId && jobSetupResponse.success && !jobSetupResponse.error
).length > 0;

const hasSuccessfullyStartedDatafeed = (datafeedId: string) => (
datafeedSetupResponses: SetupMlModuleResponsePayload['datafeeds']
) =>
datafeedSetupResponses.filter(
datafeedSetupResponse =>
datafeedSetupResponse.id === datafeedId &&
datafeedSetupResponse.success &&
datafeedSetupResponse.started &&
!datafeedSetupResponse.error
).length > 0;

const getJobStatus = (jobId: string) => (jobSummaries: FetchJobStatusResponsePayload): JobStatus =>
jobSummaries
.filter(jobSummary => jobSummary.id === jobId)
.map(
(jobSummary): JobStatus => {
if (jobSummary.jobState === 'failed') {
return 'failed';
} else if (
jobSummary.jobState === 'closed' &&
jobSummary.datafeedState === 'stopped' &&
jobSummary.fullJob &&
jobSummary.fullJob.finished_time != null
) {
return 'finished';
} else if (
jobSummary.jobState === 'closed' ||
jobSummary.jobState === 'closing' ||
jobSummary.datafeedState === 'stopped'
) {
return 'stopped';
} else if (jobSummary.jobState === 'opening') {
return 'initializing';
} else if (jobSummary.jobState === 'opened' && jobSummary.datafeedState === 'started') {
return 'started';
}

return 'unknown';
}
)[0] || 'missing';

0 comments on commit a925b9b

Please sign in to comment.