Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CREATE TABLE DDL does not save correct schema, resulting in mismatched plan vs execution (record batch) schema #7636

Open
matthewgapp opened this issue Sep 24, 2023 · 8 comments
Labels
bug Something isn't working

Comments

@matthewgapp
Copy link
Contributor

matthewgapp commented Sep 24, 2023

Describe the bug

ROW_NUMBER() function places a non-nullable constraint on its generated field and thus the resulting schema should label that column as nulllable: false. But instead, the logical plan resulting from a table created using CREATE TABLE ... shows a schema with that field as nullable: true. This results in a runtime panic with queries that involve joins (although, I'm not quite sure why it doesn't complain on queries that aren't joins).

Error message produced with minimal repo below

Thread 'main' panicked at 'query failed to execute: External(ArrowError(InvalidArgumentError("batches[0] schema is different with argument schema.\n            batches[0] schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n            argument schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n            ")))', src/main.rs:42:10

To Reproduce

Minimal repro

Run this script which will result in the error

Thread 'main' panicked at 'query failed to execute: External(ArrowError(InvalidArgumentError("batches[0] schema is different with argument schema.\n            batches[0] schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n            argument schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"row_num\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n            ")))', src/main.rs:42:10
use std::sync::Arc;

use datafusion::{
    arrow::{
        array::{Int32Array, StringArray},
        datatypes::{DataType, Field, Schema},
        record_batch::RecordBatch,
        util::pretty::print_batches,
    },
    datasource::MemTable,
    prelude::{SessionConfig, SessionContext},
};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let config = SessionConfig::new()
        .with_create_default_catalog_and_schema(true)
        .with_information_schema(true);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("source_table", Arc::new(create_mem_table()))
        .unwrap();

    let create_table_query = r#"create table customers as SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS row_num FROM source_table"#;

    let _ = ctx
        .sql(create_table_query)
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    let batches = ctx
        // performing a (cross) join query because joins seem to complain about execution schema vs plan schema mismatch
        .sql("select a.*, b.* from customers a, customers b")
        .await
        .unwrap()
        .collect()
        .await
        .expect("query failed to execute");

    print_batches(&batches).unwrap();
}

// just some random able that we'll append a row_num column to
fn create_mem_table() -> MemTable {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let ids = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
    let names = Arc::new(StringArray::from(vec![
        "Alice", "Bob", "Charlie", "David", "Eve",
    ]));

    let batch = RecordBatch::try_new(schema.clone(), vec![ids as _, names as _]).unwrap();

    MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}

Expected behavior

The schema that is saved when using create table should be correct (i.e., it should capture nullable: false requirements on fields). The logical plan shouldn't conflict with the observed record batches during execution. No panic should occur.

Additional context

A bit more context:

This is where nullable false is set. It's not being picked up in the create table statement.
https://github.com/apache/arrow-datafusion/blob/78d9613e81557ca5e5db8b75e5c7dec47ccee0a1/datafusion/physical-expr/src/window/row_number.rs#L54

  fn field(&self) -> Result<Field> {
        let nullable = false;
        let data_type = DataType::UInt64;
        Ok(Field::new(self.name(), data_type, nullable))
    }

I haven't investigated how this field property on the WindowExpr is actually used (or omitted) when constructing the logical plan.

@alamb
Copy link
Contributor

alamb commented Sep 25, 2023

I wonder if the nullable information isn't being updated correctly for join output 🤔

The code that handles setting nullability is here:

https://github.com/apache/arrow-datafusion/blob/b1d134e9ff37724459cb5090a6346a85152a1db7/datafusion/expr/src/logical_plan/builder.rs#L1068

Maybe somehow the plans in question aren't using that function or there is a bug in that function

@matthewgapp
Copy link
Contributor Author

I wonder if the nullable information isn't being updated correctly for join output 🤔

The code that handles setting nullability is here:

https://github.com/apache/arrow-datafusion/blob/b1d134e9ff37724459cb5090a6346a85152a1db7/datafusion/expr/src/logical_plan/builder.rs#L1068

Maybe somehow the plans in question aren't using that function or there is a bug in that function

I don't think this is the case because the schema is incorrect at create table time (before joins). Joins just seem to be the place where DF complains.

If you inspect the schema of the created table, the row_num column (created by the ROW_NUMBER() window function) is nullable: true when it should be nullable false, per the field implementation on the row number execution plan.

@alamb
Copy link
Contributor

alamb commented Sep 27, 2023

I think row_number() is marked as non-nullable because the implementation always produces a value (aka no nulls) and thus changing it to be nullable doesn't seem correct

In the example above, running

select * from customers

The output is this (there are no nulls):

+----+---------+---------+
| id | name    | row_num |
+----+---------+---------+
| 1  | Alice   | 1       |
| 2  | Bob     | 2       |
| 3  | Charlie | 3       |
| 4  | David   | 4       |
| 5  | Eve     | 5       |
+----+---------+---------+

I caught the error in the debugger and the error seems to be thrown in concat_batches called from here:

https://github.com/apache/arrow-datafusion/blob/32dfbb0bb5c51b55ad84eed4fc140840f0a76612/datafusion/physical-plan/src/joins/cross_join.rs#L138

it turns out that this exact check was removed in arrow 47.0.0 apache/arrow-rs#4815 (merged into DataFusion 2 days ago in #7587) and so when I run this query against the latest version of DataFusion (not yet released to crates.io) I get:

+----+---------+---------+----+---------+---------+
| id | name    | row_num | id | name    | row_num |
+----+---------+---------+----+---------+---------+
| 1  | Alice   | 1       | 1  | Alice   | 1       |
| 1  | Alice   | 1       | 2  | Bob     | 2       |
| 1  | Alice   | 1       | 3  | Charlie | 3       |
| 1  | Alice   | 1       | 4  | David   | 4       |
| 2  | Bob     | 2       | 1  | Alice   | 1       |
| 2  | Bob     | 2       | 2  | Bob     | 2       |
| 2  | Bob     | 2       | 3  | Charlie | 3       |
| 2  | Bob     | 2       | 4  | David   | 4       |
| 3  | Charlie | 3       | 1  | Alice   | 1       |
| 3  | Charlie | 3       | 2  | Bob     | 2       |
| 3  | Charlie | 3       | 3  | Charlie | 3       |
| 3  | Charlie | 3       | 4  | David   | 4       |
| 4  | David   | 4       | 1  | Alice   | 1       |
| 4  | David   | 4       | 2  | Bob     | 2       |
| 4  | David   | 4       | 3  | Charlie | 3       |
| 4  | David   | 4       | 4  | David   | 4       |
| 5  | Eve     | 5       | 1  | Alice   | 1       |
| 5  | Eve     | 5       | 2  | Bob     | 2       |
| 5  | Eve     | 5       | 3  | Charlie | 3       |
| 5  | Eve     | 5       | 4  | David   | 4       |
| 1  | Alice   | 1       | 5  | Eve     | 5       |
| 2  | Bob     | 2       | 5  | Eve     | 5       |
| 3  | Charlie | 3       | 5  | Eve     | 5       |
| 4  | David   | 4       | 5  | Eve     | 5       |
| 5  | Eve     | 5       | 5  | Eve     | 5       |
+----+---------+---------+----+---------+---------+

@matthewgapp
Copy link
Contributor Author

@alamb thanks for the continued review and for the heads up on the removal of that invariant from arrow. We should still ensure that the nullability information is correct at all times, even if arrow doesn't complain.

I completely agree that changing the column generated by ROW_NUMBER() should not be marked as nullable: true. To that end, my PR (#7638) specifically introduces nullable: false when it should be false for window functions in the logical plan. The code in main assumes that the column generated by all window functions is nullable: true which is wrong. In other words, my PR brings the logical plan in line with the actual schema produced by the execution plan.

Let me know if I'm missing something.

@xhwhis
Copy link
Contributor

xhwhis commented Sep 28, 2023

@matthewgapp I also encountered the same mismatch issue as described above. However, after applying your PR(#7638), the problem persists for me. To illustrate, I've created a minimal reproducible example that builds upon yours, adding just a single 'Insert' operation and utilizing 'PARTITION'. Below are the code and the corresponding error message. I'm uncertain whether the issue lies with the 'Insert' operation or with the 'PARTITION' clause.

Minimal repro

called `Result::unwrap()` on an `Err` value: ArrowError(InvalidArgumentError("batches[0] schema is different with argument schema.\n            batches[0] schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} },\n            argument schema: Schema { fields: [Field { name: \"id\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }\n            "))
use std::sync::Arc;

use datafusion::{
    arrow::{
        datatypes::{DataType, Field, Schema},
        util::pretty::print_batches,
    },
    datasource::MemTable,
    prelude::{SessionConfig, SessionContext},
};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let config = SessionConfig::new()
        .with_create_default_catalog_and_schema(true)
        .with_information_schema(true);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("source_table", Arc::new(create_mem_table()))
        .unwrap();

    let insert_table_query = r#"INSERT INTO source_table VALUES (1, 'Alice'),(2, 'Bob'),(3, 'Charlie'),(4, 'David'), (5, 'Eve')"#;
    let _ = ctx
        .sql(insert_table_query)
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    let create_table_query =
        r#"SELECT *, RANK() OVER (PARTITION BY id) AS row_num FROM source_table"#;

    let batches = ctx
        .sql(create_table_query)
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();

    print_batches(&batches).unwrap();
}

fn create_mem_table() -> MemTable {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    MemTable::try_new(schema, vec![vec![]]).unwrap()
}

@matthewgapp
Copy link
Contributor Author

matthewgapp commented Sep 28, 2023

Thanks @xhwhis, this seems like a separate bug (one whose root cause is because the values exec sets the schema for all of its columns as nullable here). This causes the record batch to inherit this nullable schema. The record batch is then inserted into the memory table without complaining (we do check and complain for the inverse - if nullable columns inserted into a non-nullable table here).

The fix is might be passing down the expected table schema to the values logical constructor if there is one. I would open a separate ticket.

EDIT: I opened up a separate issue and also put up a draft PR that should fix the issue: #7693

@matthewgapp
Copy link
Contributor Author

matthewgapp commented Sep 29, 2023

@alamb, more of a meta thought, but with apache/arrow-rs#4815, I'm concerned that all of these "bugs" may go unnoticed over time (unless they're caught in the DF application logic like here), potentially creating incorrect behavior.

I think it could be helpful to have something like your strict mode (potentially a compiler flag). But I'm still ramping on this ecosystem so not sure who should determine the correctness of a schema and/or when that correctness should be asserted.

However, it does feel like anytime that record batches are stuck together and assumed to be a congruent, continuous thing (like a table or stream), that nullability between these batches should be consistent (or at least a superset of the nullability of containing table or stream). For example, for the purposes of DF, it seems appropriate that non-nullable batches would always be acceptable in a table that is nullable. The inverse is not true.

@alamb
Copy link
Contributor

alamb commented Sep 29, 2023

However, it does feel like anytime that record batches are stuck together and assumed to be a congruent, continuous thing (like a table or stream), that nullability between these batches should be consistent (or at least a superset of the nullability of containing table or stream). For example, for the purposes of DF, it seems appropriate that non-nullable batches would always be acceptable in a table that is nullable. The inverse is not true.

Yes, I agree with this as the intent. I also agree with your assesment that this invariant is not always upheld. I think the reason that the RecordBatch level nullability has limited value (the biggest value is in knowing null/non-null for each individual Array, and indeed things like the low level kernels do look at the validity (Null) mask) and thus no one has ensured consistent enforcement across the codebase.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants