Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse decompressor to reduce memory consumption #1

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/java/com/hadoop/compression/lzo/LzopCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
Expand All @@ -47,13 +48,17 @@ public class LzopCodec extends LzoCodec {

@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
return createOutputStream(out, createCompressor());
//get a compressor which will be returned to the pool when the output stream
//is closed.
return createOutputStream(out, CodecPool.getCompressor(this, getConf()));
}

public CompressionOutputStream createIndexedOutputStream(OutputStream out,
DataOutputStream indexOut)
throws IOException {
return createIndexedOutputStream(out, indexOut, createCompressor());
//get a compressor which will be returned to the pool when the output stream
//is closed.
return createIndexedOutputStream(out, indexOut, CodecPool.getCompressor(this, getConf()));
}

@Override
Expand Down Expand Up @@ -86,7 +91,9 @@ public CompressionInputStream createInputStream(InputStream in,

@Override
public CompressionInputStream createInputStream(InputStream in) throws IOException {
return createInputStream(in, createDecompressor());
// get a decompressor from a pool which will be returned to the pool
// when LzoInputStream is closed
return createInputStream(in, CodecPool.getDecompressor(this));
}

@Override
Expand Down
7 changes: 5 additions & 2 deletions src/java/com/hadoop/compression/lzo/LzopInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;

public class LzopInputStream extends BlockDecompressorStream {
Expand All @@ -54,7 +55,6 @@ public LzopInputStream(InputStream in, Decompressor decompressor,
readHeader(in);
}


/**
* Reads len bytes in a loop.
*
Expand Down Expand Up @@ -83,7 +83,7 @@ private static void readFully( InputStream in, byte buf[],
* Read len bytes into buf, st LSB of int returned is the last byte of the
* first word read.
*/
private static int readInt(InputStream in, byte[] buf, int len)
private static int readInt(InputStream in, byte[] buf, int len)
throws IOException {
readFully(in, buf, 0, len);
int ret = (0xFF & buf[0]) << 24;
Expand Down Expand Up @@ -337,6 +337,9 @@ public void close() throws IOException {
// LZO requires that each file ends with 4 trailing zeroes. If we are here,
// the file didn't. It's not critical, though, so log and eat it in this case.
LOG.warn("Incorrect LZO file format: file did not end with four trailing zeroes.", e);
} finally{
//return the decompressor to the pool, the function itself handles null.
CodecPool.returnDecompressor(decompressor);
}
}
}
4 changes: 4 additions & 0 deletions src/java/com/hadoop/compression/lzo/LzopOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.zip.Adler32;

import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Compressor;

Expand Down Expand Up @@ -110,6 +111,9 @@ public void close() throws IOException {
indexOut.close();
}
closed = true;
//return the compressor to the pool for later reuse;
//the returnCompressor handles nulls.
CodecPool.returnCompressor(compressor);
}
}

Expand Down