Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-ordering Projections in scan are not working anymore (since DF15) #5146

Closed
bseifert-natzka opened this issue Feb 1, 2023 · 2 comments · Fixed by #5261
Closed

Re-ordering Projections in scan are not working anymore (since DF15) #5146

bseifert-natzka opened this issue Feb 1, 2023 · 2 comments · Fixed by #5261
Labels
bug Something isn't working

Comments

@bseifert-natzka
Copy link

Describe the bug
Projections for re-ordering the columns of schemas don't work anymore in LogicalPlanBuilder::scan (since DF15), because the schema changes unexpectedly during Optimizer rule 'push_down_projection'.

To Reproduce
Steps to reproduce the behavior:
Minimal example:

use std::io::Cursor;
use std::sync::Arc;

use futures::TryStreamExt;
use datafusion::{
    arrow::
        record_batch::RecordBatch,
    datasource::{DefaultTableSource, MemTable},
    logical_expr::LogicalPlanBuilder,
    physical_plan::{memory::MemoryStream, SendableRecordBatchStream},
    prelude::SessionContext,
};

use itertools::Itertools;

fn main() {
}

#[tokio::test]
async fn with_projection() -> Result<(), anyhow::Error> {
    let input = create_record_batches_from_csv(
        r###"
Category,Year,Metric
Electronics,2010,10
Electronics,2011,10
Electronics,2012,10
Pharmacy,2010,20
Pharmacy,2011,30
Pharmacy,2012,40
"###,
    )
    .await?;
    let ctx = SessionContext::new();
    let mem_table = Arc::new(MemTable::try_new(
        input.schema(),
        vec![input.try_collect().await?],
    )?);
    let plan = LogicalPlanBuilder::scan(
        "some_memtable",
        Arc::new(DefaultTableSource::new(mem_table)),
        Some(vec![1, 0, 2]),
    )?
    .build()?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;
    let _input = datafusion::physical_plan::execute_stream(physical_plan, ctx.task_ctx())?;
    
    Ok(())
}


pub async fn create_record_batches_from_csv(
    csv: &str,
) -> Result<SendableRecordBatchStream, anyhow::Error> {
    let csv_reader: Vec<RecordBatch> = datafusion::arrow::csv::ReaderBuilder::new()
        .has_header(true)
        .build(Cursor::new(csv))?
        .try_collect()?;
    let schema = csv_reader
        .get(0)
        .expect("There should be at least 1 record batch")
        .schema();

    Ok(Box::pin(MemoryStream::try_new(csv_reader, schema, None)?))
}

with Cargo.toml

[package]
name = "df-bug"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion = "17"
arrow = "31"      # IMPORTANT: keep in sync with the version that `datafusion`uses

anyhow = "1.0"
futures = "0.3"
itertools = "0.10"
tokio = "1"
tokio-retry = "0.3"
tokio-stream = "0.1"
tokio-util = "0.7"

yields the error

Error: Internal error: Optimizer rule 'push_down_projection' failed, due to generate a different schema, original schema: 
DFSchema { fields: 
[
DFField { qualifier: Some("some_memtable"), field: Field { name: "Year", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
DFField { qualifier: Some("some_memtable"), field: Field { name: "Category", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
DFField { qualifier: Some("some_memtable"), field: Field { name: "Metric", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }
], metadata: {} }, 

new schema: DFSchema { fields:
 [
DFField { qualifier: Some("some_memtable"), field: Field { name: "Category", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, 
DFField { qualifier: Some("some_memtable"), field: Field { name: "Year", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } },
 DFField { qualifier: Some("some_memtable"), field: Field { name: "Metric", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }
], metadata: {} }. 

This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

Expected behavior
In DF14 this code did not yield an error (and indeed should not). Instead the output schema was changed as expected.

@bseifert-natzka bseifert-natzka added the bug Something isn't working label Feb 1, 2023
@alamb
Copy link
Contributor

alamb commented Feb 2, 2023

I agree this is a bug. Thank you for the reproducer @bseifertNatzka -- I could reproduce it just with a projection directly on the scan (also this happens on master at the time of this writing)

I think you can work around it by using a LogicalPlanBuilder::project like this

    let plan = LogicalPlanBuilder::scan(
        "some_memtable",
        Arc::new(DefaultTableSource::new(mem_table)),
        //Some(vec![1, 0, 2]), <---- Don't project here, 
        None,
    )?
    .project(vec![col("Year"), col("Category"), col("Metric")])? // <-- PROJECT here instead
    .build()?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;
    let _input = datafusion::physical_plan::execute_stream(physical_plan, ctx.task_ctx())?;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<()> {
    let csv =
        r###"
Category,Year,Metric
Electronics,2010,10
Electronics,2011,10
Electronics,2012,10
Pharmacy,2010,20
Pharmacy,2011,30
Pharmacy,2012,40
"###;

    let ctx = SessionContext::new();

    let batches: Vec<RecordBatch> = datafusion::arrow::csv::ReaderBuilder::new()
        .has_header(true)
        .build(Cursor::new(csv))
        .unwrap()
        .collect::<Result<Vec<_>, ArrowError>>()
        .unwrap();

    let schema = batches
        .get(0)
        .expect("There should be at least 1 record batch")
        .schema();

    let mem_table = Arc::new(MemTable::try_new(
        schema,
        vec![batches],
    )?);

    let plan = LogicalPlanBuilder::scan(
        "some_memtable",
        Arc::new(DefaultTableSource::new(mem_table)),
        Some(vec![1, 0, 2]),
    )?
    .build()?;
    let physical_plan = ctx.state().create_physical_plan(&plan).await?;
    let _input = datafusion::physical_plan::execute_stream(physical_plan, ctx.task_ctx())?;

    Ok(())
}

@alamb
Copy link
Contributor

alamb commented Feb 7, 2023

@jackwener I wonder if you have any thoughts about this one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants