From 86740bfd3d9831d6b7c1d0e1bf4a21d91598a0ac Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 3 Dec 2024 02:43:53 +0800 Subject: [PATCH] Add generate_series() udtf (and introduce 'lazy' `MemoryExec`) (#13540) * Add generate_series() udtf * liscence * fix examples * clippy * comments * singleton udtf init * StreamingMemoryExec -> LazyMemoryExec * use RwLock * test udf+udtf generate_series() in the same sql * CI * CI * small fixes --- Cargo.toml | 2 + datafusion-cli/Cargo.lock | 213 +++++++------ datafusion-cli/Cargo.toml | 1 + datafusion-cli/src/functions.rs | 2 +- datafusion-examples/Cargo.toml | 1 + datafusion-examples/examples/simple_udtf.rs | 2 +- datafusion/catalog/src/table.rs | 41 ++- datafusion/core/Cargo.toml | 1 + datafusion/core/src/datasource/function.rs | 63 ---- datafusion/core/src/datasource/mod.rs | 1 - datafusion/core/src/execution/context/mod.rs | 9 +- .../core/src/execution/session_state.rs | 17 +- .../src/execution/session_state_defaults.rs | 8 +- datafusion/core/src/lib.rs | 5 + .../user_defined_table_functions.rs | 2 +- datafusion/functions-table/Cargo.toml | 62 ++++ .../functions-table/src/generate_series.rs | 180 +++++++++++ datafusion/functions-table/src/lib.rs | 51 ++++ datafusion/physical-plan/src/memory.rs | 280 +++++++++++++++++- .../test_files/table_functions.slt | 142 +++++++++ 20 files changed, 921 insertions(+), 162 deletions(-) delete mode 100644 datafusion/core/src/datasource/function.rs create mode 100644 datafusion/functions-table/Cargo.toml create mode 100644 datafusion/functions-table/src/generate_series.rs create mode 100644 datafusion/functions-table/src/lib.rs create mode 100644 datafusion/sqllogictest/test_files/table_functions.slt diff --git a/Cargo.toml b/Cargo.toml index 76bc50d59a08..1ca6cdfdb12d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "datafusion/functions", "datafusion/functions-aggregate", "datafusion/functions-aggregate-common", + "datafusion/functions-table", "datafusion/functions-nested", "datafusion/functions-window", "datafusion/functions-window-common", @@ -110,6 +111,7 @@ datafusion-functions = { path = "datafusion/functions", version = "43.0.0" } datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "43.0.0" } datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "43.0.0" } datafusion-functions-nested = { path = "datafusion/functions-nested", version = "43.0.0" } +datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" } datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" } datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" } datafusion-macros = { path = "datafusion/macros", version = "43.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 04b0b0d22cfd..f06a8b210bd3 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -220,7 +220,7 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "num", ] @@ -406,9 +406,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.17" +version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857" +checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ "bzip2", "flate2", @@ -814,9 +814,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" +checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e" dependencies = [ "arrayref", "arrayvec", @@ -880,9 +880,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "bytes-utils" @@ -917,9 +917,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" +checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" dependencies = [ "jobserver", "libc", @@ -1080,6 +1080,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1097,9 +1107,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" +checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" dependencies = [ "libc", ] @@ -1209,6 +1219,7 @@ dependencies = [ "datafusion-functions", "datafusion-functions-aggregate", "datafusion-functions-nested", + "datafusion-functions-table", "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", @@ -1265,6 +1276,7 @@ dependencies = [ "clap", "ctor", "datafusion", + "datafusion-catalog", "dirs", "env_logger", "futures", @@ -1446,6 +1458,29 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-table" +version = "43.0.0" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "half", + "indexmap", + "log", + "parking_lot", + "paste", +] + [[package]] name = "datafusion-functions-window" version = "43.0.0" @@ -1700,12 +1735,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1972,9 +2007,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" [[package]] name = "heck" @@ -1988,12 +2023,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" - [[package]] name = "hex" version = "0.4.3" @@ -2162,8 +2191,8 @@ dependencies = [ "http 1.1.0", "hyper 1.5.1", "hyper-util", - "rustls 0.23.17", - "rustls-native-certs 0.8.0", + "rustls 0.23.19", + "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2358,7 +2387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -2390,9 +2419,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jobserver" @@ -2405,9 +2434,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.72" +version = "0.3.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +checksum = "fb15147158e79fd8b8afd0252522769c4f48725460b37338544d8379d94fc8f9" dependencies = [ "wasm-bindgen", ] @@ -2484,9 +2513,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.167" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" [[package]] name = "libflate" @@ -2546,9 +2575,9 @@ checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "litemap" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" [[package]] name = "lock_api" @@ -2628,11 +2657,10 @@ dependencies = [ [[package]] name = "mio" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ - "hermit-abi", "libc", "wasi", "windows-sys 0.52.0", @@ -2862,7 +2890,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "lz4_flex", "num", "num-bigint", @@ -3020,9 +3048,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -3063,7 +3091,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.17", + "rustls 0.23.19", "socket2", "thiserror 2.0.3", "tokio", @@ -3081,7 +3109,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.17", + "rustls 0.23.19", "rustls-pki-types", "slab", "thiserror 2.0.3", @@ -3259,8 +3287,8 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.17", - "rustls-native-certs 0.8.0", + "rustls 0.23.19", + "rustls-native-certs 0.8.1", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -3378,9 +3406,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.17" +version = "0.23.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" dependencies = [ "once_cell", "ring", @@ -3399,20 +3427,19 @@ dependencies = [ "openssl-probe", "rustls-pemfile 1.0.4", "schannel", - "security-framework", + "security-framework 2.11.1", ] [[package]] name = "rustls-native-certs" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" dependencies = [ "openssl-probe", - "rustls-pemfile 2.2.0", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.0.1", ] [[package]] @@ -3538,7 +3565,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8" +dependencies = [ + "bitflags 2.6.0", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -3686,9 +3726,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "socket2" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" dependencies = [ "libc", "windows-sys 0.52.0", @@ -3801,9 +3841,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.87" +version = "2.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" dependencies = [ "proc-macro2", "quote", @@ -4009,7 +4049,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.17", + "rustls 0.23.19", "rustls-pki-types", "tokio", ] @@ -4052,9 +4092,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -4063,9 +4103,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -4074,9 +4114,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", ] @@ -4155,9 +4195,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", @@ -4246,9 +4286,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.95" +version = "0.2.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +checksum = "21d3b25c3ea1126a2ad5f4f9068483c2af1e64168f847abe863a526b8dbfe00b" dependencies = [ "cfg-if", "once_cell", @@ -4257,9 +4297,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.95" +version = "0.2.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +checksum = "52857d4c32e496dc6537646b5b117081e71fd2ff06de792e3577a150627db283" dependencies = [ "bumpalo", "log", @@ -4272,21 +4312,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.45" +version = "0.4.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" +checksum = "951fe82312ed48443ac78b66fa43eded9999f738f6022e67aead7b708659e49a" dependencies = [ "cfg-if", "js-sys", + "once_cell", "wasm-bindgen", "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.95" +version = "0.2.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +checksum = "920b0ffe069571ebbfc9ddc0b36ba305ef65577c94b06262ed793716a1afd981" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4294,9 +4335,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.95" +version = "0.2.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +checksum = "bf59002391099644be3524e23b781fa43d2be0c5aa0719a18c0731b9d195cab6" dependencies = [ "proc-macro2", "quote", @@ -4307,9 +4348,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.95" +version = "0.2.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +checksum = "e5047c5392700766601942795a436d7d2599af60dcc3cc1248c9120bfb0827b0" [[package]] name = "wasm-streams" @@ -4326,9 +4367,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.72" +version = "0.3.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +checksum = "476364ff87d0ae6bfb661053a9104ab312542658c3d8f963b7ace80b6f9b26b9" dependencies = [ "js-sys", "wasm-bindgen", @@ -4578,9 +4619,9 @@ dependencies = [ [[package]] name = "yoke" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", @@ -4590,9 +4631,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", @@ -4623,18 +4664,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 784d47220c7c..743ec1b4a749 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version = "43.0.0", features = [ "unicode_expressions", "compression", ] } +datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" } dirs = "5.0.1" env_logger = "0.11" futures = "0.3" diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index c622463de033..d7ca48d638b7 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -24,13 +24,13 @@ use async_trait::async_trait; use datafusion::catalog::Session; use datafusion::common::{plan_err, Column}; -use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::logical_expr::Expr; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::scalar::ScalarValue; +use datafusion_catalog::TableFunctionImpl; use parquet::basic::ConvertedType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::reader::FileReader; diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 05850e7b3a1c..d8aaad801e5c 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -60,6 +60,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-functions-window-common = { workspace = true } diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 6faa397ef60f..f32560ede69d 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -21,13 +21,13 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::Session; -use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; +use datafusion_catalog::TableFunctionImpl; use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{Expr, TableType}; diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index d771930de25d..b6752191d9a7 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,9 +25,11 @@ use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; +use datafusion_expr::Expr; + use datafusion_expr::dml::InsertOp; use datafusion_expr::{ - CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType, + CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType, }; use datafusion_physical_plan::ExecutionPlan; @@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { cmd: &CreateExternalTable, ) -> Result>; } + +/// A trait for table function implementations +pub trait TableFunctionImpl: Debug + Sync + Send { + /// Create a table provider + fn call(&self, args: &[Expr]) -> Result>; +} + +/// A table that uses a function to generate data +#[derive(Debug)] +pub struct TableFunction { + /// Name of the table function + name: String, + /// Function implementation + fun: Arc, +} + +impl TableFunction { + /// Create a new table function + pub fn new(name: String, fun: Arc) -> Self { + Self { name, fun } + } + + /// Get the name of the table function + pub fn name(&self) -> &str { + &self.name + } + + /// Get the implementation of the table function + pub fn function(&self) -> &Arc { + &self.fun + } + + /// Get the function implementation and generate a table + pub fn create_table_provider(&self, args: &[Expr]) -> Result> { + self.fun.call(args) + } +} diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 268e0fb17f7b..6c5a31e3624a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -103,6 +103,7 @@ datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-nested = { workspace = true, optional = true } +datafusion-functions-table = { workspace = true } datafusion-functions-window = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } diff --git a/datafusion/core/src/datasource/function.rs b/datafusion/core/src/datasource/function.rs deleted file mode 100644 index 37ce59f8207b..000000000000 --- a/datafusion/core/src/datasource/function.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! A table that uses a function to generate data - -use super::TableProvider; - -use datafusion_common::Result; -use datafusion_expr::Expr; - -use std::fmt::Debug; -use std::sync::Arc; - -/// A trait for table function implementations -pub trait TableFunctionImpl: Debug + Sync + Send { - /// Create a table provider - fn call(&self, args: &[Expr]) -> Result>; -} - -/// A table that uses a function to generate data -#[derive(Debug)] -pub struct TableFunction { - /// Name of the table function - name: String, - /// Function implementation - fun: Arc, -} - -impl TableFunction { - /// Create a new table function - pub fn new(name: String, fun: Arc) -> Self { - Self { name, fun } - } - - /// Get the name of the table function - pub fn name(&self) -> &str { - &self.name - } - - /// Get the implementation of the table function - pub fn function(&self) -> &Arc { - &self.fun - } - - /// Get the function implementation and generate a table - pub fn create_table_provider(&self, args: &[Expr]) -> Result> { - self.fun.call(args) - } -} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index ad369b75e130..7d3fe9ddd751 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -25,7 +25,6 @@ pub mod default_table_source; pub mod dynamic_file; pub mod empty; pub mod file_format; -pub mod function; pub mod listing; pub mod listing_table_factory; pub mod memory; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 8ec834916424..4cc3200df17d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -30,9 +30,8 @@ use crate::{ catalog_common::memory::MemorySchemaProvider, catalog_common::MemoryCatalogProvider, dataframe::DataFrame, - datasource::{ - function::{TableFunction, TableFunctionImpl}, - listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }, datasource::{provider_as_source, MemTable, ViewTable}, error::{DataFusionError, Result}, @@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory; use crate::execution::session_state::SessionStateBuilder; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory}; +use datafusion_catalog::{ + DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory, +}; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index e2be3cf3749e..4ccad5ffd323 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{ use crate::catalog_common::MemoryCatalogProviderList; use crate::datasource::cte_worktable::CteWorkTable; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; -use crate::datasource::function::{TableFunction, TableFunctionImpl}; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; @@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_catalog::Session; +use datafusion_catalog::{Session, TableFunction, TableFunctionImpl}; use datafusion_common::alias::AliasGenerator; use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions}; use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; @@ -1074,6 +1073,7 @@ impl SessionStateBuilder { .with_scalar_functions(SessionStateDefaults::default_scalar_functions()) .with_aggregate_functions(SessionStateDefaults::default_aggregate_functions()) .with_window_functions(SessionStateDefaults::default_window_functions()) + .with_table_function_list(SessionStateDefaults::default_table_functions()) } /// Set the session id. @@ -1188,6 +1188,19 @@ impl SessionStateBuilder { self } + /// Set the list of [`TableFunction`]s + pub fn with_table_function_list( + mut self, + table_functions: Vec>, + ) -> Self { + let functions = table_functions + .into_iter() + .map(|f| (f.name().to_string(), f)) + .collect(); + self.table_functions = Some(functions); + self + } + /// Set the map of [`ScalarUDF`]s pub fn with_scalar_functions( mut self, diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 7ba332c520c1..106082bc7b3b 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory; use crate::execution::context::SessionState; #[cfg(feature = "nested_expressions")] use crate::functions_nested; -use crate::{functions, functions_aggregate, functions_window}; +use crate::{functions, functions_aggregate, functions_table, functions_window}; +use datafusion_catalog::TableFunction; use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; @@ -119,6 +120,11 @@ impl SessionStateDefaults { functions_window::all_default_window_functions() } + /// returns the list of default [`TableFunction`]s + pub fn default_table_functions() -> Vec> { + functions_table::all_default_table_functions() + } + /// returns the list of default [`FileFormatFactory']'s pub fn default_file_formats() -> Vec> { let file_formats: Vec> = vec![ diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index d049e774d7c6..a1b18b8bfe8c 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -773,6 +773,11 @@ pub mod functions_window { pub use datafusion_functions_window::*; } +/// re-export of [`datafusion_functions_table`] crate +pub mod functions_table { + pub use datafusion_functions_table::*; +} + /// re-export of variable provider for `@name` and `@@name` style runtime values. pub mod variable { pub use datafusion_expr::var_provider::{VarProvider, VarType}; diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 0cc156866d4d..39f10ef11ab0 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -21,7 +21,6 @@ use arrow::csv::ReaderBuilder; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::function::TableFunctionImpl; use datafusion::datasource::TableProvider; use datafusion::error::Result; use datafusion::execution::TaskContext; @@ -29,6 +28,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_catalog::Session; +use datafusion_catalog::TableFunctionImpl; use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue}; use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType}; use std::fs::File; diff --git a/datafusion/functions-table/Cargo.toml b/datafusion/functions-table/Cargo.toml new file mode 100644 index 000000000000..f667bdde5835 --- /dev/null +++ b/datafusion/functions-table/Cargo.toml @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-functions-table" +description = "Traits and types for logical plans and expressions for DataFusion query engine" +keywords = ["datafusion", "logical", "plan", "expressions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_functions_table" +path = "src/lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ahash = { workspace = true } +arrow = { workspace = true } +arrow-schema = { workspace = true } +async-trait = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +datafusion-physical-plan = { workspace = true } +half = { workspace = true } +indexmap = { workspace = true } +log = { workspace = true } +parking_lot = { workspace = true } +paste = "1.0.14" + +[dev-dependencies] +arrow = { workspace = true, features = ["test_utils"] } +criterion = "0.5" +rand = { workspace = true } diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs new file mode 100644 index 000000000000..ced43ea8f00c --- /dev/null +++ b/datafusion/functions-table/src/generate_series.rs @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::Int64Array; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion_catalog::Session; +use datafusion_catalog::TableFunctionImpl; +use datafusion_catalog::TableProvider; +use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue}; +use datafusion_expr::{Expr, TableType}; +use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; +use datafusion_physical_plan::ExecutionPlan; +use parking_lot::RwLock; +use std::fmt; +use std::sync::Arc; + +/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive) +#[derive(Debug, Clone)] +struct GenerateSeriesTable { + schema: SchemaRef, + // None if input is Null + start: Option, + // None if input is Null + end: Option, +} + +/// Table state that generates a series of integers from `start`(inclusive) to `end`(inclusive) +#[derive(Debug, Clone)] +struct GenerateSeriesState { + schema: SchemaRef, + start: i64, // Kept for display + end: i64, + batch_size: usize, + + /// Tracks current position when generating table + current: i64, +} + +/// Detail to display for 'Explain' plan +impl fmt::Display for GenerateSeriesState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "generate_series: start={}, end={}, batch_size={}", + self.start, self.end, self.batch_size + ) + } +} + +impl LazyBatchGenerator for GenerateSeriesState { + fn generate_next_batch(&mut self) -> Result> { + // Check if we've reached the end + if self.current > self.end { + return Ok(None); + } + + // Construct batch + let batch_end = (self.current + self.batch_size as i64 - 1).min(self.end); + let array = Int64Array::from_iter_values(self.current..=batch_end); + let batch = RecordBatch::try_new(self.schema.clone(), vec![Arc::new(array)])?; + + // Update current position for next batch + self.current = batch_end + 1; + + Ok(Some(batch)) + } +} + +#[async_trait] +impl TableProvider for GenerateSeriesTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let batch_size = state.config_options().execution.batch_size; + match (self.start, self.end) { + (Some(start), Some(end)) => { + if start > end { + return plan_err!( + "End value must be greater than or equal to start value" + ); + } + + Ok(Arc::new(LazyMemoryExec::try_new( + self.schema.clone(), + vec![Arc::new(RwLock::new(GenerateSeriesState { + schema: self.schema.clone(), + start, + end, + current: start, + batch_size, + }))], + )?)) + } + _ => { + // Either start or end is None, return a generator that outputs 0 rows + Ok(Arc::new(LazyMemoryExec::try_new( + self.schema.clone(), + vec![Arc::new(RwLock::new(GenerateSeriesState { + schema: self.schema.clone(), + start: 0, + end: 0, + current: 1, + batch_size, + }))], + )?)) + } + } + } +} + +#[derive(Debug)] +pub struct GenerateSeriesFunc {} + +impl TableFunctionImpl for GenerateSeriesFunc { + // Check input `exprs` type and number. Input validity check (e.g. start <= end) + // will be performed in `TableProvider::scan` + fn call(&self, exprs: &[Expr]) -> Result> { + // TODO: support 1 or 3 arguments following DuckDB: + // + if exprs.len() == 3 || exprs.len() == 1 { + return not_impl_err!("generate_series does not support 1 or 3 arguments"); + } + + if exprs.len() != 2 { + return plan_err!("generate_series expects 2 arguments"); + } + + let start = match &exprs[0] { + Expr::Literal(ScalarValue::Null) => None, + Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n), + _ => return plan_err!("First argument must be an integer literal"), + }; + + let end = match &exprs[1] { + Expr::Literal(ScalarValue::Null) => None, + Expr::Literal(ScalarValue::Int64(Some(n))) => Some(*n), + _ => return plan_err!("Second argument must be an integer literal"), + }; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + + Ok(Arc::new(GenerateSeriesTable { schema, start, end })) + } +} diff --git a/datafusion/functions-table/src/lib.rs b/datafusion/functions-table/src/lib.rs new file mode 100644 index 000000000000..9ea4c0c8992a --- /dev/null +++ b/datafusion/functions-table/src/lib.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod generate_series; + +use datafusion_catalog::TableFunction; +use std::sync::Arc; + +/// Returns all default table functions +pub fn all_default_table_functions() -> Vec> { + vec![generate_series()] +} + +/// Creates a singleton instance of a table function +/// - `$module`: A struct implementing `TableFunctionImpl` to create the function from +/// - `$name`: The name to give to the created function +/// +/// This is used to ensure creating the list of `TableFunction` only happens once. +#[macro_export] +macro_rules! create_udtf_function { + ($module:path, $name:expr) => { + paste::paste! { + static INSTANCE: std::sync::OnceLock> = std::sync::OnceLock::new(); + + pub fn [<$name:lower>]() -> Arc { + INSTANCE.get_or_init(|| { + Arc::new(TableFunction::new( + $name.to_string(), + Arc::new($module {}), + )) + }).clone() + } + } + }; +} + +create_udtf_function!(generate_series::GenerateSeriesFunc, "generate_series"); diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 272dcdc95bc0..bf6294f5a55b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -17,6 +17,7 @@ //! Execution plan for reading in-memory batches of data +use parking_lot::RwLock; use std::any::Any; use std::fmt; use std::sync::Arc; @@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream { } } +pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { + /// Generate the next batch, return `None` when no more batches are available + fn generate_next_batch(&mut self) -> Result>; +} + +/// Execution plan for lazy in-memory batches of data +/// +/// This plan generates output batches lazily, it doesn't have to buffer all batches +/// in memory up front (compared to `MemoryExec`), thus consuming constant memory. +pub struct LazyMemoryExec { + /// Schema representing the data + schema: SchemaRef, + /// Functions to generate batches for each partition + batch_generators: Vec>>, + /// Plan properties cache storing equivalence properties, partitioning, and execution mode + cache: PlanProperties, +} + +impl LazyMemoryExec { + /// Create a new lazy memory execution plan + pub fn try_new( + schema: SchemaRef, + generators: Vec>>, + ) -> Result { + let cache = PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + Partitioning::RoundRobinBatch(generators.len()), + ExecutionMode::Bounded, + ); + Ok(Self { + schema, + batch_generators: generators, + cache, + }) + } +} + +impl fmt::Debug for LazyMemoryExec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("LazyMemoryExec") + .field("schema", &self.schema) + .field("batch_generators", &self.batch_generators) + .finish() + } +} + +impl DisplayAs for LazyMemoryExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "LazyMemoryExec: partitions={}, batch_generators=[{}]", + self.batch_generators.len(), + self.batch_generators + .iter() + .map(|g| g.read().to_string()) + .collect::>() + .join(", ") + ) + } + } + } +} + +impl ExecutionPlan for LazyMemoryExec { + fn name(&self) -> &'static str { + "LazyMemoryExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in LazyMemoryExec") + } + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + if partition >= self.batch_generators.len() { + return internal_err!( + "Invalid partition {} for LazyMemoryExec with {} partitions", + partition, + self.batch_generators.len() + ); + } + + Ok(Box::pin(LazyMemoryStream { + schema: Arc::clone(&self.schema), + generator: Arc::clone(&self.batch_generators[partition]), + })) + } + + fn statistics(&self) -> Result { + Ok(Statistics::new_unknown(&self.schema)) + } +} + +/// Stream that generates record batches on demand +pub struct LazyMemoryStream { + schema: SchemaRef, + /// Generator to produce batches + /// + /// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream + /// should have a unique `LazyBatchGenerator`. Use RepartitionExec or + /// construct multiple `LazyMemoryStream`s during planning to enable + /// parallel execution. + /// Sharing generators between streams should be used with caution. + generator: Arc>, +} + +impl Stream for LazyMemoryStream { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + let batch = self.generator.write().generate_next_batch(); + + match batch { + Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))), + Ok(None) => Poll::Ready(None), + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} + +impl RecordBatchStream for LazyMemoryStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + #[cfg(test)] -mod tests { +mod memory_exec_tests { use std::sync::Arc; use crate::memory::MemoryExec; @@ -416,3 +574,123 @@ mod tests { Ok(()) } } + +#[cfg(test)] +mod lazy_memory_tests { + use super::*; + use arrow::array::Int64Array; + use arrow::datatypes::{DataType, Field, Schema}; + use futures::StreamExt; + + #[derive(Debug, Clone)] + struct TestGenerator { + counter: i64, + max_batches: i64, + batch_size: usize, + schema: SchemaRef, + } + + impl fmt::Display for TestGenerator { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "TestGenerator: counter={}, max_batches={}, batch_size={}", + self.counter, self.max_batches, self.batch_size + ) + } + } + + impl LazyBatchGenerator for TestGenerator { + fn generate_next_batch(&mut self) -> Result> { + if self.counter >= self.max_batches { + return Ok(None); + } + + let array = Int64Array::from_iter_values( + (self.counter * self.batch_size as i64) + ..(self.counter * self.batch_size as i64 + self.batch_size as i64), + ); + self.counter += 1; + Ok(Some(RecordBatch::try_new( + Arc::clone(&self.schema), + vec![Arc::new(array)], + )?)) + } + } + + #[tokio::test] + async fn test_lazy_memory_exec() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let generator = TestGenerator { + counter: 0, + max_batches: 3, + batch_size: 2, + schema: Arc::clone(&schema), + }; + + let exec = + LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?; + + // Test schema + assert_eq!(exec.schema().fields().len(), 1); + assert_eq!(exec.schema().field(0).name(), "a"); + + // Test execution + let stream = exec.execute(0, Arc::new(TaskContext::default()))?; + let batches: Vec<_> = stream.collect::>().await; + + assert_eq!(batches.len(), 3); + + // Verify batch contents + let batch0 = batches[0].as_ref().unwrap(); + let array0 = batch0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(array0.values(), &[0, 1]); + + let batch1 = batches[1].as_ref().unwrap(); + let array1 = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(array1.values(), &[2, 3]); + + let batch2 = batches[2].as_ref().unwrap(); + let array2 = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(array2.values(), &[4, 5]); + + Ok(()) + } + + #[tokio::test] + async fn test_lazy_memory_exec_invalid_partition() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let generator = TestGenerator { + counter: 0, + max_batches: 1, + batch_size: 1, + schema: Arc::clone(&schema), + }; + + let exec = + LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?; + + // Test invalid partition + let result = exec.execute(1, Arc::new(TaskContext::default())); + + // partition is 0-indexed, so there only should be partition 0 + assert!(matches!( + result, + Err(e) if e.to_string().contains("Invalid partition 1 for LazyMemoryExec with 1 partitions") + )); + + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt new file mode 100644 index 000000000000..12402e0d70c5 --- /dev/null +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test generate_series table function + +query I rowsort +SELECT * FROM generate_series(1, 5) +---- +1 +2 +3 +4 +5 + +query I rowsort +SELECT * FROM generate_series(1, 1) +---- +1 + +query I rowsort +SELECT * FROM generate_series(3, 6) +---- +3 +4 +5 +6 + +query I rowsort +SELECT SUM(v1) FROM generate_series(1, 5) t1(v1) +---- +15 + +# Test generate_series with WHERE clause +query I rowsort +SELECT * FROM generate_series(1, 10) t1(v1) WHERE v1 % 2 = 0 +---- +10 +2 +4 +6 +8 + +# Test generate_series with ORDER BY +query I +SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC +---- +5 +4 +3 +2 +1 + +# Test generate_series with LIMIT +query I rowsort +SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5 +---- +1 +2 +3 +4 +5 + +# Test generate_series in subquery +query I rowsort +SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1)) +---- +11 +12 +13 + +# Test generate_series with JOIN +query II rowsort +SELECT a.v1, b.v1 +FROM generate_series(1, 3) a(v1) +JOIN generate_series(2, 4) b(v1) +ON a.v1 = b.v1 - 1 +---- +1 2 +2 3 +3 4 + +query I +SELECT * FROM generate_series(NULL, 5) +---- + +query I +SELECT * FROM generate_series(1, NULL) +---- + +query I +SELECT * FROM generate_series(NULL, NULL) +---- + +query TT +EXPLAIN SELECT * FROM generate_series(1, 5) +---- +logical_plan TableScan: tmp_table projection=[value] +physical_plan LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=5, batch_size=8192] + +# +# Test generate_series with invalid arguments +# + +query error DataFusion error: Error during planning: End value must be greater than or equal to start value +SELECT * FROM generate_series(5, 1) + +statement error DataFusion error: This feature is not implemented: generate_series does not support 1 or 3 arguments +SELECT * FROM generate_series(1, 5, NULL) + +statement error DataFusion error: This feature is not implemented: generate_series does not support 1 or 3 arguments +SELECT * FROM generate_series(1) + +statement error DataFusion error: Error during planning: generate_series expects 2 arguments +SELECT * FROM generate_series(1, 2, 3, 4) + +statement error DataFusion error: Error during planning: Second argument must be an integer literal +SELECT * FROM generate_series(1, '2') + +statement error DataFusion error: Error during planning: First argument must be an integer literal +SELECT * FROM generate_series('foo', 'bar') + +# UDF and UDTF `generate_series` can be used simultaneously +query ? rowsort +SELECT generate_series(1, t1.end) FROM generate_series(3, 5) as t1(end) +---- +[1, 2, 3, 4, 5] +[1, 2, 3, 4] +[1, 2, 3] \ No newline at end of file