-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add InputSource and InputFormat interfaces #8823
Changes from 10 commits
095bb32
f52f967
93ab23f
e0b80cb
b4f041e
d349db5
f308f13
d451582
b7c8b87
e942a21
08d7872
c70af75
546d957
7bb5d5f
6dba81a
ea2c8f9
1ea7758
218b392
7098056
7381277
2a3b114
355777c
e466ea9
87b83fa
c1c3fb9
169ab49
a9d167a
42a6965
230803b
540759b
ce88049
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.data.input; | ||
|
||
import com.google.common.base.Preconditions; | ||
import org.apache.druid.data.input.impl.DimensionsSpec; | ||
import org.apache.druid.data.input.impl.FirehoseToInputSourceReaderAdaptor; | ||
import org.apache.druid.data.input.impl.InputFormat; | ||
import org.apache.druid.data.input.impl.InputRowParser; | ||
import org.apache.druid.data.input.impl.SplittableInputSource; | ||
import org.apache.druid.data.input.impl.TimestampSpec; | ||
|
||
import javax.annotation.Nullable; | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.stream.Stream; | ||
|
||
public class FirehoseFactoryToInputSourceAdaptor implements SplittableInputSource | ||
{ | ||
private final FiniteFirehoseFactory firehoseFactory; | ||
private final InputRowParser inputRowParser; | ||
|
||
public FirehoseFactoryToInputSourceAdaptor(FiniteFirehoseFactory firehoseFactory, InputRowParser inputRowParser) | ||
{ | ||
this.firehoseFactory = firehoseFactory; | ||
this.inputRowParser = Preconditions.checkNotNull(inputRowParser, "inputRowParser"); | ||
} | ||
|
||
public FiniteFirehoseFactory getFirehoseFactory() | ||
{ | ||
return firehoseFactory; | ||
} | ||
|
||
@Override | ||
public boolean isSplittable() | ||
{ | ||
return firehoseFactory.isSplittable(); | ||
} | ||
|
||
@Override | ||
public Stream<InputSplit> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) | ||
throws IOException | ||
{ | ||
if (firehoseFactory.isSplittable()) { | ||
return firehoseFactory.getSplits(splitHintSpec); | ||
} else { | ||
throw new UnsupportedOperationException(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is supporting unsplittable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, only splittable firehose can create splits. |
||
} | ||
} | ||
|
||
@Override | ||
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException | ||
{ | ||
if (firehoseFactory.isSplittable()) { | ||
return firehoseFactory.getNumSplits(splitHintSpec); | ||
} else { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} | ||
|
||
@Override | ||
public SplittableInputSource withSplit(InputSplit split) | ||
{ | ||
if (firehoseFactory.isSplittable()) { | ||
return new FirehoseFactoryToInputSourceAdaptor( | ||
firehoseFactory.withSplit(split), | ||
inputRowParser | ||
); | ||
} else { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} | ||
|
||
@Override | ||
public InputSourceReader reader( | ||
TimestampSpec timestampSpec, | ||
DimensionsSpec dimensionsSpec, | ||
@Nullable InputFormat inputFormat, // inputFormat will be ignored | ||
@Nullable File temporaryDirectory | ||
) | ||
{ | ||
return new FirehoseToInputSourceReaderAdaptor(firehoseFactory, inputRowParser, temporaryDirectory); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.data.input; | ||
|
||
import com.fasterxml.jackson.annotation.JsonSubTypes; | ||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type; | ||
import com.fasterxml.jackson.annotation.JsonTypeInfo; | ||
import org.apache.druid.data.input.impl.DimensionsSpec; | ||
import org.apache.druid.data.input.impl.HttpInputSource; | ||
import org.apache.druid.data.input.impl.InputFormat; | ||
import org.apache.druid.data.input.impl.LocalInputSource; | ||
import org.apache.druid.data.input.impl.TimestampSpec; | ||
import org.apache.druid.guice.annotations.ExtensionPoint; | ||
|
||
import javax.annotation.Nullable; | ||
import java.io.File; | ||
|
||
/** | ||
* InputSource abstracts the storage system where input data is stored. | ||
* It creates an {@link InputSourceReader} to read data from the given input source. | ||
* The most common use case would be: | ||
* | ||
* <pre>{@code | ||
* InputSourceReader reader = inputSource.reader(); | ||
* try (CloseableIterator<InputRow> iterator = reader.read()) { | ||
* while (iterator.hasNext()) { | ||
* InputRow row = iterator.next(); | ||
* processRow(row); | ||
* } | ||
* } | ||
* }</pre> | ||
*/ | ||
@ExtensionPoint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment re |
||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") | ||
@JsonSubTypes(value = { | ||
@Type(name = "local", value = LocalInputSource.class), | ||
@Type(name = "http", value = HttpInputSource.class) | ||
}) | ||
public interface InputSource | ||
{ | ||
/** | ||
* Returns true if this inputSource can be processed in parallel using ParallelIndexSupervisorTask. | ||
*/ | ||
default boolean isSplittable() | ||
{ | ||
return false; | ||
} | ||
|
||
InputSourceReader reader( | ||
TimestampSpec timestampSpec, | ||
DimensionsSpec dimensionsSpec, | ||
InputFormat inputFormat, | ||
@Nullable File temporaryDirectory | ||
); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.data.input; | ||
|
||
import org.apache.druid.guice.annotations.ExtensionPoint; | ||
import org.apache.druid.java.util.common.parsers.CloseableIterator; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* InputSourceReader reads data from {@link InputSource} and returns a {@link CloseableIterator} of | ||
* {@link InputRow}. See {@link InputSource} for an example usage. | ||
* | ||
* Implementations of this class can use {@link SplitSource} and {@link SplitReader}. | ||
* | ||
* See {@link org.apache.druid.data.input.impl.SplitIteratingReader} as an example. | ||
*/ | ||
@ExtensionPoint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment re |
||
public interface InputSourceReader | ||
{ | ||
CloseableIterator<InputRow> read() throws IOException; | ||
|
||
CloseableIterator<InputRowPlusRaw> sample() throws IOException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To handle potential 1:many mapping of raw input records to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.data.input; | ||
|
||
import org.apache.druid.guice.annotations.ExtensionPoint; | ||
import org.apache.druid.java.util.common.parsers.CloseableIterator; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
|
||
/** | ||
* SplitReader knows how to parse data into {@link InputRow}. | ||
* This class is <i>stateful</i> and a new SplitReader should be created per {@link InputSplit}. | ||
* | ||
* @see TextReader for text format readers | ||
*/ | ||
@ExtensionPoint | ||
public interface SplitReader | ||
{ | ||
CloseableIterator<InputRow> read(SplitSource source, File temporaryDirectory) throws IOException; | ||
|
||
CloseableIterator<InputRowPlusRaw> sample(SplitSource source, File temporaryDirectory) throws IOException; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest adding a link to InputSource to show what's replacing this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Added.