Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the fallback strategy when the broker is unable to materialize the subquery's results as frames for estimating the bytes #16679

Merged
merged 8 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import com.google.inject.Injector;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
Expand All @@ -43,6 +47,7 @@
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregatorTest.DoublesSketchComponentSupplier;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand All @@ -53,6 +58,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
Expand Down Expand Up @@ -263,6 +269,143 @@ public void testQuantileOnComplexColumn()
);
}

@Test
public void testSubqueryWithNestedGroupBy()
{
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{946684800000L, "", 1L, "val1"},
new Object[]{946684800000L, "1", 1L, "val1"},
new Object[]{946684800000L, "10.1", 1L, "val1"},
new Object[]{946684800000L, "2", 1L, "val1"},
new Object[]{946684800000L, "abc", 1L, "val1"},
new Object[]{946684800000L, "def", 1L, "val1"}
);

testQuery(
"SELECT\n"
+ " MILLIS_TO_TIMESTAMP(946684800000) AS __time,\n"
+ " alias.\"user\",\n"
+ " alias.days,\n"
+ " (CASE WHEN alias.days < quantiles.first_quartile THEN 'val2' \n"
+ " WHEN alias.days >= quantiles.first_quartile AND alias.days < quantiles.third_quartile THEN 'val3' \n"
+ " WHEN alias.days >= quantiles.third_quartile THEN 'val1' END) AS val4\n"
+ "FROM (\n"
+ " SELECT\n"
+ " APPROX_QUANTILE_DS(alias.days, 0.25) AS first_quartile,\n"
+ " APPROX_QUANTILE_DS(alias.days, 0.75) AS third_quartile\n"
+ " FROM (\n"
+ " SELECT\n"
+ " dim1 \"user\",\n"
+ " COUNT(DISTINCT __time) AS days\n"
+ " FROM \"foo\"\n"
+ " GROUP BY 1\n"
+ " ) AS alias\n"
+ ") AS quantiles, (\n"
+ " SELECT\n"
+ " dim1 \"user\",\n"
+ " COUNT(DISTINCT __time) AS days\n"
+ " FROM \"foo\"\n"
+ " GROUP BY 1\n"
+ ") AS alias\n",
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000")
// Disallows the fallback to row based limiting
.put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "10")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the test case. Ideally we would want a similar test in processing module. Is that possible ? Maybe use a test aggregator ?

.build(),
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource("foo")
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.addDimension(new DefaultDimensionSpec(
"dim1",
"d0",
ColumnType.STRING
))
.addAggregator(new CardinalityAggregatorFactory(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: doesn't it feel kind of strange to be mixing non-datasketches approx distinct count with datasketches extensions tests?

"a0:a",
null,
Collections.singletonList(new DefaultDimensionSpec(
"__time",
"__time",
ColumnType.LONG
)),
false,
true
))
.setPostAggregatorSpecs(new HyperUniqueFinalizingPostAggregator(
"a0",
"a0:a"
))
.build()
)
)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.addAggregator(new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128))
.setPostAggregatorSpecs(
new DoublesSketchToQuantilePostAggregator(
"_a0",
new FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
0.25
),
new DoublesSketchToQuantilePostAggregator(
"_a1",
new FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
0.75
)
)
.build()

),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource("foo")
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.addDimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
.addAggregator(new CardinalityAggregatorFactory(
"a0",
null,
Collections.singletonList(new DefaultDimensionSpec(
"__time",
"__time",
ColumnType.LONG
)),
false,
true
))
.build()
),
"j0.",
"1",
JoinType.INNER,
null,
TestExprMacroTable.INSTANCE,
null
)
)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.virtualColumns(
new ExpressionVirtualColumn("v0", "946684800000", ColumnType.LONG, TestExprMacroTable.INSTANCE),
new ExpressionVirtualColumn("v1", "case_searched((\"j0.a0\" < \"_a0\"),'val2',((\"j0.a0\" >= \"_a0\") && (\"j0.a0\" < \"_a1\")),'val3',(\"j0.a0\" >= \"_a1\"),'val1',null)", ColumnType.STRING, TestExprMacroTable.INSTANCE)
)
.columns("j0.a0", "j0.d0", "v0", "v1")
.build()
),
expectedResults
);
}


@Test
public void testQuantileOnCastedString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -32,6 +33,7 @@
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.Interval;
Expand Down Expand Up @@ -100,13 +102,18 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval)
/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor,
* and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor
* as required
* as required.
* <p>
* If the type is missing from the signature, the method throws an exception without advancing/modifying/closing the
* cursor
*/
public static Iterable<Frame> cursorToFramesIterable(
final Cursor cursor,
final FrameWriterFactory frameWriterFactory
)
{
throwIfColumnsHaveUnknownType(frameWriterFactory.signature());

return () -> new Iterator<Frame>()
{
@Override
Expand Down Expand Up @@ -158,7 +165,19 @@ public static Sequence<Frame> cursorToFramesSequence(
final FrameWriterFactory frameWriterFactory
)
{

return Sequences.simple(cursorToFramesIterable(cursor, frameWriterFactory));
}

/**
* Throws {@link UnsupportedColumnTypeException} if the row signature has columns with unknown types. This is used to
* pre-determine if the frames can be materialized as rows, without touching the resource generating the frames.
*/
public static void throwIfColumnsHaveUnknownType(final RowSignature rowSignature)
{
for (int i = 0; i < rowSignature.size(); ++i) {
if (!rowSignature.getColumnType(i).isPresent()) {
throw new UnsupportedColumnTypeException(rowSignature.getColumnName(i), null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,8 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;

FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);

FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);

FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);

FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
rowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
{
Optional<Sequence<FrameSignaturePair>> framesOptional;

boolean startedAccumulating = false;
try {
framesOptional = toolChest.resultsAsFrames(
query,
Expand All @@ -760,6 +761,9 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR

Sequence<FrameSignaturePair> frames = framesOptional.get();
List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();

startedAccumulating = true;

frames.forEach(
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
Expand All @@ -772,21 +776,29 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
}
);
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));

}
catch (ResourceLimitExceededException e) {
throw e;
}
catch (UnsupportedColumnTypeException e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo();
log.debug(e, "Type info in signature insufficient to materialize rows as frames.");
return Optional.empty();
}
catch (ResourceLimitExceededException e) {
throw e;
}
catch (Exception e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason();
log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception "
+ "while conversion. Defaulting to materializing the results as rows");
return Optional.empty();
if (startedAccumulating) {
// If we have opened the resultSequence, we can't fall back safely as the resultSequence might hold some resources
// that we release on exception, and we need to throw the exception to disable the 'maxSubqueryBytes' configuration
throw DruidException.defensive()
.build(
e,
"Unable to materialize the results as frames for estimating the byte footprint. "
+ "Please disable the 'maxSubqueryBytes' by setting it to 'disabled' in the query context or removing it altogether "
+ "from the query context and/or the server config."
);
} else {
return Optional.empty();
}
}
}

Expand Down
Loading