diff --git a/airflow/www/static/js/api/useDatasetEvents.ts b/airflow/www/static/js/api/useDatasetEvents.ts index 8caccd202cae8..30e4670a87d3e 100644 --- a/airflow/www/static/js/api/useDatasetEvents.ts +++ b/airflow/www/static/js/api/useDatasetEvents.ts @@ -17,14 +17,21 @@ * under the License. */ -import axios, { AxiosResponse } from "axios"; -import { useQuery } from "react-query"; +import axios from "axios"; +import { useQuery, UseQueryOptions } from "react-query"; import { getMetaValue } from "src/utils"; -import type { API } from "src/types"; import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper"; +import type { + DatasetEventCollection, + GetDatasetEventsVariables, +} from "src/types/api-generated"; -export default function useDatasetEvents({ +interface Props extends GetDatasetEventsVariables { + options?: UseQueryOptions; +} + +const useDatasetEvents = ({ datasetId, sourceDagId, sourceRunId, @@ -33,8 +40,9 @@ export default function useDatasetEvents({ limit, offset, orderBy, -}: API.GetDatasetEventsVariables) { - const query = useQuery( + options, +}: Props) => { + const query = useQuery( [ "datasets-events", datasetId, @@ -61,16 +69,19 @@ export default function useDatasetEvents({ if (sourceMapIndex) params.set("source_map_index", sourceMapIndex.toString()); - return axios.get(datasetsUrl, { + return axios.get(datasetsUrl, { params, }); }, { keepPreviousData: true, + ...options, } ); return { ...query, data: query.data ?? { datasetEvents: [], totalEntries: 0 }, }; -} +}; + +export default useDatasetEvents; diff --git a/airflow/www/static/js/api/useTaskInstance.ts b/airflow/www/static/js/api/useTaskInstance.ts index 8e7c4faf32425..c5a96f3e44047 100644 --- a/airflow/www/static/js/api/useTaskInstance.ts +++ b/airflow/www/static/js/api/useTaskInstance.ts @@ -17,7 +17,7 @@ * under the License. */ -import axios, { AxiosResponse } from "axios"; +import axios from "axios"; import type { API } from "src/types"; import { useQuery, UseQueryOptions } from "react-query"; import { useAutoRefresh } from "src/context/autorefresh"; @@ -55,7 +55,7 @@ const useTaskInstance = ({ return useQuery( ["taskInstance", dagId, dagRunId, taskId, mapIndex], - () => axios.get(url), + () => axios.get(url), { refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000, ...options, diff --git a/airflow/www/static/js/api/useUpstreamDatasetEvents.ts b/airflow/www/static/js/api/useUpstreamDatasetEvents.ts index 5ba492183ce70..32d1c7aeff2d8 100644 --- a/airflow/www/static/js/api/useUpstreamDatasetEvents.ts +++ b/airflow/www/static/js/api/useUpstreamDatasetEvents.ts @@ -17,29 +17,35 @@ * under the License. */ -import axios, { AxiosResponse } from "axios"; -import { useQuery } from "react-query"; +import axios from "axios"; +import { useQuery, UseQueryOptions } from "react-query"; import { getMetaValue } from "src/utils"; -import type { API } from "src/types"; +import type { + DatasetEventCollection, + GetUpstreamDatasetEventsVariables, +} from "src/types/api-generated"; -interface Props { - runId: string; +interface Props extends GetUpstreamDatasetEventsVariables { + options?: UseQueryOptions; } -export default function useUpstreamDatasetEvents({ runId }: Props) { - const query = useQuery(["upstreamDatasetEvents", runId], () => { - const dagId = getMetaValue("dag_id"); - const upstreamEventsUrl = ( - getMetaValue("upstream_dataset_events_api") || - `api/v1/dags/${dagId}/dagRuns/_DAG_RUN_ID_/upstreamDatasetEvents` - ).replace("_DAG_RUN_ID_", encodeURIComponent(runId)); - return axios.get( - upstreamEventsUrl - ); - }); +const useUpstreamDatasetEvents = ({ dagId, dagRunId, options }: Props) => { + const upstreamEventsUrl = ( + getMetaValue("upstream_dataset_events_api") || + `api/v1/dags/${dagId}/dagRuns/_DAG_RUN_ID_/upstreamDatasetEvents` + ).replace("_DAG_RUN_ID_", encodeURIComponent(dagRunId)); + + const query = useQuery( + ["upstreamDatasetEvents", dagRunId], + () => axios.get(upstreamEventsUrl), + options + ); + return { ...query, - data: query.data || { datasetEvents: [], totalEntries: 0 }, + data: query.data ?? { datasetEvents: [], totalEntries: 0 }, }; -} +}; + +export default useUpstreamDatasetEvents; diff --git a/airflow/www/static/js/components/DatasetEventCard.tsx b/airflow/www/static/js/components/DatasetEventCard.tsx index e6c5b6bb9cbad..ee00def48d6b9 100644 --- a/airflow/www/static/js/components/DatasetEventCard.tsx +++ b/airflow/www/static/js/components/DatasetEventCard.tsx @@ -32,23 +32,20 @@ import { Link, } from "@chakra-ui/react"; import { HiDatabase } from "react-icons/hi"; -import { FiLink } from "react-icons/fi"; import { useSearchParams } from "react-router-dom"; import { getMetaValue } from "src/utils"; import Time from "src/components/Time"; import { useContainerRef } from "src/context/containerRef"; -import { SimpleStatus } from "src/dag/StatusBox"; -import { formatDuration, getDuration } from "src/datetime_utils"; import RenderedJsonField from "src/components/RenderedJsonField"; import SourceTaskInstance from "./SourceTaskInstance"; +import TriggeredDagRuns from "./TriggeredDagRuns"; type CardProps = { datasetEvent: DatasetEvent; }; -const gridUrl = getMetaValue("grid_url"); const datasetsUrl = getMetaValue("datasets_url"); const DatasetEventCard = ({ datasetEvent }: CardProps) => { @@ -116,51 +113,7 @@ const DatasetEventCard = ({ datasetEvent }: CardProps) => { {!!datasetEvent?.createdDagruns?.length && ( <> Triggered Dag Runs: - - {datasetEvent?.createdDagruns.map((run) => { - const runId = (run as any).dagRunId; // For some reason the type is wrong here - const url = `${gridUrl?.replace( - "__DAG_ID__", - run.dagId || "" - )}?dag_run_id=${encodeURIComponent(runId)}`; - - return ( - - DAG Id: {run.dagId} - Status: {run.state || "no status"} - - Duration:{" "} - {formatDuration( - getDuration(run.startDate, run.endDate) - )} - - - Start Date: - {run.endDate && ( - - End Date: - )} - - } - portalProps={{ containerRef }} - hasArrow - placement="top" - > - - - - - - - - ); - })} - + )} diff --git a/airflow/www/static/js/components/SourceTaskInstance.tsx b/airflow/www/static/js/components/SourceTaskInstance.tsx index 3dcae30d32501..e2db366c290ab 100644 --- a/airflow/www/static/js/components/SourceTaskInstance.tsx +++ b/airflow/www/static/js/components/SourceTaskInstance.tsx @@ -31,11 +31,15 @@ import { getMetaValue } from "src/utils"; type SourceTIProps = { datasetEvent: DatasetEvent; + showLink?: boolean; }; const gridUrl = getMetaValue("grid_url"); -const SourceTaskInstance = ({ datasetEvent }: SourceTIProps) => { +const SourceTaskInstance = ({ + datasetEvent, + showLink = true, +}: SourceTIProps) => { const containerRef = useContainerRef(); const { sourceDagId, sourceRunId, sourceTaskId, sourceMapIndex } = datasetEvent; @@ -80,11 +84,13 @@ const SourceTaskInstance = ({ datasetEvent }: SourceTIProps) => { hasArrow placement="top" > - + - - - + {showLink && ( + + + + )} )} diff --git a/airflow/www/static/js/components/TriggeredDagRuns.tsx b/airflow/www/static/js/components/TriggeredDagRuns.tsx new file mode 100644 index 0000000000000..f017fbe488756 --- /dev/null +++ b/airflow/www/static/js/components/TriggeredDagRuns.tsx @@ -0,0 +1,91 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; + +import type { DAGRun } from "src/types/api-generated"; +import { Box, Flex, Tooltip, Text, Link } from "@chakra-ui/react"; +import { FiLink } from "react-icons/fi"; + +import { getMetaValue } from "src/utils"; +import Time from "src/components/Time"; +import { useContainerRef } from "src/context/containerRef"; +import { SimpleStatus } from "src/dag/StatusBox"; +import { formatDuration, getDuration } from "src/datetime_utils"; + +type CardProps = { + createdDagRuns: DAGRun[]; + showLink?: boolean; +}; + +const gridUrl = getMetaValue("grid_url"); + +const TriggeredDagRuns = ({ createdDagRuns, showLink = true }: CardProps) => { + const containerRef = useContainerRef(); + + return ( + + {createdDagRuns.map((run) => { + const runId = (run as any).dagRunId; // For some reason the type is wrong here + const url = `${gridUrl?.replace( + "__DAG_ID__", + run.dagId || "" + )}?dag_run_id=${encodeURIComponent(runId)}`; + + return ( + + DAG Id: {run.dagId} + Status: {run.state || "no status"} + + Duration:{" "} + {formatDuration(getDuration(run.startDate, run.endDate))} + + + Start Date: + {run.endDate && ( + + End Date: + )} + + } + portalProps={{ containerRef }} + hasArrow + placement="top" + > + + + {showLink && ( + + + + )} + + + ); + })} + + ); +}; + +export default TriggeredDagRuns; diff --git a/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx b/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx index 3a3ec3eb69e1c..b9d0c1c169161 100644 --- a/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx +++ b/airflow/www/static/js/dag/details/dagRun/DatasetTriggerEvents.tsx @@ -24,11 +24,14 @@ import type { DagRun as DagRunType } from "src/types"; import { CardDef, CardList } from "src/components/Table"; import type { DatasetEvent } from "src/types/api-generated"; import DatasetEventCard from "src/components/DatasetEventCard"; +import { getMetaValue } from "src/utils"; interface Props { runId: DagRunType["runId"]; } +const dagId = getMetaValue("dag_id"); + const cardDef: CardDef = { card: ({ row }) => , }; @@ -37,7 +40,7 @@ const DatasetTriggerEvents = ({ runId }: Props) => { const { data: { datasetEvents = [] }, isLoading, - } = useUpstreamDatasetEvents({ runId }); + } = useUpstreamDatasetEvents({ dagRunId: runId, dagId }); const columns = useMemo( () => [ diff --git a/airflow/www/static/js/dag/details/graph/DatasetNode.tsx b/airflow/www/static/js/dag/details/graph/DatasetNode.tsx index 921341643bf5e..8c1692513cf6f 100644 --- a/airflow/www/static/js/dag/details/graph/DatasetNode.tsx +++ b/airflow/www/static/js/dag/details/graph/DatasetNode.tsx @@ -26,6 +26,7 @@ import { PopoverBody, PopoverCloseButton, PopoverContent, + PopoverFooter, PopoverHeader, PopoverTrigger, Portal, @@ -33,18 +34,25 @@ import { } from "@chakra-ui/react"; import { HiDatabase } from "react-icons/hi"; import type { NodeProps } from "reactflow"; +import { TbApi } from "react-icons/tb"; import { getMetaValue } from "src/utils"; import { useContainerRef } from "src/context/containerRef"; +import Time from "src/components/Time"; +import SourceTaskInstance from "src/components/SourceTaskInstance"; +import TriggeredDagRuns from "src/components/TriggeredDagRuns"; + import type { CustomNodeProps } from "./Node"; const datasetsUrl = getMetaValue("datasets_url"); const DatasetNode = ({ - data: { label, height, width, latestDagRunId, isZoomedOut }, + data: { label, height, width, latestDagRunId, isZoomedOut, datasetEvent }, }: NodeProps) => { const containerRef = useContainerRef(); + const { fromRestApi } = (datasetEvent?.extra || {}) as Record; + return ( @@ -70,23 +78,37 @@ const DatasetNode = ({ {label} {!isZoomedOut && ( - - - Dataset - + <> + + + Dataset + + {!!datasetEvent && ( + + {/* @ts-ignore */} + {moment(datasetEvent.timestamp).fromNow()} + + )} + )} @@ -95,14 +117,38 @@ const DatasetNode = ({ {label} - + {!!datasetEvent && ( + + + )} + View Dataset - + diff --git a/airflow/www/static/js/dag/details/graph/Node.tsx b/airflow/www/static/js/dag/details/graph/Node.tsx index 04f1d56fa0710..896f59f7861ad 100644 --- a/airflow/www/static/js/dag/details/graph/Node.tsx +++ b/airflow/www/static/js/dag/details/graph/Node.tsx @@ -22,6 +22,7 @@ import { Box } from "@chakra-ui/react"; import { Handle, NodeProps, Position } from "reactflow"; import type { DepNode, DagRun, Task, TaskInstance } from "src/types"; +import type { DatasetEvent } from "src/types/api-generated"; import DagNode from "./DagNode"; import DatasetNode from "./DatasetNode"; @@ -44,6 +45,7 @@ export interface CustomNodeProps { style?: string; isZoomedOut: boolean; class: DepNode["value"]["class"]; + datasetEvent?: DatasetEvent; } const Node = (props: NodeProps) => { diff --git a/airflow/www/static/js/dag/details/graph/index.tsx b/airflow/www/static/js/dag/details/graph/index.tsx index f558f1734e250..ea312f4e83833 100644 --- a/airflow/www/static/js/dag/details/graph/index.tsx +++ b/airflow/www/static/js/dag/details/graph/index.tsx @@ -30,7 +30,13 @@ import ReactFlow, { Viewport, } from "reactflow"; -import { useDatasets, useGraphData, useGridData } from "src/api"; +import { + useDatasetEvents, + useDatasets, + useGraphData, + useGridData, + useUpstreamDatasetEvents, +} from "src/api"; import useSelection from "src/dag/useSelection"; import { getMetaValue, getTask, useOffsetTop } from "src/utils"; import { useGraphLayout } from "src/utils/graph"; @@ -57,6 +63,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { const [arrange, setArrange] = useState(data?.arrange || "LR"); const [hasRendered, setHasRendered] = useState(false); const [isZoomedOut, setIsZoomedOut] = useState(false); + const { selected } = useSelection(); const { data: { dagRuns, groups }, @@ -70,6 +77,27 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { dagIds: [dagId], }); + const enabledDatasets = !!( + selected.runId && datasetsCollection?.datasets?.length + ); + + const { + data: { datasetEvents: upstreamDatasetEvents = [] }, + } = useUpstreamDatasetEvents({ + dagId, + dagRunId: selected.runId || "", + options: { enabled: enabledDatasets }, + }); + + const { + data: { datasetEvents: downstreamDatasetEvents = [] }, + } = useDatasetEvents({ + sourceDagId: dagId, + sourceRunId: selected.runId || undefined, + options: { enabled: enabledDatasets }, + }); + + const datasetEdges: WebserverEdge[] = []; const rawNodes = data?.nodes && datasetsCollection?.datasets?.length ? { @@ -90,8 +118,6 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { } : data?.nodes; - const datasetEdges: WebserverEdge[] = []; - datasetsCollection?.datasets?.forEach((dataset) => { const producingTask = dataset?.producingTasks?.find( (t) => t.dagId === dagId @@ -126,8 +152,6 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { arrange, }); - const { selected } = useSelection(); - const { colors } = useTheme(); const { getZoom, fitView } = useReactFlow(); const latestDagRunId = dagRuns[dagRuns.length - 1]?.runId; @@ -151,6 +175,9 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { groups, hoveredTaskState, isZoomedOut, + datasetEvents: selected.runId + ? [...upstreamDatasetEvents, ...downstreamDatasetEvents] + : [], }), [ graphData?.children, @@ -161,6 +188,8 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => { groups, hoveredTaskState, isZoomedOut, + upstreamDatasetEvents, + downstreamDatasetEvents, ] ); diff --git a/airflow/www/static/js/dag/details/graph/utils.ts b/airflow/www/static/js/dag/details/graph/utils.ts index bc9733aa62a33..93c8b9c253016 100644 --- a/airflow/www/static/js/dag/details/graph/utils.ts +++ b/airflow/www/static/js/dag/details/graph/utils.ts @@ -24,6 +24,7 @@ import type { ElkExtendedEdge } from "elkjs"; import type { SelectionProps } from "src/dag/useSelection"; import { getTask } from "src/utils"; import type { Task, TaskInstance, NodeType } from "src/types"; +import type { DatasetEvent } from "src/types/api-generated"; import type { CustomNodeProps } from "./Node"; @@ -37,6 +38,7 @@ interface FlattenNodesProps { onToggleGroups: (groupIds: string[]) => void; hoveredTaskState?: string | null; isZoomedOut: boolean; + datasetEvents?: DatasetEvent[]; } // Generate a flattened list of nodes for react-flow to render @@ -50,6 +52,7 @@ export const flattenNodes = ({ parent, hoveredTaskState, isZoomedOut, + datasetEvents, }: FlattenNodesProps) => { let nodes: ReactFlowNode[] = []; let edges: ElkExtendedEdge[] = []; @@ -88,6 +91,10 @@ export const flattenNodes = ({ } onToggleGroups(newGroupIds); }, + datasetEvent: + node.value.class === "dataset" + ? datasetEvents?.find((de) => de.datasetUri === node.value.label) + : undefined, ...node.value, }, type: "custom",