From e1817d228c786272c66b16ee450eeb91967d8a0e Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 20 Dec 2023 18:05:40 -0500 Subject: [PATCH 1/8] write arrow files --- Cargo.toml | 1 + datafusion/core/Cargo.toml | 1 + .../core/src/datasource/file_format/arrow.rs | 199 +++++++++++++++++- .../src/datasource/file_format/parquet.rs | 34 +-- .../src/datasource/file_format/write/mod.rs | 33 ++- datafusion/sqllogictest/test_files/copy.slt | 35 +++ .../test_files/insert_to_external.slt | 38 ++++ 7 files changed, 304 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 023dc6c6fc4f..4b8c6ae651d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ arrow-buffer = { version = "49.0.0", default-features = false } arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } arrow-ord = { version = "49.0.0", default-features = false } arrow-schema = { version = "49.0.0", default-features = false } +arrow-ipc = { version = "49.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "0.4.1" bytes = "1.4" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 0ee83e756745..4d15a3d02563 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -56,6 +56,7 @@ apache-avro = { version = "0.16", optional = true } arrow = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } +arrow-ipc = { workspace = true } async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz", "zstd", "futures-io", "tokio"], optional = true } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 07c96bdae1b4..bb0eda9c15db 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -21,10 +21,13 @@ use std::any::Any; use std::borrow::Cow; +use std::fmt::{self, Debug}; use std::sync::Arc; use crate::datasource::file_format::FileFormat; -use crate::datasource::physical_plan::{ArrowExec, FileScanConfig}; +use crate::datasource::physical_plan::{ + ArrowExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, +}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::ExecutionPlan; @@ -35,13 +38,23 @@ use arrow::ipc::root_as_message; use arrow_schema::{ArrowError, Schema, SchemaRef}; use bytes::Bytes; -use datafusion_common::{FileType, Statistics}; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use crate::physical_plan::{DisplayAs, DisplayFormatType}; use async_trait::async_trait; +use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::metrics::MetricsSet; use futures::stream::BoxStream; use futures::StreamExt; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinSet; + +use super::file_compression_type::FileCompressionType; +use super::write::demux::start_demuxer_task; +use super::write::{create_writer, SharedBuffer}; /// Arrow `FileFormat` implementation. #[derive(Default, Debug)] @@ -97,11 +110,191 @@ impl FileFormat for ArrowFormat { Ok(Arc::new(exec)) } + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &SessionState, + conf: FileSinkConfig, + order_requirements: Option>, + ) -> Result> { + if conf.overwrite { + return not_impl_err!("Overwrites are not implemented yet for Arrow format"); + } + + let sink_schema = conf.output_schema().clone(); + let sink = Arc::new(ArrowFileSink::new(conf)); + + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) + } + fn file_type(&self) -> FileType { FileType::ARROW } } +/// Implements [`DataSink`] for writing to arrow_ipc files +struct ArrowFileSink { + config: FileSinkConfig, +} + +impl ArrowFileSink { + fn new(config: FileSinkConfig) -> Self { + Self { config } + } + + /// Converts table schema to writer schema, which may differ in the case + /// of hive style partitioning where some columns are removed from the + /// underlying files. + fn get_writer_schema(&self) -> Arc { + if !self.config.table_partition_cols.is_empty() { + let schema = self.config.output_schema(); + let partition_names: Vec<_> = self + .config + .table_partition_cols + .iter() + .map(|(s, _)| s) + .collect(); + Arc::new(Schema::new( + schema + .fields() + .iter() + .filter(|f| !partition_names.contains(&f.name())) + .map(|f| (**f).clone()) + .collect::>(), + )) + } else { + self.config.output_schema().clone() + } + } +} + +impl Debug for ArrowFileSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ArrowFileSink").finish() + } +} + +impl DisplayAs for ArrowFileSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ArrowFileSink(file_groups=",)?; + FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; + write!(f, ")") + } + } + } +} + +#[async_trait] +impl DataSink for ArrowFileSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + + async fn write_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + // No props are supported yet, but can be by updating FileTypeWriterOptions + // to populate this struct and use those options to initialize the arrow_ipc::writer::FileWriter + let _arrow_props = self.config.file_type_writer_options.try_into_arrow()?; + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + + let part_col = if !self.config.table_partition_cols.is_empty() { + Some(self.config.table_partition_cols.clone()) + } else { + None + }; + + let (demux_task, mut file_stream_rx) = start_demuxer_task( + data, + context, + part_col, + self.config.table_paths[0].clone(), + "arrow".into(), + self.config.single_file_output, + ); + + let mut file_write_tasks: JoinSet> = + JoinSet::new(); + while let Some((path, mut rx)) = file_stream_rx.recv().await { + let shared_buffer = SharedBuffer::new(1048576); + let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new( + shared_buffer.clone(), + &self.get_writer_schema(), + )?; + let mut object_store_writer = create_writer( + FileCompressionType::UNCOMPRESSED, + &path, + object_store.clone(), + ) + .await?; + file_write_tasks.spawn(async move { + let mut row_count = 0; + while let Some(batch) = rx.recv().await { + row_count += batch.num_rows(); + arrow_writer.write(&batch)?; + let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap(); + if buff_to_flush.len() > 1024000 { + object_store_writer + .write_all(buff_to_flush.as_slice()) + .await?; + buff_to_flush.clear(); + } + } + arrow_writer.finish()?; + let final_buff = shared_buffer.buffer.try_lock().unwrap(); + + object_store_writer.write_all(final_buff.as_slice()).await?; + object_store_writer.shutdown().await?; + Ok(row_count) + }); + } + + let mut row_count = 0; + while let Some(result) = file_write_tasks.join_next().await { + match result { + Ok(r) => { + row_count += r?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + match demux_task.await { + Ok(r) => r?, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + Ok(row_count as u64) + } +} + const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9db320fb9da4..0c813b6ccbf0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -29,7 +29,6 @@ use parquet::file::writer::SerializedFileWriter; use std::any::Any; use std::fmt; use std::fmt::Debug; -use std::io::Write; use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -56,7 +55,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use super::write::demux::start_demuxer_task; -use super::write::{create_writer, AbortableWrite}; +use super::write::{create_writer, AbortableWrite, SharedBuffer}; use super::{FileFormat, FileScanConfig}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, @@ -1101,37 +1100,6 @@ async fn output_single_parquet_file_parallelized( Ok(row_count) } -/// A buffer with interior mutability shared by the SerializedFileWriter and -/// ObjectStore writer -#[derive(Clone)] -struct SharedBuffer { - /// The inner buffer for reading and writing - /// - /// The lock is used to obtain internal mutability, so no worry about the - /// lock contention. - buffer: Arc>>, -} - -impl SharedBuffer { - pub fn new(capacity: usize) -> Self { - Self { - buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), - } - } -} - -impl Write for SharedBuffer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut buffer = self.buffer.try_lock().unwrap(); - Write::write(&mut *buffer, buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - let mut buffer = self.buffer.try_lock().unwrap(); - Write::flush(&mut *buffer) - } -} - #[cfg(test)] pub(crate) mod test_util { use super::*; diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index cfcdbd8c464e..68fe81ce91fa 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -18,7 +18,7 @@ //! Module containing helper methods/traits related to enabling //! write support for the various file formats -use std::io::Error; +use std::io::{Error, Write}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -43,6 +43,37 @@ use tokio::io::AsyncWrite; pub(crate) mod demux; pub(crate) mod orchestration; +/// A buffer with interior mutability shared by the SerializedFileWriter and +/// ObjectStore writer +#[derive(Clone)] +pub(crate) struct SharedBuffer { + /// The inner buffer for reading and writing + /// + /// The lock is used to obtain internal mutability, so no worry about the + /// lock contention. + pub(crate) buffer: Arc>>, +} + +impl SharedBuffer { + pub fn new(capacity: usize) -> Self { + Self { + buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))), + } + } +} + +impl Write for SharedBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::write(&mut *buffer, buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut buffer = self.buffer.try_lock().unwrap(); + Write::flush(&mut *buffer) + } +} + /// Stores data needed during abortion of MultiPart writers #[derive(Clone)] pub(crate) struct MultiPart { diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 02ab33083315..faf7acf89914 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -230,6 +230,41 @@ select * from validate_csv_with_options; 1;Foo 2;Bar +# Copy from table to single arrow file +query IT +COPY source_table to 'test_files/scratch/copy/table.arrow'; +---- +2 + +# Validate single csv output +statement ok +CREATE EXTERNAL TABLE validate_arrow_file +STORED AS arrow +LOCATION 'test_files/scratch/copy/table.arrow'; + +query IT +select * from validate_arrow_file; +---- +1 Foo +2 Bar + +# Copy from table to folder of json +query IT +COPY source_table to 'test_files/scratch/copy/table_arrow' (format arrow, single_file_output false); +---- +2 + +# Validate json output +statement ok +CREATE EXTERNAL TABLE validate_arrow STORED AS arrow LOCATION 'test_files/scratch/copy/table_arrow'; + +query IT +select * from validate_arrow; +---- +1 Foo +2 Bar + + # Error cases: # Copy from table with options diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index cdaf0bb64339..a1fa1e4844df 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -76,6 +76,44 @@ select * from dictionary_encoded_parquet_partitioned order by (a); a foo b bar +statement ok +CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned( + a varchar, + b varchar, +) +STORED AS arrow +LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/' +PARTITIONED BY (b) +OPTIONS( +create_local_path 'true', +insert_mode 'append_new_files', +); + +query TT +insert into dictionary_encoded_arrow_partitioned +select * from dictionary_encoded_values +---- +2 + +statement ok +CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback( + a varchar, +) +STORED AS arrow +LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/' +OPTIONS( +create_local_path 'true', +insert_mode 'append_new_files', +); + +query T +select * from dictionary_encoded_arrow_test_readback; +---- +b + +query error DataFusion error: Arrow error: Schema error: project index 1 out of bounds, max field 1 +select * from dictionary_encoded_arrow_partitioned order by (a); + # test_insert_into statement ok From 524edfee21d8879d6d7ec8f0646641f11fe3a8c6 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 20 Dec 2023 18:13:20 -0500 Subject: [PATCH 2/8] update datafusion-cli lock --- datafusion-cli/Cargo.lock | 89 ++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 49 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 19ad6709362d..7087d8331a85 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -384,7 +384,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -1069,12 +1069,12 @@ dependencies = [ [[package]] name = "ctor" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e366bff8cd32dd8754b0991fb66b279dc48f598c3a18914852a6673deef583" +checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -1104,6 +1104,7 @@ dependencies = [ "apache-avro", "arrow", "arrow-array", + "arrow-ipc", "arrow-schema", "async-compression", "async-trait", @@ -1576,7 +1577,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -1781,9 +1782,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", @@ -1796,7 +1797,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2", "tokio", "tower-service", "tracing", @@ -2496,7 +2497,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -2513,9 +2514,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "powerfmt" @@ -2715,9 +2716,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64", "bytes", @@ -3020,7 +3021,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -3106,16 +3107,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.5" @@ -3196,7 +3187,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -3218,9 +3209,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.40" +version = "2.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13fa70a4ee923979ffb522cacce59d34421ebdea5625e1073c4326ef9d2dd42e" +checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269" dependencies = [ "proc-macro2", "quote", @@ -3284,22 +3275,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -3315,9 +3306,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", "powerfmt", @@ -3334,9 +3325,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" dependencies = [ "time-core", ] @@ -3367,9 +3358,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", @@ -3378,7 +3369,7 @@ dependencies = [ "num_cpus", "parking_lot", "pin-project-lite", - "socket2 0.5.5", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -3391,7 +3382,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -3488,7 +3479,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -3533,7 +3524,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] @@ -3687,7 +3678,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", "wasm-bindgen-shared", ] @@ -3721,7 +3712,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3970,22 +3961,22 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.30" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "306dca4455518f1f31635ec308b6b3e4eb1b11758cefafc782827d0aa7acb5c7" +checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.30" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" +checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.40", + "syn 2.0.41", ] [[package]] From 232084e5ccf0fd286c2405eed5cd6e65d1952373 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 20 Dec 2023 18:27:24 -0500 Subject: [PATCH 3/8] fix toml formatting --- Cargo.toml | 29 +++++---------------- datafusion/core/Cargo.toml | 2 +- datafusion/sqllogictest/test_files/copy.slt | 23 +++++++++++++++- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4b8c6ae651d1..99ce6fcb40ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,24 +17,7 @@ [workspace] exclude = ["datafusion-cli"] -members = [ - "datafusion/common", - "datafusion/core", - "datafusion/expr", - "datafusion/execution", - "datafusion/optimizer", - "datafusion/physical-expr", - "datafusion/physical-plan", - "datafusion/proto", - "datafusion/proto/gen", - "datafusion/sql", - "datafusion/sqllogictest", - "datafusion/substrait", - "datafusion/wasmtest", - "datafusion-examples", - "docs", - "test-utils", - "benchmarks", +members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks", ] resolver = "2" @@ -53,25 +36,26 @@ arrow = { version = "49.0.0", features = ["prettyprint"] } arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "49.0.0", default-features = false } arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } +arrow-ipc = { version = "49.0.0", default-features = false } arrow-ord = { version = "49.0.0", default-features = false } arrow-schema = { version = "49.0.0", default-features = false } -arrow-ipc = { version = "49.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "0.4.1" bytes = "1.4" +chrono = { version = "0.4.31", default-features = false } ctor = "0.2.0" +dashmap = "5.4.0" datafusion = { path = "datafusion/core", version = "34.0.0" } datafusion-common = { path = "datafusion/common", version = "34.0.0" } +datafusion-execution = { path = "datafusion/execution", version = "34.0.0" } datafusion-expr = { path = "datafusion/expr", version = "34.0.0" } -datafusion-sql = { path = "datafusion/sql", version = "34.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "34.0.0" } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "34.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "34.0.0" } -datafusion-execution = { path = "datafusion/execution", version = "34.0.0" } datafusion-proto = { path = "datafusion/proto", version = "34.0.0" } +datafusion-sql = { path = "datafusion/sql", version = "34.0.0" } datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "34.0.0" } datafusion-substrait = { path = "datafusion/substrait", version = "34.0.0" } -dashmap = "5.4.0" doc-comment = "0.3" env_logger = "0.10" futures = "0.3" @@ -89,7 +73,6 @@ serde_json = "1" sqlparser = { version = "0.40.0", features = ["visitor"] } tempfile = "3" thiserror = "1.0.44" -chrono = { version = "0.4.31", default-features = false } url = "2.2" [profile.release] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 4d15a3d02563..9de6a7f7d6a0 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -55,8 +55,8 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] apache-avro = { version = "0.16", optional = true } arrow = { workspace = true } arrow-array = { workspace = true } -arrow-schema = { workspace = true } arrow-ipc = { workspace = true } +arrow-schema = { workspace = true } async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz", "zstd", "futures-io", "tokio"], optional = true } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index faf7acf89914..89b23917884c 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -232,7 +232,7 @@ select * from validate_csv_with_options; # Copy from table to single arrow file query IT -COPY source_table to 'test_files/scratch/copy/table.arrow'; +COPY source_table to 'test_files/scratch/copy/table.arrow'; ---- 2 @@ -248,6 +248,27 @@ select * from validate_arrow_file; 1 Foo 2 Bar +# Copy from dict encoded values to single arrow file +query T? +COPY (values +('c', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('d', arrow_cast('bar', 'Dictionary(Int32, Utf8)'))) +to 'test_files/scratch/copy/table_dict.arrow'; +---- +2 + +# Validate single csv output +statement ok +CREATE EXTERNAL TABLE validate_arrow_file_dict +STORED AS arrow +LOCATION 'test_files/scratch/copy/table_dict.arrow'; + +query T? +select * from validate_arrow_file_dict; +---- +c foo +d bar + + # Copy from table to folder of json query IT COPY source_table to 'test_files/scratch/copy/table_arrow' (format arrow, single_file_output false); From 099247a368a54f63af45a8760569af7d1e9cf058 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Dec 2023 16:33:30 -0500 Subject: [PATCH 4/8] Update insert_to_external.slt Co-authored-by: Andrew Lamb --- datafusion/sqllogictest/test_files/insert_to_external.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index a1fa1e4844df..e73778ad44e5 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -111,6 +111,7 @@ select * from dictionary_encoded_arrow_test_readback; ---- b +# https://github.com/apache/arrow-datafusion/issues/7816 query error DataFusion error: Arrow error: Schema error: project index 1 out of bounds, max field 1 select * from dictionary_encoded_arrow_partitioned order by (a); From d1054235479d59ce3fb1b7658a7c478931e7d4ea Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Dec 2023 17:19:51 -0500 Subject: [PATCH 5/8] add ticket tracking arrow options --- datafusion/core/src/datasource/file_format/arrow.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index bb0eda9c15db..d679babec526 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -208,6 +208,7 @@ impl DataSink for ArrowFileSink { ) -> Result { // No props are supported yet, but can be by updating FileTypeWriterOptions // to populate this struct and use those options to initialize the arrow_ipc::writer::FileWriter + // https://github.com/apache/arrow-datafusion/issues/8635 let _arrow_props = self.config.file_type_writer_options.try_into_arrow()?; let object_store = context From 0fa4a59578595ebf00a4ad0645a2bad6d47f84e0 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Dec 2023 17:38:56 -0500 Subject: [PATCH 6/8] default to lz4 compression --- Cargo.toml | 2 +- datafusion/core/src/datasource/file_format/arrow.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 99ce6fcb40ab..a698fbf471f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] } arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "49.0.0", default-features = false } arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } -arrow-ipc = { version = "49.0.0", default-features = false } +arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] } arrow-ord = { version = "49.0.0", default-features = false } arrow-schema = { version = "49.0.0", default-features = false } async-trait = "0.1.73" diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index d679babec526..7d393d9129dd 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -35,6 +35,8 @@ use crate::physical_plan::ExecutionPlan; use arrow::ipc::convert::fb_to_schema; use arrow::ipc::reader::FileReader; use arrow::ipc::root_as_message; +use arrow_ipc::writer::IpcWriteOptions; +use arrow_ipc::CompressionType; use arrow_schema::{ArrowError, Schema, SchemaRef}; use bytes::Bytes; @@ -232,11 +234,16 @@ impl DataSink for ArrowFileSink { let mut file_write_tasks: JoinSet> = JoinSet::new(); + + let ipc_options = + IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)? + .try_with_compression(Some(CompressionType::LZ4_FRAME))?; while let Some((path, mut rx)) = file_stream_rx.recv().await { let shared_buffer = SharedBuffer::new(1048576); - let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new( + let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options( shared_buffer.clone(), &self.get_writer_schema(), + ipc_options.clone(), )?; let mut object_store_writer = create_writer( FileCompressionType::UNCOMPRESSED, From c3e22558f52cef34379d509501300736a58fa6e8 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Dec 2023 17:42:23 -0500 Subject: [PATCH 7/8] update datafusion-cli lock --- datafusion-cli/Cargo.lock | 47 ++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 7087d8331a85..9f75013c86dc 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -255,6 +255,7 @@ dependencies = [ "arrow-data", "arrow-schema", "flatbuffers", + "lz4_flex", ] [[package]] @@ -378,13 +379,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -1074,7 +1075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -1577,7 +1578,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -2497,7 +2498,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -2587,9 +2588,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" dependencies = [ "unicode-ident", ] @@ -3021,7 +3022,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3187,7 +3188,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3209,9 +3210,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.41" +version = "2.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269" +checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" dependencies = [ "proc-macro2", "quote", @@ -3290,7 +3291,7 @@ checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3382,7 +3383,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3479,7 +3480,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3524,7 +3525,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3678,7 +3679,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", "wasm-bindgen-shared", ] @@ -3712,7 +3713,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3961,22 +3962,22 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.31" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.31" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] From 171d0a38536340d568c3db1af3c73016d4abaf0f Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Dec 2023 17:51:06 -0500 Subject: [PATCH 8/8] cargo update --- datafusion-cli/Cargo.lock | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index caa12f4609bd..9f75013c86dc 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -385,7 +385,7 @@ checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -1075,7 +1075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -1578,7 +1578,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -2498,7 +2498,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3022,7 +3022,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3188,7 +3188,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3210,9 +3210,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.41" +version = "2.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269" +checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" dependencies = [ "proc-macro2", "quote", @@ -3291,7 +3291,7 @@ checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3383,7 +3383,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3480,7 +3480,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3525,7 +3525,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", ] [[package]] @@ -3679,7 +3679,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", "wasm-bindgen-shared", ] @@ -3713,7 +3713,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.41", + "syn 2.0.42", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3962,7 +3962,7 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.31" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [