diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 7588848cf5be..cfc2d9c8deb2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -811,7 +811,11 @@ public Optional> resultsAsFrames( boolean useNestedForUnknownTypes ) { - RowSignature rowSignature = resultArraySignature(query); + RowSignature rowSignature = query.getResultRowSignature( + query.context().isFinalize(true) + ? RowSignature.Finalization.YES + : RowSignature.Finalization.NO + ); RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java index 52fddccdf7b5..d255a4e4cf49 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQuery.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; @@ -39,6 +40,8 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec.LimitJsonIncludeFilter; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; import java.util.ArrayList; @@ -167,6 +170,19 @@ public boolean isSkipEmptyBuckets() return context().getBoolean(SKIP_EMPTY_BUCKETS, false); } + public RowSignature getResultSignature(final RowSignature.Finalization finalization) + { + final RowSignature.Builder builder = RowSignature.builder(); + builder.addTimeColumn(); + String timestampResultField = getTimestampResultField(); + if (StringUtils.isNotEmpty(timestampResultField)) { + builder.add(timestampResultField, ColumnType.LONG); + } + builder.addAggregators(aggregatorSpecs, finalization); + builder.addPostAggregators(postAggregatorSpecs); + return builder.build(); + } + @Nullable @Override public Set getRequiredColumns() diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 8527d551cf5c..efd99ae9d9bc 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -61,7 +61,6 @@ import org.apache.druid.segment.Cursor; import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedColumnSelectorFactory; -import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTime; @@ -427,14 +426,7 @@ public Function, Result> ma @Override public RowSignature resultArraySignature(TimeseriesQuery query) { - RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); - rowSignatureBuilder.addTimeColumn(); - if (StringUtils.isNotEmpty(query.getTimestampResultField())) { - rowSignatureBuilder.add(query.getTimestampResultField(), ColumnType.LONG); - } - rowSignatureBuilder.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN); - rowSignatureBuilder.addPostAggregators(query.getPostAggregatorSpecs()); - return rowSignatureBuilder.build(); + return query.getResultSignature(RowSignature.Finalization.UNKNOWN); } @Override @@ -474,7 +466,10 @@ public Optional> resultsAsFrames( boolean useNestedForUnknownTypes ) { - final RowSignature rowSignature = resultArraySignature(query); + final RowSignature rowSignature = + query.getResultSignature( + query.context().isFinalize(true) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO + ); final Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence( resultsAsArrays(query, resultSequence), rowSignature diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java index 349e5a02d163..21729022d616 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java @@ -37,6 +37,7 @@ import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; import java.util.ArrayList; @@ -185,6 +186,16 @@ public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector) topNMetricSpec.initTopNAlgorithmSelector(selector); } + public RowSignature getResultSignature(final RowSignature.Finalization finalization) + { + return RowSignature.builder() + .addTimeColumn() + .addDimensions(Collections.singletonList(getDimensionSpec())) + .addAggregators(getAggregatorSpecs(), finalization) + .addPostAggregators(getPostAggregatorSpecs()) + .build(); + } + @Override public TopNQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 02d07e255709..bdad6d4575e4 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -66,7 +66,6 @@ import java.io.Closeable; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -507,12 +506,7 @@ public DimensionAndMetricValueExtractor apply( @Override public RowSignature resultArraySignature(TopNQuery query) { - return RowSignature.builder() - .addTimeColumn() - .addDimensions(Collections.singletonList(query.getDimensionSpec())) - .addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN) - .addPostAggregators(query.getPostAggregatorSpecs()) - .build(); + return query.getResultSignature(RowSignature.Finalization.UNKNOWN); } @Override @@ -558,7 +552,10 @@ public Optional> resultsAsFrames( boolean useNestedForUnknownTypes ) { - final RowSignature rowSignature = resultArraySignature(query); + final RowSignature rowSignature = query.getResultSignature( + query.context().isFinalize(true) ? RowSignature.Finalization.YES : RowSignature.Finalization.NO + ); + final Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence( resultsAsArrays(query, resultSequence), rowSignature diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 6269e2a5c8cc..8f119ea3063a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -51,10 +51,12 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.SingleValueAggregatorFactory; +import org.apache.druid.query.aggregation.firstlast.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.extraction.SubstringDimExtractionFn; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -1388,6 +1390,220 @@ public void testSingleValueEmptyInnerAgg(String testName, Map qu ); } + @MethodSource("constructorFeeder") + @ParameterizedTest(name = "{0}") + public void testGroupBySubqueryWithEarliestAggregator(String testName, Map queryContext) + { + cannotVectorize(); + + // Note: EARLIEST aggregator is used because the intermediate type "serializablePair" is different from the finalized type + final List expectedResults; + if (NullHandling.replaceWithDefault()) { + expectedResults = ImmutableList.of( + new Object[]{"1", "", "a", "1"}, + new Object[]{"10.1", "b", "", "10.1"}, + new Object[]{"10.1", "c", "", "10.1"}, + new Object[]{"2", "d", "", "2"}, + new Object[]{"abc", "", "", "abc"}, + new Object[]{"def", "", "abc", "def"} + ); + } else { + expectedResults = ImmutableList.of( + new Object[]{"", "a", "a", ""}, + new Object[]{"", "b", "a", ""}, + new Object[]{"1", "", "a", "1"}, + new Object[]{"10.1", "b", null, "10.1"}, + new Object[]{"10.1", "c", null, "10.1"}, + new Object[]{"2", "d", "", "2"}, + new Object[]{"abc", null, null, "abc"}, + new Object[]{"def", null, "abc", "def"} + ); + } + + testQuery( + "SELECT a.dim1, a.dim3, a.e_dim2, b.dim1 " + + "FROM (" + + " SELECT dim1, dim3, EARLIEST(dim2) AS e_dim2 " + + " FROM foo GROUP BY 1, 2 LIMIT 100" + + ") a " + + "INNER JOIN foo b ON a.dim1 = b.dim1", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource("foo") + .setInterval(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING), + new DefaultDimensionSpec("dim3", "d1", ColumnType.STRING) + ) + .addAggregator(new StringFirstAggregatorFactory("a0", "dim2", "__time", 1024)) + .setLimitSpec(new DefaultLimitSpec(Collections.emptyList(), 100)) + .build() + ), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("dim1") + .build() + ), + "j0.", + "(\"d0\" == \"j0.dim1\")", + JoinType.INNER, + null, + TestExprMacroTable.INSTANCE, + null + ) + ) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("a0", "d0", "d1", "j0.dim1") + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expectedResults + ); + } + + @MethodSource("constructorFeeder") + @ParameterizedTest(name = "{0}") + public void testTopNSubqueryWithEarliestAggregator(String testName, Map queryContext) + { + final List expectedResults; + if (NullHandling.replaceWithDefault()) { + expectedResults = ImmutableList.of( + new Object[]{"1", "a", "1"}, + new Object[]{"10.1", "", "10.1"}, + new Object[]{"2", "", "2"}, + new Object[]{"abc", "", "abc"}, + new Object[]{"def", "abc", "def"} + ); + } else { + expectedResults = ImmutableList.of( + new Object[]{"", "a", ""}, + new Object[]{"1", "a", "1"}, + new Object[]{"10.1", null, "10.1"}, + new Object[]{"2", "", "2"}, + new Object[]{"abc", null, "abc"}, + new Object[]{"def", "abc", "def"} + ); + } + + testQuery( + "SELECT a.dim1, a.e_dim2, b.dim1 " + + "FROM (" + + " SELECT dim1, EARLIEST(dim2) AS e_dim2 " + + " FROM foo " + + " GROUP BY 1 " + + " LIMIT 100" + + ") a " + + "INNER JOIN foo b ON a.dim1 = b.dim1", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new QueryDataSource( + new TopNQueryBuilder() + .dataSource("foo") + .dimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING)) + .metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC)) + .threshold(100) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .granularity(Granularities.ALL) + .aggregators( + new StringFirstAggregatorFactory("a0", "dim2", "__time", 1024) + ) + .build() + ), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("dim1") + .build() + ), + "j0.", + "(\"d0\" == \"j0.dim1\")", + JoinType.INNER, + null, + TestExprMacroTable.INSTANCE, + null + ) + ) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("a0", "d0", "j0.dim1") + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expectedResults + ); + } + + @MethodSource("constructorFeeder") + @ParameterizedTest(name = "{0}") + public void testTimeseriesSubqueryWithEarliestAggregator(String testName, Map queryContext) + { + testQuery( + "SELECT a.__time, a.e_dim2, b.__time " + + "FROM (" + + " SELECT TIME_FLOOR(\"__time\", 'PT24H') as __time, EARLIEST(dim2) AS e_dim2 " + + " FROM foo " + + " GROUP BY 1 " + + ") a " + + "INNER JOIN foo b ON a.__time = b.__time", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .granularity(new PeriodGranularity( + new Period("PT24H"), + null, + DateTimeZone.UTC + )) + .aggregators(new StringFirstAggregatorFactory("a0", "dim2", "__time", 1024)) + .build() + ), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo") + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("__time") + .build() + ), + "j0.", + "(\"d0\" == \"j0.__time\")", + JoinType.INNER, + null, + TestExprMacroTable.INSTANCE, + null + ) + ) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("a0", "d0", "j0.__time") + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{946684800000L, "a", 946684800000L}, + new Object[]{946771200000L, NullHandling.defaultStringValue(), 946771200000L}, + new Object[]{946857600000L, "", 946857600000L}, + new Object[]{978307200000L, "a", 978307200000L}, + new Object[]{978393600000L, "abc", 978393600000L}, + new Object[]{978480000000L, NullHandling.defaultStringValue(), 978480000000L} + ) + ); + } + public static class SubqueryComponentSupplier extends SqlTestFramework.StandardComponentSupplier {