Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added OutputStream support to ParquetWriter #5

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 9 additions & 32 deletions src/main/java/blue/strategic/parquet/ParquetWriter.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package blue.strategic.parquet;

import blue.strategic.parquet.io.FileParquetOutput;
import blue.strategic.parquet.io.StreamParquetOutput;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand All @@ -15,46 +15,23 @@

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;

public final class ParquetWriter<T> implements Closeable {

private final org.apache.parquet.hadoop.ParquetWriter<T> writer;

public static <T> ParquetWriter<T> writeOutputStream(MessageType schema, OutputStream out, Dehydrator<T> dehydrator) throws IOException {
return writeOutput(schema, new StreamParquetOutput(out), dehydrator);
}

public static <T> ParquetWriter<T> writeFile(MessageType schema, File out, Dehydrator<T> dehydrator) throws IOException {
OutputFile f = new OutputFile() {
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createOrOverwrite(blockSizeHint);
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
FileOutputStream fos = new FileOutputStream(out);
return new DelegatingPositionOutputStream(fos) {
@Override
public long getPos() throws IOException {
return fos.getChannel().position();
}
};
}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return 1024L;
}
};
return writeOutputFile(schema, f, dehydrator);
return writeOutput(schema, new FileParquetOutput(out), dehydrator);
}

private static <T> ParquetWriter<T> writeOutputFile(MessageType schema, OutputFile file, Dehydrator<T> dehydrator) throws IOException {
public static <T> ParquetWriter<T> writeOutput(MessageType schema, OutputFile file, Dehydrator<T> dehydrator) throws IOException {
return new ParquetWriter<>(file, schema, dehydrator);
}

Expand Down
38 changes: 38 additions & 0 deletions src/main/java/blue/strategic/parquet/io/AbstractParquetOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package blue.strategic.parquet.io;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

/**
* Base class for parquet output
*
* @author mrmx
*/
public abstract class AbstractParquetOutput implements OutputFile{


@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createOrOverwrite(blockSizeHint);
}

@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return createPositionOutputStream();
}

@Override
public boolean supportsBlockSize() {
return false;
}

@Override
public long defaultBlockSize() {
return 1024L;
}

protected abstract PositionOutputStream createPositionOutputStream() throws IOException;

}
47 changes: 47 additions & 0 deletions src/main/java/blue/strategic/parquet/io/FileParquetOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package blue.strategic.parquet.io;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.PositionOutputStream;

/**
* File based parquet output
*
* @author mrmx
*/
public class FileParquetOutput extends AbstractParquetOutput {
private String path;
private FileOutputStream stream;

public FileParquetOutput(File file) throws FileNotFoundException {
this(file.getPath(),new FileOutputStream(file));
}

public FileParquetOutput(FileOutputStream stream) {
this(null,stream);
}

public FileParquetOutput(String path, FileOutputStream stream) {
this.path = path;
this.stream = stream;
}

@Override
protected PositionOutputStream createPositionOutputStream() throws IOException {
return new DelegatingPositionOutputStream(stream) {
@Override
public long getPos() throws IOException {
return stream.getChannel().position();
}
};
}

@Override
public String getPath() {
return path;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package blue.strategic.parquet.io;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.parquet.io.DelegatingPositionOutputStream;


/**
* Default implementation of a <code>PositionOutputStream</code>
*
* @author mrmx
*/
public class PositionAwareOutputStream extends DelegatingPositionOutputStream {
private final OutputStream outputStream;
private long position;

public PositionAwareOutputStream(OutputStream outputStream) {
super(outputStream);
this.outputStream = outputStream;
this.position = 0;
}

@Override
public void write(int b) throws IOException {
outputStream.write(b);
position++;
}

@Override
public void write(byte[] b) throws IOException {
outputStream.write(b);
position += b.length;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
position += len;
}

@Override
public long getPos() throws IOException {
return position;
}

}
24 changes: 24 additions & 0 deletions src/main/java/blue/strategic/parquet/io/StreamParquetOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package blue.strategic.parquet.io;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.parquet.io.PositionOutputStream;

/**
* <code>OutputStream</code> based parquet output
*
* @author mrmx
*/
public class StreamParquetOutput extends AbstractParquetOutput {
private final OutputStream outputStream;

public StreamParquetOutput(OutputStream outputStream) {
this.outputStream = outputStream;
}

@Override
protected PositionOutputStream createPositionOutputStream() throws IOException {
return new PositionAwareOutputStream(outputStream);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -62,6 +63,11 @@ public Map<String, Object> finish(Map<String, Object> target) {
writer.write(new Object[]{1L, "hello1"});
writer.write(new Object[]{2L, "hello2"});
}

try(ParquetWriter<Object[]> writer = ParquetWriter.writeOutputStream(schema, new FileOutputStream(parquet), dehydrator)) {
writer.write(new Object[]{1L, "hello1"});
writer.write(new Object[]{2L, "hello2"});
}

try (Stream<Map<String, Object>> s = ParquetReader.streamContent(parquet, HydratorSupplier.constantly(hydrator))) {
List<Map<String, Object>> result = s.collect(Collectors.toList());
Expand Down