diff --git a/Cargo.lock b/Cargo.lock index 5f23033061..f43a9036a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,7 +90,7 @@ version = "1.2.6-alpha" dependencies = [ "arc-swap 1.6.0", "arena", - "arrow 43.0.0", + "arrow 49.0.0", "async-stream", "async-trait", "atomic_enum", @@ -120,7 +120,7 @@ dependencies = [ "parquet_ext", "pin-project-lite", "prometheus 0.12.0", - "prost", + "prost 0.11.8", "rand 0.7.3", "remote_engine_client", "router", @@ -245,24 +245,24 @@ dependencies = [ [[package]] name = "arrow" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2feeebd77b34b0bc88f224e06d01c27da4733997cc4789a4e056196656cdc59a" +checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614" dependencies = [ "ahash 0.8.3", - "arrow-arith 43.0.0", - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-cast 43.0.0", - "arrow-csv 43.0.0", - "arrow-data 43.0.0", - "arrow-ipc 43.0.0", - "arrow-json 43.0.0", - "arrow-ord 43.0.0", - "arrow-row 43.0.0", - "arrow-schema 43.0.0", - "arrow-select 43.0.0", - "arrow-string 43.0.0", + "arrow-arith 49.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-csv 49.0.0", + "arrow-data 49.0.0", + "arrow-ipc 49.0.0", + "arrow-json 49.0.0", + "arrow-ord 49.0.0", + "arrow-row 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", + "arrow-string 49.0.0", ] [[package]] @@ -282,14 +282,14 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7173f5dc49c0ecb5135f52565af33afd3fdc9a12d13bd6f9973e8b96305e4b2e" +checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "chrono", "half 2.2.1", "num", @@ -313,14 +313,14 @@ dependencies = [ [[package]] name = "arrow-array" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63d7ea725f7d1f8bb2cffc53ef538557e95fc802e217d5be25122d402e22f3d0" +checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" dependencies = [ "ahash 0.8.3", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "chrono", "chrono-tz", "half 2.2.1", @@ -340,10 +340,11 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdbe439e077f484e5000b9e1d47b5e4c0d15f2b311a8f5bcc682553d5d67a722" +checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c" dependencies = [ + "bytes", "half 2.2.1", "num", ] @@ -366,15 +367,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93913cc14875770aa1eef5e310765e855effa352c094cb1c7c00607d0f37b4e1" +checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", - "arrow-select 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", + "base64 0.21.0", "chrono", "comfy-table 7.0.1", "half 2.2.1", @@ -403,15 +405,15 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef55b67c55ed877e6fe7b923121c19dae5e31ca70249ea2779a17b58fb0fbd9a" +checksum = "2e09aa6246a1d6459b3f14baeaa49606cfdbca34435c46320e14054d244987ca" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-cast 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "chrono", "csv", "csv-core", @@ -434,12 +436,12 @@ dependencies = [ [[package]] name = "arrow-data" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4f4f4a3c54614126a71ab91f6631c9743eb4643d6e9318b74191da9dc6e028b" +checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" dependencies = [ - "arrow-buffer 43.0.0", - "arrow-schema 43.0.0", + "arrow-buffer 49.0.0", + "arrow-schema 49.0.0", "half 2.2.1", "num", ] @@ -460,15 +462,15 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d41a3659f984a524ef1c2981d43747b24d8eec78e2425267fcd0ef34ce71cd18" +checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-cast 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "flatbuffers", ] @@ -494,15 +496,15 @@ dependencies = [ [[package]] name = "arrow-json" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b95faa95a378f56ef32d84cc0104ea998c39ef7cd1faaa6b4cebf8ea92846d" +checksum = "d82565c91fd627922ebfe2810ee4e8346841b6f9361b87505a9acea38b614fee" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-cast 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "chrono", "half 2.2.1", "indexmap 2.0.0", @@ -529,15 +531,15 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c68549a4284d9f8b39586afb8d5ff8158b8f0286353a4844deb1d11cf1ba1f26" +checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", - "arrow-select 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "half 2.2.1", "num", ] @@ -559,15 +561,15 @@ dependencies = [ [[package]] name = "arrow-row" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a75a4a757afc301ce010adadff54d79d66140c4282ed3de565f6ccb716a5cf3" +checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" dependencies = [ "ahash 0.8.3", - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "half 2.2.1", "hashbrown 0.14.0", ] @@ -580,9 +582,9 @@ checksum = "bc85923d8d6662cc66ac6602c7d1876872e671002d60993dfdf492a6badeae92" [[package]] name = "arrow-schema" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e" +checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" [[package]] name = "arrow-select" @@ -599,14 +601,15 @@ dependencies = [ [[package]] name = "arrow-select" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6e2943fa433a48921e914417173816af64eef61c0a3d448280e6c40a62df221" +checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", + "ahash 0.8.3", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "num", ] @@ -627,37 +630,37 @@ dependencies = [ [[package]] name = "arrow-string" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbc92ed638851774f6d7af1ad900b92bc1486746497511868b4298fcbcfa35af" +checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7" dependencies = [ - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-data 43.0.0", - "arrow-schema 43.0.0", - "arrow-select 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "num", "regex", - "regex-syntax 0.7.1", + "regex-syntax 0.8.2", ] [[package]] name = "arrow_ext" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "serde", "snafu 0.6.10", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] name = "arrow_util" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ "ahash 0.8.3", - "arrow 43.0.0", + "arrow 49.0.0", "chrono", "comfy-table 6.1.4", "hashbrown 0.13.2", @@ -682,8 +685,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd", - "zstd-safe", + "zstd 0.12.3+zstd.1.5.2", + "zstd-safe 6.0.4+zstd.1.5.4", ] [[package]] @@ -750,9 +753,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.72" +version = "0.1.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", @@ -881,7 +884,7 @@ version = "1.2.6-alpha" dependencies = [ "analytic_engine", "arena", - "arrow 43.0.0", + "arrow 49.0.0", "base64 0.13.1", "bytes_ext", "clap 3.2.23", @@ -908,7 +911,7 @@ dependencies = [ "toml_ext", "trace_metric", "wal", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -1452,7 +1455,7 @@ dependencies = [ "logger", "macros", "meta_client", - "prost", + "prost 0.11.8", "runtime", "serde", "serde_json", @@ -1519,7 +1522,7 @@ dependencies = [ name = "common_types" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "arrow_ext", "bytes_ext", "chrono", @@ -1528,7 +1531,7 @@ dependencies = [ "horaedbproto 2.0.0", "macros", "paste 1.0.12", - "prost", + "prost 0.11.8", "rand 0.7.3", "seahash", "serde", @@ -1565,7 +1568,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" dependencies = [ - "prost", + "prost 0.11.8", "prost-types", "tonic 0.9.2", "tracing-core", @@ -2003,13 +2006,13 @@ dependencies = [ [[package]] name = "datafusion" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ "ahash 0.8.3", - "arrow 43.0.0", - "arrow-array 43.0.0", - "arrow-schema 43.0.0", + "arrow 49.0.0", + "arrow-array 49.0.0", + "arrow-schema 49.0.0", "async-compression", "async-trait", "bytes", @@ -2021,24 +2024,22 @@ dependencies = [ "datafusion-expr", "datafusion-optimizer", "datafusion-physical-expr", - "datafusion-row", + "datafusion-physical-plan", "datafusion-sql", "flate2", "futures 0.3.28", "glob", + "half 2.2.1", "hashbrown 0.14.0", "indexmap 2.0.0", - "itertools 0.11.0", - "lazy_static", + "itertools 0.12.0", "log", "num_cpus", - "object_store 0.6.1", + "object_store 0.8.0", "parking_lot 0.12.1", "parquet", - "percent-encoding", "pin-project-lite", "rand 0.8.5", - "smallvec", "sqlparser", "tempfile", "tokio", @@ -2046,34 +2047,42 @@ dependencies = [ "url", "uuid", "xz2", - "zstd", + "zstd 0.13.0", ] [[package]] name = "datafusion-common" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ - "arrow 43.0.0", - "arrow-array 43.0.0", + "ahash 0.8.3", + "arrow 49.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-schema 49.0.0", "chrono", + "half 2.2.1", + "libc", "num_cpus", - "object_store 0.6.1", + "object_store 0.8.0", "parquet", "sqlparser", ] [[package]] name = "datafusion-execution" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ + "arrow 49.0.0", + "chrono", "dashmap 5.4.0", "datafusion-common", "datafusion-expr", + "futures 0.3.28", "hashbrown 0.14.0", "log", - "object_store 0.6.1", + "object_store 0.8.0", "parking_lot 0.12.1", "rand 0.8.5", "tempfile", @@ -2082,13 +2091,14 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ "ahash 0.8.3", - "arrow 43.0.0", + "arrow 49.0.0", + "arrow-array 49.0.0", "datafusion-common", - "lazy_static", + "paste 1.0.12", "sqlparser", "strum 0.25.0", "strum_macros 0.25.1", @@ -2096,45 +2106,43 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "async-trait", "chrono", "datafusion-common", "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.0", - "itertools 0.11.0", + "itertools 0.12.0", "log", - "regex-syntax 0.7.1", + "regex-syntax 0.8.2", ] [[package]] name = "datafusion-physical-expr" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ "ahash 0.8.3", - "arrow 43.0.0", - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-schema 43.0.0", + "arrow 49.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-ord 49.0.0", + "arrow-schema 49.0.0", "base64 0.21.0", "blake2", "blake3", "chrono", "datafusion-common", "datafusion-expr", - "datafusion-row", "half 2.2.1", "hashbrown 0.14.0", "hex", "indexmap 2.0.0", - "itertools 0.11.0", - "lazy_static", - "libc", + "itertools 0.12.0", "log", "md-5", "paste 1.0.12", @@ -2147,37 +2155,56 @@ dependencies = [ ] [[package]] -name = "datafusion-proto" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +name = "datafusion-physical-plan" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ - "arrow 43.0.0", + "ahash 0.8.3", + "arrow 49.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-schema 49.0.0", + "async-trait", "chrono", - "datafusion", "datafusion-common", + "datafusion-execution", "datafusion-expr", - "object_store 0.6.1", - "prost", + "datafusion-physical-expr", + "futures 0.3.28", + "half 2.2.1", + "hashbrown 0.14.0", + "indexmap 2.0.0", + "itertools 0.12.0", + "log", + "once_cell", + "parking_lot 0.12.1", + "pin-project-lite", + "rand 0.8.5", + "tokio", + "uuid", ] [[package]] -name = "datafusion-row" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +name = "datafusion-proto" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", + "chrono", + "datafusion", "datafusion-common", - "paste 1.0.12", - "rand 0.8.5", + "datafusion-expr", + "object_store 0.8.0", + "prost 0.12.3", ] [[package]] name = "datafusion-sql" -version = "27.0.0" -source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=9c3a537e25e5ab3299922864034f67fb2f79805d#9c3a537e25e5ab3299922864034f67fb2f79805d" +version = "33.0.0" +source = "git+https://github.com/CeresDB/arrow-datafusion.git?rev=e21b03154#e21b03154511cd61e03e299a595db6be6b1852c1" dependencies = [ - "arrow 43.0.0", - "arrow-schema 43.0.0", + "arrow 49.0.0", + "arrow-schema 49.0.0", "datafusion-common", "datafusion-expr", "log", @@ -2187,7 +2214,7 @@ dependencies = [ [[package]] name = "datafusion_util" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ "async-trait", "datafusion", @@ -2305,7 +2332,7 @@ dependencies = [ name = "df_engine_extensions" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "async-recursion", "async-trait", "catalog", @@ -2318,7 +2345,7 @@ dependencies = [ "insta", "lazy_static", "prometheus 0.12.0", - "prost", + "prost 0.11.8", "runtime", "snafu 0.6.10", "table_engine", @@ -2330,7 +2357,7 @@ dependencies = [ name = "df_operator" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "base64 0.13.1", "bincode", "chrono", @@ -2470,7 +2497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58" dependencies = [ "http", - "prost", + "prost 0.11.8", "tokio", "tokio-stream", "tonic 0.8.3", @@ -2808,12 +2835,12 @@ checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" [[package]] name = "generated_types" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ "pbjson", "pbjson-build", "pbjson-types", - "prost", + "prost 0.11.8", "prost-build", "serde", "tonic-build", @@ -3071,7 +3098,7 @@ dependencies = [ "thiserror", "tokio", "tonic 0.8.3", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -3095,7 +3122,7 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5907c770ee20818978cf2050341ca2c4c7fb7888423ccb090cbb2fda250dfad7" dependencies = [ - "prost", + "prost 0.11.8", "protoc-bin-vendored", "tonic 0.8.3", "tonic-build", @@ -3107,7 +3134,7 @@ name = "horaedbproto" version = "2.0.0" source = "git+https://github.com/apache/incubator-horaedb-proto.git?rev=19ece8f771fc0b3e8e734072cc3d8040de6c74cb#19ece8f771fc0b3e8e734072cc3d8040de6c74cb" dependencies = [ - "prost", + "prost 0.11.8", "protoc-bin-vendored", "tonic 0.8.3", "tonic-build", @@ -3325,7 +3352,7 @@ dependencies = [ [[package]] name = "influxdb_influxql_parser" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ "chrono", "chrono-tz", @@ -3367,7 +3394,7 @@ name = "interpreters" version = "1.2.6-alpha" dependencies = [ "analytic_engine", - "arrow 43.0.0", + "arrow 49.0.0", "async-trait", "catalog", "catalog_impls", @@ -3418,9 +3445,9 @@ dependencies = [ [[package]] name = "iox_query" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "arrow_util", "async-trait", "chrono", @@ -3442,9 +3469,9 @@ dependencies = [ [[package]] name = "iox_query_influxql" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "chrono", "chrono-tz", "datafusion", @@ -3497,6 +3524,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.6" @@ -3953,7 +3989,7 @@ dependencies = [ "horaedbproto 2.0.0", "logger", "macros", - "prost", + "prost 0.11.8", "reqwest", "serde", "serde_json", @@ -4314,9 +4350,9 @@ dependencies = [ [[package]] name = "num" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" dependencies = [ "num-bigint", "num-complex", @@ -4456,16 +4492,16 @@ dependencies = [ [[package]] name = "object_store" -version = "0.6.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58" +checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050" dependencies = [ "async-trait", "bytes", "chrono", "futures 0.3.28", "humantime 2.1.0", - "itertools 0.10.5", + "itertools 0.11.0", "parking_lot 0.12.1", "percent-encoding", "snafu 0.7.4", @@ -4497,7 +4533,7 @@ dependencies = [ "partitioned_lock", "prometheus 0.12.0", "prometheus-static-metric", - "prost", + "prost 0.11.8", "rand 0.7.3", "runtime", "serde", @@ -4545,13 +4581,13 @@ dependencies = [ "tokio", "tokio-util", "uuid", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] name = "observability_deps" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ "tracing", ] @@ -4675,18 +4711,18 @@ dependencies = [ [[package]] name = "parquet" -version = "43.0.0" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7267a9607c3f955d4d0ac41b88a67cecc0d8d009173ad3da390699a6cb3750" +checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4" dependencies = [ "ahash 0.8.3", - "arrow-array 43.0.0", - "arrow-buffer 43.0.0", - "arrow-cast 43.0.0", - "arrow-data 43.0.0", - "arrow-ipc 43.0.0", - "arrow-schema 43.0.0", - "arrow-select 43.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-data 49.0.0", + "arrow-ipc 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "base64 0.21.0", "brotli", "bytes", @@ -4694,24 +4730,24 @@ dependencies = [ "flate2", "futures 0.3.28", "hashbrown 0.14.0", - "lz4", + "lz4_flex", "num", "num-bigint", - "object_store 0.6.1", + "object_store 0.8.0", "paste 1.0.12", "seq-macro", "snap", "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.13.0", ] [[package]] name = "parquet_ext" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "arrow_ext", "async-trait", "bytes", @@ -4738,7 +4774,7 @@ name = "partition_table_engine" version = "1.2.6-alpha" dependencies = [ "analytic_engine", - "arrow 43.0.0", + "arrow 49.0.0", "async-trait", "common_types", "datafusion", @@ -4805,7 +4841,7 @@ checksum = "bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24" dependencies = [ "heck", "itertools 0.10.5", - "prost", + "prost 0.11.8", "prost-types", ] @@ -4819,7 +4855,7 @@ dependencies = [ "chrono", "pbjson", "pbjson-build", - "prost", + "prost 0.11.8", "prost-build", "serde", ] @@ -5179,7 +5215,7 @@ dependencies = [ "async-trait", "bytes", "futures 0.3.28", - "prost", + "prost 0.11.8", "prost-build", "snap", "warp", @@ -5256,7 +5292,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48e50df39172a3e7eb17e14642445da64996989bc212b583015435d39a58537" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.8", +] + +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive 0.12.3", ] [[package]] @@ -5273,7 +5319,7 @@ dependencies = [ "multimap", "petgraph", "prettyplease 0.1.25", - "prost", + "prost 0.11.8", "prost-types", "regex", "syn 1.0.109", @@ -5294,13 +5340,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "prost-types" version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "379119666929a1afd7a043aa6cf96fa67a6dce9af60c88095a4686dbce4c9c88" dependencies = [ - "prost", + "prost 0.11.8", ] [[package]] @@ -5363,7 +5422,7 @@ checksum = "9653c3ed92974e34c5a6e0a510864dab979760481714c172e0a34e437cb98804" name = "proxy" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "arrow_ext", "async-trait", "bytes", @@ -5391,7 +5450,7 @@ dependencies = [ "prom-remote-api", "prometheus 0.12.0", "prometheus-static-metric", - "prost", + "prost 0.11.8", "query_engine", "query_frontend", "router", @@ -5409,7 +5468,7 @@ dependencies = [ "tokio-stream", "tonic 0.8.3", "warp", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -5463,7 +5522,7 @@ dependencies = [ name = "query_engine" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "async-trait", "bytes_ext", "catalog", @@ -5478,7 +5537,7 @@ dependencies = [ "iox_query", "logger", "macros", - "prost", + "prost 0.11.8", "query_frontend", "runtime", "serde", @@ -5493,7 +5552,7 @@ dependencies = [ name = "query_frontend" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "async-trait", "catalog", "chrono", @@ -5529,9 +5588,9 @@ dependencies = [ [[package]] name = "query_functions" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "chrono", "datafusion", "itertools 0.10.5", @@ -5802,6 +5861,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "remote_engine_client" version = "1.2.6-alpha" @@ -6227,9 +6292,9 @@ dependencies = [ [[package]] name = "schema" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "hashbrown 0.13.2", "indexmap 1.9.3", "itertools 0.10.5", @@ -6353,7 +6418,7 @@ version = "1.2.6-alpha" dependencies = [ "analytic_engine", "arc-swap 1.6.0", - "arrow 43.0.0", + "arrow 49.0.0", "arrow_ext", "async-trait", "bytes_ext", @@ -6386,7 +6451,7 @@ dependencies = [ "prom-remote-api", "prometheus 0.12.0", "prometheus-static-metric", - "prost", + "prost 0.11.8", "proxy", "query_engine", "query_frontend", @@ -6407,7 +6472,7 @@ dependencies = [ "tonic 0.8.3", "wal", "warp", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -6717,9 +6782,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.35.0" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca597d77c98894be1f965f2e4e2d2a61575d4998088e655476c73715c54b2b43" +checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7" dependencies = [ "log", "serde", @@ -6897,7 +6962,7 @@ dependencies = [ name = "system_catalog" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "async-trait", "bytes_ext", "catalog", @@ -6908,7 +6973,7 @@ dependencies = [ "horaedbproto 2.0.0", "logger", "macros", - "prost", + "prost 0.11.8", "snafu 0.6.10", "table_engine", "tokio", @@ -6927,7 +6992,7 @@ dependencies = [ name = "table_engine" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "arrow_ext", "async-trait", "bytes_ext", @@ -6943,7 +7008,7 @@ dependencies = [ "lazy_static", "logger", "macros", - "prost", + "prost 0.11.8", "rand 0.7.3", "regex", "runtime", @@ -7024,7 +7089,7 @@ dependencies = [ [[package]] name = "test_helpers" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c" +source = "git+https://github.com/CeresDB/influxql.git?rev=b9fb3ca#b9fb3ca59fda99997a51cab7a56d34fb2126dd08" dependencies = [ "dotenvy", "observability_deps", @@ -7038,7 +7103,7 @@ dependencies = [ name = "test_util" version = "1.2.6-alpha" dependencies = [ - "arrow 43.0.0", + "arrow 49.0.0", "chrono", "common_types", "env_logger", @@ -7375,8 +7440,8 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.11.8", + "prost-derive 0.11.8", "rustls-pemfile 1.0.2", "tokio", "tokio-rustls 0.23.4", @@ -7408,7 +7473,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.11.8", "tokio", "tokio-stream", "tower", @@ -7647,7 +7712,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand 0.8.5", "static_assertions", ] @@ -7804,7 +7869,7 @@ dependencies = [ "macros", "message_queue", "prometheus 0.12.0", - "prost", + "prost 0.11.8", "rand 0.8.5", "rocksdb", "runtime", @@ -8433,7 +8498,16 @@ version = "0.12.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" dependencies = [ - "zstd-safe", + "zstd-safe 6.0.4+zstd.1.5.4", +] + +[[package]] +name = "zstd" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +dependencies = [ + "zstd-safe 7.0.0", ] [[package]] @@ -8446,6 +8520,15 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +dependencies = [ + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.7+zstd.1.5.4" diff --git a/Cargo.toml b/Cargo.toml index d195a121fd..b41694b31a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,8 +85,8 @@ members = [ [workspace.dependencies] alloc_tracker = { path = "src/components/alloc_tracker" } -arrow = { version = "43.0.0", features = ["prettyprint"] } -arrow_ipc = { version = "43.0.0" } +arrow = { version = "49.0.0", features = ["prettyprint"] } +arrow_ipc = { version = "49.0.0" } arrow_ext = { path = "src/components/arrow_ext" } analytic_engine = { path = "src/analytic_engine" } arena = { path = "src/components/arena" } @@ -107,8 +107,8 @@ cluster = { path = "src/cluster" } criterion = "0.5" horaedb-client = "1.0.2" common_types = { path = "src/common_types" } -datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" } -datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" } +datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" } +datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" } derive_builder = "0.12" df_operator = { path = "src/df_operator" } df_engine_extensions = { path = "src/df_engine_extensions" } @@ -121,10 +121,10 @@ hash_ext = { path = "src/components/hash_ext" } hex = "0.4.3" hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" } id_allocator = { path = "src/components/id_allocator" } -influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query_influxql" } -influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "influxdb_influxql_parser" } -influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query" } -influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "schema" } +influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "iox_query_influxql" } +influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "influxdb_influxql_parser" } +influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "iox_query" } +influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "schema" } interpreters = { path = "src/interpreters" } itertools = "0.10.5" lz4_flex = { version = "0.11", default-features = false, features = ["frame"] } @@ -142,7 +142,7 @@ panic_ext = { path = "src/components/panic_ext" } partitioned_lock = { path = "src/components/partitioned_lock" } partition_table_engine = { path = "src/partition_table_engine" } parquet_ext = { path = "src/components/parquet_ext" } -parquet = { version = "43.0.0" } +parquet = { version = "49.0.0" } paste = "1.0" pin-project-lite = "0.2.8" pprof = "0.12.1" @@ -172,9 +172,9 @@ size_ext = { path = "src/components/size_ext" } smallvec = "1.6" slog = "2.7" spin = "0.9.6" -sqlparser = { version = "0.35", features = ["serde"] } -system_catalog = { path = "src/system_catalog" } system_statis = { path = "src/components/system_stats" } +sqlparser = { version = "0.39.0", features = ["serde"] } +system_catalog = { path = "src/system_catalog" } table_engine = { path = "src/table_engine" } table_kv = { path = "src/components/table_kv" } tempfile = "3.1.0" diff --git a/integration_tests/cases/common/dml/issue-1087.result b/integration_tests/cases/common/dml/issue-1087.result index d264f4d212..fc1e0d8d5e 100644 --- a/integration_tests/cases/common/dml/issue-1087.result +++ b/integration_tests/cases/common/dml/issue-1087.result @@ -17,6 +17,7 @@ String("logical_plan after inline_table_scan"),String("SAME TEXT AS ABOVE"), String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"), String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"), String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS ABOVE"), String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"), String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"), @@ -33,6 +34,7 @@ String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"), String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"), String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"), String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"), String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"), String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"), String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), @@ -46,6 +48,7 @@ String("logical_plan after eliminate_projection"),String("TableScan: issue_1087 String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"), String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS ABOVE"), String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"), String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"), @@ -62,6 +65,7 @@ String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"), String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"), String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"), String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"), String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"), String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"), String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), @@ -76,17 +80,22 @@ String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"), String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"), String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"), -String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low\n"), +String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), +String("initial_physical_plan_with_stats"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"), +String("physical_plan after OutputRequirements"),String("OutputRequirementExec\n ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), String("physical_plan after aggregate_statistics"),String("SAME TEXT AS ABOVE"), String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"), -String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"), -String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"), +String("physical_plan after LimitedDistinctAggregation"),String("SAME TEXT AS ABOVE"), String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"), String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT AS ABOVE"), String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"), String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"), +String("physical_plan after OutputRequirements"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"), -String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low\n"), +String("physical_plan after LimitAggregation"),String("SAME TEXT AS ABOVE"), +String("physical_plan after ProjectionPushdown"),String("SAME TEXT AS ABOVE"), +String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), +String("physical_plan_with_stats"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"), DROP TABLE `issue_1087`; diff --git a/integration_tests/cases/common/dml/issue-302.result b/integration_tests/cases/common/dml/issue-302.result index b57d881fd2..cd7afc3a36 100644 --- a/integration_tests/cases/common/dml/issue-302.result +++ b/integration_tests/cases/common/dml/issue-302.result @@ -12,7 +12,7 @@ affected_rows: 1 select `t`, count(distinct name) from issue302 group by `t`; -issue302.t,COUNT(DISTINCT issue302.name), +t,COUNT(DISTINCT issue302.name), Timestamp(1651737067000),Int64(0), diff --git a/integration_tests/cases/common/dml/issue-341.result b/integration_tests/cases/common/dml/issue-341.result index 902222590b..4d7da95cab 100644 --- a/integration_tests/cases/common/dml/issue-341.result +++ b/integration_tests/cases/common/dml/issue-341.result @@ -58,7 +58,7 @@ WHERE plan_type,plan, String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"), -String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, priority=Low\n"), +String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), -- FilterExec node should not be in plan. @@ -71,8 +71,8 @@ WHERE tag1 = "t3"; plan_type,plan, -String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value, tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"), -String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8, priority=Low\n"), +String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"), +String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), -- Repeat operations above, but with overwrite table @@ -116,7 +116,7 @@ WHERE plan_type,plan, String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"), -String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8, priority=Low\n"), +String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), -- When using tag as filter, FilterExec node should not be in plan. @@ -129,8 +129,8 @@ WHERE tag1 = "t3"; plan_type,plan, -String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value, tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"), -String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8, priority=Low\n"), +String("logical_plan"),String("TableScan: issue341_t2 projection=[timestamp, value], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"), +String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), DROP TABLE IF EXISTS `issue341_t1`; diff --git a/integration_tests/cases/common/dml/issue-59.result b/integration_tests/cases/common/dml/issue-59.result index 549c7019cd..4f7544c87f 100644 --- a/integration_tests/cases/common/dml/issue-59.result +++ b/integration_tests/cases/common/dml/issue-59.result @@ -24,8 +24,8 @@ FROM issue59 GROUP BY id+1; plan_type,plan, -String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"), -String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, priority=Low\n"), +String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"), +String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), DROP TABLE IF EXISTS issue59; diff --git a/integration_tests/cases/common/explain/explain.result b/integration_tests/cases/common/explain/explain.result index 0cd06380d5..6cf09c078e 100644 --- a/integration_tests/cases/common/explain/explain.result +++ b/integration_tests/cases/common/explain/explain.result @@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`; plan_type,plan, String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"), -String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, priority=Low\n"), +String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), DROP TABLE `04_explain_t`; diff --git a/integration_tests/cases/common/function/aggregate.result b/integration_tests/cases/common/function/aggregate.result index 037e503a9f..f45a6841a8 100644 --- a/integration_tests/cases/common/function/aggregate.result +++ b/integration_tests/cases/common/function/aggregate.result @@ -105,7 +105,50 @@ COUNT(DISTINCT 02_function_aggregate_table1.arch), Int64(2), +CREATE TABLE `02_function_aggregate_table2` ( + `timestamp` timestamp NOT NULL, + `arch` string TAG, + `datacenter` string TAG, + `value` int, + `uvalue` uint64, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode = 'append' +); + +affected_rows: 0 + +INSERT INTO `02_function_aggregate_table2` + (`timestamp`, `arch`, `datacenter`, `value`, `uvalue`) +VALUES + (1658304762, 'x86-64', 'china', 100, 10), + (1658304763, 'x86-64', 'china', 200, 10), + (1658304762, 'arm64', 'china', 110, 0), + (1658304763, 'arm64', 'china', 210, 0); + +affected_rows: 4 + +-- The should select empty column +SELECT count(*) FROM `02_function_aggregate_table1`; + +COUNT(*), +Int64(4), + + +-- Same with before, but query from sst +-- SQLNESS ARG pre_cmd=flush +SELECT count(*) FROM `02_function_aggregate_table1`; + +COUNT(*), +Int64(4), + + DROP TABLE `02_function_aggregate_table1`; affected_rows: 0 +DROP TABLE `02_function_aggregate_table2`; + +affected_rows: 0 + diff --git a/integration_tests/cases/common/function/aggregate.sql b/integration_tests/cases/common/function/aggregate.sql index c4f8dd50ea..8543245ae8 100644 --- a/integration_tests/cases/common/function/aggregate.sql +++ b/integration_tests/cases/common/function/aggregate.sql @@ -57,4 +57,32 @@ SELECT distinct(`arch`) FROM `02_function_aggregate_table1` ORDER BY `arch` DESC SELECT count(distinct(`arch`)) FROM `02_function_aggregate_table1`; +CREATE TABLE `02_function_aggregate_table2` ( + `timestamp` timestamp NOT NULL, + `arch` string TAG, + `datacenter` string TAG, + `value` int, + `uvalue` uint64, + timestamp KEY (timestamp)) ENGINE=Analytic +WITH( + enable_ttl='false', + update_mode = 'append' +); + +INSERT INTO `02_function_aggregate_table2` + (`timestamp`, `arch`, `datacenter`, `value`, `uvalue`) +VALUES + (1658304762, 'x86-64', 'china', 100, 10), + (1658304763, 'x86-64', 'china', 200, 10), + (1658304762, 'arm64', 'china', 110, 0), + (1658304763, 'arm64', 'china', 210, 0); + +-- The should select empty column +SELECT count(*) FROM `02_function_aggregate_table1`; + +-- Same with before, but query from sst +-- SQLNESS ARG pre_cmd=flush +SELECT count(*) FROM `02_function_aggregate_table1`; + DROP TABLE `02_function_aggregate_table1`; +DROP TABLE `02_function_aggregate_table2`; diff --git a/integration_tests/cases/common/optimizer/optimizer.result b/integration_tests/cases/common/optimizer/optimizer.result index f9cfac2de9..5df9f47e68 100644 --- a/integration_tests/cases/common/optimizer/optimizer.result +++ b/integration_tests/cases/common/optimizer/optimizer.result @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY plan_type,plan, String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"), -String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, priority=Low\n"), +String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"), DROP TABLE `07_optimizer_t`; diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index d376718cc7..233c348318 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -80,19 +80,23 @@ UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx +-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x +-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_1, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql b/integration_tests/cases/env/cluster/ddl/partition_table.sql index a36b59ac2d..a87dfbb2cd 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.sql +++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql @@ -37,11 +37,15 @@ SELECT * from partition_table_t where name in ("horaedb5", "horaedb6", "horaedb7 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx +-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x +-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/integration_tests/cases/env/local/ddl/query-plan.result b/integration_tests/cases/env/local/ddl/query-plan.result index a421856b4c..1f63218401 100644 --- a/integration_tests/cases/env/local/ddl/query-plan.result +++ b/integration_tests/cases/env/local/ddl/query-plan.result @@ -27,48 +27,53 @@ affected_rows: 3 -- This query should include memtable -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), -- This query should have higher priority -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t >= 1695348001000 and t < 1695348002000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=High, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), -- This query should not include memtable -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t > 1695348002000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348002001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=0\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348002001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=0\n=0]\n"), -- SQLNESS ARG pre_cmd=flush -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx -- This query should include SST explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=1\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_sst_1, fetched_columns:[tsid,t]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=320\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=1\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_sst_1, fetched_columns:[tsid,t]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=320\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), -- This query should not include SST +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t > 1695348002000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348002001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=0\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348002001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=0\n=0]\n"), -- Table with an 'append' update mode @@ -97,11 +102,12 @@ affected_rows: 3 -- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_append_mode_table` where t >= 1695348001000 and name = 'ceresdb'; plan_type,plan, -String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=1\n num_ssts=0\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_memtable_1, fetched_columns:[t,name]:\n=0]\n"), +String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=1\n num_ssts=0\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_memtable_1, fetched_columns:[t,name]:\n=0]\n"), -- Should just fetch projected columns from SST @@ -111,11 +117,12 @@ String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[ou -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_append_mode_table` where t >= 1695348001000 and name = 'ceresdb'; plan_type,plan, -String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=0\n num_ssts=1\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_sst_1, fetched_columns:[t,name]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=408\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), +String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=0\n num_ssts=1\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_sst_1, fetched_columns:[t,name]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=408\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), CREATE TABLE `TEST_QUERY_PRIORITY` ( @@ -132,20 +139,22 @@ affected_rows: 0 -- This query should have higher priority -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select TS from `TEST_QUERY_PRIORITY` where TS >= 1695348001000 and TS < 1695348002000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=High, metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None), TS < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=false\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None), TS < TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=false\n=0]\n"), -- This query should have higher priority -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select TS from `TEST_QUERY_PRIORITY` where TS >= 1695348001000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=Low, metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=TEST_QUERY_PRIORITY, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[TS >= TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n=0]\n"), DROP TABLE `03_dml_select_real_time_range`; diff --git a/integration_tests/cases/env/local/ddl/query-plan.sql b/integration_tests/cases/env/local/ddl/query-plan.sql index 218e0f7ba1..5217b1a076 100644 --- a/integration_tests/cases/env/local/ddl/query-plan.sql +++ b/integration_tests/cases/env/local/ddl/query-plan.sql @@ -18,27 +18,32 @@ INSERT INTO `03_dml_select_real_time_range` (t, name, value) -- This query should include memtable -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; -- This query should have higher priority -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t >= 1695348001000 and t < 1695348002000; -- This query should not include memtable -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t > 1695348002000; -- SQLNESS ARG pre_cmd=flush -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx -- This query should include SST explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; -- This query should not include SST +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_dml_select_real_time_range` where t > 1695348002000; @@ -64,6 +69,7 @@ INSERT INTO `03_append_mode_table` (t, name, value) -- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_append_mode_table` where t >= 1695348001000 and name = 'ceresdb'; @@ -74,6 +80,7 @@ where t >= 1695348001000 and name = 'ceresdb'; -- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx -- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select t from `03_append_mode_table` where t >= 1695348001000 and name = 'ceresdb'; @@ -89,11 +96,13 @@ CREATE TABLE `TEST_QUERY_PRIORITY` ( -- This query should have higher priority -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select TS from `TEST_QUERY_PRIORITY` where TS >= 1695348001000 and TS < 1695348002000; -- This query should have higher priority -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx explain analyze select TS from `TEST_QUERY_PRIORITY` where TS >= 1695348001000; diff --git a/src/analytic_engine/src/instance/reorder_memtable.rs b/src/analytic_engine/src/instance/reorder_memtable.rs index e6eab4d135..c37417bf64 100644 --- a/src/analytic_engine/src/instance/reorder_memtable.rs +++ b/src/analytic_engine/src/instance/reorder_memtable.rs @@ -147,8 +147,11 @@ impl ExecutionPlan for ScanMemIter { })) } - fn statistics(&self) -> Statistics { - Statistics::default() + fn statistics( + &self, + ) -> std::result::Result + { + Ok(Statistics::new_unknown(&self.schema())) } } @@ -259,8 +262,8 @@ impl Reorder { pub async fn into_stream(self) -> Result { // 1. Init datafusion context let runtime = Arc::new(RuntimeEnv::default()); - let state = SessionState::with_config_rt(SessionConfig::new(), runtime); - let ctx = SessionContext::with_state(state); + let state = SessionState::new_with_config_rt(SessionConfig::new(), runtime); + let ctx = SessionContext::new_with_state(state); let table_provider = Arc::new(MemIterProvider { arrow_schema: self.schema.to_arrow_schema_ref(), iter: Mutex::new(Some(self.iter)), diff --git a/src/analytic_engine/src/memtable/skiplist/iter.rs b/src/analytic_engine/src/memtable/skiplist/iter.rs index 4787b754bd..cce3913dea 100644 --- a/src/analytic_engine/src/memtable/skiplist/iter.rs +++ b/src/analytic_engine/src/memtable/skiplist/iter.rs @@ -154,6 +154,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { assert!(self.batch_size > 0); let record_schema = self.row_projector.fetched_schema().clone(); + let is_empty_projection = record_schema.columns().is_empty(); let primary_key_indexes = self .row_projector .primary_key_indexes() @@ -183,6 +184,10 @@ impl + Clone + Sync + Send> ColumnarIterImpl { } } + if is_empty_projection { + builder.inc_row_num(num_rows); + } + if num_rows > 0 { if let Some(deadline) = self.deadline { let now = Instant::now(); diff --git a/src/analytic_engine/src/row_iter/record_batch_stream.rs b/src/analytic_engine/src/row_iter/record_batch_stream.rs index 2a39c648c0..49c41f2432 100644 --- a/src/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/src/analytic_engine/src/row_iter/record_batch_stream.rs @@ -161,6 +161,7 @@ fn filter_record_batch( let filter_array = predicate .evaluate(record_batch) .map(|v| v.into_array(record_batch.num_rows())) + .context(FilterExec)? .context(FilterExec)?; let selected_rows = filter_array .as_any() diff --git a/src/analytic_engine/src/table/mod.rs b/src/analytic_engine/src/table/mod.rs index af381b5baa..674f6b3bd5 100644 --- a/src/analytic_engine/src/table/mod.rs +++ b/src/analytic_engine/src/table/mod.rs @@ -430,6 +430,7 @@ pub fn support_pushdown(schema: &Schema, need_dedup: bool, col_names: &[String]) } // When table need dedup, only unique keys columns support pushdown + // See https://github.com/apache/incubator-horaedb/issues/605 col_names .iter() .all(|col_name| schema.is_unique_column(col_name.as_str())) diff --git a/src/common_types/src/datum.rs b/src/common_types/src/datum.rs index d152e9600a..9b22439a22 100644 --- a/src/common_types/src/datum.rs +++ b/src/common_types/src/datum.rs @@ -292,9 +292,11 @@ impl TryFrom<&SqlDataType> for DatumKind { SqlDataType::Double => Ok(Self::Double), SqlDataType::Boolean => Ok(Self::Boolean), SqlDataType::BigInt(_) => Ok(Self::Int64), + SqlDataType::Int64 => Ok(Self::Int64), SqlDataType::Int(_) => Ok(Self::Int32), + SqlDataType::Int8(_) => Ok(Self::Int8), SqlDataType::SmallInt(_) => Ok(Self::Int16), - SqlDataType::String => Ok(Self::String), + SqlDataType::String(_) => Ok(Self::String), SqlDataType::Varbinary(_) => Ok(Self::Varbinary), SqlDataType::Date => Ok(Self::Date), SqlDataType::Time(_, _) => Ok(Self::Time), @@ -1453,7 +1455,7 @@ impl Datum { ScalarValue::Date32(v) => v.map(Datum::Date), ScalarValue::Time64Nanosecond(v) => v.map(Datum::Time), ScalarValue::Dictionary(_, literal) => Datum::from_scalar_value(literal), - ScalarValue::List(_, _) + ScalarValue::List(_) | ScalarValue::Date64(_) | ScalarValue::Time32Second(_) | ScalarValue::Time32Millisecond(_) @@ -1467,10 +1469,12 @@ impl Datum { | ScalarValue::Decimal128(_, _, _) | ScalarValue::Null | ScalarValue::IntervalMonthDayNano(_) - | ScalarValue::Fixedsizelist(_, _, _) + | ScalarValue::FixedSizeList(_) | ScalarValue::DurationSecond(_) | ScalarValue::DurationMillisecond(_) | ScalarValue::DurationMicrosecond(_) + | ScalarValue::Decimal256(_, _, _) + | ScalarValue::LargeList(_) | ScalarValue::DurationNanosecond(_) => None, } } @@ -1502,7 +1506,7 @@ impl<'a> DatumView<'a> { v.map(|v| DatumView::Timestamp(Timestamp::new(v))) } ScalarValue::Dictionary(_, literal) => DatumView::from_scalar_value(literal), - ScalarValue::List(_, _) + ScalarValue::List(_) | ScalarValue::Date64(_) | ScalarValue::Time32Second(_) | ScalarValue::Time32Millisecond(_) @@ -1516,10 +1520,12 @@ impl<'a> DatumView<'a> { | ScalarValue::Decimal128(_, _, _) | ScalarValue::Null | ScalarValue::IntervalMonthDayNano(_) - | ScalarValue::Fixedsizelist(_, _, _) + | ScalarValue::FixedSizeList(_) | ScalarValue::DurationSecond(_) | ScalarValue::DurationMillisecond(_) | ScalarValue::DurationMicrosecond(_) + | ScalarValue::Decimal256(_, _, _) + | ScalarValue::LargeList(_) | ScalarValue::DurationNanosecond(_) => None, } } diff --git a/src/common_types/src/projected_schema.rs b/src/common_types/src/projected_schema.rs index 30e9eb01e3..1eff7dc424 100644 --- a/src/common_types/src/projected_schema.rs +++ b/src/common_types/src/projected_schema.rs @@ -105,7 +105,7 @@ pub struct RowProjector { /// For example: /// source columns in sst: 0,1,2,3,4 /// target projection columns: 2,1,3 - /// + /// /// the actual columns in fetched record: 1,2,3 /// relative columns indexes in fetched record: 0,1,2 /// @@ -347,6 +347,10 @@ impl ProjectedSchema { pub fn table_schema(&self) -> &Schema { &self.0.table_schema } + + pub fn target_column_schema(&self, i: usize) -> &ColumnSchema { + self.0.target_record_schema.column(i) + } } impl From for horaedbproto::schema::ProjectedSchema { diff --git a/src/common_types/src/record_batch.rs b/src/common_types/src/record_batch.rs index 2a543ca552..0278aa7095 100644 --- a/src/common_types/src/record_batch.rs +++ b/src/common_types/src/record_batch.rs @@ -24,7 +24,7 @@ use arrow::{ compute, datatypes::{DataType, Field, Schema, SchemaRef as ArrowSchemaRef, TimeUnit}, error::ArrowError, - record_batch::RecordBatch as ArrowRecordBatch, + record_batch::{RecordBatch as ArrowRecordBatch, RecordBatchOptions}, }; use arrow_ext::operation; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; @@ -124,14 +124,18 @@ pub struct RecordBatchData { } impl RecordBatchData { - fn new(arrow_schema: ArrowSchemaRef, column_blocks: Vec) -> Result { + fn new( + arrow_schema: ArrowSchemaRef, + column_blocks: Vec, + options: RecordBatchOptions, + ) -> Result { let arrays = column_blocks .iter() .map(|column| column.to_arrow_array_ref()) - .collect(); - + .collect::>(); let arrow_record_batch = - ArrowRecordBatch::try_new(arrow_schema, arrays).context(CreateArrow)?; + ArrowRecordBatch::try_new_with_options(arrow_schema, arrays, &options) + .context(CreateArrow)?; Ok(RecordBatchData { arrow_record_batch, @@ -140,10 +144,7 @@ impl RecordBatchData { } fn num_rows(&self) -> usize { - self.column_blocks - .first() - .map(|column| column.num_rows()) - .unwrap_or(0) + self.arrow_record_batch.num_rows() } fn take_column_block(&mut self, index: usize) -> ColumnBlock { @@ -227,9 +228,13 @@ impl RecordBatch { } } - pub fn new(schema: RecordSchema, column_blocks: Vec) -> Result { + pub fn new( + schema: RecordSchema, + column_blocks: Vec, + num_rows: usize, + ) -> Result { ensure!(schema.num_columns() == column_blocks.len(), SchemaLen); - + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); // Validate schema and column_blocks. for (column_schema, column_block) in schema.columns().iter().zip(column_blocks.iter()) { ensure!( @@ -243,7 +248,7 @@ impl RecordBatch { } let arrow_schema = schema.to_arrow_schema_ref(); - let data = RecordBatchData::new(arrow_schema, column_blocks)?; + let data = RecordBatchData::new(arrow_schema, column_blocks, options)?; Ok(Self { schema, data }) } @@ -388,6 +393,7 @@ impl FetchedRecordBatch { let mut column_blocks = Vec::with_capacity(fetched_schema.num_columns()); let num_rows = arrow_record_batch.num_rows(); let num_columns = arrow_record_batch.num_columns(); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); for (col_idx_opt, col_schema) in column_indexes.iter().zip(fetched_schema.columns()) { match col_idx_opt { Some(col_idx) => { @@ -419,7 +425,8 @@ impl FetchedRecordBatch { } } - let data = RecordBatchData::new(fetched_schema.to_arrow_schema_ref(), column_blocks)?; + let data = + RecordBatchData::new(fetched_schema.to_arrow_schema_ref(), column_blocks, options)?; Ok(FetchedRecordBatch { schema: fetched_schema, @@ -471,6 +478,8 @@ impl FetchedRecordBatch { // Get the schema after projection. let record_schema = projected_schema.to_record_schema(); let mut column_blocks = Vec::with_capacity(record_schema.num_columns()); + let num_rows = self.data.num_rows(); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); for column_schema in record_schema.columns() { let column_index = @@ -485,8 +494,8 @@ impl FetchedRecordBatch { column_blocks.push(column_block); } - let data = RecordBatchData::new(record_schema.to_arrow_schema_ref(), column_blocks)?; - + let data = + RecordBatchData::new(record_schema.to_arrow_schema_ref(), column_blocks, options)?; Ok(RecordBatch { schema: record_schema, data, @@ -582,6 +591,7 @@ pub struct FetchedRecordBatchBuilder { fetched_schema: RecordSchema, primary_key_indexes: Option>, builders: Vec, + num_rows: usize, } impl FetchedRecordBatchBuilder { @@ -601,6 +611,7 @@ impl FetchedRecordBatchBuilder { fetched_schema, primary_key_indexes, builders, + num_rows: 0, } } @@ -624,6 +635,7 @@ impl FetchedRecordBatchBuilder { fetched_schema: record_schema, primary_key_indexes, builders, + num_rows: 0, } } @@ -671,6 +683,13 @@ impl FetchedRecordBatchBuilder { Ok(()) } + /// When the record batch contains no column, its row num may not be 0, so + /// we need to inc row num explicitly in this case. + /// See: https://github.com/apache/arrow-datafusion/pull/7920 + pub fn inc_row_num(&mut self, n: usize) { + self.num_rows += n; + } + /// Append `len` from `start` (inclusive) to this builder. /// /// REQUIRE: @@ -702,7 +721,7 @@ impl FetchedRecordBatchBuilder { self.builders .first() .map(|builder| builder.len()) - .unwrap_or(0) + .unwrap_or(self.num_rows) } /// Returns true if the builder is empty. @@ -725,11 +744,16 @@ impl FetchedRecordBatchBuilder { .map(|builder| builder.build()) .collect(); let arrow_schema = self.fetched_schema.to_arrow_schema_ref(); + let num_rows = column_blocks + .first() + .map(|block| block.num_rows()) + .unwrap_or(self.num_rows); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); Ok(FetchedRecordBatch { schema: self.fetched_schema.clone(), primary_key_indexes: self.primary_key_indexes.clone(), - data: RecordBatchData::new(arrow_schema, column_blocks)?, + data: RecordBatchData::new(arrow_schema, column_blocks, options)?, }) } } diff --git a/src/components/parquet_ext/src/meta_data.rs b/src/components/parquet_ext/src/meta_data.rs index 00a0bb3a17..ad18a36cb7 100644 --- a/src/components/parquet_ext/src/meta_data.rs +++ b/src/components/parquet_ext/src/meta_data.rs @@ -19,9 +19,10 @@ use std::{ops::Range, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; +use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use generic_error::GenericResult; use parquet::{ - arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder}, + arrow::arrow_reader::ArrowReaderOptions, errors::{ParquetError, Result}, file::{footer, metadata::ParquetMetaData}, }; diff --git a/src/components/parquet_ext/src/prune/min_max.rs b/src/components/parquet_ext/src/prune/min_max.rs index 8ea39299ef..0a717021a1 100644 --- a/src/components/parquet_ext/src/prune/min_max.rs +++ b/src/components/parquet_ext/src/prune/min_max.rs @@ -230,7 +230,7 @@ mod test { } fn prepare_parquet_schema_descr(schema: &ArrowSchema) -> SchemaDescPtr { - let mut fields = schema + let fields = schema .fields() .iter() .map(|field| { @@ -245,7 +245,7 @@ mod test { }) .collect(); let schema = SchemaType::group_type_builder("schema") - .with_fields(&mut fields) + .with_fields(fields) .build() .unwrap(); diff --git a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs index feba491f50..dd430f520d 100644 --- a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -129,8 +129,10 @@ impl ExecutionPlan for UnresolvedPartitionedScan { )) } - fn statistics(&self) -> Statistics { - Statistics::default() + fn statistics( + &self, + ) -> Result { + Ok(Statistics::new_unknown(&self.schema())) } } @@ -367,8 +369,10 @@ impl ExecutionPlan for ResolvedPartitionedScan { Ok(Box::pin(record_stream)) } - fn statistics(&self) -> Statistics { - Statistics::default() + fn statistics( + &self, + ) -> Result { + Ok(Statistics::new_unknown(&self.schema())) } fn metrics(&self) -> Option { @@ -578,8 +582,10 @@ impl ExecutionPlan for UnresolvedSubTableScan { )) } - fn statistics(&self) -> Statistics { - Statistics::default() + fn statistics( + &self, + ) -> Result { + Ok(Statistics::new_unknown(&self.schema())) } } diff --git a/src/df_engine_extensions/src/dist_sql_query/test_util.rs b/src/df_engine_extensions/src/dist_sql_query/test_util.rs index 1f4e788fef..c42f9e3862 100644 --- a/src/df_engine_extensions/src/dist_sql_query/test_util.rs +++ b/src/df_engine_extensions/src/dist_sql_query/test_util.rs @@ -490,8 +490,10 @@ impl ExecutionPlan for MockScan { unimplemented!() } - fn statistics(&self) -> datafusion::physical_plan::Statistics { - unimplemented!() + fn statistics(&self) -> DfResult { + Ok(datafusion::physical_plan::Statistics::new_unknown( + &self.schema(), + )) } } diff --git a/src/df_operator/src/scalar.rs b/src/df_operator/src/scalar.rs index 1535ebdbd4..58e8214c1a 100644 --- a/src/df_operator/src/scalar.rs +++ b/src/df_operator/src/scalar.rs @@ -31,6 +31,7 @@ pub struct ScalarUdf { } impl ScalarUdf { + #[allow(deprecated)] pub fn create(name: &str, func: ScalarFunction) -> Self { let signature = func.signature().to_datafusion_signature(); let return_type = func.return_type().to_datafusion_return_type(); @@ -43,7 +44,7 @@ impl ScalarUdf { #[inline] pub fn name(&self) -> &str { - &self.df_udf.name + self.df_udf.name() } /// Convert into datafusion's udf diff --git a/src/df_operator/src/udaf.rs b/src/df_operator/src/udaf.rs index 448a26c626..44f3913673 100644 --- a/src/df_operator/src/udaf.rs +++ b/src/df_operator/src/udaf.rs @@ -31,6 +31,7 @@ pub struct AggregateUdf { } impl AggregateUdf { + #[allow(deprecated)] pub fn create(name: &str, func: AggregateFunction) -> Self { let signature = func.signature().to_datafusion_signature(); let return_type = func.return_type().to_datafusion_return_type(); @@ -50,7 +51,7 @@ impl AggregateUdf { #[inline] pub fn name(&self) -> &str { - &self.df_udaf.name + self.df_udaf.name() } #[inline] diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs index cac5af0cec..cc455b3fb6 100644 --- a/src/interpreters/src/insert.rs +++ b/src/interpreters/src/insert.rs @@ -374,5 +374,5 @@ fn get_or_extract_column_from_row_groups( Ok(columnar_value) })?; - Ok(column.into_array(num_rows)) + column.into_array(num_rows).context(DatafusionExecutor) } diff --git a/src/interpreters/src/tests.rs b/src/interpreters/src/tests.rs index 6d521738f7..f9c8c75bd9 100644 --- a/src/interpreters/src/tests.rs +++ b/src/interpreters/src/tests.rs @@ -117,7 +117,7 @@ where .enable_partition_table_access(enable_partition_table_access) .build(); let sql= format!("CREATE TABLE IF NOT EXISTS {table_name}(c1 string tag not null,ts timestamp not null, c3 string, timestamp key(ts),primary key(c1, ts)) \ - ENGINE=Analytic WITH (ttl='70d',update_mode='overwrite',arena_block_size='1KB')"); + ENGINE=Analytic WITH (enable_ttl='false',update_mode='overwrite',arena_block_size='1KB')"); let output = self.sql_to_output_with_context(&sql, ctx).await?; assert!( @@ -157,7 +157,7 @@ where .enable_partition_table_access(enable_partition_table_access) .build(); let sql = format!("select * from {table_name}"); - let output = self.sql_to_output_with_context(&sql, ctx).await?; + let output = self.sql_to_output_with_context(&sql, ctx.clone()).await?; let records = output.try_into().unwrap(); let expected = vec![ "+------------+---------------------+--------+--------+------------+--------------+", @@ -169,15 +169,15 @@ where ]; test_util::assert_record_batches_eq(&expected, records); - let sql = "select count(*) from test_table"; - let output = self.sql_to_output(sql).await?; + let sql = format!("select count(*) from {table_name}"); + let output = self.sql_to_output_with_context(&sql, ctx).await?; let records = output.try_into().unwrap(); let expected = vec![ - "+-----------------+", - "| COUNT(UInt8(1)) |", - "+-----------------+", - "| 2 |", - "+-----------------+", + "+----------+", + "| COUNT(*) |", + "+----------+", + "| 2 |", + "+----------+", ]; test_util::assert_record_batches_eq(&expected, records); diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs index 1c999ad0c0..673b6131a5 100644 --- a/src/proxy/src/grpc/prom_query.rs +++ b/src/proxy/src/grpc/prom_query.rs @@ -471,7 +471,7 @@ mod tests { let schema = build_schema(); let record_schema = schema.to_record_schema(); let column_blocks = build_column_block(); - let record_batch = RecordBatch::new(record_schema, column_blocks).unwrap(); + let record_batch = RecordBatch::new(record_schema, column_blocks, 4).unwrap(); let column_name = ColumnNames { timestamp: "timestamp".to_string(), diff --git a/src/proxy/src/influxdb/types.rs b/src/proxy/src/influxdb/types.rs index 117b5cf31c..488f5dedfe 100644 --- a/src/proxy/src/influxdb/types.rs +++ b/src/proxy/src/influxdb/types.rs @@ -744,7 +744,7 @@ mod tests { fn test_influxql_result() { let record_schema = build_test_record_schema(); let column_blocks = build_test_column_blocks(); - let record_batch = RecordBatch::new(record_schema, column_blocks).unwrap(); + let record_batch = RecordBatch::new(record_schema, column_blocks, 7).unwrap(); let mut builder = InfluxqlResultBuilder::new(record_batch.schema(), 0).unwrap(); builder.add_record_batch(record_batch).unwrap(); diff --git a/src/query_engine/src/datafusion_impl/mod.rs b/src/query_engine/src/datafusion_impl/mod.rs index 48e42c211b..482628f836 100644 --- a/src/query_engine/src/datafusion_impl/mod.rs +++ b/src/query_engine/src/datafusion_impl/mod.rs @@ -137,7 +137,7 @@ impl DfContextBuilder { // Using default logcial optimizer, if want to add more custom rule, using // `add_optimizer_rule` to add. - let state = SessionState::with_config_rt(df_session_config, self.runtime_env.clone()); - SessionContext::with_state(state) + let state = SessionState::new_with_config_rt(df_session_config, self.runtime_env.clone()); + SessionContext::new_with_state(state) } } diff --git a/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs b/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs index c963c75fad..d1406a75b9 100644 --- a/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs +++ b/src/query_engine/src/datafusion_impl/physical_optimizer/repartition.rs @@ -21,7 +21,9 @@ use std::sync::Arc; use datafusion::{ config::ConfigOptions, - physical_optimizer::{optimizer::PhysicalOptimizerRule, repartition::Repartition}, + physical_optimizer::{ + enforce_distribution::EnforceDistribution, optimizer::PhysicalOptimizerRule, + }, physical_plan::ExecutionPlan, }; use logger::debug; @@ -34,7 +36,7 @@ pub struct RepartitionAdapter { impl Adapter for RepartitionAdapter { fn may_adapt(original_rule: OptimizeRuleRef) -> OptimizeRuleRef { - if original_rule.name() == Repartition::new().name() { + if original_rule.name() == EnforceDistribution::new().name() { Arc::new(Self { original_rule }) } else { original_rule diff --git a/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs b/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs index a5a6161c9b..3b1a0cd9a7 100644 --- a/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs +++ b/src/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs @@ -37,7 +37,7 @@ use common_types::{ time::{TimeRange, Timestamp}, }; use datafusion::{ - error::{DataFusionError, Result as ArrowResult}, + error::{DataFusionError, Result as DataFusionResult}, execution::context::TaskContext, physical_expr::PhysicalSortExpr, physical_plan::{ @@ -93,15 +93,15 @@ impl PhysicalExpr for ExtractTsidExpr { self } - fn data_type(&self, _input_schema: &ArrowSchema) -> ArrowResult { + fn data_type(&self, _input_schema: &ArrowSchema) -> DataFusionResult { Ok(DataType::UInt64) } - fn nullable(&self, _input_schema: &ArrowSchema) -> ArrowResult { + fn nullable(&self, _input_schema: &ArrowSchema) -> DataFusionResult { Ok(false) } - fn evaluate(&self, batch: &RecordBatch) -> ArrowResult { + fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { let tsid_idx = batch .schema() .index_of(TSID_COLUMN) @@ -116,7 +116,7 @@ impl PhysicalExpr for ExtractTsidExpr { fn with_new_children( self: Arc, _children: Vec>, - ) -> ArrowResult> { + ) -> DataFusionResult> { Ok(self) } @@ -204,7 +204,7 @@ impl ExecutionPlan for PromAlignExec { fn with_new_children( self: Arc, children: Vec>, - ) -> ArrowResult> { + ) -> DataFusionResult> { match children.len() { 1 => Ok(Arc::new(PromAlignExec { input: children[0].clone(), @@ -222,7 +222,7 @@ impl ExecutionPlan for PromAlignExec { &self, partition: usize, context: Arc, - ) -> ArrowResult { + ) -> DataFusionResult { debug!("PromAlignExec: partition:{}", partition); Ok(Box::pin(PromAlignReader { input: self.input.execute(partition, context)?, @@ -236,9 +236,9 @@ impl ExecutionPlan for PromAlignExec { })) } - fn statistics(&self) -> Statistics { + fn statistics(&self) -> DataFusionResult { // TODO(chenxiang) - Statistics::default() + Ok(Statistics::new_unknown(&self.schema())) } } diff --git a/src/query_engine/src/datafusion_impl/task_context.rs b/src/query_engine/src/datafusion_impl/task_context.rs index aee9812871..d1ea667de9 100644 --- a/src/query_engine/src/datafusion_impl/task_context.rs +++ b/src/query_engine/src/datafusion_impl/task_context.rs @@ -40,7 +40,6 @@ use df_engine_extensions::dist_sql_query::{ }; use futures::future::BoxFuture; use generic_error::BoxError; -use prost::Message; use runtime::Priority; use snafu::ResultExt; use table_engine::{ @@ -116,7 +115,7 @@ impl Preprocessor { ctx: &Context, ) -> Result> { // Decode to datafusion physical plan. - let protobuf = protobuf::PhysicalPlanNode::decode(encoded_plan) + let protobuf = protobuf::PhysicalPlanNode::try_decode(encoded_plan) .box_err() .with_context(|| ExecutorWithCause { msg: Some("failed to decode plan".to_string()), diff --git a/src/query_frontend/src/influxql/planner.rs b/src/query_frontend/src/influxql/planner.rs index 3b21228ad3..ed8d9c1460 100644 --- a/src/query_frontend/src/influxql/planner.rs +++ b/src/query_frontend/src/influxql/planner.rs @@ -57,7 +57,7 @@ struct InfluxQLSchemaProvider<'a, P: MetaProvider> { impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> { fn get_table_provider(&self, name: &str) -> datafusion::error::Result> { self.context_provider - .get_table_provider(name.into()) + .get_table_source(name.into()) .map_err(|e| { DataFusionError::Plan(format!( "measurement does not exist, measurement:{name}, source:{e}" diff --git a/src/query_frontend/src/logical_optimizer/mod.rs b/src/query_frontend/src/logical_optimizer/mod.rs index 4d62e87750..8f2bf42a2c 100644 --- a/src/query_frontend/src/logical_optimizer/mod.rs +++ b/src/query_frontend/src/logical_optimizer/mod.rs @@ -30,7 +30,8 @@ use datafusion::{ use type_conversion::TypeConversion; pub fn optimize_plan(plan: &LogicalPlan) -> Result { - let state = SessionState::with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default())); + let state = + SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default())); let state = register_analyzer_rules(state); // Register iox optimizers, used by influxql. let state = influxql_query::logical_optimizer::register_iox_logical_optimizers(state); diff --git a/src/query_frontend/src/logical_optimizer/type_conversion.rs b/src/query_frontend/src/logical_optimizer/type_conversion.rs index 89f0a14ec0..0aeaaba207 100644 --- a/src/query_frontend/src/logical_optimizer/type_conversion.rs +++ b/src/query_frontend/src/logical_optimizer/type_conversion.rs @@ -30,7 +30,7 @@ use datafusion::{ logical_expr::{ expr::{Expr, InList}, logical_plan::{Filter, LogicalPlan, TableScan}, - utils, Between, BinaryExpr, ExprSchemable, Operator, + Between, BinaryExpr, ExprSchemable, Operator, }, optimizer::analyzer::AnalyzerRule, scalar::ScalarValue, @@ -113,17 +113,18 @@ impl AnalyzerRule for TypeConversion { .map(|plan| self.analyze(plan.clone(), config)) .collect::>>()?; - let expr = plan + let exprs = plan .expressions() .into_iter() .map(|e| e.rewrite(&mut rewriter)) .collect::>>()?; - Ok(utils::from_plan(&plan, &expr, &new_inputs)?) + Ok(LogicalPlan::with_new_exprs(&plan, exprs, &new_inputs)?) } LogicalPlan::Subquery(_) | LogicalPlan::Statement { .. } | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) | LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()), } @@ -209,7 +210,7 @@ impl<'a> TypeRewriter<'a> { } } - let array = value.to_array(); + let array = value.to_array()?; ScalarValue::try_from_array( &compute::cast(&array, data_type).map_err(DataFusionError::ArrowError)?, // index: Converts a value in `array` at `index` into a ScalarValue diff --git a/src/query_frontend/src/parser.rs b/src/query_frontend/src/parser.rs index e01c4d03bc..23efa0ade0 100644 --- a/src/query_frontend/src/parser.rs +++ b/src/query_frontend/src/parser.rs @@ -352,7 +352,7 @@ impl<'a> Parser<'a> { is_dictionary = true; } } - if c.data_type != DataType::String && is_dictionary { + if !matches!(c.data_type, DataType::String(_)) && is_dictionary { return parser_err!(format!( "Only string column can be dictionary encoded: {:?}", c.to_string() @@ -1001,7 +1001,7 @@ mod tests { let columns = vec![ make_column_def("c1", DataType::Timestamp(None, TimezoneInfo::None)), make_column_def("c2", DataType::Double), - make_column_def("c3", DataType::String), + make_column_def("c3", DataType::String(None)), ]; let sql = "CREATE TABLE mytbl(c1 timestamp, c2 double, c3 string,) ENGINE = XX"; @@ -1027,7 +1027,7 @@ mod tests { let columns = vec![ make_column_def("c1", DataType::Timestamp(None, TimezoneInfo::None)), make_comment_column_def("c2", DataType::Double, "id".to_string()), - make_comment_column_def("c3", DataType::String, "name".to_string()), + make_comment_column_def("c3", DataType::String(None), "name".to_string()), ]; let sql = "CREATE TABLE mytbl(c1 timestamp, c2 double comment 'id', c3 string comment 'name',) ENGINE = XX"; @@ -1053,7 +1053,7 @@ mod tests { let columns = vec![ make_column_def("c1", DataType::Timestamp(None, TimezoneInfo::None)), make_column_def("c2", DataType::Timestamp(None, TimezoneInfo::None)), - make_column_def("c3", DataType::String), + make_column_def("c3", DataType::String(None)), make_column_def("c4", DataType::Double), ]; @@ -1253,7 +1253,7 @@ mod tests { table_name: make_table_name("t"), columns: vec![ make_column_def("c1", DataType::Double), - make_column_def("c2", DataType::String), + make_column_def("c2", DataType::String(None)), ], }); expect_parse_ok(sql, expected).unwrap(); @@ -1277,7 +1277,7 @@ mod tests { table_name: make_table_name("t"), columns: vec![ make_column_def("c1", DataType::Double), - make_tag_column_def("c2", DataType::String), + make_tag_column_def("c2", DataType::String(None)), ], }); expect_parse_ok(sql, expected).unwrap(); @@ -1287,7 +1287,7 @@ mod tests { let sql = "ALTER TABLE t ADD COLUMN c1 string tag"; let expected = Statement::AlterAddColumn(AlterAddColumn { table_name: make_table_name("t"), - columns: vec![make_tag_column_def("c1", DataType::String)], + columns: vec![make_tag_column_def("c1", DataType::String(None))], }); expect_parse_ok(sql, expected).unwrap(); } diff --git a/src/query_frontend/src/promql/convert.rs b/src/query_frontend/src/promql/convert.rs index 297e71612c..f364a0b101 100644 --- a/src/query_frontend/src/promql/convert.rs +++ b/src/query_frontend/src/promql/convert.rs @@ -24,7 +24,7 @@ use common_types::{ use datafusion::{ logical_expr::{ avg, count, - expr::{Alias, ScalarUDF}, + expr::{Alias, ScalarFunction}, lit, logical_plan::{Extension, LogicalPlan, LogicalPlanBuilder}, max, min, sum, Expr as DataFusionExpr, @@ -316,11 +316,10 @@ impl Expr { // TSID is lost after aggregate, but PromAlignNode need a unique id, so // mock UUID as tsid based on groupby keys DataFusionExpr::Alias(Alias { - expr: Box::new(DataFusionExpr::ScalarUDF(ScalarUDF { - fun: Arc::new(create_unique_id(tag_exprs.len())), - args: tag_exprs.clone(), - })), + expr: Box::new(DataFusionExpr::ScalarFunction( + ScalarFunction::new_udf(Arc::new(create_unique_id(tag_exprs.len())), tag_exprs.clone()))), name: TSID_COLUMN.to_string(), + relation: None, }); let mut projection = tag_exprs.clone(); projection.extend(vec![ @@ -371,6 +370,7 @@ impl Expr { Ok(DataFusionExpr::Alias(Alias { expr: Box::new(expr), name: alias, + relation: None, })) } } @@ -578,7 +578,7 @@ impl Selector { .context(TableNotFound { name: &table })?; let table_provider = meta_provider - .get_table_provider(table_ref.table.name().into()) + .get_table_source(table_ref.table.name().into()) .context(TableProviderNotFound { name: &table })?; let schema = Schema::try_from(table_provider.schema()).context(BuildTableSchema)?; let timestamp_column_name = schema.timestamp_name().to_string(); diff --git a/src/query_frontend/src/promql/remote.rs b/src/query_frontend/src/promql/remote.rs index c687b51d0f..c3c1439ec7 100644 --- a/src/query_frontend/src/promql/remote.rs +++ b/src/query_frontend/src/promql/remote.rs @@ -64,7 +64,7 @@ pub fn remote_query_to_plan( let (metric, field, mut filters) = normalize_matchers(query.matchers)?; let table_provider = meta_provider - .get_table_provider(TableReference::bare(&metric)) + .get_table_source(TableReference::bare(&metric)) .context(TableProviderNotFound { name: &metric })?; let schema = Schema::try_from(table_provider.schema()).context(BuildTableSchema)?; let timestamp_col_name = schema.timestamp_name(); diff --git a/src/query_frontend/src/provider.rs b/src/query_frontend/src/provider.rs index 4380829fef..6464725405 100644 --- a/src/query_frontend/src/provider.rs +++ b/src/query_frontend/src/provider.rs @@ -320,7 +320,7 @@ impl<'a, P: MetaProvider> MetaProvider for ContextProviderAdapter<'a, P> { } impl<'a, P: MetaProvider> ContextProvider for ContextProviderAdapter<'a, P> { - fn get_table_provider( + fn get_table_source( &self, name: TableReference, ) -> std::result::Result, DataFusionError> { diff --git a/src/table_engine/src/memory.rs b/src/table_engine/src/memory.rs index 689677052a..20cfe583e4 100644 --- a/src/table_engine/src/memory.rs +++ b/src/table_engine/src/memory.rs @@ -260,7 +260,7 @@ fn row_group_to_record_batch( column_blocks.push(column_block); } - RecordBatch::new(record_schema.clone(), column_blocks) + RecordBatch::new(record_schema.clone(), column_blocks, rows.num_rows()) .box_err() .context(ErrWithSource { msg: "failed to create RecordBatch", diff --git a/src/table_engine/src/predicate.rs b/src/table_engine/src/predicate.rs index 723724f35e..b316b99e24 100644 --- a/src/table_engine/src/predicate.rs +++ b/src/table_engine/src/predicate.rs @@ -329,6 +329,8 @@ impl<'a> TimeRangeExtractor<'a> { | Operator::BitwiseAnd | Operator::BitwiseOr | Operator::BitwiseXor + | Operator::AtArrow + | Operator::ArrowAt | Operator::BitwiseShiftRight | Operator::BitwiseShiftLeft | Operator::StringConcat => TimeRange::min_to_max(), @@ -427,20 +429,18 @@ impl<'a> TimeRangeExtractor<'a> { | Expr::IsUnknown(_) | Expr::IsNotUnknown(_) | Expr::Negative(_) + | Expr::AggregateUDF(_) | Expr::Case { .. } | Expr::Cast { .. } | Expr::TryCast { .. } | Expr::Sort { .. } | Expr::ScalarFunction { .. } - | Expr::ScalarUDF { .. } | Expr::AggregateFunction { .. } | Expr::WindowFunction { .. } - | Expr::AggregateUDF { .. } | Expr::Wildcard { .. } | Expr::Exists { .. } | Expr::InSubquery { .. } | Expr::ScalarSubquery(_) - | Expr::QualifiedWildcard { .. } | Expr::GroupingSet(_) | Expr::GetIndexedField { .. } | Expr::OuterReferenceColumn { .. } diff --git a/src/table_engine/src/provider.rs b/src/table_engine/src/provider.rs index d5e4c69f18..bcca5ba897 100644 --- a/src/table_engine/src/provider.rs +++ b/src/table_engine/src/provider.rs @@ -19,6 +19,7 @@ use std::{ any::Any, + collections::HashSet, fmt, sync::{Arc, Mutex}, time::{Duration, Instant}, @@ -35,8 +36,10 @@ use datafusion::{ logical_expr::{Expr, TableProviderFilterPushDown, TableSource, TableType}, physical_expr::PhysicalSortExpr, physical_plan::{ + expressions, metrics::{Count, MetricValue, MetricsSet}, - DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, + projection::ProjectionExec, + DisplayAs, DisplayFormatType, ExecutionPlan, Metric, Partitioning, PhysicalExpr, SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, }; @@ -230,9 +233,34 @@ impl TableProviderAdapter { priority, ); + let mut need_reprojection = false; + let all_projections = if let Some(proj) = projection { + let mut original_projections = proj.clone(); + let projections_from_filter = + collect_projection_from_expr(filters, &self.current_table_schema); + for proj in projections_from_filter { + if !original_projections.contains(&proj) { + original_projections.push(proj); + // If the projection from filters have columns not in the original projection, + // we need to add it to projection, and add a ProjectionExec plan to project the + // orignal columns. Eg: + // ```text + // select a from table where b > 1 + // ``` + // The original projection only contains a, but the filter has column b, so we + // need to query both a and b column from table but only + // output a column. More details can be found in: + // https://github.com/apache/arrow-datafusion/pull/9131#pullrequestreview-1865020767 + need_reprojection = true; + } + } + Some(original_projections) + } else { + None + }; let predicate = self.check_and_build_predicate_from_filters(filters); let projected_schema = - ProjectedSchema::new(self.current_table_schema.clone(), projection.cloned()).map_err( + ProjectedSchema::new(self.current_table_schema.clone(), all_projections).map_err( |e| { DataFusionError::Internal(format!( "Invalid projection, plan:{self:?}, projection:{projection:?}, err:{e:?}" @@ -240,6 +268,22 @@ impl TableProviderAdapter { }, )?; + let projection_exprs = if need_reprojection { + let original_projection = projection.unwrap(); + let exprs = (0..original_projection.len()) + .map(|i| { + let column = projected_schema.target_column_schema(i); + ( + Arc::new(expressions::Column::new(&column.name, i)) + as Arc, + column.name.clone(), + ) + }) + .collect::>(); + Some(exprs) + } else { + None + }; let opts = ReadOptions { deadline, read_parallelism, @@ -256,7 +300,13 @@ impl TableProviderAdapter { priority, }; - self.builder.build(request).await + let scan = self.builder.build(request).await?; + if let Some(expr) = projection_exprs { + let plan = ProjectionExec::try_new(expr, scan)?; + Ok(Arc::new(plan)) + } else { + Ok(scan) + } } fn check_and_build_predicate_from_filters(&self, filters: &[Expr]) -> PredicateRef { @@ -410,7 +460,7 @@ impl ExecutionPlan for ScanTable { // However, we have no inputs here, so `UnknownPartitioning` is suitable. // In datafusion, always set it to `UnknownPartitioning` in the scan plan, for // example: https://github.com/apache/arrow-datafusion/blob/cf152af6515f0808d840e1fe9c63b02802595826/datafusion/core/src/datasource/physical_plan/csv.rs#L175 - Partitioning::UnknownPartitioning(self.parallelism) + Partitioning::UnknownPartitioning(self.parallelism.max(1)) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { @@ -467,9 +517,12 @@ impl ExecutionPlan for ScanTable { Some(metric_set) } - fn statistics(&self) -> Statistics { + fn statistics( + &self, + ) -> std::result::Result + { // TODO(yingwen): Implement this - Statistics::default() + Ok(Statistics::new_unknown(&self.schema())) } } @@ -477,10 +530,11 @@ impl DisplayAs for ScanTable { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "ScanTable: table={}, parallelism={}, priority={:?}", + "ScanTable: table={}, parallelism={}, priority={:?}, partition_count={:?}", self.table.name(), self.request.opts.read_parallelism, - self.request.priority + self.request.priority, + self.output_partitioning() ) } } @@ -495,3 +549,16 @@ impl fmt::Debug for ScanTable { .finish() } } + +fn collect_projection_from_expr(exprs: &[Expr], schema: &Schema) -> HashSet { + let mut projections = HashSet::new(); + exprs.iter().for_each(|expr| { + for col_name in visitor::find_columns_by_expr(expr) { + if let Some(idx) = schema.index_of(&col_name) { + projections.insert(idx); + } + } + }); + + projections +} diff --git a/src/table_engine/src/table.rs b/src/table_engine/src/table.rs index 7365ca66a4..3c611b4395 100644 --- a/src/table_engine/src/table.rs +++ b/src/table_engine/src/table.rs @@ -421,6 +421,7 @@ impl fmt::Debug for ReadRequest { .field("projected", &projected) .field("predicate", &predicate) .field("priority", &self.priority) + .field("projected_schema", &self.projected_schema) .finish() } }