diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java index d33ba9a6be87..dec2ab045bbe 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java @@ -22,19 +22,21 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; -import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Objects; +import java.util.stream.Stream; /** * Creates firehose that produces data inlined in its own spec */ -public class InlineFirehoseFactory implements FirehoseFactory +public class InlineFirehoseFactory implements FiniteFirehoseFactory { private final String data; @@ -74,4 +76,28 @@ public int hashCode() { return Objects.hash(data); } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public Stream> getSplits() + { + return Stream.of(new InputSplit<>(data)); + } + + @Override + public int getNumSplits() + { + return 1; + } + + @Override + public FiniteFirehoseFactory withSplit(InputSplit split) + { + return new InlineFirehoseFactory(split.get()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java index fe9d797d1362..6bc9a0e12136 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java @@ -20,8 +20,10 @@ package org.apache.druid.segment.realtime.firehose; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringInputRowParser; @@ -37,6 +39,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; @SuppressWarnings({"NullableProblems", "ConstantConditions"}) public class InlineFirehoseFactoryTest @@ -77,6 +80,14 @@ public void setUp() target = new InlineFirehoseFactory(DATA); } + @Test + public void testInterfaceImplementation() + { + Assert.assertTrue(target instanceof FiniteFirehoseFactory); + Assert.assertFalse(target.isSplittable()); + Assert.assertEquals(1, target.getNumSplits()); + } + @Test(expected = NullPointerException.class) public void testContstructorDataRequired() { @@ -101,6 +112,16 @@ public void testConnect() throws IOException Assert.assertEquals(VALUE, values.get(0)); } + @Test + public void testForcedSplitAndClone() + { + Optional> inputSplitOptional = target.getSplits().findFirst(); + Assert.assertTrue(inputSplitOptional.isPresent()); + FiniteFirehoseFactory cloneWithSplit = target.withSplit(inputSplitOptional.get()); + Assert.assertTrue(cloneWithSplit instanceof InlineFirehoseFactory); + Assert.assertEquals(DATA, ((InlineFirehoseFactory) cloneWithSplit).getData()); + } + @Test public void testSerde() throws IOException {