Skip to content

Commit

Permalink
deps: bump datafusion (#1445)
Browse files Browse the repository at this point in the history
## Rationale
Close #1461 

## Detailed Changes
Bump datafusion to
https://github.com/CeresDB/arrow-datafusion/commits/e21b03154, which is
version 33.

Some important breaking changes:
- apache/datafusion#7920
- apache/datafusion#9109

## Test Plan
CI

---------

Co-authored-by: jiacai2050 <[email protected]>
  • Loading branch information
tanruixiang and jiacai2050 authored Feb 23, 2024
1 parent 8156b32 commit 62ffffc
Show file tree
Hide file tree
Showing 46 changed files with 685 additions and 370 deletions.
549 changes: 316 additions & 233 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ members = [

[workspace.dependencies]
alloc_tracker = { path = "src/components/alloc_tracker" }
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "43.0.0" }
arrow = { version = "49.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "49.0.0" }
arrow_ext = { path = "src/components/arrow_ext" }
analytic_engine = { path = "src/analytic_engine" }
arena = { path = "src/components/arena" }
Expand All @@ -107,8 +107,8 @@ cluster = { path = "src/cluster" }
criterion = "0.5"
horaedb-client = "1.0.2"
common_types = { path = "src/common_types" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "9c3a537e25e5ab3299922864034f67fb2f79805d" }
datafusion = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
datafusion-proto = { git = "https://github.com/CeresDB/arrow-datafusion.git", rev = "e21b03154" }
derive_builder = "0.12"
df_operator = { path = "src/df_operator" }
df_engine_extensions = { path = "src/df_engine_extensions" }
Expand All @@ -121,10 +121,10 @@ hash_ext = { path = "src/components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" }
id_allocator = { path = "src/components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "b9fb3ca", package = "schema" }
interpreters = { path = "src/interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
Expand All @@ -142,7 +142,7 @@ panic_ext = { path = "src/components/panic_ext" }
partitioned_lock = { path = "src/components/partitioned_lock" }
partition_table_engine = { path = "src/partition_table_engine" }
parquet_ext = { path = "src/components/parquet_ext" }
parquet = { version = "43.0.0" }
parquet = { version = "49.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
pprof = "0.12.1"
Expand Down Expand Up @@ -172,9 +172,9 @@ size_ext = { path = "src/components/size_ext" }
smallvec = "1.6"
slog = "2.7"
spin = "0.9.6"
sqlparser = { version = "0.35", features = ["serde"] }
system_catalog = { path = "src/system_catalog" }
system_statis = { path = "src/components/system_stats" }
sqlparser = { version = "0.39.0", features = ["serde"] }
system_catalog = { path = "src/system_catalog" }
table_engine = { path = "src/table_engine" }
table_kv = { path = "src/components/table_kv" }
tempfile = "3.1.0"
Expand Down
17 changes: 13 additions & 4 deletions integration_tests/cases/common/dml/issue-1087.result
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ String("logical_plan after inline_table_scan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"),
String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"),
Expand All @@ -33,6 +34,7 @@ String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
Expand All @@ -46,6 +48,7 @@ String("logical_plan after eliminate_projection"),String("TableScan: issue_1087
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_nested_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"),
Expand All @@ -62,6 +65,7 @@ String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_one_union"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
Expand All @@ -76,17 +80,22 @@ String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"),
String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low\n"),
String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("initial_physical_plan_with_stats"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),
String("physical_plan after OutputRequirements"),String("OutputRequirementExec\n ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("physical_plan after aggregate_statistics"),String("SAME TEXT AS ABOVE"),
String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"),
String("physical_plan after LimitedDistinctAggregation"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"),
String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
String("physical_plan after OutputRequirements"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low\n"),
String("physical_plan after LimitAggregation"),String("SAME TEXT AS ABOVE"),
String("physical_plan after ProjectionPushdown"),String("SAME TEXT AS ABOVE"),
String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),
String("physical_plan_with_stats"),String("ScanTable: table=issue_1087, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:)]]\n"),


DROP TABLE `issue_1087`;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dml/issue-302.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ affected_rows: 1

select `t`, count(distinct name) from issue302 group by `t`;

issue302.t,COUNT(DISTINCT issue302.name),
t,COUNT(DISTINCT issue302.name),
Timestamp(1651737067000),Int64(0),


Expand Down
12 changes: 6 additions & 6 deletions integration_tests/cases/common/dml/issue-341.result
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ WHERE

plan_type,plan,
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"),
String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, priority=Low\n"),
String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


-- FilterExec node should not be in plan.
Expand All @@ -71,8 +71,8 @@ WHERE
tag1 = "t3";

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value, tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8, priority=Low\n"),
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


-- Repeat operations above, but with overwrite table
Expand Down Expand Up @@ -116,7 +116,7 @@ WHERE

plan_type,plan,
String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"),
String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8, priority=Low\n"),
String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


-- When using tag as filter, FilterExec node should not be in plan.
Expand All @@ -129,8 +129,8 @@ WHERE
tag1 = "t3";

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value, tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8, priority=Low\n"),
String("logical_plan"),String("TableScan: issue341_t2 projection=[timestamp, value], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE IF EXISTS `issue341_t1`;
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ FROM issue59
GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, priority=Low\n"),
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE IF EXISTS issue59;
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/cases/common/explain/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`;

plan_type,plan,
String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, priority=Low\n"),
String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE `04_explain_t`;
Expand Down
43 changes: 43 additions & 0 deletions integration_tests/cases/common/function/aggregate.result
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,50 @@ COUNT(DISTINCT 02_function_aggregate_table1.arch),
Int64(2),


CREATE TABLE `02_function_aggregate_table2` (
`timestamp` timestamp NOT NULL,
`arch` string TAG,
`datacenter` string TAG,
`value` int,
`uvalue` uint64,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false',
update_mode = 'append'
);

affected_rows: 0

INSERT INTO `02_function_aggregate_table2`
(`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
VALUES
(1658304762, 'x86-64', 'china', 100, 10),
(1658304763, 'x86-64', 'china', 200, 10),
(1658304762, 'arm64', 'china', 110, 0),
(1658304763, 'arm64', 'china', 210, 0);

affected_rows: 4

-- The should select empty column
SELECT count(*) FROM `02_function_aggregate_table1`;

COUNT(*),
Int64(4),


-- Same with before, but query from sst
-- SQLNESS ARG pre_cmd=flush
SELECT count(*) FROM `02_function_aggregate_table1`;

COUNT(*),
Int64(4),


DROP TABLE `02_function_aggregate_table1`;

affected_rows: 0

DROP TABLE `02_function_aggregate_table2`;

affected_rows: 0

28 changes: 28 additions & 0 deletions integration_tests/cases/common/function/aggregate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,32 @@ SELECT distinct(`arch`) FROM `02_function_aggregate_table1` ORDER BY `arch` DESC

SELECT count(distinct(`arch`)) FROM `02_function_aggregate_table1`;

CREATE TABLE `02_function_aggregate_table2` (
`timestamp` timestamp NOT NULL,
`arch` string TAG,
`datacenter` string TAG,
`value` int,
`uvalue` uint64,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false',
update_mode = 'append'
);

INSERT INTO `02_function_aggregate_table2`
(`timestamp`, `arch`, `datacenter`, `value`, `uvalue`)
VALUES
(1658304762, 'x86-64', 'china', 100, 10),
(1658304763, 'x86-64', 'china', 200, 10),
(1658304762, 'arm64', 'china', 110, 0),
(1658304763, 'arm64', 'china', 210, 0);

-- The should select empty column
SELECT count(*) FROM `02_function_aggregate_table1`;

-- Same with before, but query from sst
-- SQLNESS ARG pre_cmd=flush
SELECT count(*) FROM `02_function_aggregate_table1`;

DROP TABLE `02_function_aggregate_table1`;
DROP TABLE `02_function_aggregate_table2`;
2 changes: 1 addition & 1 deletion integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, priority=Low\n"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8)\n"),


DROP TABLE `07_optimizer_t`;
Expand Down
Loading

0 comments on commit 62ffffc

Please sign in to comment.