From 3d1792fb9f73c6df0ca71e06c89a7bb6e273a740 Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Tue, 9 Jul 2024 14:05:59 -0700 Subject: [PATCH 01/10] Implement TPCH substrait integration test, support tpch_6, tpch_10, tpch_11 (#11349) --- .../tests/cases/consumer_integration.rs | 131 ++ .../tpch_substrait_plans/query_10.json | 1257 +++++++++++++++++ .../tpch_substrait_plans/query_11.json | 1059 ++++++++++++++ .../tpch_substrait_plans/query_6.json | 585 ++++++++ 4 files changed, 3032 insertions(+) create mode 100644 datafusion/substrait/tests/testdata/tpch_substrait_plans/query_10.json create mode 100644 datafusion/substrait/tests/testdata/tpch_substrait_plans/query_11.json create mode 100644 datafusion/substrait/tests/testdata/tpch_substrait_plans/query_6.json diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs b/datafusion/substrait/tests/cases/consumer_integration.rs index 5d565c0378528..6133c239873b2 100644 --- a/datafusion/substrait/tests/cases/consumer_integration.rs +++ b/datafusion/substrait/tests/cases/consumer_integration.rs @@ -124,6 +124,56 @@ mod tests { Ok(ctx) } + async fn create_context_tpch6() -> Result { + let ctx = SessionContext::new(); + + let registrations = + vec![("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/lineitem.csv")]; + + for (table_name, file_path) in registrations { + register_csv(&ctx, table_name, file_path).await?; + } + + Ok(ctx) + } + // missing context for query 7,8,9 + + async fn create_context_tpch10() -> Result { + let ctx = SessionContext::new(); + + let registrations = vec![ + ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"), + ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"), + ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"), + ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/nation.csv"), + ]; + + for (table_name, file_path) in registrations { + register_csv(&ctx, table_name, file_path).await?; + } + + Ok(ctx) + } + + async fn create_context_tpch11() -> Result { + let ctx = SessionContext::new(); + + let registrations = vec![ + ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/partsupp.csv"), + ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/supplier.csv"), + ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/nation.csv"), + ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/partsupp.csv"), + ("FILENAME_PLACEHOLDER_4", "tests/testdata/tpch/supplier.csv"), + ("FILENAME_PLACEHOLDER_5", "tests/testdata/tpch/nation.csv"), + ]; + + for (table_name, file_path) in registrations { + register_csv(&ctx, table_name, file_path).await?; + } + + Ok(ctx) + } + #[tokio::test] async fn tpch_test_1() -> Result<()> { let ctx = create_context_tpch1().await?; @@ -266,4 +316,85 @@ mod tests { \n TableScan: REGION projection=[r_regionkey, r_name, r_comment]"); Ok(()) } + + #[tokio::test] + async fn tpch_test_6() -> Result<()> { + let ctx = create_context_tpch6().await?; + let path = "tests/testdata/tpch_substrait_plans/query_6.json"; + let proto = serde_json::from_reader::<_, Plan>(BufReader::new( + File::open(path).expect("file not found"), + )) + .expect("failed to parse json"); + + let plan = from_substrait_plan(&ctx, &proto).await?; + let plan_str = format!("{:?}", plan); + assert_eq!(plan_str, "Aggregate: groupBy=[[]], aggr=[[sum(FILENAME_PLACEHOLDER_0.l_extendedprice * FILENAME_PLACEHOLDER_0.l_discount) AS REVENUE]]\ + \n Projection: FILENAME_PLACEHOLDER_0.l_extendedprice * FILENAME_PLACEHOLDER_0.l_discount\ + \n Filter: FILENAME_PLACEHOLDER_0.l_shipdate >= CAST(Utf8(\"1994-01-01\") AS Date32) AND FILENAME_PLACEHOLDER_0.l_shipdate < CAST(Utf8(\"1995-01-01\") AS Date32) AND FILENAME_PLACEHOLDER_0.l_discount >= Decimal128(Some(5),3,2) AND FILENAME_PLACEHOLDER_0.l_discount <= Decimal128(Some(7),3,2) AND FILENAME_PLACEHOLDER_0.l_quantity < CAST(Int32(24) AS Decimal128(19, 0))\ + \n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]"); + Ok(()) + } + + // TODO: missing plan 7, 8, 9 + #[tokio::test] + async fn tpch_test_10() -> Result<()> { + let ctx = create_context_tpch10().await?; + let path = "tests/testdata/tpch_substrait_plans/query_10.json"; + let proto = serde_json::from_reader::<_, Plan>(BufReader::new( + File::open(path).expect("file not found"), + )) + .expect("failed to parse json"); + + let plan = from_substrait_plan(&ctx, &proto).await?; + let plan_str = format!("{:?}", plan); + assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_0.c_custkey AS C_CUSTKEY, FILENAME_PLACEHOLDER_0.c_name AS C_NAME, sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE, FILENAME_PLACEHOLDER_0.c_acctbal AS C_ACCTBAL, FILENAME_PLACEHOLDER_3.n_name AS N_NAME, FILENAME_PLACEHOLDER_0.c_address AS C_ADDRESS, FILENAME_PLACEHOLDER_0.c_phone AS C_PHONE, FILENAME_PLACEHOLDER_0.c_comment AS C_COMMENT\ + \n Limit: skip=0, fetch=20\ + \n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST\ + \n Projection: FILENAME_PLACEHOLDER_0.c_custkey, FILENAME_PLACEHOLDER_0.c_name, sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount), FILENAME_PLACEHOLDER_0.c_acctbal, FILENAME_PLACEHOLDER_3.n_name, FILENAME_PLACEHOLDER_0.c_address, FILENAME_PLACEHOLDER_0.c_phone, FILENAME_PLACEHOLDER_0.c_comment\n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.c_custkey, FILENAME_PLACEHOLDER_0.c_name, FILENAME_PLACEHOLDER_0.c_acctbal, FILENAME_PLACEHOLDER_0.c_phone, FILENAME_PLACEHOLDER_3.n_name, FILENAME_PLACEHOLDER_0.c_address, FILENAME_PLACEHOLDER_0.c_comment]], aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_2.l_discount)]]\ + \n Projection: FILENAME_PLACEHOLDER_0.c_custkey, FILENAME_PLACEHOLDER_0.c_name, FILENAME_PLACEHOLDER_0.c_acctbal, FILENAME_PLACEHOLDER_0.c_phone, FILENAME_PLACEHOLDER_3.n_name, FILENAME_PLACEHOLDER_0.c_address, FILENAME_PLACEHOLDER_0.c_comment, FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_2.l_discount)\ + \n Filter: FILENAME_PLACEHOLDER_0.c_custkey = FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey = FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_1.o_orderdate >= CAST(Utf8(\"1993-10-01\") AS Date32) AND FILENAME_PLACEHOLDER_1.o_orderdate < CAST(Utf8(\"1994-01-01\") AS Date32) AND FILENAME_PLACEHOLDER_2.l_returnflag = Utf8(\"R\") AND FILENAME_PLACEHOLDER_0.c_nationkey = FILENAME_PLACEHOLDER_3.n_nationkey\ + \n Inner Join: Filter: Boolean(true)\ + \n Inner Join: Filter: Boolean(true)\ + \n Inner Join: Filter: Boolean(true)\ + \n TableScan: FILENAME_PLACEHOLDER_0 projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_1 projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_2 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_3 projection=[n_nationkey, n_name, n_regionkey, n_comment]"); + Ok(()) + } + + #[tokio::test] + async fn tpch_test_11() -> Result<()> { + let ctx = create_context_tpch11().await?; + let path = "tests/testdata/tpch_substrait_plans/query_11.json"; + let proto = serde_json::from_reader::<_, Plan>(BufReader::new( + File::open(path).expect("file not found"), + )) + .expect("failed to parse json"); + + let plan = from_substrait_plan(&ctx, &proto).await?; + let plan_str = format!("{:?}", plan); + assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_0.ps_partkey AS PS_PARTKEY, sum(FILENAME_PLACEHOLDER_0.ps_supplycost * FILENAME_PLACEHOLDER_0.ps_availqty) AS value\ + \n Sort: sum(FILENAME_PLACEHOLDER_0.ps_supplycost * FILENAME_PLACEHOLDER_0.ps_availqty) DESC NULLS FIRST\ + \n Filter: sum(FILENAME_PLACEHOLDER_0.ps_supplycost * FILENAME_PLACEHOLDER_0.ps_availqty) > ()\ + \n Subquery:\ + \n Projection: sum(FILENAME_PLACEHOLDER_3.ps_supplycost * FILENAME_PLACEHOLDER_3.ps_availqty) * Decimal128(Some(1000000),11,10)\ + \n Aggregate: groupBy=[[]], aggr=[[sum(FILENAME_PLACEHOLDER_3.ps_supplycost * FILENAME_PLACEHOLDER_3.ps_availqty)]]\ + \n Projection: FILENAME_PLACEHOLDER_3.ps_supplycost * CAST(FILENAME_PLACEHOLDER_3.ps_availqty AS Decimal128(19, 0))\ + \n Filter: FILENAME_PLACEHOLDER_3.ps_suppkey = FILENAME_PLACEHOLDER_4.s_suppkey AND FILENAME_PLACEHOLDER_4.s_nationkey = FILENAME_PLACEHOLDER_5.n_nationkey AND FILENAME_PLACEHOLDER_5.n_name = CAST(Utf8(\"JAPAN\") AS Utf8)\ + \n Inner Join: Filter: Boolean(true)\ + \n Inner Join: Filter: Boolean(true)\ + \n TableScan: FILENAME_PLACEHOLDER_3 projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_4 projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_5 projection=[n_nationkey, n_name, n_regionkey, n_comment]\ + \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.ps_partkey]], aggr=[[sum(FILENAME_PLACEHOLDER_0.ps_supplycost * FILENAME_PLACEHOLDER_0.ps_availqty)]]\ + \n Projection: FILENAME_PLACEHOLDER_0.ps_partkey, FILENAME_PLACEHOLDER_0.ps_supplycost * CAST(FILENAME_PLACEHOLDER_0.ps_availqty AS Decimal128(19, 0))\ + \n Filter: FILENAME_PLACEHOLDER_0.ps_suppkey = FILENAME_PLACEHOLDER_1.s_suppkey AND FILENAME_PLACEHOLDER_1.s_nationkey = FILENAME_PLACEHOLDER_2.n_nationkey AND FILENAME_PLACEHOLDER_2.n_name = CAST(Utf8(\"JAPAN\") AS Utf8)\ + \n Inner Join: Filter: Boolean(true)\ + \n Inner Join: Filter: Boolean(true)\ + \n TableScan: FILENAME_PLACEHOLDER_0 projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_1 projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\ + \n TableScan: FILENAME_PLACEHOLDER_2 projection=[n_nationkey, n_name, n_regionkey, n_comment]"); + Ok(()) + } } diff --git a/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_10.json b/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_10.json new file mode 100644 index 0000000000000..04e13b1edc279 --- /dev/null +++ b/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_10.json @@ -0,0 +1,1257 @@ +{ + "extensionUris": [ + { + "extensionUriAnchor": 1, + "uri": "/functions_boolean.yaml" + }, + { + "extensionUriAnchor": 4, + "uri": "/functions_arithmetic_decimal.yaml" + }, + { + "extensionUriAnchor": 3, + "uri": "/functions_datetime.yaml" + }, + { + "extensionUriAnchor": 2, + "uri": "/functions_comparison.yaml" + } + ], + "extensions": [ + { + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "and:bool" + } + }, + { + "extensionFunction": { + "extensionUriReference": 2, + "functionAnchor": 1, + "name": "equal:any1_any1" + } + }, + { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 2, + "name": "gte:date_date" + } + }, + { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 3, + "name": "lt:date_date" + } + }, + { + "extensionFunction": { + "extensionUriReference": 4, + "functionAnchor": 4, + "name": "multiply:opt_decimal_decimal" + } + }, + { + "extensionFunction": { + "extensionUriReference": 4, + "functionAnchor": 5, + "name": "subtract:opt_decimal_decimal" + } + }, + { + "extensionFunction": { + "extensionUriReference": 4, + "functionAnchor": 6, + "name": "sum:opt_decimal" + } + } + ], + "relations": [ + { + "root": { + "input": { + "fetch": { + "common": { + "direct": { + } + }, + "input": { + "sort": { + "common": { + "direct": { + } + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [ + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15 + ] + } + }, + "input": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [ + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44 + ] + } + }, + "input": { + "filter": { + "common": { + "direct": { + } + }, + "input": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": [ + "C_CUSTKEY", + "C_NAME", + "C_ADDRESS", + "C_NATIONKEY", + "C_PHONE", + "C_ACCTBAL", + "C_MKTSEGMENT", + "C_COMMENT" + ], + "struct": { + "types": [ + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "varchar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "varchar": { + "length": 40, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "fixedChar": { + "length": 15, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 10, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "varchar": { + "length": 117, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_0", + "parquet": {} + } + ] + } + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": [ + "O_ORDERKEY", + "O_CUSTKEY", + "O_ORDERSTATUS", + "O_TOTALPRICE", + "O_ORDERDATE", + "O_ORDERPRIORITY", + "O_CLERK", + "O_SHIPPRIORITY", + "O_COMMENT" + ], + "struct": { + "types": [ + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "fixedChar": { + "length": 1, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 15, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 15, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "varchar": { + "length": 79, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_1", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": [ + "L_ORDERKEY", + "L_PARTKEY", + "L_SUPPKEY", + "L_LINENUMBER", + "L_QUANTITY", + "L_EXTENDEDPRICE", + "L_DISCOUNT", + "L_TAX", + "L_RETURNFLAG", + "L_LINESTATUS", + "L_SHIPDATE", + "L_COMMITDATE", + "L_RECEIPTDATE", + "L_SHIPINSTRUCT", + "L_SHIPMODE", + "L_COMMENT" + ], + "struct": { + "types": [ + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 1, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 1, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 10, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "varchar": { + "length": 44, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_2", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": [ + "N_NATIONKEY", + "N_NAME", + "N_REGIONKEY", + "N_COMMENT" + ], + "struct": { + "types": [ + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "varchar": { + "length": 152, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_3", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "condition": { + "scalarFunction": { + "functionReference": 0, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 9 + } + }, + "rootReference": { + } + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 17 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 8 + } + }, + "rootReference": { + } + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 2, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 12 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "cast": { + "type": { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "input": { + "literal": { + "fixedChar": "1993-10-01", + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 3, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 12 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "cast": { + "type": { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "input": { + "literal": { + "fixedChar": "1994-01-01", + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 25 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "literal": { + "fixedChar": "R", + "nullable": false, + "typeVariationReference": 0 + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 33 + } + }, + "rootReference": { + } + } + } + } + ] + } + } + } + ] + } + } + } + }, + "expressions": [ + { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 34 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 7 + } + }, + "rootReference": { + } + } + }, + { + "scalarFunction": { + "functionReference": 4, + "args": [], + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 22 + } + }, + "rootReference": { + } + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 5, + "args": [], + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "cast": { + "type": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "input": { + "literal": { + "i32": 1, + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + }, + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 23 + } + }, + "rootReference": { + } + } + } + } + ] + } + } + } + ] + } + } + ] + } + }, + "groupings": [ + { + "groupingExpressions": [ + { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + }, + "rootReference": { + } + } + } + ] + } + ], + "measures": [ + { + "measure": { + "functionReference": 6, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 7 + } + }, + "rootReference": { + } + } + } + } + ] + } + } + ] + } + }, + "expressions": [ + { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 7 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + }, + "rootReference": { + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + }, + "rootReference": { + } + } + } + ] + } + }, + "sorts": [ + { + "expr": { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + }, + "direction": "SORT_DIRECTION_DESC_NULLS_FIRST" + } + ] + } + }, + "offset": "0", + "count": "20" + } + }, + "names": [ + "C_CUSTKEY", + "C_NAME", + "REVENUE", + "C_ACCTBAL", + "N_NAME", + "C_ADDRESS", + "C_PHONE", + "C_COMMENT" + ] + } + } + ], + "expectedTypeUrls": [] +} diff --git a/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_11.json b/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_11.json new file mode 100644 index 0000000000000..916bc6f71c2cb --- /dev/null +++ b/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_11.json @@ -0,0 +1,1059 @@ +{ + "extensionUris": [{ + "extensionUriAnchor": 1, + "uri": "/functions_boolean.yaml" + }, { + "extensionUriAnchor": 3, + "uri": "/functions_arithmetic_decimal.yaml" + }, { + "extensionUriAnchor": 2, + "uri": "/functions_comparison.yaml" + }], + "extensions": [{ + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "and:bool" + } + }, { + "extensionFunction": { + "extensionUriReference": 2, + "functionAnchor": 1, + "name": "equal:any1_any1" + } + }, { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 2, + "name": "multiply:opt_decimal_decimal" + } + }, { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 3, + "name": "sum:opt_decimal" + } + }, { + "extensionFunction": { + "extensionUriReference": 2, + "functionAnchor": 4, + "name": "gt:any1_any1" + } + }], + "relations": [{ + "root": { + "input": { + "sort": { + "common": { + "direct": { + } + }, + "input": { + "filter": { + "common": { + "direct": { + } + }, + "input": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [16, 17] + } + }, + "input": { + "filter": { + "common": { + "direct": { + } + }, + "input": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "varchar": { + "length": 199, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_0", + "parquet": {} + } + ] + } + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_NATIONKEY", "S_PHONE", "S_ACCTBAL", "S_COMMENT"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "varchar": { + "length": 40, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "fixedChar": { + "length": 15, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "varchar": { + "length": 101, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_1", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "varchar": { + "length": 152, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_2", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "condition": { + "scalarFunction": { + "functionReference": 0, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": { + } + } + } + }] + } + } + }, { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 8 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 12 + } + }, + "rootReference": { + } + } + } + }] + } + } + }, { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 13 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "cast": { + "type": { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "input": { + "literal": { + "fixedChar": "JAPAN", + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + }] + } + } + }] + } + } + } + }, + "expressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }, { + "scalarFunction": { + "functionReference": 2, + "args": [], + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "cast": { + "type": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "input": { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + }] + } + }] + } + }, + "groupings": [{ + "groupingExpressions": [{ + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + }] + }], + "measures": [{ + "measure": { + "functionReference": 3, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + } + }] + } + }] + } + }, + "condition": { + "scalarFunction": { + "functionReference": 4, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "subquery": { + "scalar": { + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [1] + } + }, + "input": { + "aggregate": { + "common": { + "direct": { + } + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [16] + } + }, + "input": { + "filter": { + "common": { + "direct": { + } + }, + "input": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "join": { + "common": { + "direct": { + } + }, + "left": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "varchar": { + "length": 199, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_3", + "parquet": {} + } + ] + } + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_NATIONKEY", "S_PHONE", "S_ACCTBAL", "S_COMMENT"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "varchar": { + "length": 40, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "fixedChar": { + "length": 15, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "varchar": { + "length": 101, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_4", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "right": { + "read": { + "common": { + "direct": { + } + }, + "baseSchema": { + "names": ["N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"], + "struct": { + "types": [{ + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, { + "varchar": { + "length": 152, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_5", + "parquet": {} + } + ] + } + } + }, + "expression": { + "literal": { + "boolean": true, + "nullable": false, + "typeVariationReference": 0 + } + }, + "type": "JOIN_TYPE_INNER" + } + }, + "condition": { + "scalarFunction": { + "functionReference": 0, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": { + } + } + } + }] + } + } + }, { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 8 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 12 + } + }, + "rootReference": { + } + } + } + }] + } + } + }, { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 13 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "cast": { + "type": { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "input": { + "literal": { + "fixedChar": "JAPAN", + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + }] + } + } + }] + } + } + } + }, + "expressions": [{ + "scalarFunction": { + "functionReference": 2, + "args": [], + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "cast": { + "type": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "input": { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + }, + "rootReference": { + } + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + }] + } + }] + } + }, + "groupings": [{ + "groupingExpressions": [] + }], + "measures": [{ + "measure": { + "functionReference": 3, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }] + } + }] + } + }, + "expressions": [{ + "scalarFunction": { + "functionReference": 2, + "args": [], + "outputType": { + "decimal": { + "scale": 10, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": { + } + } + } + }, { + "value": { + "literal": { + "decimal": { + "value": "QEIPAAAAAAAAAAAAAAAAAA==", + "precision": 11, + "scale": 10 + }, + "nullable": false, + "typeVariationReference": 0 + } + } + }] + } + }] + } + } + } + } + } + }] + } + } + } + }, + "sorts": [{ + "expr": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": { + } + } + }, + "direction": "SORT_DIRECTION_DESC_NULLS_FIRST" + }] + } + }, + "names": ["PS_PARTKEY", "value"] + } + }], + "expectedTypeUrls": [] +} diff --git a/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_6.json b/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_6.json new file mode 100644 index 0000000000000..18fb9781da551 --- /dev/null +++ b/datafusion/substrait/tests/testdata/tpch_substrait_plans/query_6.json @@ -0,0 +1,585 @@ +{ + "extensionUris": [ + { + "extensionUriAnchor": 1, + "uri": "/functions_boolean.yaml" + }, + { + "extensionUriAnchor": 4, + "uri": "/functions_arithmetic_decimal.yaml" + }, + { + "extensionUriAnchor": 2, + "uri": "/functions_datetime.yaml" + }, + { + "extensionUriAnchor": 3, + "uri": "/functions_comparison.yaml" + } + ], + "extensions": [ + { + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "and:bool" + } + }, + { + "extensionFunction": { + "extensionUriReference": 2, + "functionAnchor": 1, + "name": "gte:date_date" + } + }, + { + "extensionFunction": { + "extensionUriReference": 2, + "functionAnchor": 2, + "name": "lt:date_date" + } + }, + { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 3, + "name": "gte:any1_any1" + } + }, + { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 4, + "name": "lte:any1_any1" + } + }, + { + "extensionFunction": { + "extensionUriReference": 3, + "functionAnchor": 5, + "name": "lt:any1_any1" + } + }, + { + "extensionFunction": { + "extensionUriReference": 4, + "functionAnchor": 6, + "name": "multiply:opt_decimal_decimal" + } + }, + { + "extensionFunction": { + "extensionUriReference": 4, + "functionAnchor": 7, + "name": "sum:opt_decimal" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [ + 16 + ] + } + }, + "input": { + "filter": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "L_ORDERKEY", + "L_PARTKEY", + "L_SUPPKEY", + "L_LINENUMBER", + "L_QUANTITY", + "L_EXTENDEDPRICE", + "L_DISCOUNT", + "L_TAX", + "L_RETURNFLAG", + "L_LINESTATUS", + "L_SHIPDATE", + "L_COMMITDATE", + "L_RECEIPTDATE", + "L_SHIPINSTRUCT", + "L_SHIPMODE", + "L_COMMENT" + ], + "struct": { + "types": [ + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i64": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + { + "i32": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 1, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 1, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 25, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fixedChar": { + "length": 10, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "varchar": { + "length": 44, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "local_files": { + "items": [ + { + "uri_file": "file://FILENAME_PLACEHOLDER_0", + "parquet": {} + } + ] + } + } + }, + "condition": { + "scalarFunction": { + "functionReference": 0, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "scalarFunction": { + "functionReference": 1, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 10 + } + }, + "rootReference": {} + } + } + }, + { + "value": { + "cast": { + "type": { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "input": { + "literal": { + "fixedChar": "1994-01-01", + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 2, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 10 + } + }, + "rootReference": {} + } + } + }, + { + "value": { + "cast": { + "type": { + "date": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_REQUIRED" + } + }, + "input": { + "literal": { + "fixedChar": "1995-01-01", + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 3, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + }, + "rootReference": {} + } + } + }, + { + "value": { + "literal": { + "decimal": { + "value": "BQAAAAAAAAAAAAAAAAAAAA==", + "precision": 3, + "scale": 2 + }, + "nullable": false, + "typeVariationReference": 0 + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 4, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + }, + "rootReference": {} + } + } + }, + { + "value": { + "literal": { + "decimal": { + "value": "BwAAAAAAAAAAAAAAAAAAAA==", + "precision": 3, + "scale": 2 + }, + "nullable": false, + "typeVariationReference": 0 + } + } + } + ] + } + } + }, + { + "value": { + "scalarFunction": { + "functionReference": 5, + "args": [], + "outputType": { + "bool": { + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + }, + "rootReference": {} + } + } + }, + { + "value": { + "cast": { + "type": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "input": { + "literal": { + "i32": 24, + "nullable": false, + "typeVariationReference": 0 + } + }, + "failureBehavior": "FAILURE_BEHAVIOR_UNSPECIFIED" + } + } + } + ] + } + } + } + ] + } + } + } + }, + "expressions": [ + { + "scalarFunction": { + "functionReference": 6, + "args": [], + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + }, + "rootReference": {} + } + } + }, + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + }, + "rootReference": {} + } + } + } + ] + } + } + ] + } + }, + "groupings": [ + { + "groupingExpressions": [] + } + ], + "measures": [ + { + "measure": { + "functionReference": 7, + "args": [], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "decimal": { + "scale": 0, + "precision": 19, + "typeVariationReference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + } + } + ] + } + } + ] + } + }, + "names": [ + "REVENUE" + ] + } + } + ], + "expectedTypeUrls": [] +} \ No newline at end of file From 6cca0f7f3725406aef4deb1ff1bbe299867ce82c Mon Sep 17 00:00:00 2001 From: Jonah Gao Date: Wed, 10 Jul 2024 05:08:22 +0800 Subject: [PATCH 02/10] Fix bug when pushing projection under joins (#11333) * Fix bug in `ProjectionPushdown` * add order by * Fix join on --- .../physical_optimizer/projection_pushdown.rs | 50 ++++++++++------ datafusion/sqllogictest/test_files/join.slt | 58 +++++++++++++++++++ 2 files changed, 89 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 70524dfcea7d2..3c2be59f75040 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -46,7 +46,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{DataFusionError, JoinSide}; +use datafusion_common::{internal_err, JoinSide}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{ utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef, @@ -640,6 +640,7 @@ fn try_pushdown_through_hash_join( &projection_as_columns[0..=far_right_left_col_ind as _], &projection_as_columns[far_left_right_col_ind as _..], hash_join.on(), + hash_join.left().schema().fields().len(), ) else { return Ok(None); }; @@ -649,8 +650,7 @@ fn try_pushdown_through_hash_join( &projection_as_columns[0..=far_right_left_col_ind as _], &projection_as_columns[far_left_right_col_ind as _..], filter, - hash_join.left(), - hash_join.right(), + hash_join.left().schema().fields().len(), ) { Some(updated_filter) => Some(updated_filter), None => return Ok(None), @@ -750,8 +750,7 @@ fn try_swapping_with_nested_loop_join( &projection_as_columns[0..=far_right_left_col_ind as _], &projection_as_columns[far_left_right_col_ind as _..], filter, - nl_join.left(), - nl_join.right(), + nl_join.left().schema().fields().len(), ) { Some(updated_filter) => Some(updated_filter), None => return Ok(None), @@ -806,6 +805,7 @@ fn try_swapping_with_sort_merge_join( &projection_as_columns[0..=far_right_left_col_ind as _], &projection_as_columns[far_left_right_col_ind as _..], sm_join.on(), + sm_join.left().schema().fields().len(), ) else { return Ok(None); }; @@ -859,6 +859,7 @@ fn try_swapping_with_sym_hash_join( &projection_as_columns[0..=far_right_left_col_ind as _], &projection_as_columns[far_left_right_col_ind as _..], sym_join.on(), + sym_join.left().schema().fields().len(), ) else { return Ok(None); }; @@ -868,8 +869,7 @@ fn try_swapping_with_sym_hash_join( &projection_as_columns[0..=far_right_left_col_ind as _], &projection_as_columns[far_left_right_col_ind as _..], filter, - sym_join.left(), - sym_join.right(), + sym_join.left().schema().fields().len(), ) { Some(updated_filter) => Some(updated_filter), None => return Ok(None), @@ -1090,6 +1090,7 @@ fn update_join_on( proj_left_exprs: &[(Column, String)], proj_right_exprs: &[(Column, String)], hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)], + left_field_size: usize, ) -> Option> { // TODO: Clippy wants the "map" call removed, but doing so generates // a compilation error. Remove the clippy directive once this @@ -1100,8 +1101,9 @@ fn update_join_on( .map(|(left, right)| (left, right)) .unzip(); - let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs); - let new_right_columns = new_columns_for_join_on(&right_idx, proj_right_exprs); + let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0); + let new_right_columns = + new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size); match (new_left_columns, new_right_columns) { (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()), @@ -1112,9 +1114,14 @@ fn update_join_on( /// This function generates a new set of columns to be used in a hash join /// operation based on a set of equi-join conditions (`hash_join_on`) and a /// list of projection expressions (`projection_exprs`). +/// +/// Notes: Column indices in the projection expressions are based on the join schema, +/// whereas the join on expressions are based on the join child schema. `column_index_offset` +/// represents the offset between them. fn new_columns_for_join_on( hash_join_on: &[&PhysicalExprRef], projection_exprs: &[(Column, String)], + column_index_offset: usize, ) -> Option> { let new_columns = hash_join_on .iter() @@ -1130,6 +1137,8 @@ fn new_columns_for_join_on( .enumerate() .find(|(_, (proj_column, _))| { column.name() == proj_column.name() + && column.index() + column_index_offset + == proj_column.index() }) .map(|(index, (_, alias))| Column::new(alias, index)); if let Some(new_column) = new_column { @@ -1138,10 +1147,10 @@ fn new_columns_for_join_on( // If the column is not found in the projection expressions, // it means that the column is not projected. In this case, // we cannot push the projection down. - Err(DataFusionError::Internal(format!( + internal_err!( "Column {:?} not found in projection expressions", column - ))) + ) } } else { Ok(Transformed::no(expr)) @@ -1160,21 +1169,20 @@ fn update_join_filter( projection_left_exprs: &[(Column, String)], projection_right_exprs: &[(Column, String)], join_filter: &JoinFilter, - join_left: &Arc, - join_right: &Arc, + left_field_size: usize, ) -> Option { let mut new_left_indices = new_indices_for_join_filter( join_filter, JoinSide::Left, projection_left_exprs, - join_left.schema(), + 0, ) .into_iter(); let mut new_right_indices = new_indices_for_join_filter( join_filter, JoinSide::Right, projection_right_exprs, - join_right.schema(), + left_field_size, ) .into_iter(); @@ -1204,20 +1212,24 @@ fn update_join_filter( /// This function determines and returns a vector of indices representing the /// positions of columns in `projection_exprs` that are involved in `join_filter`, /// and correspond to a particular side (`join_side`) of the join operation. +/// +/// Notes: Column indices in the projection expressions are based on the join schema, +/// whereas the join filter is based on the join child schema. `column_index_offset` +/// represents the offset between them. fn new_indices_for_join_filter( join_filter: &JoinFilter, join_side: JoinSide, projection_exprs: &[(Column, String)], - join_child_schema: SchemaRef, + column_index_offset: usize, ) -> Vec { join_filter .column_indices() .iter() .filter(|col_idx| col_idx.side == join_side) .filter_map(|col_idx| { - projection_exprs.iter().position(|(col, _)| { - col.name() == join_child_schema.fields()[col_idx.index].name() - }) + projection_exprs + .iter() + .position(|(col, _)| col_idx.index + column_index_offset == col.index()) }) .collect() } diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt index 3c89109145d70..12cb8b3985c76 100644 --- a/datafusion/sqllogictest/test_files/join.slt +++ b/datafusion/sqllogictest/test_files/join.slt @@ -986,3 +986,61 @@ DROP TABLE employees statement ok DROP TABLE department + + +# Test issue: https://github.com/apache/datafusion/issues/11269 +statement ok +CREATE TABLE t1 (v0 BIGINT) AS VALUES (-503661263); + +statement ok +CREATE TABLE t2 (v0 DOUBLE) AS VALUES (-1.663563947387); + +statement ok +CREATE TABLE t3 (v0 DOUBLE) AS VALUES (0.05112015193508901); + +query RR +SELECT t3.v0, t2.v0 FROM t1,t2,t3 WHERE t3.v0 >= t1.v0; +---- +0.051120151935 -1.663563947387 + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3; + + +# Test issue: https://github.com/apache/datafusion/issues/11275 +statement ok +CREATE TABLE t0 (v1 BOOLEAN) AS VALUES (false), (null); + +statement ok +CREATE TABLE t1 (v1 BOOLEAN) AS VALUES (false), (null), (false); + +statement ok +CREATE TABLE t2 (v1 BOOLEAN) AS VALUES (false), (true); + +query BB +SELECT t2.v1, t1.v1 FROM t0, t1, t2 WHERE t2.v1 IS DISTINCT FROM t0.v1 ORDER BY 1,2; +---- +false false +false false +false NULL +true false +true false +true false +true false +true NULL +true NULL + +statement ok +DROP TABLE t0; + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; From 7f25d9dd63918ecfeecaa5810d2a5c4fc9155c5d Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 9 Jul 2024 14:09:57 -0700 Subject: [PATCH 03/10] Minor: some cosmetics in `filter.rs`, fix clippy due to logical conflict (#11368) * Minor: some cosmetics in `filter.rs` * Minor: some cosmetics in `filter.rs` --- datafusion/physical-plan/src/filter.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 84afc227578f8..c5ba3992d3b41 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! FilterExec evaluates a boolean predicate against all input batches to determine which rows to -//! include in its output batches. - use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -60,7 +57,7 @@ pub struct FilterExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, - /// Selectivity for statistics. 0 = no rows, 100 all rows + /// Selectivity for statistics. 0 = no rows, 100 = all rows default_selectivity: u8, cache: PlanProperties, } @@ -91,14 +88,14 @@ impl FilterExec { Ok(Self { predicate, - input: input.clone(), + input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache, }) } other => { - plan_err!("Filter predicate must return boolean values, not {other:?}") + plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") } } } @@ -108,7 +105,9 @@ impl FilterExec { default_selectivity: u8, ) -> Result { if default_selectivity > 100 { - return plan_err!("Default filter selectivity needs to be less than 100"); + return plan_err!( + "Default filter selectivity value needs to be less than or equal to 100" + ); } self.default_selectivity = default_selectivity; Ok(self) @@ -369,12 +368,12 @@ pub(crate) fn batch_filter( .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { let filter_array = match as_boolean_array(&array) { - Ok(boolean_array) => { - Ok(boolean_array.to_owned()) - }, + Ok(boolean_array) => Ok(boolean_array.to_owned()), Err(_) => { let Ok(null_array) = as_null_array(&array) else { - return internal_err!("Cannot create filter_array from non-boolean predicates, unable to continute"); + return internal_err!( + "Cannot create filter_array from non-boolean predicates" + ); }; // if the predicate is null, then the result is also null From 9df393ea5539d5c83f8c16b028f44468727e3bee Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 9 Jul 2024 17:11:48 -0400 Subject: [PATCH 04/10] Update prost-derive requirement from 0.12 to 0.13 (#11355) Updates the requirements on [prost-derive](https://github.com/tokio-rs/prost) to permit the latest version. - [Release notes](https://github.com/tokio-rs/prost/releases) - [Changelog](https://github.com/tokio-rs/prost/blob/master/CHANGELOG.md) - [Commits](https://github.com/tokio-rs/prost/compare/v0.12.0...v0.13.0) --- updated-dependencies: - dependency-name: prost-derive dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- datafusion-examples/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 52e3a5525717d..626c365af21cb 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -73,7 +73,7 @@ mimalloc = { version = "0.1", default-features = false } num_cpus = { workspace = true } object_store = { workspace = true, features = ["aws", "http"] } prost = { version = "0.12", default-features = false } -prost-derive = { version = "0.12", default-features = false } +prost-derive = { version = "0.13", default-features = false } serde = { version = "1.0.136", features = ["derive"] } serde_json = { workspace = true } tempfile = { workspace = true } From c018c74ae1ca2339bd530e5e6724e45a16e3900b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 9 Jul 2024 17:44:39 -0400 Subject: [PATCH 05/10] Minor: update dashmap (#11335) --- Cargo.toml | 2 +- datafusion-cli/Cargo.lock | 43 +++++++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f87205f0d0671..6dd434abc87c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ bigdecimal = "=0.4.1" bytes = "1.4" chrono = { version = "0.4.34", default-features = false } ctor = "0.2.0" -dashmap = "5.5.0" +dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false } datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 42ec5922a73fe..8af42cb43932e 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -443,7 +443,7 @@ dependencies = [ "fastrand 1.9.0", "hex", "http 0.2.12", - "hyper 0.14.29", + "hyper 0.14.30", "ring 0.16.20", "time", "tokio", @@ -609,7 +609,7 @@ dependencies = [ "fastrand 1.9.0", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "hyper-rustls 0.23.2", "lazy_static", "pin-project-lite", @@ -631,7 +631,7 @@ dependencies = [ "futures-core", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.29", + "hyper 0.14.30", "once_cell", "percent-encoding", "pin-project-lite", @@ -875,9 +875,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.106" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "066fce287b1d4eafef758e89e09d724a24808a9196fe9756b8ca90e86d0719a2" +checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" dependencies = [ "jobserver", "libc", @@ -1055,6 +1055,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crunchy" version = "0.2.2" @@ -1110,11 +1116,12 @@ checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" [[package]] name = "dashmap" -version = "5.5.3" +version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" dependencies = [ "cfg-if", + "crossbeam-utils", "hashbrown 0.14.5", "lock_api", "once_cell", @@ -1941,9 +1948,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.29" +version = "0.14.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" +checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" dependencies = [ "bytes", "futures-channel", @@ -1965,9 +1972,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", @@ -1990,7 +1997,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ "http 0.2.12", - "hyper 0.14.29", + "hyper 0.14.30", "log", "rustls 0.20.9", "rustls-native-certs 0.6.3", @@ -2006,7 +2013,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-util", "rustls 0.23.11", "rustls-native-certs 0.7.1", @@ -2027,7 +2034,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.4.0", + "hyper 1.4.1", "pin-project-lite", "socket2", "tokio", @@ -2502,7 +2509,7 @@ dependencies = [ "chrono", "futures", "humantime", - "hyper 1.4.0", + "hyper 1.4.1", "itertools", "md-5", "parking_lot", @@ -2976,7 +2983,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.0", + "hyper 1.4.1", "hyper-rustls 0.27.2", "hyper-util", "ipnet", @@ -3908,9 +3915,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.9.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom", "serde", From 1e0c06e14ae821ac6aa344f8acb638431a898ae8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 9 Jul 2024 17:45:34 -0400 Subject: [PATCH 06/10] Improve and test dataframe API examples in docs (#11290) * Improve and test dataframe API examples in docs * Update introduction with pointer to user guide * Make example consistent * Make read_csv comment consistent * clarifications * prettier + tweaks * Update docs/source/library-user-guide/using-the-dataframe-api.md Co-authored-by: Eric Fredine * Update docs/source/library-user-guide/using-the-dataframe-api.md Co-authored-by: Eric Fredine --------- Co-authored-by: Eric Fredine --- datafusion-examples/README.md | 1 + datafusion/core/src/lib.rs | 8 +- .../using-the-dataframe-api.md | 302 +++++++++++++----- .../library-user-guide/using-the-sql-api.md | 17 +- 4 files changed, 239 insertions(+), 89 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 90469e6715a6a..2696f74775cf3 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -55,6 +55,7 @@ cargo run --example dataframe - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) +- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 956e9f7246a36..f5805bc069825 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -641,5 +641,11 @@ doc_comment::doctest!( #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/library-user-guide/using-the-sql-api.md", - library_user_guide_example_usage + library_user_guide_sql_api +); + +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/using-the-dataframe-api.md", + library_user_guide_dataframe_api ); diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index c4f4ecd4f1370..9e7774cbb944c 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -19,129 +19,267 @@ # Using the DataFrame API -## What is a DataFrame +The [Users Guide] introduces the [`DataFrame`] API and this section describes +that API in more depth. -`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans. +## What is a DataFrame? -```rust -pub struct DataFrame { - session_state: SessionState, - plan: LogicalPlan, -} -``` - -You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`: - -```rust -let df = ctx.table("users").await?; +As described in the [Users Guide], DataFusion [`DataFrame`]s are modeled after +the [Pandas DataFrame] interface, and are implemented as thin wrapper over a +[`LogicalPlan`] that adds functionality for building and executing those plans. -// Create a new DataFrame sorted by `id`, `bank_account` -let new_df = df.select(vec![col("id"), col("bank_account")])? - .sort(vec![col("id")])?; - -// Build the same plan using the LogicalPlanBuilder -let plan = LogicalPlanBuilder::from(&df.to_logical_plan()) - .project(vec![col("id"), col("bank_account")])? - .sort(vec![col("id")])? - .build()?; -``` - -You can use `collect` or `execute_stream` to execute the query. +The simplest possible dataframe is one that scans a table and that table can be +in a file or in memory. ## How to generate a DataFrame -You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query. - -For example, to use `sql` to construct `DataFrame`: +You can construct [`DataFrame`]s programmatically using the API, similarly to +other DataFrame APIs. For example, you can read an in memory `RecordBatch` into +a `DataFrame`: ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(create_memtable()?))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::arrow::array::{ArrayRef, Int32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register an in-memory table containing the following data + // id | bank_account + // ---|------------- + // 1 | 9000 + // 2 | 8000 + // 3 | 7000 + let data = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), + ])?; + // Create a DataFrame that scans the user table, and finds + // all users with a bank account at least 8000 + // and sorts the results by bank account in descending order + let dataframe = ctx + .read_batch(data)? + .filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000 + .sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC + + Ok(()) +} ``` -To construct `DataFrame` using the API: +You can _also_ generate a `DataFrame` from a SQL query and use the DataFrame's APIs +to manipulate the output of the query. ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(create_memtable()?))?; -let dataframe = ctx - .table("users") - .filter(col("a").lt_eq(col("b")))? - .sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; +use std::sync::Arc; +use datafusion::prelude::*; +use datafusion::assert_batches_eq; +use datafusion::arrow::array::{ArrayRef, Int32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // Register the same in-memory table as the previous example + let data = RecordBatch::try_from_iter(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), + ])?; + ctx.register_batch("users", data)?; + // Create a DataFrame using SQL + let dataframe = ctx.sql("SELECT * FROM users;") + .await? + // Note we can filter the output of the query using the DataFrame API + .filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000 + + let results = &dataframe.collect().await?; + + // use the `assert_batches_eq` macro to show the output + assert_batches_eq!( + vec![ + "+----+--------------+", + "| id | bank_account |", + "+----+--------------+", + "| 1 | 9000 |", + "| 2 | 8000 |", + "+----+--------------+", + ], + &results + ); + Ok(()) +} ``` ## Collect / Streaming Exec -DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations. +DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until +they are executed, which allows for additional optimizations. -When you have a `DataFrame`, you can run it in one of three ways: +You can run a `DataFrame` in one of three ways: -1. `collect` which executes the query and buffers all the output into a `Vec` -2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` -3. `cache` which executes the query and buffers the output into a new in memory DataFrame. +1. `collect`: executes the query and buffers all the output into a `Vec` +2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` +3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.` -You can just collect all outputs once like: +To collect all outputs into a memory buffer, use the `collect` method: ```rust -let ctx = SessionContext::new(); -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; -let batches = df.collect().await?; +use datafusion::prelude::*; +use datafusion::error::Result; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read the contents of a CSV file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // execute the query and collect the results as a Vec + let batches = df.collect().await?; + for record_batch in batches { + println!("{record_batch:?}"); + } + Ok(()) +} ``` -You can also use stream output to incrementally generate output one `RecordBatch` at a time +Use `execute_stream` to incrementally generate output one `RecordBatch` at a time: ```rust -let ctx = SessionContext::new(); -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; -let mut stream = df.execute_stream().await?; -while let Some(rb) = stream.next().await { - println!("{rb:?}"); +use datafusion::prelude::*; +use datafusion::error::Result; +use futures::stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // begin execution (returns quickly, does not compute results) + let mut stream = df.execute_stream().await?; + // results are returned incrementally as they are computed + while let Some(record_batch) = stream.next().await { + println!("{record_batch:?}"); + } + Ok(()) } ``` # Write DataFrame to Files -You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. - -When writing a file, DataFusion will execute the DataFrame and stream the results to a file. +You can also write the contents of a `DataFrame` to a file. When writing a file, +DataFusion executes the `DataFrame` and streams the results to the output. +DataFusion comes with support for writing `csv`, `json` `arrow` `avro`, and +`parquet` files, and supports writing custom file formats via API (see +[`custom_file_format.rs`] for an example) -For example, to write a csv_file +For example, to read a CSV file and write it to a parquet file, use the +[`DataFrame::write_parquet`] method ```rust -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(mem_table))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; - -dataframe - .write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None) - .await; +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::dataframe::DataFrameWriteOptions; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // stream the contents of the DataFrame to the `example.parquet` file + df.write_parquet( + "example.parquet", + DataFrameWriteOptions::new(), + None, // writer_options + ).await; + Ok(()) +} ``` -and the file will look like (Example Output): +[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs -``` -id,bank_account -1,9000 +The output file will look like (Example Output): + +```sql +> select * from '../datafusion/core/example.parquet'; ++---+---+---+ +| a | b | c | ++---+---+---+ +| 1 | 2 | 3 | ++---+---+---+ ``` -## Transform between LogicalPlan and DataFrame +## Relationship between `LogicalPlan`s and `DataFrame`s -As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. +The `DataFrame` struct is defined like this: ```rust -// Just combine LogicalPlan with SessionContext and you get a DataFrame -let ctx = SessionContext::new(); -// Register the in-memory table containing the data -ctx.register_table("users", Arc::new(mem_table))?; -let dataframe = ctx.sql("SELECT * FROM users;").await?; +use datafusion::execution::session_state::SessionState; +use datafusion::logical_expr::LogicalPlan; +pub struct DataFrame { + // state required to execute a LogicalPlan + session_state: Box, + // LogicalPlan that describes the computation to perform + plan: LogicalPlan, +} +``` -// get LogicalPlan in dataframe -let plan = dataframe.logical_plan().clone(); +As shown above, `DataFrame` is a thin wrapper of `LogicalPlan`, so you can +easily go back and forth between them. -// construct a DataFrame with LogicalPlan -let new_df = DataFrame::new(ctx.state(), plan); +```rust +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::logical_expr::LogicalPlanBuilder; + +#[tokio::main] +async fn main() -> Result<()>{ + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // You can easily get the LogicalPlan from the DataFrame + let (_state, plan) = df.into_parts(); + // Just combine LogicalPlan with SessionContext and you get a DataFrame + // get LogicalPlan in dataframe + let new_df = DataFrame::new(ctx.state(), plan); + Ok(()) +} ``` + +In fact, using the [`DataFrame`]s methods you can create the same +[`LogicalPlan`]s as when using [`LogicalPlanBuilder`]: + +```rust +use datafusion::prelude::*; +use datafusion::error::Result; +use datafusion::logical_expr::LogicalPlanBuilder; + +#[tokio::main] +async fn main() -> Result<()>{ + let ctx = SessionContext::new(); + // read example.csv file into a DataFrame + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + // Create a new DataFrame sorted by `id`, `bank_account` + let new_df = df.select(vec![col("a"), col("b")])? + .sort(vec![col("a")])?; + // Build the same plan using the LogicalPlanBuilder + // Similar to `SELECT a, b FROM example.csv ORDER BY a` + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan + let plan = LogicalPlanBuilder::from(plan) + .project(vec![col("a"), col("b")])? + .sort(vec![col("a")])? + .build()?; + // prove they are the same + assert_eq!(new_df.logical_plan(), &plan); + Ok(()) +} +``` + +[users guide]: ../user-guide/dataframe.md +[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html +[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html +[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html +[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html +[`dataframe::write_parquet`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet diff --git a/docs/source/library-user-guide/using-the-sql-api.md b/docs/source/library-user-guide/using-the-sql-api.md index 1a25f078cc2e2..9c32004db4359 100644 --- a/docs/source/library-user-guide/using-the-sql-api.md +++ b/docs/source/library-user-guide/using-the-sql-api.md @@ -29,16 +29,15 @@ using the [`SessionContext::sql`] method. For lower level control such as preventing DDL, you can use [`SessionContext::sql_with_options`] or the [`SessionState`] APIs -[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html -[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql -[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options -[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html - ## Registering Data Sources using `SessionContext::register*` The `SessionContext::register*` methods tell DataFusion the name of the source and how to read data. Once registered, you can execute SQL queries -using the `SessionContext::sql` method referring to your data source as a table. +using the [`SessionContext::sql`] method referring to your data source as a table. + +The [`SessionContext::sql`] method returns a `DataFrame` for ease of +use. See the ["Using the DataFrame API"] section for more information on how to +work with DataFrames. ### Read a CSV File @@ -215,3 +214,9 @@ async fn main() -> Result<()> { Ok(()) } ``` + +[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html +[`sessioncontext::sql`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql +[`sessioncontext::sql_with_options`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options +[`sessionstate`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html +["using the dataframe api"]: ../library-user-guide/using-the-dataframe-api.md From 16a3148354e81e1ae4e2aebdd83c07799164ac14 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 9 Jul 2024 21:30:28 -0400 Subject: [PATCH 07/10] Remove redundant `unalias_nested` calls for creating Filter's (#11340) * Remove uncessary unalias_nested calls when creating Filter * simplify --- datafusion/expr/src/logical_plan/plan.rs | 54 ++++--------------- .../optimizer/src/common_subexpr_eliminate.rs | 11 +--- datafusion/optimizer/src/push_down_filter.rs | 4 +- 3 files changed, 13 insertions(+), 56 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 998b5bdcb60c8..bde9655b8a390 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -41,9 +41,7 @@ use crate::{ }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, @@ -645,39 +643,6 @@ impl LogicalPlan { Ok(LogicalPlan::Values(Values { schema, values })) } LogicalPlan::Filter(Filter { predicate, input }) => { - // todo: should this logic be moved to Filter::try_new? - - // filter predicates should not contain aliased expressions so we remove any aliases - // before this logic was added we would have aliases within filters such as for - // benchmark q6: - // - // lineitem.l_shipdate >= Date32(\"8766\") - // AND lineitem.l_shipdate < Date32(\"9131\") - // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >= - // Decimal128(Some(49999999999999),30,15) - // AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <= - // Decimal128(Some(69999999999999),30,15) - // AND lineitem.l_quantity < Decimal128(Some(2400),15,2) - - let predicate = predicate - .transform_down(|expr| { - match expr { - Expr::Exists { .. } - | Expr::ScalarSubquery(_) - | Expr::InSubquery(_) => { - // subqueries could contain aliases so we don't recurse into those - Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) - } - Expr::Alias(_) => Ok(Transformed::new( - expr.unalias(), - true, - TreeNodeRecursion::Jump, - )), - _ => Ok(Transformed::no(expr)), - } - }) - .data()?; - Filter::try_new(predicate, input).map(LogicalPlan::Filter) } LogicalPlan::Repartition(_) => Ok(self), @@ -878,7 +843,7 @@ impl LogicalPlan { } LogicalPlan::Filter { .. } => { assert_eq!(1, expr.len()); - let predicate = expr.pop().unwrap().unalias_nested().data; + let predicate = expr.pop().unwrap(); Filter::try_new(predicate, Arc::new(inputs.swap_remove(0))) .map(LogicalPlan::Filter) @@ -2117,6 +2082,9 @@ pub struct Filter { impl Filter { /// Create a new filter operator. + /// + /// Notes: as Aliases have no effect on the output of a filter operator, + /// they are removed from the predicate expression. pub fn try_new(predicate: Expr, input: Arc) -> Result { // Filter predicates must return a boolean value so we try and validate that here. // Note that it is not always possible to resolve the predicate expression during plan @@ -2940,7 +2908,7 @@ mod tests { use crate::logical_plan::table_scan; use crate::{col, exists, in_subquery, lit, placeholder, GroupingSet}; - use datafusion_common::tree_node::TreeNodeVisitor; + use datafusion_common::tree_node::{TransformedResult, TreeNodeVisitor}; use datafusion_common::{not_impl_err, Constraint, ScalarValue}; use crate::test::function_stub::count; @@ -3500,11 +3468,8 @@ digraph { })); let col = schema.field_names()[0].clone(); - let filter = Filter::try_new( - Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), - scan, - ) - .unwrap(); + let filter = + Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap(); assert!(filter.is_scalar()); } @@ -3522,8 +3487,7 @@ digraph { .build() .unwrap(); - let external_filter = - col("foo").eq(Expr::Literal(ScalarValue::Boolean(Some(true)))); + let external_filter = col("foo").eq(lit(true)); // after transformation, because plan is not the same anymore, // the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 4a4933fe9cfdb..e18d8bc91bf60 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -342,16 +342,9 @@ impl CommonSubexprEliminate { let input = unwrap_arc(input); let expr = vec![predicate]; self.try_unary_plan(expr, input, config)? - .transform_data(|(mut new_expr, new_input)| { + .map_data(|(mut new_expr, new_input)| { assert_eq!(new_expr.len(), 1); // passed in vec![predicate] - let new_predicate = new_expr - .pop() - .unwrap() - .unalias_nested() - .update_data(|new_predicate| (new_predicate, new_input)); - Ok(new_predicate) - })? - .map_data(|(new_predicate, new_input)| { + let new_predicate = new_expr.pop().unwrap(); Filter::try_new(new_predicate, Arc::new(new_input)) .map(LogicalPlan::Filter) }) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 1c3186b762b71..0a3bae154bd64 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -761,11 +761,11 @@ impl OptimizerRule for PushDownFilter { // Push down non-unnest filter predicate // Unnest - // Unenst Input (Projection) + // Unnest Input (Projection) // -> rewritten to // Unnest // Filter - // Unenst Input (Projection) + // Unnest Input (Projection) let unnest_input = std::mem::take(&mut unnest.input); From 146b679aa19c7749cc73d0c27440419d6498142b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 10 Jul 2024 16:01:53 +0800 Subject: [PATCH 08/10] Enable `clone_on_ref_ptr` clippy lint on optimizer (#11346) * Enable clone_on_ref_ptr clippy lint on optimizer * Fix lints * fmt --- datafusion/optimizer/src/analyzer/subquery.rs | 5 ++-- .../optimizer/src/analyzer/type_coercion.rs | 8 +++--- .../optimizer/src/common_subexpr_eliminate.rs | 2 +- datafusion/optimizer/src/decorrelate.rs | 15 ++++++----- .../src/decorrelate_predicate_subquery.rs | 4 +-- .../optimizer/src/eliminate_cross_join.rs | 6 ++--- datafusion/optimizer/src/eliminate_filter.rs | 3 ++- datafusion/optimizer/src/eliminate_limit.rs | 3 ++- .../optimizer/src/eliminate_nested_union.rs | 2 +- .../optimizer/src/eliminate_one_union.rs | 2 +- .../optimizer/src/eliminate_outer_join.rs | 2 +- .../src/extract_equijoin_predicate.rs | 4 +-- datafusion/optimizer/src/lib.rs | 2 ++ .../optimizer/src/optimize_projections/mod.rs | 12 ++++----- datafusion/optimizer/src/optimizer.rs | 8 +++--- datafusion/optimizer/src/plan_signature.rs | 4 +-- .../optimizer/src/propagate_empty_relation.rs | 26 +++++++++---------- datafusion/optimizer/src/push_down_filter.rs | 10 +++---- .../optimizer/src/scalar_subquery_to_join.rs | 2 +- .../simplify_expressions/expr_simplifier.rs | 11 ++++---- .../src/single_distinct_to_groupby.rs | 2 +- datafusion/optimizer/src/test/mod.rs | 2 +- .../src/unwrap_cast_in_comparison.rs | 2 +- 23 files changed, 73 insertions(+), 64 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 5725a725e64a4..db39f8f7737d4 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -16,6 +16,7 @@ // under the License. use std::ops::Deref; +use std::sync::Arc; use crate::analyzer::check_plan; use crate::utils::collect_subquery_cols; @@ -245,7 +246,7 @@ fn check_aggregation_in_scalar_subquery( if !agg.group_expr.is_empty() { let correlated_exprs = get_correlated_expressions(inner_plan)?; let inner_subquery_cols = - collect_subquery_cols(&correlated_exprs, agg.input.schema().clone())?; + collect_subquery_cols(&correlated_exprs, Arc::clone(agg.input.schema()))?; let mut group_columns = agg .group_expr .iter() @@ -375,7 +376,7 @@ mod test { _inputs: Vec, ) -> Result { Ok(Self { - empty_schema: self.empty_schema.clone(), + empty_schema: Arc::clone(&self.empty_schema), }) } } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 6c08b3e998b3d..3cab474df84e0 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -656,7 +656,7 @@ fn coerce_arguments_for_fun( .map(|expr| { let data_type = expr.get_type(schema).unwrap(); if let DataType::FixedSizeList(field, _) = data_type { - let to_type = DataType::List(field.clone()); + let to_type = DataType::List(Arc::clone(&field)); expr.cast_to(&to_type, schema) } else { Ok(expr) @@ -1265,8 +1265,10 @@ mod test { signature: Signature::variadic(vec![Utf8], Volatility::Immutable), }) .call(args.to_vec()); - let plan = - LogicalPlan::Projection(Projection::try_new(vec![expr], empty.clone())?); + let plan = LogicalPlan::Projection(Projection::try_new( + vec![expr], + Arc::clone(&empty), + )?); let expected = "Projection: TestScalarUDF(a, Utf8(\"b\"), CAST(Boolean(true) AS Utf8), CAST(Boolean(false) AS Utf8), CAST(Int32(13) AS Utf8))\n EmptyRelation"; assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?; diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index e18d8bc91bf60..721987b917d4c 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -1385,7 +1385,7 @@ mod test { "my_agg", Signature::exact(vec![DataType::UInt32], Volatility::Stable), return_type.clone(), - accumulator.clone(), + Arc::clone(&accumulator), vec![Field::new("value", DataType::UInt32, true)], ))), vec![inner], diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 5f8e0a85215aa..c998e8442548c 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -19,6 +19,7 @@ use std::collections::{BTreeSet, HashMap}; use std::ops::Deref; +use std::sync::Arc; use crate::simplify_expressions::ExprSimplifier; use crate::utils::collect_subquery_cols; @@ -147,7 +148,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { } fn f_up(&mut self, plan: LogicalPlan) -> Result> { - let subquery_schema = plan.schema().clone(); + let subquery_schema = Arc::clone(plan.schema()); match &plan { LogicalPlan::Filter(plan_filter) => { let subquery_filter_exprs = split_conjunction(&plan_filter.predicate); @@ -172,7 +173,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { if let Some(expr) = conjunction(subquery_filters.clone()) { filter_exprs_evaluation_result_on_empty_batch( &expr, - plan_filter.input.schema().clone(), + Arc::clone(plan_filter.input.schema()), expr_result_map, &mut expr_result_map_for_count_bug, )? @@ -230,7 +231,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { { proj_exprs_evaluation_result_on_empty_batch( &projection.expr, - projection.input.schema().clone(), + Arc::clone(projection.input.schema()), expr_result_map, &mut expr_result_map_for_count_bug, )?; @@ -276,7 +277,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { { agg_exprs_evaluation_result_on_empty_batch( &aggregate.aggr_expr, - aggregate.input.schema().clone(), + Arc::clone(aggregate.input.schema()), &mut expr_result_map_for_count_bug, )?; if !expr_result_map_for_count_bug.is_empty() { @@ -332,7 +333,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr { if limit.fetch.filter(|limit_row| *limit_row == 0).is_some() { LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: limit.input.schema().clone(), + schema: Arc::clone(limit.input.schema()), }) } else { LogicalPlanBuilder::from((*limit.input).clone()).build()? @@ -456,7 +457,7 @@ fn agg_exprs_evaluation_result_on_empty_batch( let result_expr = result_expr.unalias(); let props = ExecutionProps::new(); - let info = SimplifyContext::new(&props).with_schema(schema.clone()); + let info = SimplifyContext::new(&props).with_schema(Arc::clone(&schema)); let simplifier = ExprSimplifier::new(info); let result_expr = simplifier.simplify(result_expr)?; if matches!(result_expr, Expr::Literal(ScalarValue::Int64(_))) { @@ -492,7 +493,7 @@ fn proj_exprs_evaluation_result_on_empty_batch( if result_expr.ne(expr) { let props = ExecutionProps::new(); - let info = SimplifyContext::new(&props).with_schema(schema.clone()); + let info = SimplifyContext::new(&props).with_schema(Arc::clone(&schema)); let simplifier = ExprSimplifier::new(info); let result_expr = simplifier.simplify(result_expr)?; let expr_name = match expr { diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 81d6dc863af6b..4e3ca7e33a2eb 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -571,7 +571,7 @@ mod tests { ); let plan = LogicalPlanBuilder::from(scan_tpch_table("customer")) .filter( - in_subquery(col("customer.c_custkey"), orders.clone()) + in_subquery(col("customer.c_custkey"), Arc::clone(&orders)) .and(in_subquery(col("customer.c_custkey"), orders)), )? .project(vec![col("customer.c_custkey")])? @@ -1358,7 +1358,7 @@ mod tests { ); let plan = LogicalPlanBuilder::from(scan_tpch_table("customer")) - .filter(exists(orders.clone()).and(exists(orders)))? + .filter(exists(Arc::clone(&orders)).and(exists(orders)))? .project(vec![col("customer.c_custkey")])? .build()?; diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 6d6f84373a36b..729c45426ff29 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -86,7 +86,7 @@ impl OptimizerRule for EliminateCrossJoin { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - let plan_schema = plan.schema().clone(); + let plan_schema = Arc::clone(plan.schema()); let mut possible_join_keys = JoinKeySet::new(); let mut all_inputs: Vec = vec![]; @@ -155,7 +155,7 @@ impl OptimizerRule for EliminateCrossJoin { if &plan_schema != left.schema() { left = LogicalPlan::Projection(Projection::new_from_schema( Arc::new(left), - plan_schema.clone(), + Arc::clone(&plan_schema), )); } @@ -420,7 +420,7 @@ mod tests { }; fn assert_optimized_plan_eq(plan: LogicalPlan, expected: Vec<&str>) { - let starting_schema = plan.schema().clone(); + let starting_schema = Arc::clone(plan.schema()); let rule = EliminateCrossJoin::new(); let transformed_plan = rule.rewrite(plan, &OptimizerContext::new()).unwrap(); assert!(transformed_plan.transformed, "failed to optimize plan"); diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index 7c873b411d592..2d8d77b89ddc8 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -21,6 +21,7 @@ use datafusion_common::tree_node::Transformed; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::{EmptyRelation, Expr, Filter, LogicalPlan}; +use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; @@ -68,7 +69,7 @@ impl OptimizerRule for EliminateFilter { Some(false) | None => Ok(Transformed::yes(LogicalPlan::EmptyRelation( EmptyRelation { produce_one_row: false, - schema: input.schema().clone(), + schema: Arc::clone(input.schema()), }, ))), }, diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index b0a75fa47c277..165834e759752 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -21,6 +21,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::Transformed; use datafusion_common::Result; use datafusion_expr::logical_plan::{tree_node::unwrap_arc, EmptyRelation, LogicalPlan}; +use std::sync::Arc; /// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is /// greater than or equal to current's fetch @@ -67,7 +68,7 @@ impl OptimizerRule for EliminateLimit { return Ok(Transformed::yes(LogicalPlan::EmptyRelation( EmptyRelation { produce_one_row: false, - schema: limit.input.schema().clone(), + schema: Arc::clone(limit.input.schema()), }, ))); } diff --git a/datafusion/optimizer/src/eliminate_nested_union.rs b/datafusion/optimizer/src/eliminate_nested_union.rs index 3732f7ed90c8a..c8ae937e128a6 100644 --- a/datafusion/optimizer/src/eliminate_nested_union.rs +++ b/datafusion/optimizer/src/eliminate_nested_union.rs @@ -79,7 +79,7 @@ impl OptimizerRule for EliminateNestedUnion { Ok(Transformed::yes(LogicalPlan::Distinct(Distinct::All( Arc::new(LogicalPlan::Union(Union { inputs: inputs.into_iter().map(Arc::new).collect_vec(), - schema: schema.clone(), + schema: Arc::clone(&schema), })), )))) } diff --git a/datafusion/optimizer/src/eliminate_one_union.rs b/datafusion/optimizer/src/eliminate_one_union.rs index edf6b72d7e178..5e37b8cf7c1fa 100644 --- a/datafusion/optimizer/src/eliminate_one_union.rs +++ b/datafusion/optimizer/src/eliminate_one_union.rs @@ -110,7 +110,7 @@ mod tests { &table_scan(Some("table"), &schema(), None)?.build()?, &schema().to_dfschema()?, )?; - let schema = table_plan.schema().clone(); + let schema = Arc::clone(table_plan.schema()); let single_union_plan = LogicalPlan::Union(Union { inputs: vec![Arc::new(table_plan)], schema, diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 13c483c6dfcca..12534e058152e 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -118,7 +118,7 @@ impl OptimizerRule for EliminateOuterJoin { join_constraint: join.join_constraint, on: join.on.clone(), filter: join.filter.clone(), - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), null_equals_null: join.null_equals_null, })); Filter::try_new(filter.predicate, new_join) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 87d205139e8e9..0dae777ab5bdf 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -357,8 +357,8 @@ mod tests { let t1 = test_table_scan_with_name("t1")?; let t2 = test_table_scan_with_name("t2")?; - let t1_schema = t1.schema().clone(); - let t2_schema = t2.schema().clone(); + let t1_schema = Arc::clone(t1.schema()); + let t2_schema = Arc::clone(t2.schema()); // filter: t1.a + CAST(Int64(1), UInt32) = t2.a + CAST(Int64(2), UInt32) as t1.a + 1 = t2.a + 2 let filter = Expr::eq( diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index a6a9e5cf26eaf..332d3e9fe54e9 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -14,6 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143 +#![deny(clippy::clone_on_ref_ptr)] //! # DataFusion Optimizer //! diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 4684dbd3b043a..cae2a7b2cad2f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -205,7 +205,7 @@ fn optimize_projections( }); } LogicalPlan::Window(window) => { - let input_schema = window.input.schema().clone(); + let input_schema = Arc::clone(window.input.schema()); // Split parent requirements to child and window expression sections: let n_input_fields = input_schema.fields().len(); // Offset window expression indices so that they point to valid @@ -881,7 +881,7 @@ mod tests { Ok(Self { exprs, input: Arc::new(inputs.swap_remove(0)), - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), }) } @@ -949,7 +949,7 @@ mod tests { exprs, left_child: Arc::new(inputs.remove(0)), right_child: Arc::new(inputs.remove(0)), - schema: self.schema.clone(), + schema: Arc::clone(&self.schema), }) } @@ -1256,7 +1256,7 @@ mod tests { let table_scan = test_table_scan()?; let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoOpUserDefined::new( - table_scan.schema().clone(), + Arc::clone(table_scan.schema()), Arc::new(table_scan.clone()), )), }); @@ -1281,7 +1281,7 @@ mod tests { let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new( NoOpUserDefined::new( - table_scan.schema().clone(), + Arc::clone(table_scan.schema()), Arc::new(table_scan.clone()), ) .with_exprs(exprs), @@ -1316,7 +1316,7 @@ mod tests { let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new( NoOpUserDefined::new( - table_scan.schema().clone(), + Arc::clone(table_scan.schema()), Arc::new(table_scan.clone()), ) .with_exprs(exprs), diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 14e5ac141eeb6..93923a4e1e74a 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -205,7 +205,7 @@ impl OptimizerConfig for OptimizerContext { } fn alias_generator(&self) -> Arc { - self.alias_generator.clone() + Arc::clone(&self.alias_generator) } fn options(&self) -> &ConfigOptions { @@ -381,7 +381,7 @@ impl Optimizer { .skip_failed_rules .then(|| new_plan.clone()); - let starting_schema = new_plan.schema().clone(); + let starting_schema = Arc::clone(new_plan.schema()); let result = match rule.apply_order() { // optimizer handles recursion @@ -579,7 +579,7 @@ mod tests { let config = OptimizerContext::new().with_skip_failing_rules(false); let input = Arc::new(test_table_scan()?); - let input_schema = input.schema().clone(); + let input_schema = Arc::clone(input.schema()); let plan = LogicalPlan::Projection(Projection::try_new_with_schema( vec![col("a"), col("b"), col("c")], @@ -760,7 +760,7 @@ mod tests { } Ok(Transformed::yes(LogicalPlan::Projection( - Projection::try_new(exprs, projection.input.clone())?, + Projection::try_new(exprs, Arc::clone(&projection.input))?, ))) } } diff --git a/datafusion/optimizer/src/plan_signature.rs b/datafusion/optimizer/src/plan_signature.rs index d22795797478c..73e6b418272a9 100644 --- a/datafusion/optimizer/src/plan_signature.rs +++ b/datafusion/optimizer/src/plan_signature.rs @@ -100,7 +100,7 @@ mod tests { let one_node_plan = Arc::new(LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation { produce_one_row: false, - schema: schema.clone(), + schema: Arc::clone(&schema), })); assert_eq!(1, get_node_number(&one_node_plan).get()); @@ -112,7 +112,7 @@ mod tests { assert_eq!(2, get_node_number(&two_node_plan).get()); let five_node_plan = Arc::new(LogicalPlan::Union(datafusion_expr::Union { - inputs: vec![two_node_plan.clone(), two_node_plan], + inputs: vec![Arc::clone(&two_node_plan), two_node_plan], schema, })); diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 88bd1b17883b1..91044207c4e11 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -79,7 +79,7 @@ impl OptimizerRule for PropagateEmptyRelation { return Ok(Transformed::yes(LogicalPlan::EmptyRelation( EmptyRelation { produce_one_row: false, - schema: plan.schema().clone(), + schema: Arc::clone(plan.schema()), }, ))); } @@ -99,43 +99,43 @@ impl OptimizerRule for PropagateEmptyRelation { JoinType::Full if left_empty && right_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), }), )), JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), }), )), JoinType::Left if left_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), }), )), JoinType::Right if right_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), }), )), JoinType::LeftSemi if left_empty || right_empty => Ok( Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), })), ), JoinType::RightSemi if left_empty || right_empty => Ok( Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), })), ), JoinType::LeftAnti if left_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), }), )), JoinType::LeftAnti if right_empty => { @@ -147,7 +147,7 @@ impl OptimizerRule for PropagateEmptyRelation { JoinType::RightAnti if right_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: join.schema.clone(), + schema: Arc::clone(&join.schema), }), )), _ => Ok(Transformed::no(plan)), @@ -178,7 +178,7 @@ impl OptimizerRule for PropagateEmptyRelation { Ok(Transformed::yes(LogicalPlan::EmptyRelation( EmptyRelation { produce_one_row: false, - schema: plan.schema().clone(), + schema: Arc::clone(plan.schema()), }, ))) } else if new_inputs.len() == 1 { @@ -191,14 +191,14 @@ impl OptimizerRule for PropagateEmptyRelation { Ok(Transformed::yes(LogicalPlan::Projection( Projection::new_from_schema( Arc::new(child), - plan.schema().clone(), + Arc::clone(plan.schema()), ), ))) } } else { Ok(Transformed::yes(LogicalPlan::Union(Union { inputs: new_inputs, - schema: union.schema.clone(), + schema: Arc::clone(&union.schema), }))) } } @@ -232,7 +232,7 @@ fn empty_child(plan: &LogicalPlan) -> Result> { if !empty.produce_one_row { Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: plan.schema().clone(), + schema: Arc::clone(plan.schema()), }))) } else { Ok(None) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 0a3bae154bd64..20e2ac07dffd8 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -652,7 +652,7 @@ impl OptimizerRule for PushDownFilter { return push_down_join(join, None); }; - let plan_schema = plan.schema().clone(); + let plan_schema = Arc::clone(plan.schema()); let LogicalPlan::Filter(mut filter) = plan else { return Ok(Transformed::no(plan)); @@ -1498,7 +1498,7 @@ mod tests { let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoopPlan { input: vec![table_scan.clone()], - schema: table_scan.schema().clone(), + schema: Arc::clone(table_scan.schema()), }), }); let plan = LogicalPlanBuilder::from(custom_plan) @@ -1514,7 +1514,7 @@ mod tests { let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoopPlan { input: vec![table_scan.clone()], - schema: table_scan.schema().clone(), + schema: Arc::clone(table_scan.schema()), }), }); let plan = LogicalPlanBuilder::from(custom_plan) @@ -1531,7 +1531,7 @@ mod tests { let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoopPlan { input: vec![table_scan.clone(), table_scan.clone()], - schema: table_scan.schema().clone(), + schema: Arc::clone(table_scan.schema()), }), }); let plan = LogicalPlanBuilder::from(custom_plan) @@ -1548,7 +1548,7 @@ mod tests { let custom_plan = LogicalPlan::Extension(Extension { node: Arc::new(NoopPlan { input: vec![table_scan.clone(), table_scan.clone()], - schema: table_scan.schema().clone(), + schema: Arc::clone(table_scan.schema()), }), }); let plan = LogicalPlanBuilder::from(custom_plan) diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 0333cc8dde368..35691847fb8e9 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -413,7 +413,7 @@ mod tests { let plan = LogicalPlanBuilder::from(scan_tpch_table("customer")) .filter( lit(1) - .lt(scalar_subquery(orders.clone())) + .lt(scalar_subquery(Arc::clone(&orders))) .and(lit(1).lt(scalar_subquery(orders))), )? .project(vec![col("customer.c_custkey")])? diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 36dd85ac96e1f..17855e17bef8b 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -1807,8 +1807,9 @@ mod tests { fn basic_coercion() { let schema = test_schema(); let props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(schema.clone())); + let simplifier = ExprSimplifier::new( + SimplifyContext::new(&props).with_schema(Arc::clone(&schema)), + ); // Note expr type is int32 (not int64) // (1i64 + 2i32) < i @@ -3340,15 +3341,15 @@ mod tests { assert_eq!( simplify(in_list( col("c1"), - vec![scalar_subquery(subquery.clone())], + vec![scalar_subquery(Arc::clone(&subquery))], false )), - in_subquery(col("c1"), subquery.clone()) + in_subquery(col("c1"), Arc::clone(&subquery)) ); assert_eq!( simplify(in_list( col("c1"), - vec![scalar_subquery(subquery.clone())], + vec![scalar_subquery(Arc::clone(&subquery))], true )), not_in_subquery(col("c1"), subquery) diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 7c66d659cbaf5..f2b4abdd6cbd5 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -279,7 +279,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { let alias_str = format!("alias{}", index); inner_aggr_exprs.push( Expr::AggregateFunction(AggregateFunction::new_udf( - udf.clone(), + Arc::clone(&udf), args, false, None, diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 2c7e8644026ed..4dccb42941dd0 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -176,7 +176,7 @@ pub fn assert_optimized_plan_eq( // Apply the rule once let opt_context = OptimizerContext::new().with_max_passes(1); - let optimizer = Optimizer::with_rules(vec![rule.clone()]); + let optimizer = Optimizer::with_rules(vec![Arc::clone(&rule)]); let optimized_plan = optimizer.optimize(plan, &opt_context, observe)?; let formatted_plan = format!("{optimized_plan:?}"); assert_eq!(formatted_plan, expected); diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index de471d59c4660..3447082525597 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -832,7 +832,7 @@ mod tests { fn optimize_test(expr: Expr, schema: &DFSchemaRef) -> Expr { let mut expr_rewriter = UnwrapCastExprRewriter { - schema: schema.clone(), + schema: Arc::clone(schema), }; expr.rewrite(&mut expr_rewriter).data().unwrap() } From 9a8f8b7188fba8dcba52028443b90556aeff7f5f Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Wed, 10 Jul 2024 08:14:28 -0500 Subject: [PATCH 09/10] fix: Fix eq properties regression from #10434 (#11363) * discover new orderings when constants are added * more comments * reduce nesting + describe argument * lint? --- .../src/equivalence/properties.rs | 183 +++++++++++------- 1 file changed, 113 insertions(+), 70 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index d9d19c0bcf47e..8c327fbaf4098 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -223,56 +223,11 @@ impl EquivalenceProperties { } } - // Discover new valid orderings in light of the new equality. For a discussion, see: - // https://github.com/apache/datafusion/issues/9812 - let mut new_orderings = vec![]; - for ordering in self.normalized_oeq_class().iter() { - let expressions = if left.eq(&ordering[0].expr) { - // Left expression is leading ordering - Some((ordering[0].options, right)) - } else if right.eq(&ordering[0].expr) { - // Right expression is leading ordering - Some((ordering[0].options, left)) - } else { - None - }; - if let Some((leading_ordering, other_expr)) = expressions { - // Currently, we only handle expressions with a single child. - // TODO: It should be possible to handle expressions orderings like - // f(a, b, c), a, b, c if f is monotonic in all arguments. - // First expression after leading ordering - if let Some(next_expr) = ordering.get(1) { - let children = other_expr.children(); - if children.len() == 1 - && children[0].eq(&next_expr.expr) - && SortProperties::Ordered(leading_ordering) - == other_expr - .get_properties(&[ExprProperties { - sort_properties: SortProperties::Ordered( - leading_ordering, - ), - range: Interval::make_unbounded( - &other_expr.data_type(&self.schema)?, - )?, - }])? - .sort_properties - { - // Assume existing ordering is [a ASC, b ASC] - // When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid, - // then we can deduce that ordering `[b ASC]` is also valid. - // Hence, ordering `[b ASC]` can be added to the state as valid ordering. - // (e.g. existing ordering where leading ordering is removed) - new_orderings.push(ordering[1..].to_vec()); - } - } - } - } - if !new_orderings.is_empty() { - self.oeq_class.add_new_orderings(new_orderings); - } - // Add equal expressions to the state self.eq_group.add_equal_conditions(left, right); + + // Discover any new orderings + self.discover_new_orderings(left)?; Ok(()) } @@ -304,9 +259,78 @@ impl EquivalenceProperties { self.constants.push(const_expr); } } + + for ordering in self.normalized_oeq_class().iter() { + if let Err(e) = self.discover_new_orderings(&ordering[0].expr) { + log::debug!("error discovering new orderings: {e}"); + } + } + self } + // Discover new valid orderings in light of a new equality. + // Accepts a single argument (`expr`) which is used to determine + // which orderings should be updated. + // When constants or equivalence classes are changed, there may be new orderings + // that can be discovered with the new equivalence properties. + // For a discussion, see: https://github.com/apache/datafusion/issues/9812 + fn discover_new_orderings(&mut self, expr: &Arc) -> Result<()> { + let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr)); + let eq_class = self + .eq_group + .classes + .iter() + .find_map(|class| { + class + .contains(&normalized_expr) + .then(|| class.clone().into_vec()) + }) + .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]); + + let mut new_orderings: Vec = vec![]; + for (ordering, next_expr) in self + .normalized_oeq_class() + .iter() + .filter(|ordering| ordering[0].expr.eq(&normalized_expr)) + // First expression after leading ordering + .filter_map(|ordering| Some(ordering).zip(ordering.get(1))) + { + let leading_ordering = ordering[0].options; + // Currently, we only handle expressions with a single child. + // TODO: It should be possible to handle expressions orderings like + // f(a, b, c), a, b, c if f is monotonic in all arguments. + for equivalent_expr in &eq_class { + let children = equivalent_expr.children(); + if children.len() == 1 + && children[0].eq(&next_expr.expr) + && SortProperties::Ordered(leading_ordering) + == equivalent_expr + .get_properties(&[ExprProperties { + sort_properties: SortProperties::Ordered( + leading_ordering, + ), + range: Interval::make_unbounded( + &equivalent_expr.data_type(&self.schema)?, + )?, + }])? + .sort_properties + { + // Assume existing ordering is [a ASC, b ASC] + // When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid, + // then we can deduce that ordering `[b ASC]` is also valid. + // Hence, ordering `[b ASC]` can be added to the state as valid ordering. + // (e.g. existing ordering where leading ordering is removed) + new_orderings.push(ordering[1..].to_vec()); + break; + } + } + } + + self.oeq_class.add_new_orderings(new_orderings); + Ok(()) + } + /// Updates the ordering equivalence group within assuming that the table /// is re-sorted according to the argument `sort_exprs`. Note that constants /// and equivalence classes are unchanged as they are unaffected by a re-sort. @@ -2454,30 +2478,49 @@ mod tests { ]; for case in cases { - let mut properties = base_properties - .clone() - .add_constants(case.constants.into_iter().map(ConstExpr::from)); - for [left, right] in &case.equal_conditions { - properties.add_equal_conditions(left, right)? - } - - let sort = case - .sort_columns - .iter() - .map(|&name| { - col(name, &schema).map(|col| PhysicalSortExpr { - expr: col, - options: SortOptions::default(), + // Construct the equivalence properties in different orders + // to exercise different code paths + // (The resulting properties _should_ be the same) + for properties in [ + // Equal conditions before constants + { + let mut properties = base_properties.clone(); + for [left, right] in &case.equal_conditions { + properties.add_equal_conditions(left, right)? + } + properties.add_constants( + case.constants.iter().cloned().map(ConstExpr::from), + ) + }, + // Constants before equal conditions + { + let mut properties = base_properties.clone().add_constants( + case.constants.iter().cloned().map(ConstExpr::from), + ); + for [left, right] in &case.equal_conditions { + properties.add_equal_conditions(left, right)? + } + properties + }, + ] { + let sort = case + .sort_columns + .iter() + .map(|&name| { + col(name, &schema).map(|col| PhysicalSortExpr { + expr: col, + options: SortOptions::default(), + }) }) - }) - .collect::>>()?; + .collect::>>()?; - assert_eq!( - properties.ordering_satisfy(&sort), - case.should_satisfy_ordering, - "failed test '{}'", - case.name - ); + assert_eq!( + properties.ordering_satisfy(&sort), + case.should_satisfy_ordering, + "failed test '{}'", + case.name + ); + } } Ok(()) From d99002cf0b39843afb9c224bbb880d2266acefc6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 10 Jul 2024 09:24:55 -0400 Subject: [PATCH 10/10] Update termtree requirement from 0.4.1 to 0.5.0 (#11383) Updates the requirements on [termtree](https://github.com/rust-cli/termtree) to permit the latest version. - [Changelog](https://github.com/rust-cli/termtree/blob/main/CHANGELOG.md) - [Commits](https://github.com/rust-cli/termtree/compare/v0.4.1...v0.5.0) --- updated-dependencies: - dependency-name: termtree dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- datafusion/physical-plan/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 4292f95fe4061..f5f756417ebf8 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -66,7 +66,7 @@ tokio = { workspace = true } [dev-dependencies] rstest = { workspace = true } rstest_reuse = "0.7.0" -termtree = "0.4.1" +termtree = "0.5.0" tokio = { workspace = true, features = [ "rt-multi-thread", "fs",