Skip to content

Commit

Permalink
fix: graph has realtime updates as execution progresses (flyteorg#543)
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Nad <[email protected]>
  • Loading branch information
olga-union authored Jul 15, 2022
1 parent f6f8283 commit 64440c0
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,65 +13,36 @@ import * as React from 'react';
import { useEffect, useMemo, useState } from 'react';
import { useQuery, useQueryClient } from 'react-query';
import { NodeExecutionsContext } from '../contexts';
import { fetchTaskExecutionList } from '../taskExecutionQueries';
import { getGroupedLogs } from '../TaskExecutionsList/utils';
import { useTaskExecutions, useTaskExecutionsRefresher } from '../useTaskExecutions';
import { NodeExecutionDetailsPanelContent } from './NodeExecutionDetailsPanelContent';

export interface ExecutionWorkflowGraphProps {
nodeExecutions: NodeExecution[];
workflowId: WorkflowId;
}

interface WorkflowNodeExecution extends NodeExecution {
logsByPhase?: LogsByPhase;
}

/** Wraps a WorkflowGraph, customizing it to also show execution statuses */
export const ExecutionWorkflowGraph: React.FC<ExecutionWorkflowGraphProps> = ({
nodeExecutions,
workflowId,
}) => {
const workflowQuery = useQuery<Workflow, Error>(makeWorkflowQuery(useQueryClient(), workflowId));

const nodeExecutionsWithResources = nodeExecutions.map((nodeExecution) => {
const taskExecutions = useTaskExecutions(nodeExecution.id);
useTaskExecutionsRefresher(nodeExecution, taskExecutions);

const useNewMapTaskView = taskExecutions.value.every((taskExecution) => {
const {
closure: { taskType, metadata, eventVersion = 0 },
} = taskExecution;
return isMapTaskV1(
eventVersion,
metadata?.externalResources?.length ?? 0,
taskType ?? undefined,
);
});
const externalResources: ExternalResource[] = taskExecutions.value
.map((taskExecution) => taskExecution.closure.metadata?.externalResources)
.flat()
.filter((resource): resource is ExternalResource => !!resource);
const queryClient = useQueryClient();
const workflowQuery = useQuery<Workflow, Error>(makeWorkflowQuery(queryClient, workflowId));

const logsByPhase: LogsByPhase = getGroupedLogs(externalResources);

return {
...nodeExecution,
...(useNewMapTaskView && logsByPhase.size > 0 && { logsByPhase }),
};
});
const [nodeExecutionsWithResources, setNodeExecutionsWithResources] = useState<
WorkflowNodeExecution[]
>([]);
const [selectedNodes, setSelectedNodes] = useState<string[]>([]);

const nodeExecutionsById = useMemo(
() => keyBy(nodeExecutionsWithResources, 'scopedId'),
[nodeExecutionsWithResources],
);

const [selectedNodes, setSelectedNodes] = useState<string[]>([]);
const onNodeSelectionChanged = (newSelection: string[]) => {
const validSelection = newSelection.filter((nodeId) => {
if (nodeId === startNodeId || nodeId === endNodeId) {
return false;
}
return true;
});
setSelectedNodes(validSelection);
};

// Note: flytegraph allows multiple selection, but we only support showing
// a single item in the details panel
const selectedExecution = selectedNodes.length
Expand All @@ -83,19 +54,72 @@ export const ExecutionWorkflowGraph: React.FC<ExecutionWorkflowGraphProps> = ({
}
: null;

const onCloseDetailsPanel = () => {
setSelectedPhase(undefined);
setIsDetailsTabClosed(true);
setSelectedNodes([]);
};

const [selectedPhase, setSelectedPhase] = useState<TaskExecutionPhase | undefined>(undefined);
const [isDetailsTabClosed, setIsDetailsTabClosed] = useState<boolean>(!selectedExecution);

useEffect(() => {
let isCurrent = true;
async function fetchData(nodeExecutions, queryClient) {
const newValue = await Promise.all(
nodeExecutions.map(async (nodeExecution) => {
const taskExecutions = await fetchTaskExecutionList(queryClient, nodeExecution.id);

const useNewMapTaskView = taskExecutions.every((taskExecution) => {
const {
closure: { taskType, metadata, eventVersion = 0 },
} = taskExecution;
return isMapTaskV1(
eventVersion,
metadata?.externalResources?.length ?? 0,
taskType ?? undefined,
);
});
const externalResources: ExternalResource[] = taskExecutions
.map((taskExecution) => taskExecution.closure.metadata?.externalResources)
.flat()
.filter((resource): resource is ExternalResource => !!resource);

const logsByPhase: LogsByPhase = getGroupedLogs(externalResources);

return {
...nodeExecution,
...(useNewMapTaskView && logsByPhase.size > 0 && { logsByPhase }),
};
}),
);

if (isCurrent) {
setNodeExecutionsWithResources(newValue);
}
}

fetchData(nodeExecutions, queryClient);

return () => {
isCurrent = false;
};
}, [nodeExecutions]);

useEffect(() => {
setIsDetailsTabClosed(!selectedExecution);
}, [selectedExecution]);

const onNodeSelectionChanged = (newSelection: string[]) => {
const validSelection = newSelection.filter((nodeId) => {
if (nodeId === startNodeId || nodeId === endNodeId) {
return false;
}
return true;
});
setSelectedNodes(validSelection);
};

const onCloseDetailsPanel = () => {
setSelectedPhase(undefined);
setIsDetailsTabClosed(true);
setSelectedNodes([]);
};

const renderGraph = (workflow: Workflow) => (
<WorkflowGraph
onNodeSelectionChanged={onNodeSelectionChanged}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,11 @@ export function useAllChildNodeExecutionGroupsQuery(
const shouldEnableFn = (groups) => {
if (groups.length > 0) {
return groups.some((group) => {
if (group.nodeExecutions?.length > 0) {
// non-empty groups are wrapped in array
const unwrappedGroup = Array.isArray(group) ? group[0] : group;
if (unwrappedGroup.nodeExecutions?.length > 0) {
/* Return true is any executions are not yet terminal (ie, they can change) */
return group.nodeExecutions.some((ne) => {
return unwrappedGroup.nodeExecutions.some((ne) => {
return !nodeExecutionIsTerminal(ne);
});
} else {
Expand All @@ -347,9 +349,11 @@ export function useAllChildNodeExecutionGroupsQuery(
}
};

const key = `${nodeExecutions?.[0]?.scopedId}-${nodeExecutions?.[0]?.closure?.phase}`;

return useConditionalQuery<Array<NodeExecutionGroup[]>>(
{
queryKey: [QueryType.NodeExecutionChildList, nodeExecutions[0]?.id, config],
queryKey: [QueryType.NodeExecutionChildList, key, config],
queryFn: () => fetchAllChildNodeExecutions(queryClient, nodeExecutions, config),
},
shouldEnableFn,
Expand Down

0 comments on commit 64440c0

Please sign in to comment.