Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Hive-Style Partitioned Write Support #7801

Merged
merged 9 commits into from
Oct 20, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Oct 11, 2023

Which issue does this PR close?

Closes #7744
Closes #7860

Rationale for this change

We support reads from hive-style partitioned tables, so it makes sense also to support insert into for these tables.

What changes are included in this PR?

Implements a new run time demux strategy which creates a stream for each unique tuple of partition values.

Also refactors write.rs into additional submodules for improved readability.

Are these changes tested?

Yes, new sqllogic tests added to verify partitions are written as expected.

Are there any user-facing changes?

Inserts to partitioned tables works

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Oct 11, 2023
/// Splits an input stream based on the distinct values of a set of columns
/// Assumes standard hive style partition paths such as
/// /col1=val1/col2=val2/outputfile.parquet
async fn hive_style_partitions_demuxer(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key new code in this PR.

@github-actions github-actions bot removed sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules labels Oct 12, 2023
----
6

query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data written to the scratch directory looks correct to me, and the other file types are working. So, I suspect this is a bug related to support for reading partitioned JSONL tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue filed #7816

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @devinjdangelo . This is amazing 🙏

I am sorry it has taken so long to get a review

I also tried this out and something is not working quite right. However, since it is not a regression (it is a bug in a new feature, it wasn't working before) I think we can still merge this PR and fix it as a follow on

@metesynnada I wonder if you might have time to give this a look and offer any suggestions

❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition_id) location '/tmp/test'  options (create_local_path 'true');
Arrow error: Schema error: Unable to get field named "partition_id". Valid fields: ["partition", "trace_id"]
❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition) location '/tmp/test'  options (create_local_path 'true');
0 rows in set. Query took 0.001 seconds.

❯ insert into test select * from 'input.parquet';
Object Store error: Generic LocalFileSystem error: Unable to open file /private/tmp/test/partition=00000000000000003088e9e74cf166bd/QZSOEmePoAsQkvzU.parquet#1: Too many open files (os error 24)

It looks like it used the wrong column as the partition key:

alamb@MacBook-Pro-8:~/Software/arrow-datafusion$ ls /tmp/test | wc -l
   19992

alamb@MacBook-Pro-8:~/Software/arrow-datafusion$ ls /tmp/test | head
partition=0000000000000000000102576ce2faea/
partition=00000000000000000004d8eb49424f0d/
partition=000000000000000000051e89839e2bb0/
partition=00000000000000000005406b87ebcb41/
partition=000000000000000000065226d41eba99/
partition=000000000000000000066e556bea9c68/
partition=0000000000000000000688030531c6ff/
partition=000000000000000000068839bb143b45/
partition=00000000000000000006aaa220390696/
partition=00000000000000000006b0ebabd460a3/

Here is the input: input.parquet

Here is how I made it:

❯ copy (select 'a' as "partition", trace_id from traces UNION ALL select 'b' as "partition", trace_id from traces UNION ALL select 'c' as "partition", trace_id from traces) to 'input.parquet';
+----------+
| count    |
+----------+
| 15557151 |
+----------+
1 row in set. Query took 3.639 seconds.

❯ select * from 'input.parquet' limit 1;
+-----------+----------------------------------+
| partition | trace_id                         |
+-----------+----------------------------------+
| b         | 000000000000000028bf4438cad62275 |
+-----------+----------------------------------+
1 row in set. Query took 0.009 seconds.

❯ describe 'input.parquet';
+-------------+-------------------------+-------------+
| column_name | data_type               | is_nullable |
+-------------+-------------------------+-------------+
| partition   | Utf8                    | NO          |
| trace_id    | Dictionary(Int32, Utf8) | YES         |
+-------------+-------------------------+-------------+
2 rows in set. Query took 0.001 seconds.

type RecordBatchReceiver = Receiver<RecordBatch>;
type DemuxedStreamReceiver = UnboundedReceiver<(Path, RecordBatchReceiver)>;

/// Splits a single [SendableRecordBatchStream] into a dynamically determined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for this description @devinjdangelo -- it is very clear to read and extremely helpful for review (and long term understanding how this code works)

CREATE EXTERNAL TABLE
partitioned_insert_test_verify(c bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/a=2/b=1/'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is very cool

insert_mode 'append_new_files',
);

#note that partitioned cols are moved to the end so value tuples are (c, a, b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment, I recommend updating the test to use values like (1, 10, 100), (1, 20, 100), ... so it is more clear which values belong to which columns

parted_batch: &RecordBatch,
partition_by: &Vec<(String, DataType)>,
) -> Result<RecordBatch> {
let end_idx = parted_batch.num_columns() - partition_by.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably express this more concisely using [filter()](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter) to discard any partition columns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this function and agreed it is much cleaner now 👍

}
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Unpin + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we get your outstanding PRs merged, it might be a good time to refactor the code into (even more) modules

}

file_path.child(format!("{}.{}", write_id, file_extension))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think about writing some unit tests of the partition extraction and schema pruning code? Perhaps those tests could be used to cover the issue I found while testing this end to end?

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Oct 20, 2023

Thank you @alamb for the review! No worries, the amount of high quality reviews you complete is impressive 🙇.

I spent some time digging into the failure case you found, and it appears to be specific to datafusion-cli interpreting the schema of the parquet file incorrectly in the SELECT query. You can see in the describe query that the second column is actually a dictionary:

  ❯ describe 'input.parquet';
+-------------+-------------------------+-------------+
| column_name | data_type               | is_nullable |
+-------------+-------------------------+-------------+
| partition   | Utf8                    | NO          |
| trace_id    | Dictionary(Int32, Utf8) | YES         |
+-------------+-------------------------+-------------+
2 rows in set. Query took 0.001 seconds.

But when you do a select it is incorrectly specifying the parquet schema as both Utf8 columns. I added some debug print statements to see the following (note that trace_id is type Utf8):

  ❯ insert into test select partition, trace_id from 'input.parquet';
Table Schema: Field { name: "trace_id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
Input Schema: Field { name: "trace_id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
Object Store error: Generic LocalFileSystem error: Unable to open file /tmp/partition=00000000000000002d0d2b9bbb99d0e3/OdZUZxMhsRWtSdIi.parquet#1: Too many open files (os error 24)

My theory is that the schema being wrong is resulting in the arrow function RecordBatch::column_by_name returning the wrong values corresponding to a different column! Since the demux code uses the column name, it should never end up partitioning by the wrong column unless RecordBatch::column_by_name somehow returns the wrong values.

I tried to replicate the error in rust rather than via the cli, but I see different and correct behavior:

use arrow_schema::DataType;
use datafusion::{
    dataframe::DataFrameWriteOptions,
    prelude::*,
};
use datafusion_common::DataFusionError;
use datafusion_expr::ExprSchemable;
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use url::Url;

const FILENAME: &str =
    "/home/dev/arrow-datafusion/input.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();
    let local = Arc::new(LocalFileSystem::new());
    let local_url = Url::parse("file://local").unwrap();
    _ctx.runtime_env().register_object_store(&local_url, local);

    let _read_options = ParquetReadOptions::default();

    let _df = _ctx
        .read_parquet(FILENAME, _read_options.clone())
        .await?;

    _df.clone().show_limit(10).await?;

    println!("{}", _df.clone().schema());

    _ctx.sql("create external table 
    test(partition varchar, trace_id varchar)
    stored as parquet 
    partitioned by (partition) 
    location './temptest/'  
    options (create_local_path 'true');").await?.collect().await?;

    // Expecting an error here since schemas do not match
    _df.clone()
        .select(vec![col("trace_id"), col("partition")])?
        .write_table("test", DataFrameWriteOptions::new()).await
        .expect_err("Inserting query must have the same schema with the table.");

    // Cast the column to the correct type and it works as expected!
    _df.clone()
        .select(vec![col("trace_id").cast_to(&DataType::Utf8, _df.schema())?.alias("trace_id"), col("partition")])?
        .write_table("test", DataFrameWriteOptions::new()).await?;

    _ctx.sql("select count(1) from test").await?.show().await?;

    Ok(())
}

Output from the above:

+-----------+----------------------------------+
| partition | trace_id                         |
+-----------+----------------------------------+
| b         | 000000000000000028bf4438cad62275 |
| b         | 00000000000000005b030b4e8e16d8a5 |
| b         | 00000000000000006ef85090c32c67ae |
| b         | 000000000000000070939fa0f70e77af |
| c         | 0000000000000000004bfbc558a3a6c3 |
| c         | 000000000000000000543a7e3fb497e4 |
| c         | 0000000000000000005ee83ee2dc3ec6 |
| c         | 0000000000000000008aa7bc36608924 |
| c         | 0000000000000000009fc66410b57ccb |
| c         | 000000000000000000aa04d9a37ff8f8 |
+-----------+----------------------------------+
fields:[?table?.partition, ?table?.trace_id], metadata:{}
removeme self: Field { name: "trace_id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
removeme input: Field { name: "trace_id", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
removeme self: Field { name: "trace_id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
removeme input: Field { name: "trace_id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
+-----------------+
| COUNT(Int64(1)) |
+-----------------+
| 15557151        |
+-----------------+

And on disk we see:

dev@dev:~/arrow-datafusion$ ls -lhR temptest/
temptest/:
total 12K
drwxrwxr-x 2 dev dev 4.0K Oct 19 20:04 'partition=a'
drwxrwxr-x 2 dev dev 4.0K Oct 19 20:04 'partition=b'
drwxrwxr-x 2 dev dev 4.0K Oct 19 20:04 'partition=c'

'temptest/partition=a':
total 35M
-rw-rw-r-- 1 dev dev 35M Oct 19 20:04 mbsEGriqmMG7vbt6.parquet

'temptest/partition=b':
total 35M
-rw-rw-r-- 1 dev dev 35M Oct 19 20:04 mbsEGriqmMG7vbt6.parquet

'temptest/partition=c':
total 35M
-rw-rw-r-- 1 dev dev 35M Oct 19 20:04 mbsEGriqmMG7vbt6.parquet

@suremarc
Copy link
Contributor

@devinjdangelo I attempted to use this feature in datafusion-cli today, as it is useful for something I am doing. I got this error when writing to a partitioned table:

This feature is not implemented: it is not yet supported to write to hive partitions with datatype Dictionary(UInt16, Utf8)

Here is a repro using datafusion-cli:

CREATE EXTERNAL TABLE lz4_raw_compressed_larger
STORED AS PARQUET
PARTITIONED BY (partition)
LOCATION 'data/';

INSERT INTO lz4_raw_compressed_larger VALUES ('non-partition-value', 'partition');

Here's a zip file with a single file in it, data/partition=A/lz4_raw_compressed_larger.parquet.

I noticed the unit tests specify the schema explicitly, but I am guessing if you have DataFusion infer the schema, the partition columns are encoded as dictionaries. I think this will limit the usefulness of this feature if partitioned writes don't work with tables whose schemas are inferred.

@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

Thanks for checking @suremarc -- what I plan to do is merge this PR and then file follow on tickets for the known issues which we can address as follow on work

@alamb alamb merged commit e17ca27 into apache:main Oct 20, 2023
23 checks passed
@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

Thanks again @devinjdangelo -- truly epic work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Writing a Dataframe to a partitioned table ignores partitioning Allow Inserts to Partitioned Listing Table
4 participants