Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for number of rows/bytes materialized while running subqueries #16835

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies |
|`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies |
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DruidMetrics
public static final String TYPE = "type";
public static final String INTERVAL = "interval";
public static final String ID = "id";
public static final String SUBQUERY_ID = "subQueryId";
public static final String TASK_ID = "taskId";
public static final String GROUP_ID = "groupId";
public static final String STATUS = "status";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.FrameSignaturePair;
Expand Down Expand Up @@ -94,6 +96,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker

private static final Logger log = new Logger(ClientQuerySegmentWalker.class);
private static final int FRAME_SIZE = 8_000_000;
public static final String ROWS_COUNT_METRIC = "subquery/rows";
public static final String BYTES_COUNT_METRIC = "subquery/bytes";

private final ServiceEmitter emitter;
private final QuerySegmentWalker clusterClient;
Expand Down Expand Up @@ -447,7 +451,9 @@ private DataSource inlineIfNecessary(
maxSubqueryRows,
maxSubqueryMemory,
useNestedForUnknownTypeInSubquery,
subqueryStatsProvider
subqueryStatsProvider,
!dryRun,
emitter
);
} else {
// Cannot inline subquery. Attempt to inline one level deeper, and then try again.
Expand Down Expand Up @@ -553,8 +559,8 @@ private <T> QueryRunner<T> decorateClusterRunner(Query<T> query, QueryRunner<T>
* It also plumbs parent query's id and sql id in case the subqueries don't have it set by default
*
* @param rootDataSource Datasource whose subqueries need to be populated
* @param parentQueryId Parent Query's ID, can be null if do not need to update this in the subqueries
* @param parentSqlQueryId Parent Query's SQL Query ID, can be null if do not need to update this in the subqueries
* @param parentQueryId Parent Query's ID, can be null if it does not need to update this in the subqueries
* @param parentSqlQueryId Parent Query's SQL Query ID, can be null if it does not need to update this in the subqueries
* @return DataSource populated with the subqueries
*/
private DataSource generateSubqueryIds(
Expand Down Expand Up @@ -642,6 +648,9 @@ private DataSource insertSubqueryIds(
}

/**
*/
/**
*
* Convert the results of a particular query into a materialized (List-based) InlineDataSource.
*
* @param query the query
Expand All @@ -651,6 +660,13 @@ private DataSource insertSubqueryIds(
* particular master query
* @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}.
* If zero, this method will throw an error immediately.
* @param memoryLimit User configured byte limit.
* @param useNestedForUnknownTypeInSubquery Uses nested json for unknown types when materializing subquery results
* @param subqueryStatsProvider Statistics about the subquery materialization
* @param emitMetrics Flag to control if the metrics need to be emitted while materializing. The metrics are omitted
* when we are performing a dry run of the query to avoid double reporting the same metric incorrectly
* @param emitter Metrics emitter
* @return Inlined datasource represented by the provided results
* @throws ResourceLimitExceededException if the limit is exceeded
*/
private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
Expand All @@ -662,8 +678,10 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
final AtomicBoolean cannotMaterializeToFrames,
final int limit,
long memoryLimit,
boolean useNestedForUnknownTypeInSubquery,
SubqueryCountStatsProvider subqueryStatsProvider
final boolean useNestedForUnknownTypeInSubquery,
final SubqueryCountStatsProvider subqueryStatsProvider,
final boolean emitMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please java doc why this dryRun flag is needed

final ServiceEmitter emitter
)
{
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
Expand All @@ -683,7 +701,9 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
toolChest,
limitAccumulator,
limit,
subqueryStatsProvider
subqueryStatsProvider,
emitMetrics,
emitter
);
break;
case MEMORY_LIMIT:
Expand All @@ -699,7 +719,9 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
memoryLimitAccumulator,
memoryLimit,
useNestedForUnknownTypeInSubquery,
subqueryStatsProvider
subqueryStatsProvider,
emitMetrics,
emitter
);
if (!maybeDataSource.isPresent()) {
cannotMaterializeToFrames.set(true);
Expand All @@ -716,7 +738,9 @@ private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
toolChest,
limitAccumulator,
limit,
subqueryStatsProvider
subqueryStatsProvider,
emitMetrics,
emitter
);
} else {
subqueryStatsProvider.incrementSubqueriesWithByteLimit();
Expand All @@ -739,9 +763,11 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
final AtomicLong memoryLimitAccumulator,
long memoryLimit,
boolean useNestedForUnknownTypeInSubquery,
final SubqueryCountStatsProvider subqueryStatsProvider
final long memoryLimit,
final boolean useNestedForUnknownTypeInSubquery,
final SubqueryCountStatsProvider subqueryStatsProvider,
final boolean emitMetrics,
final ServiceEmitter emitter
)
{
Optional<Sequence<FrameSignaturePair>> framesOptional;
Expand All @@ -764,6 +790,8 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR

startedAccumulating = true;

final int initialSubqueryRows = limitAccumulator.get();
final long initialSubqueryBytes = memoryLimitAccumulator.get();
frames.forEach(
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
Expand All @@ -775,6 +803,21 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
frameSignaturePairs.add(frame);
}
);
if (emitMetrics) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.ID, query.getId())
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
.setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows)
);

emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.ID, query.getId())
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
.setMetric(BYTES_COUNT_METRIC, memoryLimitAccumulator.get() - initialSubqueryBytes)
);
}
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));
}
catch (UnsupportedColumnTypeException e) {
Expand Down Expand Up @@ -811,14 +854,17 @@ private static <T, QueryType extends Query<T>> DataSource materializeResultsAsAr
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
final int limit,
final SubqueryCountStatsProvider subqueryStatsProvider
final SubqueryCountStatsProvider subqueryStatsProvider,
boolean emitMetrics,
final ServiceEmitter emitter
)
{
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
final RowSignature signature = toolChest.resultArraySignature(query);

final ArrayList<Object[]> resultList = new ArrayList<>();

final int initialSubqueryRows = limitAccumulator.get();
toolChest.resultsAsArrays(query, results).accumulate(
resultList,
(acc, in) -> {
Expand All @@ -830,6 +876,14 @@ private static <T, QueryType extends Query<T>> DataSource materializeResultsAsAr
return acc;
}
);
if (emitMetrics) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.ID, query.getId())
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
.setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows)
);
}
return InlineDataSource.fromIterable(resultList, signature);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.GlobalTableDataSource;
Expand Down Expand Up @@ -214,6 +218,7 @@ public class ClientQuerySegmentWalkerTest

private Closer closer;
private QueryRunnerFactoryConglomerate conglomerate;
private final StubServiceEmitter emitter = new StubServiceEmitter();

// Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter.
private final List<ExpectedQuery> issuedQueries = new ArrayList<>();
Expand All @@ -228,6 +233,7 @@ public class ClientQuerySegmentWalkerTest
public void setUp()
{
closer = Closer.create();
emitter.flush();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
scheduler = new ObservableQueryScheduler(
8,
Expand Down Expand Up @@ -941,6 +947,113 @@ public void testTimeseriesOnGroupByOnTableErrorTooLarge()
testQuery(query, ImmutableList.of(), ImmutableList.of());
}

@Test
public void testMetricsWithMaxSubqueryRowsEnabled()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();

final TimeseriesQuery query =
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build()
.withId(DUMMY_QUERY_ID);

testQuery(
query,
new ArrayList<>(ImmutableList.of(
ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
RowSignature.builder().add("s", ColumnType.STRING).build()
)
)
)
)),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);

List<Event> events = emitter.getEvents();

for (Event event : events) {
EventMap map = event.toMap();
if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
Assert.assertTrue(map.containsKey("service"));
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
Assert.assertEquals(3, map.get("value"));
}
}
}

@Test
public void testMetricsWithMaxSubqueryBytesEnabled()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();

final TimeseriesQuery query =
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.context(ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "10000"))
.build()
.withId(DUMMY_QUERY_ID);

testQuery(
query,
new ArrayList<>(ImmutableList.of(
ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
RowSignature.builder().add("s", ColumnType.STRING).build()
)
)
)
)),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);

List<Event> events = emitter.getEvents();

for (Event event : events) {
EventMap map = event.toMap();
if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
Assert.assertTrue(map.containsKey("service"));
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
Assert.assertEquals(3, map.get("value"));
} else if (ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
Assert.assertTrue(map.containsKey("service"));
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
Assert.assertEquals(43L, map.get("value"));
}
}
}

@Test
public void testGroupByOnArraysDoubles()
{
Expand Down Expand Up @@ -1545,13 +1658,15 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
conglomerate,
segmentWrangler,
joinableFactoryWrapper,
schedulerForTest
),
schedulerForTest,
emitter
),
ClusterOrLocal.LOCAL
),
conglomerate,
joinableFactory,
serverConfig
serverConfig,
emitter
);
}

Expand Down
Loading
Loading