From cedf8d842a8d551d8c2023541dcee04f1096d0ff Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 2 Aug 2024 10:19:00 +0530 Subject: [PATCH 1/3] init tests --- .../org/apache/druid/query/DruidMetrics.java | 1 + .../server/ClientQuerySegmentWalker.java | 68 ++++++++-- .../server/ClientQuerySegmentWalkerTest.java | 121 +++++++++++++++++- .../apache/druid/server/QueryStackTests.java | 12 +- .../SpecificSegmentsQuerySegmentWalker.java | 7 +- 5 files changed, 186 insertions(+), 23 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index f85dd0016005..1f478809333b 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -32,6 +32,7 @@ public class DruidMetrics public static final String TYPE = "type"; public static final String INTERVAL = "interval"; public static final String ID = "id"; + public static final String SUBQUERY_ID = "subQueryId"; public static final String TASK_ID = "taskId"; public static final String GROUP_ID = "groupId"; public static final String STATUS = "status"; diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index d49ce3909f71..298270d514e5 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -37,7 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.FluentQueryRunner; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameSignaturePair; @@ -94,6 +96,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private static final Logger log = new Logger(ClientQuerySegmentWalker.class); private static final int FRAME_SIZE = 8_000_000; + public static final String ROWS_COUNT_METRIC = "subquery/rows"; + public static final String BYTES_COUNT_METRIC = "subquery/bytes"; private final ServiceEmitter emitter; private final QuerySegmentWalker clusterClient; @@ -447,7 +451,9 @@ private DataSource inlineIfNecessary( maxSubqueryRows, maxSubqueryMemory, useNestedForUnknownTypeInSubquery, - subqueryStatsProvider + subqueryStatsProvider, + !dryRun, + emitter ); } else { // Cannot inline subquery. Attempt to inline one level deeper, and then try again. @@ -553,8 +559,8 @@ private QueryRunner decorateClusterRunner(Query query, QueryRunner * It also plumbs parent query's id and sql id in case the subqueries don't have it set by default * * @param rootDataSource Datasource whose subqueries need to be populated - * @param parentQueryId Parent Query's ID, can be null if do not need to update this in the subqueries - * @param parentSqlQueryId Parent Query's SQL Query ID, can be null if do not need to update this in the subqueries + * @param parentQueryId Parent Query's ID, can be null if it does not need to update this in the subqueries + * @param parentSqlQueryId Parent Query's SQL Query ID, can be null if it does not need to update this in the subqueries * @return DataSource populated with the subqueries */ private DataSource generateSubqueryIds( @@ -662,8 +668,10 @@ private static > DataSource toInlineDataSource( final AtomicBoolean cannotMaterializeToFrames, final int limit, long memoryLimit, - boolean useNestedForUnknownTypeInSubquery, - SubqueryCountStatsProvider subqueryStatsProvider + final boolean useNestedForUnknownTypeInSubquery, + final SubqueryCountStatsProvider subqueryStatsProvider, + final boolean emitMetrics, + final ServiceEmitter emitter ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -683,7 +691,9 @@ private static > DataSource toInlineDataSource( toolChest, limitAccumulator, limit, - subqueryStatsProvider + subqueryStatsProvider, + emitMetrics, + emitter ); break; case MEMORY_LIMIT: @@ -699,7 +709,9 @@ private static > DataSource toInlineDataSource( memoryLimitAccumulator, memoryLimit, useNestedForUnknownTypeInSubquery, - subqueryStatsProvider + subqueryStatsProvider, + emitMetrics, + emitter ); if (!maybeDataSource.isPresent()) { cannotMaterializeToFrames.set(true); @@ -716,7 +728,9 @@ private static > DataSource toInlineDataSource( toolChest, limitAccumulator, limit, - subqueryStatsProvider + subqueryStatsProvider, + emitMetrics, + emitter ); } else { subqueryStatsProvider.incrementSubqueriesWithByteLimit(); @@ -739,9 +753,11 @@ private static > Optional materializeR final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, - long memoryLimit, - boolean useNestedForUnknownTypeInSubquery, - final SubqueryCountStatsProvider subqueryStatsProvider + final long memoryLimit, + final boolean useNestedForUnknownTypeInSubquery, + final SubqueryCountStatsProvider subqueryStatsProvider, + final boolean emitMetrics, + final ServiceEmitter emitter ) { Optional> framesOptional; @@ -764,6 +780,8 @@ private static > Optional materializeR startedAccumulating = true; + final int initialSubqueryRows = limitAccumulator.get(); + final long initialSubqueryBytes = memoryLimitAccumulator.get(); frames.forEach( frame -> { limitAccumulator.addAndGet(frame.getFrame().numRows()); @@ -775,6 +793,21 @@ private static > Optional materializeR frameSignaturePairs.add(frame); } ); + if (emitMetrics) { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.ID, query.getId()) + .setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId()) + .setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows) + ); + + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.ID, query.getId()) + .setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId()) + .setMetric(BYTES_COUNT_METRIC, memoryLimitAccumulator.get() - initialSubqueryBytes) + ); + } return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); } catch (UnsupportedColumnTypeException e) { @@ -811,7 +844,9 @@ private static > DataSource materializeResultsAsAr final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final int limit, - final SubqueryCountStatsProvider subqueryStatsProvider + final SubqueryCountStatsProvider subqueryStatsProvider, + boolean emitMetrics, + final ServiceEmitter emitter ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -819,6 +854,7 @@ private static > DataSource materializeResultsAsAr final ArrayList resultList = new ArrayList<>(); + final int initialSubqueryRows = limitAccumulator.get(); toolChest.resultsAsArrays(query, results).accumulate( resultList, (acc, in) -> { @@ -830,6 +866,14 @@ private static > DataSource materializeResultsAsAr return acc; } ); + if (emitMetrics) { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.ID, query.getId()) + .setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId()) + .setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows) + ); + } return InlineDataSource.fromIterable(resultList, signature); } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 038fbce7d455..467f375f9f7b 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -32,8 +32,12 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.Druids; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.GlobalTableDataSource; @@ -214,6 +218,7 @@ public class ClientQuerySegmentWalkerTest private Closer closer; private QueryRunnerFactoryConglomerate conglomerate; + private final StubServiceEmitter emitter = new StubServiceEmitter(); // Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter. private final List issuedQueries = new ArrayList<>(); @@ -228,6 +233,7 @@ public class ClientQuerySegmentWalkerTest public void setUp() { closer = Closer.create(); + emitter.flush(); conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); scheduler = new ObservableQueryScheduler( 8, @@ -941,6 +947,113 @@ public void testTimeseriesOnGroupByOnTableErrorTooLarge() testQuery(query, ImmutableList.of(), ImmutableList.of()); } + @Test + public void testMetricsWithMaxSubqueryRowsEnabled() + { + final GroupByQuery subquery = + GroupByQuery.builder() + .setDataSource(FOO) + .setGranularity(Granularities.ALL) + .setInterval(Collections.singletonList(INTERVAL)) + .setDimensions(DefaultDimensionSpec.of("s")) + .build(); + + final TimeseriesQuery query = + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .aggregators(new CountAggregatorFactory("cnt")) + .build() + .withId(DUMMY_QUERY_ID); + + testQuery( + query, + new ArrayList<>(ImmutableList.of( + ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}), + RowSignature.builder().add("s", ColumnType.STRING).build() + ) + ) + ) + )), + ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) + ); + + List events = emitter.getEvents(); + + for (Event event : events) { + EventMap map = event.toMap(); + if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(3, map.get("value")); + } + } + } + + @Test + public void testMetricsWithMaxSubqueryBytesEnabled() + { + final GroupByQuery subquery = + GroupByQuery.builder() + .setDataSource(FOO) + .setGranularity(Granularities.ALL) + .setInterval(Collections.singletonList(INTERVAL)) + .setDimensions(DefaultDimensionSpec.of("s")) + .build(); + + final TimeseriesQuery query = + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .aggregators(new CountAggregatorFactory("cnt")) + .context(ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "10000")) + .build() + .withId(DUMMY_QUERY_ID); + + testQuery( + query, + new ArrayList<>(ImmutableList.of( + ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}), + RowSignature.builder().add("s", ColumnType.STRING).build() + ) + ) + ) + )), + ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) + ); + + List events = emitter.getEvents(); + + for (Event event : events) { + EventMap map = event.toMap(); + if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(3, map.get("value")); + } else if (ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) { + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(43L, map.get("value")); + } + } + } + @Test public void testGroupByOnArraysDoubles() { @@ -1545,13 +1658,15 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable Date: Fri, 2 Aug 2024 10:32:26 +0530 Subject: [PATCH 2/3] docs --- docs/operations/metrics.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index b5be94ba4ba7..44998f963cd7 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -77,6 +77,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| +|`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies | +|`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies | |`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | From 1cca99e237f690d92c2f4b0fa18cbe86d83f1603 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 5 Aug 2024 10:03:49 +0530 Subject: [PATCH 3/3] add metrics --- .../apache/druid/server/ClientQuerySegmentWalker.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 298270d514e5..990878eda6e3 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -648,6 +648,9 @@ private DataSource insertSubqueryIds( } /** + */ + /** + * * Convert the results of a particular query into a materialized (List-based) InlineDataSource. * * @param query the query @@ -657,6 +660,13 @@ private DataSource insertSubqueryIds( * particular master query * @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}. * If zero, this method will throw an error immediately. + * @param memoryLimit User configured byte limit. + * @param useNestedForUnknownTypeInSubquery Uses nested json for unknown types when materializing subquery results + * @param subqueryStatsProvider Statistics about the subquery materialization + * @param emitMetrics Flag to control if the metrics need to be emitted while materializing. The metrics are omitted + * when we are performing a dry run of the query to avoid double reporting the same metric incorrectly + * @param emitter Metrics emitter + * @return Inlined datasource represented by the provided results * @throws ResourceLimitExceededException if the limit is exceeded */ private static > DataSource toInlineDataSource(