-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stateful auto compaction #8573
Stateful auto compaction #8573
Conversation
@@ -178,12 +178,11 @@ private CoordinatorStats doRun( | |||
final List<DataSegment> segmentsToCompact = iterator.next(); | |||
final String dataSourceName = segmentsToCompact.get(0).getDataSource(); | |||
|
|||
if (segmentsToCompact.size() > 1) { | |||
if (!segmentsToCompact.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line#179 should move in this if block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for finding this! Will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
i.e. user can explicitly run |
For now, yes. And I'm planning to add it in the near future. |
|
||
@Inject(optional = true) @PruneLoadSpec boolean pruneLoadSpec = false; | ||
@Inject(optional = true) @PrunePartitionsSpec boolean prunePartitionsSpec = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to add this to the comment on line 65?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
@@ -68,17 +66,19 @@ | |||
* github.com/google/guice/wiki/FrequentlyAskedQuestions#how-can-i-inject-optional-parameters-into-a-constructor | |||
*/ | |||
@VisibleForTesting | |||
public static class PruneLoadSpecHolder | |||
public static class PruneSpecs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps keep the Holder
suffix as it seems to be the naming convention for the optional constructor parameter injection pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
@@ -457,6 +488,7 @@ public Builder(DataSegment segment) | |||
this.dimensions = segment.getDimensions(); | |||
this.metrics = segment.getMetrics(); | |||
this.shardSpec = segment.getShardSpec(); | |||
this.compactionPartitionsSpec = segment.compactionPartitionsSpec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the getter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@Test | ||
public void testBucketMonthComparator() | ||
{ | ||
DataSegment[] sortedOrder = { | ||
makeDataSegment("test1", "2011-01-01/2011-01-02", "a"), | ||
makeDataSegment("test1", "2011-01-02/2011-01-03", "a"), | ||
makeDataSegment("test1", "2011-01-02/2011-01-03", "b"), | ||
makeDataSegment("test2", "2011-01-01/2011-01-02", "a"), | ||
makeDataSegment("test2", "2011-01-02/2011-01-03", "a"), | ||
makeDataSegment("test1", "2011-02-01/2011-02-02", "a"), | ||
makeDataSegment("test1", "2011-02-02/2011-02-03", "a"), | ||
makeDataSegment("test1", "2011-02-02/2011-02-03", "b"), | ||
makeDataSegment("test2", "2011-02-01/2011-02-02", "a"), | ||
makeDataSegment("test2", "2011-02-02/2011-02-03", "a"), | ||
}; | ||
|
||
List<DataSegment> shuffled = new ArrayList<>(Arrays.asList(sortedOrder)); | ||
Collections.shuffle(shuffled); | ||
|
||
Set<DataSegment> theSet = new TreeSet<>(DataSegment.bucketMonthComparator()); | ||
theSet.addAll(shuffled); | ||
|
||
int index = 0; | ||
for (DataSegment dataSegment : theSet) { | ||
Assert.assertEquals(sortedOrder[index], dataSegment); | ||
++index; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this test no longer needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bucketMonthComparator()
is not used anywhere.
DataSegmentPusher segmentPusher | ||
) | ||
{ | ||
return appenderatorsManager.createOfflineAppenderatorForTask( | ||
taskId, | ||
dataSchema, | ||
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()), | ||
firehoseFactory instanceof IngestSegmentFirehoseFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative to using instanceof
is to add another method to FirehoseFactory
(i.e., polymorphism)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is ugly, but I don't have a better idea. What makes most sense to me is adding a new method getTaskType()
, but it's a pretty big refactoring which is not necessary in this PR.
return newTuningConfig.withPartitionsSpec( | ||
new DynamicPartitionsSpec( | ||
dynamicPartitionsSpec.getMaxRowsPerSegment(), | ||
dynamicPartitionsSpec.getMaxTotalRowsOr(Long.MAX_VALUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative that allocates one fewer TuningConfig
for the DynamicPartitionsSpec
case:
PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec());
if (partitionsSpec instanceof DynamicPartitionsSpec) {
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec;
partitionsSpec = new DynamicPartitionsSpec(
dynamicPartitionsSpec.getMaxRowsPerSegment(),
dynamicPartitionsSpec.getMaxTotalRowsOr(Long.MAX_VALUE),
);
}
return newTuningConfig.withPartitionsSpec(partitionsSpec);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
@@ -191,6 +192,10 @@ public void testRun() throws Exception | |||
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), | |||
segments.get(i).getInterval() | |||
); | |||
Assert.assertEquals( | |||
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps save this as a named constant since it's used a lot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
@@ -201,7 +208,7 @@ private void updateQueue(String dataSourceName, DataSourceCompactionConfig confi | |||
.filter(holder -> { | |||
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator()); | |||
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); | |||
return chunks.size() > 1 | |||
return chunks.size() > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer !chunks.isEmpty()
(similar to the change you made on line 185)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
candidates.segments.get(0).getDataSource(), | ||
candidates.segments.get(0).getInterval() | ||
); | ||
if (candidates.getNumSegments() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer !candidates.isEmpty()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
@@ -229,15 +236,58 @@ public boolean hasNext() | |||
} | |||
} | |||
|
|||
private static boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCompact candidates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which tests cover this new logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DruidCoordinatorSegmentCompactorTest
verifies the behavior of DruidCoordinatorSegmentCompactor
.
I also changed |
This pull request introduces 1 alert when merging 47dca61 into 6c60929 - view on LGTM.com new alerts:
|
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
public class CompactionState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add javadoc for this class describing what this is, why it has the field it does ...(I know there is some discussion in proposal, but it would be very non-obvious for someone reading the code) and what guarantees it provides ... e.g. something like if a CompactionTask
is run with parameters matching here then row distribution in segments created would be exactly same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added javadoc. Please take a look if it's enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
public class CompactionState | ||
{ | ||
private final PartitionsSpec partitionsSpec; | ||
// org.apache.druid.segment.IndexSpec cannot be used here to avoid the dependency cycle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't understand what is the cycle ? .. do you mean IndexSpec
can contain a CompactionState
, so json serde would fail ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is IndexSpec
is in the processing
module while this class is in the core
module. Since processing
has a dependency on core
, I cannot add a new dependency of core
-> processing
since it will introduce a cycle. I updated this comment more understandable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thanks.
maybe in next round of module merge: merge core into processing if there is no use case of anyone depending on druid-core directly :)
@@ -786,8 +786,7 @@ A description of the compaction config is: | |||
|`dataSource`|dataSource name to be compacted.|yes| | |||
|`taskPriority`|[Priority](../ingestion/tasks.html#priority) of compaction task.|no (default = 25)| | |||
|`inputSegmentSizeBytes`|Maximum number of total segment bytes processed per compaction task. Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. Because each compaction task runs with a single thread, setting this value too far above 1–2GB will result in compaction tasks taking an excessive amount of time.|no (default = 419430400)| | |||
|`targetCompactionSizeBytes`|The target segment size, for each segment, after compaction. The actual sizes of compacted segments might be slightly larger or smaller than this value. Each compaction task may generate more than one output segment, and it will try to keep each output segment close to this configured size. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably add a blurb in release notes for this , just in case some people set this property and expect something to happen.
isReingest = dataSchema.getDataSource().equals(((IngestSegmentFirehoseFactory) firehoseFactory).getDataSource()); | ||
} else { | ||
isReingest = false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to drive this from auto compaction code in druid coordinator instead as IngestSegmentFirehoseFactory
could be used outside of auto compaction as well. For example, as a user , knowing my data flow, I can setup re-index task to run every day for previous day's data .. sort of a manual compaction. But in that case, the CompactionState
doesn't need to be preserved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I thought lastCompactionState
could be useful even for manual compaction as well. How about adding a parameter to taskContext to store lastCompactionState
, so that other users also can use it if they want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah , that would work and let user (in this case auto compaction code) explicitly say whether CompactionState should be saved or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, added a new task context configuration. I'm still not sure whether this should be documented though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, I think this config is best left undocumented and for auto compaction internal usage only.
Some docs should be further updated, but I will do it once this PR and #8570 are merged. |
Thanks for the review @ccaominh and @himanshug! |
Fixes #8489.
Description
In addition to #8489,
targetCompactionSizeBytes
is dropped for the compaction task and auto compaction.targetCompactionSizeBytes
was added for easy configuration, but it could be a misleading that optimizing segments should be done in terms of the size rather than number of rows. DroppingtargetCompactionSizeBytes
also makes things simpler such that all tasks can share the same partitionsSpec sincetargetCompactionSizeBytes
makes sense only for compaction task.maxRowsPerSegment
now is a mandatory configuration for auto compaction. For compaction task, any partitionsSpec can be used.Also fixed a bug that auto compaction couldn't compact an interval if there is only one segment. Note that compaction can split a segment into smaller ones.
This PR has:
This change is