Skip to content

Commit

Permalink
Support overlapping segment intervals in auto compaction (#12062)
Browse files Browse the repository at this point in the history
* add impl

* add impl

* fix more bugs

* add tests

* fix checkstyle

* address comments

* address comments

* fix test
  • Loading branch information
maytasm authored Jan 4, 2022
1 parent fe71fc4 commit b53e7f4
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand Down Expand Up @@ -775,6 +776,59 @@ public void testAutoCompactionDutyWithFilter() throws Exception
}
}

@Test
public void testAutoCompactionDutyWithOverlappingInterval() throws Exception
{
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.NONE, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
// Create WEEK segment with 2013-08-26 to 2013-09-02
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.NONE, false, ImmutableList.of(new Interval("2013-09-01/2013-09-02", chrono))));
// Create MONTH segment with 2013-09-01 to 2013-10-01
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);

try (final Closeable ignored = unloader(fullDatasourceName)) {
verifySegmentsCount(2);

// Result is not rollup
// For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
Map<String, Object> expectedResult = ImmutableMap.of(
"%%EXPECTED_COUNT_RESULT%%",
2,
"%%EXPECTED_SCAN_RESULT%%",
ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);

submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
null,
null,
null,
false
);
// Compact the MONTH segment
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);

// Compact the WEEK segment
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);

// Verify all task succeed
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
for (TaskResponseObject taskResponseObject : compactTasksBefore) {
Assert.assertEquals(TaskState.SUCCESS, taskResponseObject.getStatus());
}

// Verify compacted segments does not get compacted again
forceTriggerAutoCompaction(2);
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}

private void loadData(String indexTask) throws Exception
{
loadData(indexTask, ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

Expand Down Expand Up @@ -51,7 +50,7 @@ public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segmen
JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
),
SegmentUtils.hashIds(segments)
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
Expand Down Expand Up @@ -343,18 +345,37 @@ private CoordinatorStats doRun(
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());

final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);

// Create granularitySpec to send to compaction task
ClientCompactionTaskGranularitySpec granularitySpec;
if (config.getGranularitySpec() != null) {
granularitySpec = new ClientCompactionTaskGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup()

);
Granularity segmentGranularityToUse = null;
if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) {
// Determines segmentGranularity from the segmentsToCompact
// Each batch of segmentToCompact from CompactionSegmentIterator will contains the same interval as
// segmentGranularity is not set in the compaction config
Interval interval = segmentsToCompact.get(0).getInterval();
if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps(segment.getInterval()))) {
try {
segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
}
catch (IAE iae) {
// This case can happen if the existing segment interval result in complicated periods.
// Fall back to setting segmentGranularity as null
LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval);
}
} else {
LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task");
}
} else {
granularitySpec = null;
segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
}
granularitySpec = new ClientCompactionTaskGranularitySpec(
segmentGranularityToUse,
config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null,
config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null

);

// Create dimensionsSpec to send to compaction task
ClientCompactionTaskDimensionsSpec dimensionsSpec;
if (config.getDimensionsSpec() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -48,6 +53,7 @@ public class HttpIndexingServiceClientTest
private HttpIndexingServiceClient httpIndexingServiceClient;
private ObjectMapper jsonMapper;
private DruidLeaderClient druidLeaderClient;
private ObjectMapper mockMapper;

@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand All @@ -57,6 +63,8 @@ public void setup()
{
jsonMapper = new DefaultObjectMapper();
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
mockMapper = EasyMock.createMock(ObjectMapper.class);

httpIndexingServiceClient = new HttpIndexingServiceClient(
jsonMapper,
druidLeaderClient
Expand Down Expand Up @@ -268,5 +276,70 @@ public void testGetTaskReportEmpty() throws Exception
EasyMock.verify(druidLeaderClient, response);
}

@Test
public void testCompact() throws Exception
{
DataSegment segment = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("bucket", "bucket", "path", "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"),
null,
null,
NoneShardSpec.instance(),
0,
1
);
Capture captureTask = EasyMock.newCapture();
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);

StringFullResponseHolder responseHolder = new StringFullResponseHolder(
response,
StandardCharsets.UTF_8
).addChunk(jsonMapper.writeValueAsString(ImmutableMap.of("task", "aaa")));

EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task"))
.andReturn(new Request(HttpMethod.POST, new URL("http://localhost:8090/druid/indexer/v1/task")))
.anyTimes();
EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
.andReturn(responseHolder)
.anyTimes();
EasyMock.expect(mockMapper.writeValueAsBytes(EasyMock.capture(captureTask)))
.andReturn(new byte[]{1, 2, 3})
.anyTimes();
EasyMock.expect(mockMapper.readValue(EasyMock.anyString(), EasyMock.eq(JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)))
.andReturn(ImmutableMap.of())
.anyTimes();
EasyMock.replay(druidLeaderClient, mockMapper);

HttpIndexingServiceClient httpIndexingServiceClient = new HttpIndexingServiceClient(
mockMapper,
druidLeaderClient
);

try {
httpIndexingServiceClient.compactSegments(
"test-compact",
ImmutableList.of(segment),
50,
null,
null,
null,
null,
null,
null
);
}
catch (Exception e) {
// Ignore IllegalStateException as taskId is internally generated and returned task id will failed check
Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName());
}

ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue();
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
}
}

Loading

0 comments on commit b53e7f4

Please sign in to comment.