From f7b3d35f987fd3c6af50a2e56866bf1911462b53 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 26 Dec 2023 07:04:43 -0500 Subject: [PATCH 1/2] Fix group by aliased expression in LogicalPLanBuilder::aggregate (#8629) --- datafusion/core/src/dataframe/mod.rs | 36 ++++++++++++- datafusion/expr/src/logical_plan/builder.rs | 58 ++++++++++++++------- 2 files changed, 73 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 4b8a9c5b7d79..65aa7eb09cb8 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1825,8 +1825,8 @@ mod tests { let df_results = collect(physical_plan, ctx.task_ctx()).await?; #[rustfmt::skip] - assert_batches_sorted_eq!( - [ "+----+", + assert_batches_sorted_eq!([ + "+----+", "| id |", "+----+", "| 1 |", @@ -1837,6 +1837,38 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_aggregate_alias() -> Result<()> { + let df = test_table().await?; + + let df = df + // GROUP BY `c2 + 1` + .aggregate(vec![col("c2") + lit(1)], vec![])? + // SELECT `c2 + 1` as c2 + .select(vec![(col("c2") + lit(1)).alias("c2")])? + // GROUP BY c2 as "c2" (alias in expr is not supported by SQL) + .aggregate(vec![col("c2").alias("c2")], vec![])?; + + let df_results = df.collect().await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!([ + "+----+", + "| c2 |", + "+----+", + "| 2 |", + "| 3 |", + "| 4 |", + "| 5 |", + "| 6 |", + "+----+", + ], + &df_results + ); + + Ok(()) + } + #[tokio::test] async fn test_distinct() -> Result<()> { let t = test_table().await?; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index be2c45b901fa..2264949cf42a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -904,27 +904,11 @@ impl LogicalPlanBuilder { group_expr: impl IntoIterator>, aggr_expr: impl IntoIterator>, ) -> Result { - let mut group_expr = normalize_cols(group_expr, &self.plan)?; + let group_expr = normalize_cols(group_expr, &self.plan)?; let aggr_expr = normalize_cols(aggr_expr, &self.plan)?; - // Rewrite groupby exprs according to functional dependencies - let group_by_expr_names = group_expr - .iter() - .map(|group_by_expr| group_by_expr.display_name()) - .collect::>>()?; - let schema = self.plan.schema(); - if let Some(target_indices) = - get_target_functional_dependencies(schema, &group_by_expr_names) - { - for idx in target_indices { - let field = schema.field(idx); - let expr = - Expr::Column(Column::new(field.qualifier().cloned(), field.name())); - if !group_expr.contains(&expr) { - group_expr.push(expr); - } - } - } + let group_expr = + add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?; Aggregate::try_new(Arc::new(self.plan), group_expr, aggr_expr) .map(LogicalPlan::Aggregate) .map(Self::from) @@ -1189,6 +1173,42 @@ pub fn build_join_schema( schema.with_functional_dependencies(func_dependencies) } +/// Add additional "synthetic" group by expressions based on functional +/// dependencies. +/// +/// For example, if we are grouping on `[c1]`, and we know from +/// functional dependencies that column `c1` determines `c2`, this function +/// adds `c2` to the group by list. +/// +/// This allows MySQL style selects like +/// `SELECT col FROM t WHERE pk = 5` if col is unique +fn add_group_by_exprs_from_dependencies( + mut group_expr: Vec, + schema: &DFSchemaRef, +) -> Result> { + // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY + // c1 + 1` produces an output field named `"c1 + 1"` + let mut group_by_field_names = group_expr + .iter() + .map(|e| e.display_name()) + .collect::>>()?; + + if let Some(target_indices) = + get_target_functional_dependencies(schema, &group_by_field_names) + { + for idx in target_indices { + let field = schema.field(idx); + let expr = + Expr::Column(Column::new(field.qualifier().cloned(), field.name())); + let expr_name = expr.display_name()?; + if !group_by_field_names.contains(&expr_name) { + group_by_field_names.push(expr_name); + group_expr.push(expr); + } + } + } + Ok(group_expr) +} /// Errors if one or more expressions have equal names. pub(crate) fn validate_unique_names<'a>( node_name: &str, From a0641a92c5675a5452625f3c9fa566d9a9afdca9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 8 Jan 2024 09:29:33 -0500 Subject: [PATCH 2/2] chore: cherry-pick Jan 8 fix ff27d9073421d527439e6e338f31fb568227bbb2 --- datafusion/core/tests/path_partition.rs | 15 +- .../src/aggregates/group_values/row.rs | 27 +- .../physical-plan/src/aggregates/mod.rs | 31 +- .../physical-plan/src/aggregates/row_hash.rs | 4 +- .../sqllogictest/test_files/aggregate.slt | 10 +- .../sqllogictest/test_files/dictionary.slt | 282 ++++++++++++++++++ 6 files changed, 325 insertions(+), 44 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/dictionary.slt diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index abe6ab283aff..dd8eb52f67c7 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -168,9 +168,9 @@ async fn parquet_distinct_partition_col() -> Result<()> { assert_eq!(min_limit, resulting_limit); let s = ScalarValue::try_from_array(results[0].column(1), 0)?; - let month = match s { - ScalarValue::Utf8(Some(month)) => month, - s => panic!("Expected month as Utf8 found {s:?}"), + let month = match extract_as_utf(&s) { + Some(month) => month, + s => panic!("Expected month as Dict(_, Utf8) found {s:?}"), }; let sql_on_partition_boundary = format!( @@ -191,6 +191,15 @@ async fn parquet_distinct_partition_col() -> Result<()> { Ok(()) } +fn extract_as_utf(v: &ScalarValue) -> Option { + if let ScalarValue::Dictionary(_, v) = v { + if let ScalarValue::Utf8(v) = v.as_ref() { + return v.clone(); + } + } + None +} + #[tokio::test] async fn csv_filter_with_file_col() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index e7c7a42cf902..10ff9edb8912 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -17,18 +17,22 @@ use crate::aggregates::group_values::GroupValues; use ahash::RandomState; +use arrow::compute::cast; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; -use arrow_array::ArrayRef; -use arrow_schema::SchemaRef; +use arrow_array::{Array, ArrayRef}; +use arrow_schema::{DataType, SchemaRef}; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_physical_expr::EmitTo; use hashbrown::raw::RawTable; /// A [`GroupValues`] making use of [`Rows`] pub struct GroupValuesRows { + /// The output schema + schema: SchemaRef, + /// Converter for the group values row_converter: RowConverter, @@ -75,6 +79,7 @@ impl GroupValuesRows { let map = RawTable::with_capacity(0); Ok(Self { + schema, row_converter, map, map_size: 0, @@ -165,7 +170,7 @@ impl GroupValues for GroupValuesRows { .take() .expect("Can not emit from empty rows"); - let output = match emit_to { + let mut output = match emit_to { EmitTo::All => { let output = self.row_converter.convert_rows(&group_values)?; group_values.clear(); @@ -198,6 +203,20 @@ impl GroupValues for GroupValuesRows { } }; + // TODO: Materialize dictionaries in group keys (#7647) + for (field, array) in self.schema.fields.iter().zip(&mut output) { + let expected = field.data_type(); + if let DataType::Dictionary(_, v) = expected { + let actual = array.data_type(); + if v.as_ref() != actual { + return Err(DataFusionError::Internal(format!( + "Converted group rows expected dictionary of {v} got {actual}" + ))); + } + *array = cast(array.as_ref(), expected)?; + } + } + self.group_values = Some(group_values); Ok(output) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index c74c4ac0f821..1affea9e1433 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -36,7 +36,6 @@ use crate::{ use arrow::array::ArrayRef; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow_schema::DataType; use datafusion_common::stats::Precision; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -283,9 +282,6 @@ pub struct AggregateExec { limit: Option, /// Input plan, could be a partial aggregate or the input to the aggregate pub input: Arc, - /// Original aggregation schema, could be different from `schema` before dictionary group - /// keys get materialized - original_schema: SchemaRef, /// Schema after the aggregate is applied schema: SchemaRef, /// Input schema before any aggregation is applied. For partial aggregate this will be the @@ -469,7 +465,7 @@ impl AggregateExec { input: Arc, input_schema: SchemaRef, ) -> Result { - let original_schema = create_schema( + let schema = create_schema( &input.schema(), &group_by.expr, &aggr_expr, @@ -477,11 +473,7 @@ impl AggregateExec { mode, )?; - let schema = Arc::new(materialize_dict_group_keys( - &original_schema, - group_by.expr.len(), - )); - let original_schema = Arc::new(original_schema); + let schema = Arc::new(schema); // Reset ordering requirement to `None` if aggregator is not order-sensitive let mut order_by_expr = aggr_expr .iter() @@ -555,7 +547,6 @@ impl AggregateExec { aggr_expr, filter_expr, input, - original_schema, schema, input_schema, projection_mapping, @@ -971,24 +962,6 @@ fn create_schema( Ok(Schema::new(fields)) } -/// returns schema with dictionary group keys materialized as their value types -/// The actual convertion happens in `RowConverter` and we don't do unnecessary -/// conversion back into dictionaries -fn materialize_dict_group_keys(schema: &Schema, group_count: usize) -> Schema { - let fields = schema - .fields - .iter() - .enumerate() - .map(|(i, field)| match field.data_type() { - DataType::Dictionary(_, value_data_type) if i < group_count => { - Field::new(field.name(), *value_data_type.clone(), field.is_nullable()) - } - _ => Field::clone(field), - }) - .collect::>(); - Schema::new(fields) -} - fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { let group_fields = schema.fields()[0..group_count].to_vec(); Arc::new(Schema::new(group_fields)) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 89614fd3020c..6a0c02f5caf3 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -324,9 +324,7 @@ impl GroupedHashAggregateStream { .map(create_group_accumulator) .collect::>()?; - // we need to use original schema so RowConverter in group_values below - // will do the proper coversion of dictionaries into value types - let group_schema = group_schema(&agg.original_schema, agg_group_by.expr.len()); + let group_schema = group_schema(&agg_schema, agg_group_by.expr.len()); let spill_expr = group_schema .fields .into_iter() diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 78575c9dffc5..aa512f6e2600 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2469,11 +2469,11 @@ select max(x_dict) from value_dict group by x_dict % 2 order by max(x_dict); query T select arrow_typeof(x_dict) from value_dict group by x_dict; ---- -Int32 -Int32 -Int32 -Int32 -Int32 +Dictionary(Int64, Int32) +Dictionary(Int64, Int32) +Dictionary(Int64, Int32) +Dictionary(Int64, Int32) +Dictionary(Int64, Int32) statement ok drop table value diff --git a/datafusion/sqllogictest/test_files/dictionary.slt b/datafusion/sqllogictest/test_files/dictionary.slt new file mode 100644 index 000000000000..002aade2528e --- /dev/null +++ b/datafusion/sqllogictest/test_files/dictionary.slt @@ -0,0 +1,282 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for querying on dictionary encoded data + +# Note: These tables model data as is common for timeseries, such as in InfluxDB IOx +# There are three types of columns: +# 1. tag columns, which are string dictionaries, often with low cardinality +# 2. field columns, which are typed, +# 3. a `time` columns, which is a nanosecond timestamp + +# It is common to group and filter on the "tag" columns (and thus on dictionary +# encoded values) + +# Table m1 with a tag column `tag_id` 4 fields `f1` - `f4`, and `time` + +statement ok +CREATE VIEW m1 AS +SELECT + arrow_cast(column1, 'Dictionary(Int32, Utf8)') as tag_id, + arrow_cast(column2, 'Float64') as f1, + arrow_cast(column3, 'Utf8') as f2, + arrow_cast(column4, 'Utf8') as f3, + arrow_cast(column5, 'Float64') as f4, + arrow_cast(column6, 'Timestamp(Nanosecond, None)') as time +FROM ( + VALUES + -- equivalent to the following line protocol data + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=1.0 1703030400000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=2.0 1703031000000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=3.0 1703031600000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=4.0 1703032200000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=5.0 1703032800000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=6.0 1703033400000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=7.0 1703034000000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=8.0 1703034600000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=9.0 1703035200000000000 + -- m1,tag_id=1000 f1=32,f2="foo",f3="True",f4=10.0 1703035800000000000 + ('1000', 32, 'foo', 'True', 1.0, 1703030400000000000), + ('1000', 32, 'foo', 'True', 2.0, 1703031000000000000), + ('1000', 32, 'foo', 'True', 3.0, 1703031600000000000), + ('1000', 32, 'foo', 'True', 4.0, 1703032200000000000), + ('1000', 32, 'foo', 'True', 5.0, 1703032800000000000), + ('1000', 32, 'foo', 'True', 6.0, 1703033400000000000), + ('1000', 32, 'foo', 'True', 7.0, 1703034000000000000), + ('1000', 32, 'foo', 'True', 8.0, 1703034600000000000), + ('1000', 32, 'foo', 'True', 9.0, 1703035200000000000), + ('1000', 32, 'foo', 'True', 10.0, 1703035800000000000) +); + +query ?RTTRP +SELECT * FROM m1; +---- +1000 32 foo True 1 2023-12-20T00:00:00 +1000 32 foo True 2 2023-12-20T00:10:00 +1000 32 foo True 3 2023-12-20T00:20:00 +1000 32 foo True 4 2023-12-20T00:30:00 +1000 32 foo True 5 2023-12-20T00:40:00 +1000 32 foo True 6 2023-12-20T00:50:00 +1000 32 foo True 7 2023-12-20T01:00:00 +1000 32 foo True 8 2023-12-20T01:10:00 +1000 32 foo True 9 2023-12-20T01:20:00 +1000 32 foo True 10 2023-12-20T01:30:00 + +# Note that te type of the tag column is `Dictionary(Int32, Utf8)` +query TTT +DESCRIBE m1; +---- +tag_id Dictionary(Int32, Utf8) YES +f1 Float64 YES +f2 Utf8 YES +f3 Utf8 YES +f4 Float64 YES +time Timestamp(Nanosecond, None) YES + + +# Table m2 with a tag columns `tag_id` and `type`, a field column `f5`, and `time` +statement ok +CREATE VIEW m2 AS +SELECT + arrow_cast(column1, 'Dictionary(Int32, Utf8)') as type, + arrow_cast(column2, 'Dictionary(Int32, Utf8)') as tag_id, + arrow_cast(column3, 'Float64') as f5, + arrow_cast(column4, 'Timestamp(Nanosecond, None)') as time +FROM ( + VALUES + -- equivalent to the following line protocol data + -- m2,type=active,tag_id=1000 f5=100 1701648000000000000 + -- m2,type=active,tag_id=1000 f5=200 1701648600000000000 + -- m2,type=active,tag_id=1000 f5=300 1701649200000000000 + -- m2,type=active,tag_id=1000 f5=400 1701649800000000000 + -- m2,type=active,tag_id=1000 f5=500 1701650400000000000 + -- m2,type=active,tag_id=1000 f5=600 1701651000000000000 + -- m2,type=passive,tag_id=2000 f5=700 1701651600000000000 + -- m2,type=passive,tag_id=1000 f5=800 1701652200000000000 + -- m2,type=passive,tag_id=1000 f5=900 1701652800000000000 + -- m2,type=passive,tag_id=1000 f5=1000 1701653400000000000 + ('active', '1000', 100, 1701648000000000000), + ('active', '1000', 200, 1701648600000000000), + ('active', '1000', 300, 1701649200000000000), + ('active', '1000', 400, 1701649800000000000), + ('active', '1000', 500, 1701650400000000000), + ('active', '1000', 600, 1701651000000000000), + ('passive', '1000', 700, 1701651600000000000), + ('passive', '1000', 800, 1701652200000000000), + ('passive', '1000', 900, 1701652800000000000), + ('passive', '1000', 1000, 1701653400000000000) +); + +query ??RP +SELECT * FROM m2; +---- +active 1000 100 2023-12-04T00:00:00 +active 1000 200 2023-12-04T00:10:00 +active 1000 300 2023-12-04T00:20:00 +active 1000 400 2023-12-04T00:30:00 +active 1000 500 2023-12-04T00:40:00 +active 1000 600 2023-12-04T00:50:00 +passive 1000 700 2023-12-04T01:00:00 +passive 1000 800 2023-12-04T01:10:00 +passive 1000 900 2023-12-04T01:20:00 +passive 1000 1000 2023-12-04T01:30:00 + +query TTT +DESCRIBE m2; +---- +type Dictionary(Int32, Utf8) YES +tag_id Dictionary(Int32, Utf8) YES +f5 Float64 YES +time Timestamp(Nanosecond, None) YES + +query I +select count(*) from m1 where tag_id = '1000' and time < '2024-01-03T14:46:35+01:00'; +---- +10 + +query RRR rowsort +select min(f5), max(f5), avg(f5) from m2 where tag_id = '1000' and time < '2024-01-03T14:46:35+01:00' group by type; +---- +100 600 350 +700 1000 850 + +query IRRRP +select count(*), min(f5), max(f5), avg(f5), date_bin('30 minutes', time) as "time" +from m2 where tag_id = '1000' and time < '2024-01-03T14:46:35+01:00' +group by date_bin('30 minutes', time) +order by date_bin('30 minutes', time) DESC +---- +1 1000 1000 1000 2023-12-04T01:30:00 +3 700 900 800 2023-12-04T01:00:00 +3 400 600 500 2023-12-04T00:30:00 +3 100 300 200 2023-12-04T00:00:00 + + + +# Reproducer for https://github.com/apache/arrow-datafusion/issues/8738 +# This query should work correctly +query P?TT rowsort +SELECT + "data"."timestamp" as "time", + "data"."tag_id", + "data"."field", + "data"."value" +FROM ( + ( + SELECT "m2"."time" as "timestamp", "m2"."tag_id", 'active_power' as "field", "m2"."f5" as "value" + FROM "m2" + WHERE "m2"."time" >= '2023-12-05T14:46:35+01:00' AND "m2"."time" < '2024-01-03T14:46:35+01:00' + AND "m2"."f5" IS NOT NULL + AND "m2"."type" IN ('active') + AND "m2"."tag_id" IN ('1000') + ) UNION ( + SELECT "m1"."time" as "timestamp", "m1"."tag_id", 'f1' as "field", "m1"."f1" as "value" + FROM "m1" + WHERE "m1"."time" >= '2023-12-05T14:46:35+01:00' AND "m1"."time" < '2024-01-03T14:46:35+01:00' + AND "m1"."f1" IS NOT NULL + AND "m1"."tag_id" IN ('1000') + ) UNION ( + SELECT "m1"."time" as "timestamp", "m1"."tag_id", 'f2' as "field", "m1"."f2" as "value" + FROM "m1" + WHERE "m1"."time" >= '2023-12-05T14:46:35+01:00' AND "m1"."time" < '2024-01-03T14:46:35+01:00' + AND "m1"."f2" IS NOT NULL + AND "m1"."tag_id" IN ('1000') + ) +) as "data" +ORDER BY + "time", + "data"."tag_id" +; +---- +2023-12-20T00:00:00 1000 f1 32.0 +2023-12-20T00:00:00 1000 f2 foo +2023-12-20T00:10:00 1000 f1 32.0 +2023-12-20T00:10:00 1000 f2 foo +2023-12-20T00:20:00 1000 f1 32.0 +2023-12-20T00:20:00 1000 f2 foo +2023-12-20T00:30:00 1000 f1 32.0 +2023-12-20T00:30:00 1000 f2 foo +2023-12-20T00:40:00 1000 f1 32.0 +2023-12-20T00:40:00 1000 f2 foo +2023-12-20T00:50:00 1000 f1 32.0 +2023-12-20T00:50:00 1000 f2 foo +2023-12-20T01:00:00 1000 f1 32.0 +2023-12-20T01:00:00 1000 f2 foo +2023-12-20T01:10:00 1000 f1 32.0 +2023-12-20T01:10:00 1000 f2 foo +2023-12-20T01:20:00 1000 f1 32.0 +2023-12-20T01:20:00 1000 f2 foo +2023-12-20T01:30:00 1000 f1 32.0 +2023-12-20T01:30:00 1000 f2 foo + + +# deterministic sort (so we can avoid rowsort) +query P?TT +SELECT + "data"."timestamp" as "time", + "data"."tag_id", + "data"."field", + "data"."value" +FROM ( + ( + SELECT "m2"."time" as "timestamp", "m2"."tag_id", 'active_power' as "field", "m2"."f5" as "value" + FROM "m2" + WHERE "m2"."time" >= '2023-12-05T14:46:35+01:00' AND "m2"."time" < '2024-01-03T14:46:35+01:00' + AND "m2"."f5" IS NOT NULL + AND "m2"."type" IN ('active') + AND "m2"."tag_id" IN ('1000') + ) UNION ( + SELECT "m1"."time" as "timestamp", "m1"."tag_id", 'f1' as "field", "m1"."f1" as "value" + FROM "m1" + WHERE "m1"."time" >= '2023-12-05T14:46:35+01:00' AND "m1"."time" < '2024-01-03T14:46:35+01:00' + AND "m1"."f1" IS NOT NULL + AND "m1"."tag_id" IN ('1000') + ) UNION ( + SELECT "m1"."time" as "timestamp", "m1"."tag_id", 'f2' as "field", "m1"."f2" as "value" + FROM "m1" + WHERE "m1"."time" >= '2023-12-05T14:46:35+01:00' AND "m1"."time" < '2024-01-03T14:46:35+01:00' + AND "m1"."f2" IS NOT NULL + AND "m1"."tag_id" IN ('1000') + ) +) as "data" +ORDER BY + "time", + "data"."tag_id", + "data"."field", + "data"."value" +; +---- +2023-12-20T00:00:00 1000 f1 32.0 +2023-12-20T00:00:00 1000 f2 foo +2023-12-20T00:10:00 1000 f1 32.0 +2023-12-20T00:10:00 1000 f2 foo +2023-12-20T00:20:00 1000 f1 32.0 +2023-12-20T00:20:00 1000 f2 foo +2023-12-20T00:30:00 1000 f1 32.0 +2023-12-20T00:30:00 1000 f2 foo +2023-12-20T00:40:00 1000 f1 32.0 +2023-12-20T00:40:00 1000 f2 foo +2023-12-20T00:50:00 1000 f1 32.0 +2023-12-20T00:50:00 1000 f2 foo +2023-12-20T01:00:00 1000 f1 32.0 +2023-12-20T01:00:00 1000 f2 foo +2023-12-20T01:10:00 1000 f1 32.0 +2023-12-20T01:10:00 1000 f2 foo +2023-12-20T01:20:00 1000 f1 32.0 +2023-12-20T01:20:00 1000 f2 foo +2023-12-20T01:30:00 1000 f1 32.0 +2023-12-20T01:30:00 1000 f2 foo