From fd8f4c4d0dfb927c7d7a408e3e90589549b99636 Mon Sep 17 00:00:00 2001 From: Manuel Polo Date: Sun, 18 Feb 2024 17:54:35 +0100 Subject: [PATCH 1/2] feat: Added OutputStream support to ParquetWriter --- .../blue/strategic/parquet/ParquetWriter.java | 41 ++++------------ .../parquet/io/AbstractParquetOutput.java | 38 +++++++++++++++ .../parquet/io/FileParquetOutput.java | 47 +++++++++++++++++++ .../parquet/io/PositionAwareOutputStream.java | 46 ++++++++++++++++++ .../parquet/io/StreamParquetOutput.java | 24 ++++++++++ .../parquet/ParquetReadWriteTest.java | 6 +++ 6 files changed, 170 insertions(+), 32 deletions(-) create mode 100644 src/main/java/blue/strategic/parquet/io/AbstractParquetOutput.java create mode 100644 src/main/java/blue/strategic/parquet/io/FileParquetOutput.java create mode 100644 src/main/java/blue/strategic/parquet/io/PositionAwareOutputStream.java create mode 100644 src/main/java/blue/strategic/parquet/io/StreamParquetOutput.java diff --git a/src/main/java/blue/strategic/parquet/ParquetWriter.java b/src/main/java/blue/strategic/parquet/ParquetWriter.java index 7d75b05..d9a5107 100644 --- a/src/main/java/blue/strategic/parquet/ParquetWriter.java +++ b/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -1,12 +1,12 @@ package blue.strategic.parquet; +import blue.strategic.parquet.io.FileParquetOutput; +import blue.strategic.parquet.io.StreamParquetOutput; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.OutputFile; -import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -15,46 +15,23 @@ import java.io.Closeable; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collections; public final class ParquetWriter implements Closeable { private final org.apache.parquet.hadoop.ParquetWriter writer; + + public static ParquetWriter writeOutputStream(MessageType schema, OutputStream out, Dehydrator dehydrator) throws IOException { + return writeOutput(schema, new StreamParquetOutput(out), dehydrator); + } public static ParquetWriter writeFile(MessageType schema, File out, Dehydrator dehydrator) throws IOException { - OutputFile f = new OutputFile() { - @Override - public PositionOutputStream create(long blockSizeHint) throws IOException { - return createOrOverwrite(blockSizeHint); - } - - @Override - public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { - FileOutputStream fos = new FileOutputStream(out); - return new DelegatingPositionOutputStream(fos) { - @Override - public long getPos() throws IOException { - return fos.getChannel().position(); - } - }; - } - - @Override - public boolean supportsBlockSize() { - return false; - } - - @Override - public long defaultBlockSize() { - return 1024L; - } - }; - return writeOutputFile(schema, f, dehydrator); + return writeOutput(schema, new FileParquetOutput(out), dehydrator); } - private static ParquetWriter writeOutputFile(MessageType schema, OutputFile file, Dehydrator dehydrator) throws IOException { + private static ParquetWriter writeOutput(MessageType schema, OutputFile file, Dehydrator dehydrator) throws IOException { return new ParquetWriter<>(file, schema, dehydrator); } diff --git a/src/main/java/blue/strategic/parquet/io/AbstractParquetOutput.java b/src/main/java/blue/strategic/parquet/io/AbstractParquetOutput.java new file mode 100644 index 0000000..2680311 --- /dev/null +++ b/src/main/java/blue/strategic/parquet/io/AbstractParquetOutput.java @@ -0,0 +1,38 @@ +package blue.strategic.parquet.io; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +/** + * Base class for parquet output + * + * @author mrmx + */ +public abstract class AbstractParquetOutput implements OutputFile{ + + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return createOrOverwrite(blockSizeHint); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return createPositionOutputStream(); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 1024L; + } + + protected abstract PositionOutputStream createPositionOutputStream() throws IOException; + +} diff --git a/src/main/java/blue/strategic/parquet/io/FileParquetOutput.java b/src/main/java/blue/strategic/parquet/io/FileParquetOutput.java new file mode 100644 index 0000000..4c66046 --- /dev/null +++ b/src/main/java/blue/strategic/parquet/io/FileParquetOutput.java @@ -0,0 +1,47 @@ +package blue.strategic.parquet.io; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import org.apache.parquet.io.DelegatingPositionOutputStream; +import org.apache.parquet.io.PositionOutputStream; + +/** + * File based parquet output + * + * @author mrmx + */ +public class FileParquetOutput extends AbstractParquetOutput { + private String path; + private FileOutputStream stream; + + public FileParquetOutput(File file) throws FileNotFoundException { + this(file.getPath(),new FileOutputStream(file)); + } + + public FileParquetOutput(FileOutputStream stream) { + this(null,stream); + } + + public FileParquetOutput(String path, FileOutputStream stream) { + this.path = path; + this.stream = stream; + } + + @Override + protected PositionOutputStream createPositionOutputStream() throws IOException { + return new DelegatingPositionOutputStream(stream) { + @Override + public long getPos() throws IOException { + return stream.getChannel().position(); + } + }; + } + + @Override + public String getPath() { + return path; + } + +} diff --git a/src/main/java/blue/strategic/parquet/io/PositionAwareOutputStream.java b/src/main/java/blue/strategic/parquet/io/PositionAwareOutputStream.java new file mode 100644 index 0000000..adb1cac --- /dev/null +++ b/src/main/java/blue/strategic/parquet/io/PositionAwareOutputStream.java @@ -0,0 +1,46 @@ +package blue.strategic.parquet.io; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.parquet.io.DelegatingPositionOutputStream; + + +/** + * Default implementation of a PositionOutputStream + * + * @author mrmx + */ +public class PositionAwareOutputStream extends DelegatingPositionOutputStream { + private final OutputStream outputStream; + private long position; + + public PositionAwareOutputStream(OutputStream outputStream) { + super(outputStream); + this.outputStream = outputStream; + this.position = 0; + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + position++; + } + + @Override + public void write(byte[] b) throws IOException { + outputStream.write(b); + position += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + position += len; + } + + @Override + public long getPos() throws IOException { + return position; + } + +} diff --git a/src/main/java/blue/strategic/parquet/io/StreamParquetOutput.java b/src/main/java/blue/strategic/parquet/io/StreamParquetOutput.java new file mode 100644 index 0000000..60dabf6 --- /dev/null +++ b/src/main/java/blue/strategic/parquet/io/StreamParquetOutput.java @@ -0,0 +1,24 @@ +package blue.strategic.parquet.io; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.parquet.io.PositionOutputStream; + +/** + * OutputStream based parquet output + * + * @author mrmx + */ +public class StreamParquetOutput extends AbstractParquetOutput { + private final OutputStream outputStream; + + public StreamParquetOutput(OutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + protected PositionOutputStream createPositionOutputStream() throws IOException { + return new PositionAwareOutputStream(outputStream); + } + +} diff --git a/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java b/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java index 26db983..223b24b 100644 --- a/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java +++ b/src/test/java/blue/strategic/parquet/ParquetReadWriteTest.java @@ -8,6 +8,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -62,6 +63,11 @@ public Map finish(Map target) { writer.write(new Object[]{1L, "hello1"}); writer.write(new Object[]{2L, "hello2"}); } + + try(ParquetWriter writer = ParquetWriter.writeOutputStream(schema, new FileOutputStream(parquet), dehydrator)) { + writer.write(new Object[]{1L, "hello1"}); + writer.write(new Object[]{2L, "hello2"}); + } try (Stream> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator))) { List> result = s.collect(Collectors.toList()); From f458120d8fd6aab5596ac9074b50fc760eb32f55 Mon Sep 17 00:00:00 2001 From: Manuel Polo Date: Sun, 18 Feb 2024 20:19:06 +0100 Subject: [PATCH 2/2] feat: Make writeOutput public in ParquetWriter --- src/main/java/blue/strategic/parquet/ParquetWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/blue/strategic/parquet/ParquetWriter.java b/src/main/java/blue/strategic/parquet/ParquetWriter.java index d9a5107..0d8ab7b 100644 --- a/src/main/java/blue/strategic/parquet/ParquetWriter.java +++ b/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -31,7 +31,7 @@ public static ParquetWriter writeFile(MessageType schema, File out, Dehyd return writeOutput(schema, new FileParquetOutput(out), dehydrator); } - private static ParquetWriter writeOutput(MessageType schema, OutputFile file, Dehydrator dehydrator) throws IOException { + public static ParquetWriter writeOutput(MessageType schema, OutputFile file, Dehydrator dehydrator) throws IOException { return new ParquetWriter<>(file, schema, dehydrator); }