Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reasoning for choosing shardSpec to the MSQ report #16175

Merged
merged 7 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/api-reference/sql-ingestion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ The response shows an example report for a query.
},
"pendingTasks": 0,
"runningTasks": 2,
"segmentLoadStatus": {
"segmentLoadWaiterStatus": {
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
"state": "SUCCESS",
"dataSource": "kttm_simple",
"startTime": "2022-09-14T23:12:09.266Z",
Expand All @@ -310,6 +310,10 @@ The response shows an example report for a query.
"onDemandSegments": 0,
"pendingSegments": 0,
"unknownSegments": 0
},
"segmentReport": {
"shardSpec": "NumberedShardSpec",
"details": "Cannot use RangeShardSpec, RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec instead."
}
},
"stages": [
Expand Down Expand Up @@ -631,6 +635,9 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.segmentLoadStatus.onDemandSegments` | The number of segments which are not loaded on any historical, as per the load rules. |
| `multiStageQuery.payload.status.segmentLoadStatus.pendingSegments` | The number of segments remaining to be loaded. |
| `multiStageQuery.payload.status.segmentLoadStatus.unknownSegments` | The number of segments whose status is unknown. |
| `multiStageQuery.payload.status.segmentReport` | Segment report. Only present if the query is an ingestion. |
| `multiStageQuery.payload.status.segmentReport.shardSpec` | Contains the shard spec chosen. |
| `multiStageQuery.payload.status.segmentReport.details` | Contains further reasoning about the shard spec chosen. |
| `multiStageQuery.payload.status.errorReport` | Error object. Only present if there was an error. |
| `multiStageQuery.payload.status.errorReport.taskId` | The task that reported the error, if known. May be a controller task or a worker task. |
| `multiStageQuery.payload.status.errorReport.host` | The hostname and port of the task that reported the error, if known. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
Expand Down Expand Up @@ -313,6 +314,8 @@ public class ControllerImpl implements Controller
private final boolean isFaultToleranceEnabled;
private final boolean isFailOnEmptyInsertEnabled;
private volatile SegmentLoadStatusFetcher segmentLoadWaiter;
@Nullable
private MSQSegmentReport segmentReport;

public ControllerImpl(
final MSQControllerTask task,
Expand Down Expand Up @@ -565,7 +568,8 @@ public TaskStatus runTask(final Closer closer)
queryStartTime,
new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher,
segmentLoadWaiter
segmentLoadWaiter,
segmentReport
),
stagesReport,
countersSnapshot,
Expand Down Expand Up @@ -935,7 +939,8 @@ public TaskReport.ReportMap liveReports()
queryStartTime,
queryStartTime == null ? -1L : new Interval(queryStartTime, DateTimes.nowUtc()).toDurationMillis(),
workerTaskLauncher,
segmentLoadWaiter
segmentLoadWaiter,
segmentReport
),
makeStageReport(
queryDef,
Expand Down Expand Up @@ -1015,6 +1020,11 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForAppend(

String previousSegmentId = null;

segmentReport = new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Using NumberedShardSpec to generate segments since the query is inserting rows."
);

for (ClusterByPartition partitionBoundary : partitionBoundaries) {
final DateTime timestamp = getBucketDateTime(partitionBoundary, segmentGranularity, keyReader);
final SegmentIdWithShardSpec allocation;
Expand Down Expand Up @@ -1099,12 +1109,16 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForReplace(
final SegmentIdWithShardSpec[] retVal = new SegmentIdWithShardSpec[partitionBoundaries.size()];
final Granularity segmentGranularity = destination.getSegmentGranularity();
final List<String> shardColumns;

if (mayHaveMultiValuedClusterByFields) {
// DimensionRangeShardSpec cannot handle multi-valued fields.
shardColumns = Collections.emptyList();
final Pair<List<String>, String> shardReasonPair;

shardReasonPair = computeShardColumns(signature, clusterBy, task.getQuerySpec().getColumnMappings(), mayHaveMultiValuedClusterByFields);
shardColumns = shardReasonPair.lhs;
String reason = shardReasonPair.rhs;
log.info(StringUtils.format("ShardSpec chosen: %s", reason));
if (shardColumns.isEmpty()) {
segmentReport = new MSQSegmentReport(NumberedShardSpec.class.getSimpleName(), reason);
} else {
shardColumns = computeShardColumns(signature, clusterBy, task.getQuerySpec().getColumnMappings());
segmentReport = new MSQSegmentReport(DimensionRangeShardSpec.class.getSimpleName(), reason);
}

// Group partition ranges by bucket (time chunk), so we can generate shardSpecs for each bucket independently.
Expand Down Expand Up @@ -2039,19 +2053,24 @@ private static boolean isTimeBucketedIngestion(final MSQSpec querySpec)
* Compute shard columns for {@link DimensionRangeShardSpec}. Returns an empty list if range-based sharding
* is not applicable.
*/
private static List<String> computeShardColumns(
private static Pair<List<String>, String> computeShardColumns(
final RowSignature signature,
final ClusterBy clusterBy,
final ColumnMappings columnMappings
final ColumnMappings columnMappings,
boolean mayHaveMultiValuedClusterByFields
)
{
if (mayHaveMultiValuedClusterByFields) {
// DimensionRangeShardSpec cannot handle multivalued fields.
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, the fields in the CLUSTERED BY clause contains multivalued fields. Using NumberedShardSpec instead.");
}
final List<KeyColumn> clusterByColumns = clusterBy.getColumns();
final List<String> shardColumns = new ArrayList<>();
final boolean boosted = isClusterByBoosted(clusterBy);
final int numShardColumns = clusterByColumns.size() - clusterBy.getBucketByCount() - (boosted ? 1 : 0);

if (numShardColumns == 0) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Using NumberedShardSpec as no columns are supplied in the 'CLUSTERED BY' clause.");
}

for (int i = clusterBy.getBucketByCount(); i < clusterBy.getBucketByCount() + numShardColumns; i++) {
Expand All @@ -2060,25 +2079,25 @@ private static List<String> computeShardColumns(

// DimensionRangeShardSpec only handles ascending order.
if (column.order() != KeyOrder.ASCENDING) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, RangedShardSpec only supports ascending CLUSTER BY keys. Using NumberedShardSpec instead.");
}

ColumnType columnType = signature.getColumnType(column.columnName()).orElse(null);

// DimensionRangeShardSpec only handles strings.
if (!(ColumnType.STRING.equals(columnType))) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), "Cannot use RangeShardSpec, RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec instead.");
}

// DimensionRangeShardSpec only handles columns that appear as-is in the output.
if (outputColumns.isEmpty()) {
return Collections.emptyList();
return Pair.of(Collections.emptyList(), StringUtils.format("Cannot use RangeShardSpec, Could not find output column name for column [%s]. Using NumberedShardSpec instead.", column.columnName()));
}

shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
}

return shardColumns;
return Pair.of(shardColumns, "Using RangeShardSpec to generate segments.");
}

/**
Expand Down Expand Up @@ -2354,7 +2373,8 @@ private static MSQStatusReport makeStatusReport(
@Nullable final DateTime queryStartTime,
final long queryDuration,
MSQWorkerTaskLauncher taskLauncher,
final SegmentLoadStatusFetcher segmentLoadWaiter
final SegmentLoadStatusFetcher segmentLoadWaiter,
@Nullable MSQSegmentReport msqSegmentReport
)
{
int pendingTasks = -1;
Expand All @@ -2379,7 +2399,8 @@ private static MSQStatusReport makeStatusReport(
workerStatsMap,
pendingTasks,
runningTasks,
status
status,
msqSegmentReport
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.report;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class MSQSegmentReport
{
private final String shardSpec;
private final String details;

@JsonCreator
public MSQSegmentReport(@JsonProperty("shardSpec") String shardSpec, @JsonProperty("details") String reason)
{
this.shardSpec = shardSpec;
this.details = reason;
}

@JsonProperty
public String getShardSpec()
{
return shardSpec;
}

@JsonProperty
public String getDetails()
{
return details;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MSQSegmentReport that = (MSQSegmentReport) o;
return Objects.equals(shardSpec, that.shardSpec) && Objects.equals(details, that.details);
}

@Override
public int hashCode()
{
return Objects.hash(shardSpec, details);
}

@Override
public String toString()
{
return "MSQSegmentReport{" +
"shardSpec='" + shardSpec + '\'' +
", details='" + details + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class MSQStatusReport
@Nullable
private final SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus;

@Nullable
private final MSQSegmentReport segmentReport;

@JsonCreator
public MSQStatusReport(
@JsonProperty("status") TaskState status,
Expand All @@ -69,7 +72,8 @@ public MSQStatusReport(
@JsonProperty("workers") Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus,
@JsonProperty("segmentReport") @Nullable MSQSegmentReport segmentReport
)
{
this.status = Preconditions.checkNotNull(status, "status");
Expand All @@ -81,6 +85,7 @@ public MSQStatusReport(
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
this.segmentReport = segmentReport;
}

@JsonProperty
Expand Down Expand Up @@ -144,6 +149,14 @@ public SegmentLoadStatusFetcher.SegmentLoadWaiterStatus getSegmentLoadWaiterStat
return segmentLoadWaiterStatus;
}

@JsonProperty("segmentReport")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public MSQSegmentReport getSegmentReport()
{
return segmentReport;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
Expand All @@ -46,6 +47,7 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -110,6 +112,12 @@ public void testInsertOnFoo1(String contextName, Map<String, Object> context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedRows)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Using NumberedShardSpec to generate segments since the query is inserting rows."
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -45,7 +47,6 @@
import org.mockito.Mockito;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -238,6 +239,12 @@ public void testReplaceOnFoo1WithAllExtern(String contextName, Map<String, Objec
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Cannot use RangeShardSpec, RangedShardSpec only supports string CLUSTER BY keys. Using NumberedShardSpec instead."
)
)
.setExpectedSegment(ImmutableSet.of(
SegmentId.of(
"foo1",
Expand Down Expand Up @@ -872,6 +879,12 @@ public void testReplaceOnFoo1Range(String contextName, Map<String, Object> conte
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
DimensionRangeShardSpec.class.getSimpleName(),
"Using RangeShardSpec to generate segments."
)
)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegment(expectedFooSegments())
Expand Down Expand Up @@ -993,6 +1006,12 @@ public void testReplaceUnnestSegmentEntireTable(String contextName, Map<String,
"test",
0
)))
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Using NumberedShardSpec as no columns are supplied in the 'CLUSTERED BY' clause."
)
)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, "a"},
Expand Down
Loading
Loading