Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report zero values instead of unknown for empty ingest queries #15674

Merged
merged 9 commits into from
Jan 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class SqlStatementResourceHelper
Expand Down Expand Up @@ -147,7 +149,9 @@ public static SqlStatementState getSqlStatementState(TaskStatusPlus taskStatusPl
* <ul>
* <li>For each partition from 0 to N</li>
* <li>For each worker from 0 to M</li>
* <li>If num rows for that partition,worker combination is 0, create a page</li>
* <li>If a worker's counter snapshot is empty, create a page for that worker with {@code numRows = 0}
* and {@code sizeInBytes = 0}</li>
* <li>If {@code numRows != 0} for a (partition, worker) combination, create a page</li>
* so that we maintain the record ordering.
* </ul>
* </ol>
Expand All @@ -172,31 +176,22 @@ public static Optional<List<PageInformation>> populatePageList(
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap()
.getOrDefault("segmentGenerationProgress", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
if (queryCounterSnapshot instanceof SegmentGenerationProgressCounter.Snapshot) {
rows += ((SegmentGenerationProgressCounter.Snapshot) queryCounterSnapshot).getRowsPushed();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else {
return Optional.empty();
}
return Optional.of(ImmutableList.of(new PageInformation(0, rows, null)));
} else if (msqDestination instanceof TaskReportMSQDestination) {
long rows = 0L;
long size = 0L;
for (CounterSnapshots counterSnapshots : workerCounters.values()) {
QueryCounterSnapshot queryCounterSnapshot = counterSnapshots.getMap().getOrDefault("output", null);
if (queryCounterSnapshot != null && queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
if (queryCounterSnapshot instanceof ChannelCounters.Snapshot) {
rows += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getRows()).sum();
size += Arrays.stream(((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes()).sum();
}
}
if (rows != 0L) {
return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else {
return Optional.empty();
}

return Optional.of(ImmutableList.of(new PageInformation(0, rows, size)));
} else if (msqDestination instanceof DurableStorageMSQDestination) {

return populatePagesForDurableStorageDestination(finalStage, workerCounters);
Expand All @@ -221,16 +216,24 @@ private static Optional<List<PageInformation>> populatePagesForDurableStorageDes
throw DruidException.defensive("Expected worker count to be set for stage[%d]", finalStage);
}


List<PageInformation> pages = new ArrayList<>();

abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved

Set<Integer> emptyWorkerCounters = new HashSet<>();

for (int partitionNumber = 0; partitionNumber < totalPartitions; partitionNumber++) {
for (int workerNumber = 0; workerNumber < totalWorkerCount; workerNumber++) {
CounterSnapshots workerCounter = workerCounters.get(workerNumber);
if ((workerCounter == null || workerCounter.isEmpty()) && !emptyWorkerCounters.contains(workerNumber)) {
pages.add(new PageInformation(pages.size(), 0L, 0L, workerNumber, null));
emptyWorkerCounters.add(workerNumber);
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

if (workerCounter != null && workerCounter.getMap() != null) {
QueryCounterSnapshot channelCounters = workerCounter.getMap().get("output");

if (channelCounters != null && channelCounters instanceof ChannelCounters.Snapshot) {
if (channelCounters instanceof ChannelCounters.Snapshot) {
long rows = 0L;
long size = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,65 @@ public void nonSupportedModes()
);
}

@Test
public void emptyInsert()
{
Response response = resource.doPost(new SqlQuery(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

SqlStatementResult actual = (SqlStatementResult) response.getEntity();

SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}

@Test
public void emptyReplace()
{
Response response = resource.doPost(new SqlQuery(
"replace into foo1 overwrite all select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time < TIMESTAMP '1971-01-01 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1",
null,
false,
false,
false,
ImmutableMap.<String, Object>builder()
.putAll(defaultAsyncContext())
.build(),
null
), SqlStatementResourceTest.makeOkRequest());
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

SqlStatementResult actual = (SqlStatementResult) response.getEntity();

SqlStatementResult expected = new SqlStatementResult(
actual.getQueryId(),
SqlStatementState.SUCCESS,
MSQTestOverlordServiceClient.CREATED_TIME,
null,
MSQTestOverlordServiceClient.DURATION,
new ResultSetInformation(0L, 0L, null, "foo1", null, null),
null
);
Assert.assertEquals(expected, actual);
}

@Test
public void insertCannotBeEmptyFaultTest()
Expand Down Expand Up @@ -433,7 +492,6 @@ public void testMultipleWorkersWithPageSizeLimiting() throws IOException
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));
rows.add(ImmutableList.of(1466985600000L, "GiftBot"));


Assert.assertEquals(rows, SqlStatementResourceTest.getResultRowsFromResponse(resource.doGetResults(
sqlStatementResult.getQueryId(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,8 +1404,8 @@ public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>>
);
rows.addAll(new FrameChannelSequence(inputChannelFactory.openChannel(
finalStage.getId(),
pageInformation.getWorker(),
pageInformation.getPartition()
pageInformation.getWorker() == null ? 0 : pageInformation.getWorker(),
pageInformation.getPartition() == null ? 0 : pageInformation.getPartition()
)).flatMap(frame -> SqlStatementResourceHelper.getResultSequence(
msqControllerTask,
finalStage,
Expand Down
Loading
Loading