Skip to content

Commit

Permalink
Use arrays instead of serializing to csv
Browse files Browse the repository at this point in the history
  • Loading branch information
vchiapaikeo committed Dec 1, 2023
1 parent 03c80bd commit 69ac592
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 51 deletions.
1 change: 1 addition & 0 deletions airflow/www/static/js/api/useGridData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const useGridData = () => {
};
const response = await axios.get<AxiosResponse, GridData>(gridDataUrl, {
params,
paramsSerializer: { indexes: null },
});
// turn off auto refresh if there are no active runs
if (!areActiveRuns(response.dagRuns)) stopRefresh();
Expand Down
16 changes: 9 additions & 7 deletions airflow/www/static/js/dag/nav/FilterBar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const FilterBar = () => {
onRunTypeChange,
onRunStateChange,
clearFilters,
transformCsvToMultiSelectOptions,
transformArrayToMultiSelectOptions,
} = useFilters();

const { timezone } = useTimezone();
Expand Down Expand Up @@ -109,37 +109,39 @@ const FilterBar = () => {
<Box px={2} style={multiSelectBoxStyle}>
<MultiSelect
{...multiSelectStyles}
value={transformCsvToMultiSelectOptions(filters.runType)}
value={transformArrayToMultiSelectOptions(filters.runType)}
onChange={(typeOptions) => {
if (
Array.isArray(typeOptions) &&
typeOptions.every((typeOption) => "value" in typeOption)
) {
onRunTypeChange(
typeOptions.map((typeOption) => typeOption.value).join(",")
typeOptions.map((typeOption) => typeOption.value)
);
}
}}
options={transformCsvToMultiSelectOptions(filters.runTypeOptions)}
options={transformArrayToMultiSelectOptions(filters.runTypeOptions)}
placeholder="All Run Types"
/>
</Box>
<Box />
<Box px={2} style={multiSelectBoxStyle}>
<MultiSelect
{...multiSelectStyles}
value={transformCsvToMultiSelectOptions(filters.runState)}
value={transformArrayToMultiSelectOptions(filters.runState)}
onChange={(stateOptions) => {
if (
Array.isArray(stateOptions) &&
stateOptions.every((stateOption) => "value" in stateOption)
) {
onRunStateChange(
stateOptions.map((stateOption) => stateOption.value).join(",")
stateOptions.map((stateOption) => stateOption.value)
);
}
}}
options={transformCsvToMultiSelectOptions(filters.runStateOptions)}
options={transformArrayToMultiSelectOptions(
filters.runStateOptions
)}
placeholder="All Run States"
/>
</Box>
Expand Down
12 changes: 7 additions & 5 deletions airflow/www/static/js/dag/useFilters.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,22 @@ describe("Test useFilters hook", () => {
{
fnName: "onRunTypeChange" as keyof UtilFunctions,
paramName: "runType" as keyof Filters,
paramValue: "manual",
paramValue: ["manual"],
},
{
fnName: "onRunTypeChange" as keyof UtilFunctions,
paramName: "runType" as keyof Filters,
paramValue: "manual,backfill",
paramValue: ["manual", "backfill"],
},
{
fnName: "onRunStateChange" as keyof UtilFunctions,
paramName: "runState" as keyof Filters,
paramValue: "success",
paramValue: ["success"],
},
{
fnName: "onRunStateChange" as keyof UtilFunctions,
paramName: "runState" as keyof Filters,
paramValue: "success,failed,queued",
paramValue: ["success", "failed", "queued"],
},
])("Test $fnName functions", async ({ fnName, paramName, paramValue }) => {
const { result } = renderHook<FilterHookReturn, undefined>(
Expand All @@ -113,7 +113,9 @@ describe("Test useFilters hook", () => {
);

await act(async () => {
result.current[fnName](paramValue as "string" & FilterTasksProps);
result.current[fnName](
paramValue as "string" & string[] & FilterTasksProps
);
});

expect(result.current.filters[paramName]).toBe(paramValue);
Expand Down
59 changes: 32 additions & 27 deletions airflow/www/static/js/dag/useFilters.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ export interface Filters {
filterDownstream: boolean | undefined;
baseDate: string | null;
numRuns: string | null;
runType: string | null;
runTypeOptions: string | null;
runState: string | null;
runStateOptions: string | null;
runType: string[] | null;
runTypeOptions: string[] | null;
runState: string[] | null;
runStateOptions: string[] | null;
}

export interface FilterTasksProps {
Expand All @@ -53,11 +53,11 @@ export interface FilterTasksProps {
export interface UtilFunctions {
onBaseDateChange: (value: string) => void;
onNumRunsChange: (value: string) => void;
onRunTypeChange: (value: string) => void;
onRunStateChange: (value: string) => void;
onRunTypeChange: (values: string[]) => void;
onRunStateChange: (values: string[]) => void;
onFilterTasksChange: (args: FilterTasksProps) => void;
transformCsvToMultiSelectOptions: (
options: string | null
transformArrayToMultiSelectOptions: (
options: string[] | null
) => { label: string; value: string }[];
clearFilters: () => void;
resetRoot: () => void;
Expand Down Expand Up @@ -97,14 +97,14 @@ const useFilters = (): FilterHookReturn => {
const numRuns =
searchParams.get(NUM_RUNS_PARAM) || defaultDagRunDisplayNumber.toString();

const runTypeOptions = filtersOptions.runTypes.join(",");
const runType = searchParams.get(RUN_TYPE_PARAM);
const runTypeOptions = filtersOptions.runTypes;
const runType = searchParams.getAll(RUN_TYPE_PARAM);

const runStateOptions = filtersOptions.dagStates.join(",");
const runState = searchParams.get(RUN_STATE_PARAM);
const runStateOptions = filtersOptions.dagStates;
const runState = searchParams.getAll(RUN_STATE_PARAM);

const makeOnChangeFn =
(paramName: string, formatFn?: (arg: string) => string | null) =>
(paramName: string, formatFn?: (arg: string) => string) =>
(value: string) => {
const formattedValue = formatFn ? formatFn(value) : value;
const params = new URLSearchParamsWrapper(searchParams);
Expand All @@ -115,34 +115,39 @@ const useFilters = (): FilterHookReturn => {
setSearchParams(params);
};

const getMultiSelectFormatFn = (options: string[]) => {
const formatFn = (arg: string) => {
const argLength = arg.split(",").length;
return argLength === options.length || argLength === 0 ? null : arg;
const makeMultiSelectOnChangeFn =
(paramName: string, options: string[]) => (values: string[]) => {
const params = new URLSearchParamsWrapper(searchParams);
if (values.length === options.length || values.length === 0) {
params.delete(paramName);
} else {
// Delete and reinsert anew each time; otherwise, there will be duplicates
params.delete(paramName);
values.forEach((value) => params.append(paramName, value));
}
setSearchParams(params);
};
return formatFn;
};

const transformCsvToMultiSelectOptions = (
options: string | null
const transformArrayToMultiSelectOptions = (
options: string[] | null
): { label: string; value: string }[] =>
options === null
? []
: options.split(",").map((option) => ({ label: option, value: option }));
: options.map((option) => ({ label: option, value: option }));

const onBaseDateChange = makeOnChangeFn(
BASE_DATE_PARAM,
// @ts-ignore
(localDate: string) => moment(localDate).utc().format()
);
const onNumRunsChange = makeOnChangeFn(NUM_RUNS_PARAM);
const onRunTypeChange = makeOnChangeFn(
const onRunTypeChange = makeMultiSelectOnChangeFn(
RUN_TYPE_PARAM,
getMultiSelectFormatFn(filtersOptions.runTypes)
filtersOptions.runTypes
);
const onRunStateChange = makeOnChangeFn(
const onRunStateChange = makeMultiSelectOnChangeFn(
RUN_STATE_PARAM,
getMultiSelectFormatFn(filtersOptions.dagStates)
filtersOptions.dagStates
);

const onFilterTasksChange = ({
Expand Down Expand Up @@ -203,7 +208,7 @@ const useFilters = (): FilterHookReturn => {
onFilterTasksChange,
clearFilters,
resetRoot,
transformCsvToMultiSelectOptions,
transformArrayToMultiSelectOptions,
};
};

Expand Down
10 changes: 4 additions & 6 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3527,14 +3527,12 @@ def grid_data(self):
with create_session() as session:
query = select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)

run_type_raw = request.args.get("run_type")
if run_type_raw:
run_types = {run_type.strip() for run_type in run_type_raw.split(",")}
run_types = request.args.getlist("run_type")
if run_types:
query = query.where(DagRun.run_type.in_(run_types))

run_state_raw = request.args.get("run_state")
if run_state_raw:
run_states = {run_state.strip() for run_state in run_state_raw.split(",")}
run_states = request.args.getlist("run_state")
if run_states:
query = query.where(DagRun.state.in_(run_states))

dag_runs = wwwutils.sorted_dag_runs(
Expand Down
16 changes: 10 additions & 6 deletions tests/www/views/test_views_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,16 @@ def test_no_runs(admin_client, dag_without_runs):

def test_grid_data_filtered_on_run_type_and_run_state(admin_client, dag_with_runs):
for uri_params, expected_run_types, expected_run_states in [
("run_state=success%2Cqueued", ["scheduled"], ["success"]),
("run_state=running%2Cfailed", ["scheduled"], ["running"]),
("run_type=scheduled%2Cmanual", ["scheduled", "scheduled"], ["success", "running"]),
("run_type=backfill%2Cmanual", [], []),
("run_state=running%2Cfailed&run_type=backfill%2Cmanual", [], []),
("run_state=running%2Cfailed&run_type=scheduled%2Cbackfill%2Cmanual", ["scheduled"], ["running"]),
("run_state=success&run_state=queued", ["scheduled"], ["success"]),
("run_state=running&run_state=failed", ["scheduled"], ["running"]),
("run_type=scheduled&run_type=manual", ["scheduled", "scheduled"], ["success", "running"]),
("run_type=backfill&run_type=manual", [], []),
("run_state=running&run_type=failed&run_type=backfill&run_type=manual", [], []),
(
"run_state=running&run_type=failed&run_type=scheduled&run_type=backfill&run_type=manual",
["scheduled"],
["running"],
),
]:
resp = admin_client.get(f"/object/grid_data?dag_id={DAG_ID}&{uri_params}", follow_redirects=True)
assert resp.status_code == 200, resp.json
Expand Down

0 comments on commit 69ac592

Please sign in to comment.