Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 4, 2022
2 parents a4b2078 + c65feeb commit 29a3e2f
Show file tree
Hide file tree
Showing 27 changed files with 486 additions and 257 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
# specific language governing permissions and limitations
# under the License.

datafusion:
- datafusion/**/*
- datafusion-cli/**/*
- datafusion-examples/**/*

python:
- python/**/*

development-process:
- dev/**.*
- .github/**.*
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ jobs:
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cargo test --features avro
# test datafusion-sql examples
cargo run --example sql
# test datafusion examples
cd datafusion-examples
cargo run --example csv_sql
Expand Down
2 changes: 0 additions & 2 deletions datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ COPY ./datafusion /usr/src/datafusion

COPY ./datafusion-cli /usr/src/datafusion-cli

COPY ./ballista /usr/src/ballista

WORKDIR /usr/src/datafusion-cli

RUN rustup component add rustfmt
Expand Down
58 changes: 29 additions & 29 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,15 @@ mod tests {
.await?;

let expected = vec![
"+---------------+--------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2, #abc.column3 |",
"| | TableScan: abc projection=Some([0, 1, 2]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+--------------------------------------------------------+",
"+---------------+-----------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+-----------------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2, #abc.column3 |",
"| | TableScan: abc projection=Some([column1, column2, column3]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+-----------------------------------------------------------------+",
];

assert_batches_eq!(expected, &results);
Expand All @@ -282,16 +282,16 @@ mod tests {
.await?;

let expected = vec![
"+---------------+--------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2, #abc.column3 |",
"| | Filter: #abc.column2 = Int64(5) |",
"| | TableScan: abc projection=Some([0, 1, 2]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+--------------------------------------------------------+",
"+---------------+-------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+-------------------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2, #abc.column3 |",
"| | Filter: #abc.column2 = Int64(5) |",
"| | TableScan: abc projection=Some([column1, column2, column3]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+-------------------------------------------------------------------+",
];

assert_batches_eq!(expected, &results);
Expand All @@ -303,16 +303,16 @@ mod tests {
.await?;

let expected = vec![
"+---------------+----------------------------------------------+",
"| plan_type | plan |",
"+---------------+----------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2 |",
"| | Filter: #abc.column2 = Int64(5) |",
"| | TableScan: abc projection=Some([0, 1]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+----------------------------------------------+",
"+---------------+----------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+----------------------------------------------------------+",
"| logical_plan | CreateView: \"xyz\" |",
"| | Projection: #abc.column1, #abc.column2 |",
"| | Filter: #abc.column2 = Int64(5) |",
"| | TableScan: abc projection=Some([column1, column2]) |",
"| physical_plan | EmptyExec: produce_one_row=false |",
"| | |",
"+---------------+----------------------------------------------------------+",
];

assert_batches_eq!(expected, &results);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod tests {

let expected = "Projection: #employee_csv.id\
\n Filter: #employee_csv.state = Utf8(\"CO\")\
\n TableScan: employee_csv projection=Some([0, 3])";
\n TableScan: employee_csv projection=Some([id, state])";

assert_eq!(expected, format!("{}", plan.display_indent()));
}
Expand All @@ -143,7 +143,7 @@ mod tests {

let expected = "Projection: #employee_csv.id [id:Int32]\
\n Filter: #employee_csv.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
\n TableScan: employee_csv projection=Some([0, 3]) [id:Int32, state:Utf8]";
\n TableScan: employee_csv projection=Some([id, state]) [id:Int32, state:Utf8]";

assert_eq!(expected, format!("{}", plan.display_indent_schema()));
}
Expand All @@ -165,12 +165,12 @@ mod tests {
);
assert!(
graphviz.contains(
r#"[shape=box label="TableScan: employee_csv projection=Some([0, 3])"]"#
r#"[shape=box label="TableScan: employee_csv projection=Some([id, state])"]"#
),
"\n{}",
plan.display_graphviz()
);
assert!(graphviz.contains(r#"[shape=box label="TableScan: employee_csv projection=Some([0, 3])\nSchema: [id:Int32, state:Utf8]"]"#),
assert!(graphviz.contains(r#"[shape=box label="TableScan: employee_csv projection=Some([id, state])\nSchema: [id:Int32, state:Utf8]"]"#),
"\n{}", plan.display_graphviz());
assert!(
graphviz.contains(r#"// End DataFusion GraphViz Plan"#),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn custom_source_dataframe() -> Result<()> {

let expected = format!(
"Projection: #{}.c2\
\n TableScan: {} projection=Some([1])",
\n TableScan: {} projection=Some([c2])",
UNNAMED_TABLE, UNNAMED_TABLE
);
assert_eq!(format!("{:?}", optimized_plan), expected);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn avro_explain() {
"logical_plan",
"Projection: #COUNT(UInt8(1))\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: alltypes_plain projection=Some([0])",
\n TableScan: alltypes_plain projection=Some([id])",
],
vec![
"physical_plan",
Expand Down
26 changes: 13 additions & 13 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async fn csv_explain_plans() {
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: #aggregate_test_100.c1 [c1:Utf8]",
" Filter: #aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int32]",
" TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)] [c1:Utf8, c2:Int32]",
" TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)] [c1:Utf8, c2:Int32]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand All @@ -284,7 +284,7 @@ async fn csv_explain_plans() {
"Explain",
" Projection: #aggregate_test_100.c1",
" Filter: #aggregate_test_100.c2 > Int64(10)",
" TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]",
" TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]",
];
let formatted = plan.display_indent().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand All @@ -306,7 +306,7 @@ async fn csv_explain_plans() {
" 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]",
" 4[shape=box label=\"Filter: #aggregate_test_100.c2 > Int64(10)\"]",
" 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]",
" 5[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\"]",
" 5[shape=box label=\"TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\"]",
" 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
" subgraph cluster_6",
Expand All @@ -317,7 +317,7 @@ async fn csv_explain_plans() {
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: #aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8, c2:Int32]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\\nSchema: [c1:Utf8, c2:Int32]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\\nSchema: [c1:Utf8, c2:Int32]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand Down Expand Up @@ -467,7 +467,7 @@ async fn csv_explain_verbose_plans() {
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: #aggregate_test_100.c1 [c1:Utf8]",
" Filter: #aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int32]",
" TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)] [c1:Utf8, c2:Int32]",
" TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)] [c1:Utf8, c2:Int32]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand All @@ -482,7 +482,7 @@ async fn csv_explain_verbose_plans() {
"Explain",
" Projection: #aggregate_test_100.c1",
" Filter: #aggregate_test_100.c2 > Int64(10)",
" TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]",
" TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]",
];
let formatted = plan.display_indent().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand All @@ -504,7 +504,7 @@ async fn csv_explain_verbose_plans() {
" 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]",
" 4[shape=box label=\"Filter: #aggregate_test_100.c2 > Int64(10)\"]",
" 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]",
" 5[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\"]",
" 5[shape=box label=\"TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\"]",
" 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
" subgraph cluster_6",
Expand All @@ -515,7 +515,7 @@ async fn csv_explain_verbose_plans() {
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: #aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8, c2:Int32]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\\nSchema: [c1:Utf8, c2:Int32]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]\\nSchema: [c1:Utf8, c2:Int32]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand Down Expand Up @@ -628,12 +628,12 @@ order by
\n Inner Join: #customer.c_nationkey = #nation.n_nationkey\
\n Inner Join: #orders.o_orderkey = #lineitem.l_orderkey\
\n Inner Join: #customer.c_custkey = #orders.o_custkey\
\n TableScan: customer projection=Some([0, 1, 2, 3, 4, 5, 7])\
\n TableScan: customer projection=Some([c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment])\
\n Filter: #orders.o_orderdate >= Date32(\"8674\") AND #orders.o_orderdate < Date32(\"8766\")\
\n TableScan: orders projection=Some([0, 1, 4]), partial_filters=[#orders.o_orderdate >= Date32(\"8674\"), #orders.o_orderdate < Date32(\"8766\")]\
\n TableScan: orders projection=Some([o_orderkey, o_custkey, o_orderdate]), partial_filters=[#orders.o_orderdate >= Date32(\"8674\"), #orders.o_orderdate < Date32(\"8766\")]\
\n Filter: #lineitem.l_returnflag = Utf8(\"R\")\
\n TableScan: lineitem projection=Some([0, 5, 6, 8]), partial_filters=[#lineitem.l_returnflag = Utf8(\"R\")]\
\n TableScan: nation projection=Some([0, 1])";
\n TableScan: lineitem projection=Some([l_orderkey, l_extendedprice, l_discount, l_returnflag]), partial_filters=[#lineitem.l_returnflag = Utf8(\"R\")]\
\n TableScan: nation projection=Some([n_nationkey, n_name])";
assert_eq!(format!("{:?}", plan.unwrap()), expected);

Ok(())
Expand Down Expand Up @@ -753,7 +753,7 @@ async fn csv_explain() {
"logical_plan",
"Projection: #aggregate_test_100.c1\
\n Filter: #aggregate_test_100.c2 > Int64(10)\
\n TableScan: aggregate_test_100 projection=Some([0, 1]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]"
\n TableScan: aggregate_test_100 projection=Some([c1, c2]), partial_filters=[#aggregate_test_100.c2 > Int64(10)]"
],
vec!["physical_plan",
"ProjectionExec: expr=[c1@0 as c1]\
Expand Down
20 changes: 20 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,26 @@ async fn test_crypto_expressions() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_array_index() -> Result<()> {
// By default PostgreSQL uses a one-based numbering convention for arrays, that is, an array of n elements starts with array[1] and ends with array[n]
test_expression!("([5,4,3,2,1])[1]", "5");
test_expression!("([5,4,3,2,1])[2]", "4");
test_expression!("([5,4,3,2,1])[5]", "1");
test_expression!("([[1, 2], [2, 3], [3,4]])[1]", "[1, 2]");
test_expression!("([[1, 2], [2, 3], [3,4]])[3]", "[3, 4]");
test_expression!("([[1, 2], [2, 3], [3,4]])[1][1]", "1");
test_expression!("([[1, 2], [2, 3], [3,4]])[2][2]", "3");
test_expression!("([[1, 2], [2, 3], [3,4]])[3][2]", "4");
// out of bounds
test_expression!("([5,4,3,2,1])[0]", "NULL");
test_expression!("([5,4,3,2,1])[6]", "NULL");
// test_expression!("([5,4,3,2,1])[-1]", "NULL");
test_expression!("([5,4,3,2,1])[100]", "NULL");

Ok(())
}

#[tokio::test]
async fn test_array_literals() -> Result<()> {
// Named, just another syntax
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn json_explain() {
"logical_plan",
"Projection: #COUNT(UInt8(1))\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: t1 projection=Some([0])",
\n TableScan: t1 projection=Some([a])",
],
vec![
"physical_plan",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async fn projection_on_table_scan() -> Result<()> {
}

let expected = "Projection: #test.c2\
\n TableScan: test projection=Some([1])";
\n TableScan: test projection=Some([c2])";
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn projection_on_memory_scan() -> Result<()> {

let expected = format!(
"Projection: #{}.b\
\n TableScan: {} projection=Some([1])",
\n TableScan: {} projection=Some([b])",
UNNAMED_TABLE, UNNAMED_TABLE
);
assert_eq!(format!("{:?}", optimized_plan), expected);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ async fn query_get_indexed_field() -> Result<()> {
ctx.register_table("ints", table_a)?;

// Original column is micros, convert to millis and check timestamp
let sql = "SELECT some_list[0] as i0 FROM ints LIMIT 3";
let sql = "SELECT some_list[1] as i0 FROM ints LIMIT 3";
let actual = execute_to_batches(&ctx, sql).await;
#[rustfmt::skip]
let expected = vec![
Expand Down Expand Up @@ -582,7 +582,7 @@ async fn query_nested_get_indexed_field() -> Result<()> {
ctx.register_table("ints", table_a)?;

// Original column is micros, convert to millis and check timestamp
let sql = "SELECT some_list[0] as i0 FROM ints LIMIT 3";
let sql = "SELECT some_list[1] as i0 FROM ints LIMIT 3";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------+",
Expand All @@ -594,7 +594,7 @@ async fn query_nested_get_indexed_field() -> Result<()> {
"+----------+",
];
assert_batches_eq!(expected, &actual);
let sql = "SELECT some_list[0][0] as i0 FROM ints LIMIT 3";
let sql = "SELECT some_list[1][1] as i0 FROM ints LIMIT 3";
let actual = execute_to_batches(&ctx, sql).await;
#[rustfmt::skip]
let expected = vec![
Expand Down Expand Up @@ -666,7 +666,7 @@ async fn query_nested_get_indexed_field_on_struct() -> Result<()> {
];
assert_batches_eq!(expected, &actual);

let sql = "SELECT some_struct['bar'][0] as i0 FROM structs LIMIT 3";
let sql = "SELECT some_struct['bar'][1] as i0 FROM structs LIMIT 3";
let actual = execute_to_batches(&ctx, sql).await;
#[rustfmt::skip]
let expected = vec![
Expand Down
Loading

0 comments on commit 29a3e2f

Please sign in to comment.