Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compaction tasks reports getting overwritten #15981

52 changes: 52 additions & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,58 @@ An example output is shown below:
}
```

Compaction tasks can generate multiple sets of segment output reports based on how the input interval is split. So the overall report contains mappings from each split to each report.
Example report could be:

```json
{
"ingestionStatsAndErrors_0": {
"taskId": "compact_twitter_2018-09-24T18:24:23.920Z",
"payload": {
"ingestionState": "COMPLETED",
"unparseableEvents": {},
"rowStats": {
"buildSegments": {
"processed": 5390324,
"processedBytes": 5109573212,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0
}
},
"segmentAvailabilityConfirmed": false,
"segmentAvailabilityWaitTimeMs": 0,
"recordsProcessed": null,
"errorMsg": null
},
"type": "ingestionStatsAndErrors"
},
"ingestionStatsAndErrors_1": {
"taskId": "compact_twitter_2018-09-25T18:24:23.920Z",
"payload": {
"ingestionState": "COMPLETED",
"unparseableEvents": {},
"rowStats": {
"buildSegments": {
"processed": 12345,
"processedBytes": 132456789,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0
}
},
"segmentAvailabilityConfirmed": false,
"segmentAvailabilityWaitTimeMs": 0,
"recordsProcessed": null,
"errorMsg": null
},
"type": "ingestionStatsAndErrors"
}
}
```



#### Segment Availability Fields

For some task types, the indexing task can wait for the newly ingested segments to become available for queries after ingestion completes. The below fields inform the end user regarding the duration and result of the availability wait. For batch ingestion task types, refer to `tuningConfig` docs to see if the task supports an availability waiting period.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;

Expand All @@ -113,6 +115,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
Expand Down Expand Up @@ -499,6 +502,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
log.info("Generated [%d] compaction task specs", totalNumSpecs);

int failCnt = 0;
Map<String, TaskReport> completionReports = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the compaction is being run on several intervals (not very likely but still a possibility), can holding all the task reports in memory potentially cause an OOM exception? Currently, most of the task reports contain only ingestStatsAndErrors but they may contain other stuff in the future.

In the future, we should consider writing out the sub-reports in a streaming fashion alongwith the required changes to the TaskReportFileWriter API.
For now, we should add a guardrail here so that we don't try to hold too many reports in memory and fail with an OOM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think is a good size to hold then ?

for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
Expand All @@ -514,6 +518,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
Optional.ofNullable(eachSpec.getCompletionReports())
.ifPresent(reports -> completionReports.putAll(
CollectionUtils.mapKeys(reports, key -> getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json);
Expand All @@ -528,6 +535,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] failed",
totalNumSpecs, totalNumSpecs - failCnt, failCnt
);

toolbox.getTaskReportFileWriter().write(getId(), completionReports);
log.info(msg);
return failCnt == 0 ? TaskStatus.success(getId()) : TaskStatus.failure(getId(), msg);
}
Expand All @@ -542,7 +551,8 @@ ParallelIndexSupervisorTask newTask(String baseSequenceName, ParallelIndexIngest
getTaskResource(),
ingestionSpec,
baseSequenceName,
createContextForSubtask()
createContextForSubtask(),
true
);
}

Expand All @@ -562,6 +572,11 @@ private String createIndexTaskSpecId(int i)
return StringUtils.format("%s_%d", getId(), i);
}

private String getReportkey(String baseSequenceName, String currentKey)
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
return StringUtils.format("%s_%s", currentKey, baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1));
}

/**
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode

private IngestionState ingestionState;

private boolean shouldCleanup;
// used to specify if indextask.run() is run as a part of another task
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
// skips writing reports and cleanup if not a standalone task
private boolean isStandAloneTask;

@MonotonicNonNull
private ParseExceptionHandler determinePartitionsParseExceptionHandler;
Expand All @@ -189,6 +191,8 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode
@Nullable
private String errorMsg;

private Map<String, TaskReport> completionReports;

@JsonCreator
public IndexTask(
@JsonProperty("id") final String id,
Expand Down Expand Up @@ -222,7 +226,7 @@ public IndexTask(
IndexIngestionSpec ingestionSchema,
Map<String, Object> context,
int maxAllowedLockCount,
boolean shouldCleanup
boolean isStandAloneTask
)
{
super(
Expand All @@ -237,7 +241,7 @@ public IndexTask(
this.baseSequenceName = baseSequenceName == null ? getId() : baseSequenceName;
this.ingestionSchema = ingestionSchema;
this.ingestionState = IngestionState.NOT_STARTED;
this.shouldCleanup = shouldCleanup;
this.isStandAloneTask = isStandAloneTask;
}

@Override
Expand Down Expand Up @@ -314,6 +318,13 @@ public Set<ResourceAction> getInputSourceResources()
ImmutableSet.of();
}

@Nullable
@JsonIgnore
public Map<String, TaskReport> getCompletionReports()
{
return completionReports;
}

@GET
@Path("/unparseableEvents")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -556,7 +567,8 @@ public TaskStatus runTask(final TaskToolbox toolbox)
catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState);
errorMsg = Throwables.getStackTraceAsString(e);
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
return TaskStatus.failure(
getId(),
errorMsg
Expand All @@ -568,6 +580,13 @@ public TaskStatus runTask(final TaskToolbox toolbox)
}
}

private void writeCompletionReports(TaskToolbox toolbox)
{
if (isStandAloneTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
}

private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
Expand Down Expand Up @@ -1024,7 +1043,8 @@ private TaskStatus generateAndPublishSegments(
if (published == null) {
log.error("Failed to publish segments, aborting!");
errorMsg = "Failed to publish segments.";
toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
Expand All @@ -1047,7 +1067,8 @@ private TaskStatus generateAndPublishSegments(

log.debugSegments(published.getSegments(), "Published segments");

toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
return TaskStatus.success(getId());
}
}
Expand Down Expand Up @@ -1089,7 +1110,7 @@ private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema)
@Override
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
{
if (shouldCleanup) {
if (isStandAloneTask) {
super.cleanUp(toolbox, taskStatus);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private volatile Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;

private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private final Boolean isCompactionTask;
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved


@JsonCreator
Expand All @@ -213,7 +215,7 @@ public ParallelIndexSupervisorTask(
@JsonProperty("context") Map<String, Object> context
)
{
this(id, groupId, taskResource, ingestionSchema, null, context);
this(id, groupId, taskResource, ingestionSchema, null, context, false);
}

public ParallelIndexSupervisorTask(
Expand All @@ -222,7 +224,8 @@ public ParallelIndexSupervisorTask(
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
@Nullable String baseSubtaskSpecName,
Map<String, Object> context
Map<String, Object> context,
Boolean isCompactionTask
)
{
super(
Expand Down Expand Up @@ -259,6 +262,7 @@ public ParallelIndexSupervisorTask(

awaitSegmentAvailabilityTimeoutMillis = ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis();
this.ingestionState = IngestionState.NOT_STARTED;
this.isCompactionTask = isCompactionTask;
}

private static void checkPartitionsSpecForForceGuaranteedRollup(PartitionsSpec partitionsSpec)
Expand Down Expand Up @@ -292,6 +296,20 @@ public Set<ResourceAction> getInputSourceResources()
ImmutableSet.of();
}

@Nullable
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
@JsonIgnore
public Map<String, TaskReport> getCompletionReports()
{
return completionReports;
}

@Nullable
@JsonIgnore
public String getBaseSubtaskSpecName()
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
return baseSubtaskSpecName;
}

@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
Expand Down Expand Up @@ -651,10 +669,8 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
}
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
toolbox.getTaskReportFileWriter().write(
getId(),
getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted)
);
completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
return taskStatus;
}

Expand Down Expand Up @@ -821,10 +837,8 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
taskStatus = TaskStatus.failure(getId(), errMsg);
}

toolbox.getTaskReportFileWriter().write(
getId(),
getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted)
);
completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
return taskStatus;
}

Expand Down Expand Up @@ -921,10 +935,8 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
taskStatus = TaskStatus.failure(getId(), errMsg);
}

toolbox.getTaskReportFileWriter().write(
getId(),
getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted)
);
completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
return taskStatus;
}

Expand Down Expand Up @@ -1211,7 +1223,9 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception

if (currentSubTaskHolder.setTask(sequentialIndexTask)
&& sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
return sequentialIndexTask.run(toolbox);
TaskStatus status = sequentialIndexTask.run(toolbox);
completionReports = sequentialIndexTask.getCompletionReports();
return status;
} else {
String msg = "Task was asked to stop. Finish as failed";
LOG.info(msg);
Expand Down Expand Up @@ -1247,6 +1261,13 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
);
}

private void writeCompletionReports(TaskToolbox toolbox)
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
{
if (!isCompactionTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
}

private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig)
{
return new IndexTuningConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
Expand Down Expand Up @@ -246,6 +247,9 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc
);
Assert.assertEquals("Compaction state for " + segment.getId(), expectedState, segment.getLastCompactionState());
}

List<IngestionStatsAndErrorsTaskReportData> reports = getIngestionReports();
Assert.assertEquals(reports.size(), 3); // since three index tasks are run by single compaction task
}

@Test
Expand Down
Loading
Loading