Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InputSource and InputFormat interfaces #8823

Merged
merged 31 commits into from
Nov 15, 2019

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Nov 5, 2019

Description

This is the First PR for #8812 which includes the new interfaces proposed in #8812. A couple of implementations are also included such as LocalInputSource and HttpInputSource for InputSource, and CsvInputFormat and JsonInputFormat for InputFormat. Their formats are:

"inputSource": {
  "type" : "local",
  "baseDir" : "/path/to/dir",
  "filter" : "your filter"
}
"inputSource": {
  "type" : "http",
  "uris" : ["http://example.com/uri1", "http://example.com/uri2"],
  "httpAuthenticationUsername": "username",
  "httpAuthenticationPassword": "password provider"
}
"inputFormat": {
  "type": "csv",
  "columns": [ "col1", "col2", "col3" ],
  "listDelimiter": "|",
  "findColumnsFromHeader" : true,
  "skipHeaderRows" : 3
}
"inputFormat": {
  "type": "json",
  "flattenSpec": {
    // your flatten spec
  }
}

Note that both inputSource and inputFormat are in ioConfig as below:

"ioConfig": {
  "type" : "index" or "index_parallel",
  "inputSource" : {
    // your input source
  },
  "inputFormat": {
    // your input format
  }
}

These are supported only by native batch indexing tasks yet. Sampler doesn't support them yet.

The old firehose and parser parameters should still work, but you cannot mix them. Only the combinations of firehose + parser or inputSource + inputFormat are allowed.

Documents will be added after more inputSources and inputFormats are implemented in follow-up PRs.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

This change is Reviewable

@vogievetsky
Copy link
Contributor

This is amazing.

@lgtm-com
Copy link

lgtm-com bot commented Nov 6, 2019

This pull request fixes 1 alert when merging d451582 into 3b602da - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

@fjy
Copy link
Contributor

fjy commented Nov 6, 2019

One of the most exciting PRs on Druid ingestion in awhile. Glad we got it out.

@lgtm-com
Copy link

lgtm-com bot commented Nov 6, 2019

This pull request fixes 1 alert when merging b7c8b87 into 5c0fc0a - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 6, 2019

This pull request fixes 1 alert when merging e942a21 into 517c146 - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

if (firehoseFactory.isSplittable()) {
return ((FiniteFirehoseFactory) firehoseFactory).getSplits(splitHintSpec);
} else {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is supporting unsplittable Firehoses future work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, only splittable firehose can create splits.

}
}

private static class TestCsvParseSpec extends CSVParseSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Rename the class to something like UnimplementedInputFormatCsvParseSpec. Currently, looking at just the body of testUnimplementedInputFormat, it's not apparent where the unimplemented input format is coming from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 renamed.

@@ -57,10 +58,10 @@

@JsonCreator
public TimestampSpec(
@JsonProperty("column") String timestampColumn,
@JsonProperty("format") String format,
@JsonProperty("column") @Nullable String timestampColumn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding!

Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec(), "parseSpec");
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec().getTimestampSpec(), "timestampSpec");
Preconditions.checkNotNull(schema.getDataSchema().getNonNullTimestampSpec(), "timestampSpec");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking this one for null seems redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -294,7 +294,7 @@ public IndexMergerV9 getIndexMergerV9()
return indexMergerV9;
}

public File getFirehoseTemporaryDir()
public File getIndexingTmpDir()
{
return new File(taskWorkDir, "firehose");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps rename the temporary directory as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to indexing-tmp.

ImmutableList.of(new Property<>("firehose", firehoseFactory), new Property<>("inputSource", inputSource))
);
if (firehoseFactory != null && inputFormat != null) {
throw new IAE("Cannot use firehose and inputFormat together. Try use inputSource instead of firehose.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Try use inputFormat -> Try using inputSource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

);
if (dataSchema.getParserMap() != null && ioConfig.getInputSource() != null) {
if (!(ioConfig.getInputSource() instanceof FirehoseFactoryToInputSourceAdaptor)) {
throw new IAE("Cannot use parser and inputSource together. Try use inputFormat instead of parser.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Try use inputFormat -> Try using inputFormat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 139 to 142
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, false},
new Object[]{LockGranularity.SEGMENT, true}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a relatively slow test (~15 seconds per parameterized run), so all the permutations may be overkill. Perhaps remove (SEGMENT, false), which will still give coverage of both lock granularities and both with/without the input format API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

new Object[]{LockGranularity.SEGMENT}
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, false},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to IndexingTest about skipping this permutation

new Object[]{LockGranularity.SEGMENT}
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, false},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to IndexingTest about skipping this permutation

@lgtm-com
Copy link

lgtm-com bot commented Nov 9, 2019

This pull request fixes 1 alert when merging 546d957 into 0e8c3f7 - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 9, 2019

This pull request fixes 1 alert when merging ea2c8f9 into 75ea0d5 - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

@lgtm-com
Copy link

lgtm-com bot commented Nov 11, 2019

This pull request fixes 1 alert when merging 218b392 into e9e1625 - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm 🤘

*/
default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException
{
final File tempFile = File.createTempFile("druid-object-source", ".tmp", temporaryDirectory);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should druid-object-source be druid-input-entity since this class had it's name changed? (same with the LOG.debug message just below this line, as well as some of the javadoc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

File file();
}

T getObject();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getEntity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this and added getUri() instead.

{
final Map<String, Object> zipped = parseLine(line);
return Collections.singletonList(
MapInputRowParser.parse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a version of MapInputRowParser.parse that takes an InputRowSchema since providing timestamp and dimension specs from it seems like it will be common

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

if (transforms.isEmpty()) {
transformedRow = row;
} else {
transformedRow = InputRowListPlusJson.of(new TransformedInputRow(row.getInputRow(), transforms), row.getRaw());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be transforming the list of InputRow to TransformedInputRow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 fixed.

import java.lang.annotation.Target;

/**
* Signifies that the annotated entity is an unstable API for extension authors. Unstable APIs may change at any time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also maybe indicate that someday it will likely become either an @ExtensionPoint or @PublicApi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm 👍

I'm slightly hesitant since I feel that this will be a moderately disruptive change that further fractures the state of indexing with regards to differences between specs, but I think these new interfaces are nicer going forward, so worth the pain of migrating stuff to this model and fully replacing firehoses.

@lgtm-com
Copy link

lgtm-com bot commented Nov 14, 2019

This pull request fixes 1 alert when merging ce88049 into ce4ee42 - view on LGTM.com

fixed alerts:

  • 1 for Dereferenced variable may be null

@@ -74,13 +74,13 @@
*
* @return an InputRowPlusRaw which may contain any of: an InputRow, the raw data, or a ParseException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc for @return needs to be updated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only for sampler and will be removed in the follow-up pr.

{
return new InputRowPlusRaw(null, raw, parseException);
return (inputRows == null || inputRows.isEmpty()) && raw == null && rawJson == null && parseException == null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also check if rawJson.isEmpty()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is also used only by sampler and will be cleaned up in the follow-up pr.

Copy link
Contributor

@ccaominh ccaominh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jihoonson jihoonson merged commit 1611792 into apache:master Nov 15, 2019
@jihoonson
Copy link
Contributor Author

@ccaominh @clintropolis @jon-wei thanks for the review!

ccaominh added a commit to ccaominh/druid that referenced this pull request Nov 15, 2019
gianm pushed a commit that referenced this pull request Nov 21, 2019
* Refactor parallel indexing perfect rollup partitioning

Refactoring to make it easier to later add range partitioning for
perfect rollup parallel indexing. This is accomplished by adding several
new base classes (e.g., PerfectRollupWorkerTask) and new classes for
encapsulating logic that needs to be changed for different partitioning
strategies (e.g., IndexTaskInputRowIteratorBuilder).

The code is functionally equivalent to before except for the following
small behavior changes:

1) PartialSegmentMergeTask: Previously, this task had a priority of
   DEFAULT_TASK_PRIORITY. It now has a priority of
   DEFAULT_BATCH_INDEX_TASK_PRIORITY (via the new PerfectRollupWorkerTask
   base class), since it is a batch index task.

2) ParallelIndexPhaseRunner: A decorator was added to
   subTaskSpecIterator to ensure the subtasks are generated with unique
   ids. Previously, only tests (i.e., MultiPhaseParallelIndexingTest)
   would have this decorator, but this behavior is desired for non-test
   code as well.

* Fix forbidden apis and pmd warnings

* Fix analyze dependencies warnings

* Fix IndexTask json and add IT diags

* Fix parallel index supervisor<->worker serde

* Fix TeamCity inspection errors/warnings

* Fix TeamCity inspection errors/warnings again

* Integrate changes with those from #8823

* Address review comments

* Address more review comments

* Fix forbidden apis

* Address more review comments
jon-wei pushed a commit to jon-wei/druid that referenced this pull request Nov 26, 2019
* Refactor parallel indexing perfect rollup partitioning

Refactoring to make it easier to later add range partitioning for
perfect rollup parallel indexing. This is accomplished by adding several
new base classes (e.g., PerfectRollupWorkerTask) and new classes for
encapsulating logic that needs to be changed for different partitioning
strategies (e.g., IndexTaskInputRowIteratorBuilder).

The code is functionally equivalent to before except for the following
small behavior changes:

1) PartialSegmentMergeTask: Previously, this task had a priority of
   DEFAULT_TASK_PRIORITY. It now has a priority of
   DEFAULT_BATCH_INDEX_TASK_PRIORITY (via the new PerfectRollupWorkerTask
   base class), since it is a batch index task.

2) ParallelIndexPhaseRunner: A decorator was added to
   subTaskSpecIterator to ensure the subtasks are generated with unique
   ids. Previously, only tests (i.e., MultiPhaseParallelIndexingTest)
   would have this decorator, but this behavior is desired for non-test
   code as well.

* Fix forbidden apis and pmd warnings

* Fix analyze dependencies warnings

* Fix IndexTask json and add IT diags

* Fix parallel index supervisor<->worker serde

* Fix TeamCity inspection errors/warnings

* Fix TeamCity inspection errors/warnings again

* Integrate changes with those from apache#8823

* Address review comments

* Address more review comments

* Fix forbidden apis

* Address more review comments
@jon-wei jon-wei added this to the 0.17.0 milestone Dec 17, 2019
abhishekagarwal87 pushed a commit that referenced this pull request Mar 2, 2023
The FiniteFirehoseFactory and InputRowParser classes were deprecated in 0.17.0 (#8823) in favor of InputSource & InputFormat. This PR removes the FiniteFirehoseFactory and all its implementations along with classes solely used by them like Fetcher (Used by PrefetchableTextFilesFirehoseFactory). Refactors classes including tests using FiniteFirehoseFactory to use InputSource instead.
Removing InputRowParser may not be as trivial as many classes that aren't deprecated depends on it (with no alternatives), like EventReceiverFirehoseFactory. Hence FirehoseFactory, EventReceiverFirehoseFactory, and Firehose are marked deprecated.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants