Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor parallel indexing perfect rollup partitioning #8852

Merged
merged 18 commits into from
Nov 21, 2019

Conversation

ccaominh
Copy link
Contributor

@ccaominh ccaominh commented Nov 11, 2019

Description

Refactoring to make it easier to later add range partitioning for perfect rollup parallel indexing. This is accomplished by adding several new base classes (e.g., PerfectRollupWorkerTask) and new classes for encapsulating logic that needs to be changed for different partitioning
strategies (e.g., IndexTaskInputRowIteratorBuilder).

The code is functionally equivalent to before except for the following small behavior changes:

  1. PartialSegmentMergeTask: Previously, this task had a priority of DEFAULT_TASK_PRIORITY. It now has a priority of DEFAULT_BATCH_INDEX_TASK_PRIORITY (via the new PerfectRollupWorkerTask base class), since it is a batch index task.

This PR has:

  • been self-reviewed.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.

Refactoring to make it easier to later add range partitioning for
perfect rollup parallel indexing. This is accomplished by adding several
new base classes (e.g., PerfectRollupWorkerTask) and new classes for
encapsulating logic that needs to be changed for different partitioning
strategies (e.g., IndexTaskInputRowIteratorBuilder).

The code is functionally equivalent to before except for the following
small behavior changes:

1) PartialSegmentMergeTask: Previously, this task had a priority of
   DEFAULT_TASK_PRIORITY. It now has a priority of
   DEFAULT_BATCH_INDEX_TASK_PRIORITY (via the new PerfectRollupWorkerTask
   base class), since it is a batch index task.

2) ParallelIndexPhaseRunner: A decorator was added to
   subTaskSpecIterator to ensure the subtasks are generated with unique
   ids. Previously, only tests (i.e., MultiPhaseParallelIndexingTest)
   would have this decorator, but this behavior is desired for non-test
   code as well.
@suneet-s
Copy link
Contributor

suneet-s commented Nov 11, 2019

@ccaominh Any recommendations on where to start reviewing this code for someone who doesn't have a deep understanding of the ingest system? Also is there a design doc that talks about the different components involved

@ccaominh
Copy link
Contributor Author

@suneet-amp The entry point for parallel native batch indexing is ParallelIndexSupervisorTask#runTask(), and the subsequent code path relevant to this PR is ParallelIndexSupervisorTask#runMultiPhaseParallel().

There is a "Proposal" issue label that'll list a bunch of design docs: https://github.com/apache/incubator-druid/labels/Proposal

In particular, #5543 and #8061 are related to the code paths that are refactored in this PR.

The relevant external documentation is: https://druid.apache.org/docs/latest/ingestion/native-batch.html#parallel-task

@suneet-s
Copy link
Contributor

Thanks!

* the partitionSpec is compatible.
*/
@JsonIgnore
String getForceGuaranteedRollupIncompatiblityReason();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: there are 3 ways of saying whether something is incompatible or not. Is there a better way to do this?

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished reviewing stuff under core. More reviews coming today. Sorry it took me almost a week to get around to this.

private static final String MAX_PARTITION_SIZE = "maxPartitionSize";
private static final String FORCE_GUARANTEED_ROLLUP_COMPATIBLE = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in my follow-up PR that adds the implementation of range partitioning.

public String getForceGuaranteedRollupIncompatiblityReason()
{
return NAME + " partitions unsupported";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should isForceGuaranteedRollupCompatibleType also be implemented to return false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it could be implemented in a clearer way, but currently I have the default implementation of isForceGuaranteedRollupCompatibleType in the PartitionsSpec interface that returns true if this method returns an empty string.

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second round of comments - I haven't reviewed any of the tests in indexing-service, but have read through everything else. Overall the refactoring looks good to me 👍

{
return allocateSpec;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code - but getSequenceName below uses String#format. If this is called a bunch of times I'd recommend switching this to using string addition or a StringBuilder. Here are some results from a micro-benchmark I ran comparing a million combinations. String.format takes 1.7s vs 300 ms for the other 2 methods. This is because internally String#format has to compile the format on each invocation.

I don't have enough context to know if this is important or not, but figured I'd flag it. Totally ok to skip this since it has nothing to do with your refactoring.

/**
     * String#format 1M times -
     *     INIT    Total: 1384.5 ms, average: 0.0 ms, stdev: 0.0 ms, median: 0.0 ms
     *     RUN     Total: 1720.3 ms, average: 0.0 ms, stdev: 0.0 ms, median: 0.0 ms
     * String plus 1M times -
     *     INIT    Total: 1323.1 ms, average: 0.0 ms, stdev: 0.0 ms, median: 0.0 ms
     *     RUN     Total: 235.5 ms, average: 0.0 ms, stdev: 0.0 ms, median: 0.0 ms
     * StringBuilder 1M times -
     *     INIT    Total: 1408.0 ms, average: 0.0 ms, stdev: 0.0 ms, median: 0.0 ms
     *     RUN     Total: 337.3 ms, average: 0.0 ms, stdev: 0.0 ms, median: 0.0 ms
     */

Copy link
Contributor

@jihoonson jihoonson Nov 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. StringUtils.format() should never used in performance-sensitive code path. I'm pretty sure getSequenceName() is not a performance bottleneck in indexing for now, but guess this is good to have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a general ticket to profile parallel indexing and then make additional optimizations as needed (this potentially being one of them).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with not doing it in this PR. @suneet-amp are you interested in filing this issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson Happy to file an issue! Just to clarify - this should be an issue to profile parallel indexing and look for optimizations correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, I think there could be two issues here. One is optimizing performance of indexing tasks in general and the other is not using StringUtils.format() in getSequenceName(). I think it's pretty obvious and good to have regardless of the profiling result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #8904 to replace StringUtils#format

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks!

);
}
return intervalToSegmentIds;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Missing tests for creating the delegate using getIntervalToSegmentIds?

The logic appears non-trivial - It might be worth adding a very simple test just to ensure the delegate is created correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test

.granularitySpec(granularitySpec)
.nullRowRunnable(buildSegmentsMeters::incrementThrownAway)
.absentBucketIntervalConsumer(inputRow -> buildSegmentsMeters.incrementThrownAway())
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice - I like how this reads a lot!

continue;
}

Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
@SuppressWarnings("OptionalGetWithoutIsPresent") // always present via IndexTaskInputRowIteratorBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this. Both implementations of GranularitySpec can return absent

How is the granularitySpec that's passed to inputRowIteratorBuilder guaranteed to return a present Interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultIndexTaskInputRowIteratorBuilder has an absent bucket interval handler, which adds a HandlingInputrowIterator.InputRowHandler that returns true if the row has a timestamp that doesn't match the intervals in the granularitySpec. That means the row gets skipped by this loop as the iterator will return null (i.e., row was handled already).

I'll improve the comment.

}

abstract SubTaskSpec<T> createSubTaskSpec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: javadoc please

abstract int getPartitionId();

abstract T getSecondaryPartition();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: javadocs - these seem like obvious things that someone more familiar with the codebase will know just from the name. Feel free to ignore.

}

@Override
public final boolean isPerfectRollup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It looks like the intention of this is to indicate that we want to use the timeChunk for rollups instead of segment? Should we rename it to something like useTimeChunkForRollups It's unclear to me why perfect Rollups mean using the timeChunk instead of segment for rollups.

Also it looks like this is only used in AbstractBatchIndexTask can we make this method protected instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "using timeChunk for rollups", but the perfect rollup is pre-aggregating rows across the entire input data. See https://druid.apache.org/docs/latest/ingestion/index.html#rollup for more details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson In AbstractBatchIndexTask line 268-271 it says

if (isPerfectRollup()) {
    log.info("Using timeChunk lock for perfect rollup");
    ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's the timeChunk lock rather than timeChunk. Tasks must use the timeChunk lock instead of segment lock for perfect rollup because the rollup happens across the entire input data. See #7491 for timeChunk lock vs segment lock.

/**
* @param inputRowHandler Optionally, append this input row handler to the required ones.
*/
DefaultIndexTaskInputRowIteratorBuilder appendInputRowHandler(HandlingInputRowIterator.InputRowHandler inputRowHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: @VisibleForTesting

Or indicate that we can add additional row handlers in the top level javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to javadoc.

* If any of the handlers invoke their respective callback, the {@link HandlingInputRowIterator} will yield
* a null {@link InputRow} next; otherwise, the next {@link InputRow} is yielded.
* </pre>
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this abstraction a lot 😍

};

Consumer<InputRow> NOOP_CONSUMER = inputRow -> {
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: both the NOOP variables are only used in tests. Do you plan to use this in a future PR? Maybe move to the tests folder otherwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're going to be used in the production code for my follow up PR that adds the implementation for range partitioning.

.delegate(inputRowIterator)
.granularitySpec(granularitySpec)
.nullRowRunnable(buildSegmentsMeters::incrementThrownAway)
.absentBucketIntervalConsumer(inputRow -> buildSegmentsMeters.incrementThrownAway())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semi-related: I don't think this should be done in this pr, but I think it would be better to add MetricsCollectingInputSourceReader or something since this decoration will be same across all task types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also semi-related, another place where I think refactoring is needed is having a consistent and systematic way to throw ParseException. In the current code, ParseException can be thrown in any place and callers must be careful to not miss it (it's a RuntimeException. I think we need some ParseErrorHandlingRunner or something to handle this in a single place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestions, but I won't address in this PR

*/
class PartialSegmentGenerateParallelIndexTaskRunner
extends ParallelIndexPhaseRunner<PartialSegmentGenerateTask, GeneratedPartitionsReport>
abstract class FirehoseSplitParallelIndexTaskRunner<T extends Task, R extends SubTaskReport>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputSourceSplit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

return super.next();
}

private void ensureUniqueSubtaskId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was missing in production code since it's not really happening mostly. I think #8612 is a better way to fix this.

}

@Override
public final boolean isPerfectRollup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "using timeChunk for rollups", but the perfect rollup is pre-aggregating rows across the entire input data. See https://druid.apache.org/docs/latest/ingestion/index.html#rollup for more details.

.travis.yml Outdated
@@ -277,6 +277,9 @@ jobs:
echo $v dmesg ======================== ;
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
done
- for v in ~/shared/tasklogs/*.log ; do
echo $v logtail ======================== ; tail -100 $v ;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could cause travis failure since it would fail once the number of output rows reaches to the limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revert this for now

@Override
public final int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! Wondering if we can move this to AbstractBatchIndexTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved it to AbstractBatchIndexTask

.delegate(inputRowIterator)
.granularitySpec(granularitySpec)
.nullRowRunnable(buildSegmentsMeters::incrementThrownAway)
.absentBucketIntervalConsumer(inputRow -> buildSegmentsMeters.incrementThrownAway())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also semi-related, another place where I think refactoring is needed is having a consistent and systematic way to throw ParseException. In the current code, ParseException can be thrown in any place and callers must be careful to not miss it (it's a RuntimeException. I think we need some ParseErrorHandlingRunner or something to handle this in a single place.

/**
* Decorated {@link CloseableIterator<InputRow>} that can process rows with {@link InputRowHandler}s.
*/
public class HandlingInputRowIterator implements Iterator<InputRow>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implementing CloseableIterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make that addition. When I originally implemented the class, it was providing a iterator interface for a firehose.

InputRow inputRow = delegate.next();

for (InputRowHandler inputRowHandler : inputRowHandlers) {
if (inputRowHandler.handle(inputRow)) {
Copy link
Contributor

@jihoonson jihoonson Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop will invoke virtual calls per inputRow, which could be slow with large input. I don't think this will lead to any performance degradation for now because segment merge is the most prominent bottleneck. However, I would recommend adding some notes as here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add comments warning about the overhead and potential future work.

{
return allocateSpec;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with not doing it in this PR. @suneet-amp are you interested in filing this issue?

import java.util.List;
import java.util.Map;

class Factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this class!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to rename the class more intuitive or add javadoc so that other people can also use this class. Maybe both would be best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed to something more descriptive and added a javadoc

*/
public class CachingLocalSegmentAllocator implements IndexTaskSegmentAllocator
class CachingLocalSegmentAllocator implements IndexTaskSegmentAllocator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the non-default version of CachingLocalSegmentAllocator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preview of RangePartitionCachingLocalSegmentAllocator: https://github.com/ccaominh/incubator-druid/blob/backup-superbatch-range-partitioning/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocator.java

"Default" may not be the best name for the class, so I'm open to better suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Maybe HashPartitioningCachingLocalSegmentAllocator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

{
private final String host;
private final int port;
private final boolean useHttps;
private final String subTaskId;
private final Interval interval;
private final int partitionId;
private final T secondaryPartition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be secondaryPartition for range partitioning?

Copy link
Contributor Author

@ccaominh ccaominh Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses the ShardSpec, so that it's generic enough for any kind of partitioning. Here's a preview of GenericPartitionLocation: https://github.com/ccaominh/incubator-druid/blob/backup-superbatch-range-partitioning/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java

For hash partitions, the existing implementation (HashPartitionLocation) is more optimal with regard to having a smaller payload (since the partition dimensions do not need to be repeated for each partition).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, does it need the entire shardSpec for range partitioning? I thought a triple of (startKey, endKey, partitionId) would be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discussion of what exactly is used is probably best on the follow up PR that adds the implementation of range partitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds right.

@ccaominh
Copy link
Contributor Author

Travis "other modules" test failure is due to flaky EmitterTest

@ccaominh
Copy link
Contributor Author

Travis failure for "other integration test" is due to flaky ItBasicAuthConfigurationTest as described in #7021

static final String HOST = "host";
static final int PORT = 1;
static final String SUBTASK_ID = "subtask-id";
private static final ObjectMapper NESTED_OBJECT_MAPPER = TestHelper.makeJsonMapper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe TestUtils.getTestObjectMapper() is more useful. It's lame that we have a couple of helper classes to make an ObjectMapper though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of using the object mapper from TestUtils versus TestHelper. Both are probably more than what's actually needed for the current usages of NESTED_OBJECT_MAPPER.

@ccaominh
Copy link
Contributor Author

I've had to merge master a few times now to resolve merge conflicts, so getting this through code review soon would be greatly appreciated!


static ObjectMapper createObjectMapper()
{
InjectableValues injectableValues = new InjectableValues.Std()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm sorry, I left a comment on a wrong line. This seems duplicate with TestUtils.getTestObjectMapper() except HttpClient. Probably better to reuse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made changes to TestUtils and changed this class to use TestUtils


Map<String, Object> parser = NESTED_OBJECT_MAPPER.convertValue(
new StringInputRowParser(
new JSONParseSpec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can use the new InputFormat API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

static ParallelIndexIngestionSpec createIngestionSpec(
InlineFirehoseFactory inlineFirehoseFactory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can use the new InputSource API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
public void close()
{
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this class for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for the convenience of not having to override close() when creating each of the iterators that are used for the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think it will be more clear to not extend CloseableIterator in that case. If it had to extend CloseableIterator, it makes more sense to me to call close() properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to use CloseableIterators.withEmptyBaggage()

import java.util.List;
import java.util.Map;

class Factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to rename the class more intuitive or add javadoc so that other people can also use this class. Maybe both would be best.

import java.util.List;
import java.util.function.Supplier;

class Factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I would suggest to rename the class more intuitive or add javadoc so that other people can also use this class. Maybe both would be best.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed to something more descriptive and added a javadoc

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after CI.

I've had to merge master a few times now to resolve merge conflicts, so getting this through code review soon would be greatly appreciated!

@ccaominh sorry for those conflicts and thanks for your patience! Please understand sometimes merge conflicts happen especially when different people work on the parts related to each other.

@gianm gianm added this to the 0.17.0 milestone Nov 21, 2019
@gianm gianm merged commit ff62173 into apache:master Nov 21, 2019
@ccaominh ccaominh deleted the superbatch-partition-refactor branch November 21, 2019 01:25
jon-wei pushed a commit to jon-wei/druid that referenced this pull request Nov 26, 2019
* Refactor parallel indexing perfect rollup partitioning

Refactoring to make it easier to later add range partitioning for
perfect rollup parallel indexing. This is accomplished by adding several
new base classes (e.g., PerfectRollupWorkerTask) and new classes for
encapsulating logic that needs to be changed for different partitioning
strategies (e.g., IndexTaskInputRowIteratorBuilder).

The code is functionally equivalent to before except for the following
small behavior changes:

1) PartialSegmentMergeTask: Previously, this task had a priority of
   DEFAULT_TASK_PRIORITY. It now has a priority of
   DEFAULT_BATCH_INDEX_TASK_PRIORITY (via the new PerfectRollupWorkerTask
   base class), since it is a batch index task.

2) ParallelIndexPhaseRunner: A decorator was added to
   subTaskSpecIterator to ensure the subtasks are generated with unique
   ids. Previously, only tests (i.e., MultiPhaseParallelIndexingTest)
   would have this decorator, but this behavior is desired for non-test
   code as well.

* Fix forbidden apis and pmd warnings

* Fix analyze dependencies warnings

* Fix IndexTask json and add IT diags

* Fix parallel index supervisor<->worker serde

* Fix TeamCity inspection errors/warnings

* Fix TeamCity inspection errors/warnings again

* Integrate changes with those from apache#8823

* Address review comments

* Address more review comments

* Fix forbidden apis

* Address more review comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants