Skip to content

Commit

Permalink
Merge pull request #14 from vintmd/initbufferfix
Browse files Browse the repository at this point in the history
fix init too many buffer in pool which may create too many useless buffer
  • Loading branch information
yuyang733 authored Jun 22, 2020
2 parents 8129d6c + 4ca8b98 commit a5f1b1b
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions src/main/java/org/apache/hadoop/fs/CosFsDataOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,17 @@ public CosFsDataOutputStream(
this.executorService =
MoreExecutors.listeningDecorator(executorService);

try {
this.currentBlockBuffer =
BufferPool.getInstance().getBuffer((int) this.blockSize);
} catch (InterruptedException e) {
String exceptionMsg = String.format("Getting a buffer size:[%d] " +
"from the buffer pool occurs an exception.",
this.blockSize);
throw new IOException(exceptionMsg);
}
try {
this.digest = MessageDigest.getInstance("MD5");
this.currentBlockOutputStream = new DigestOutputStream(
new BufferOutputStream(this.currentBlockBuffer),
this.digest);
} catch (NoSuchAlgorithmException e) {
this.digest = null;
this.currentBlockOutputStream =
new BufferOutputStream(this.currentBlockBuffer);
}
// malloc when trigger the write operations
this.currentBlockBuffer = null;
this.currentBlockOutputStream = null;
}

@Override
public void flush() throws IOException {
// create but not call write before
if (this.currentBlockOutputStream == null) {
initCurrentBlock();
}
this.currentBlockOutputStream.flush();
}

Expand All @@ -103,6 +91,11 @@ public synchronized void close() throws IOException {
if (this.closed) {
return;
}

if (this.currentBlockOutputStream == null) {
initCurrentBlock();
}

try {
this.currentBlockOutputStream.flush();
this.currentBlockOutputStream.close();
Expand Down Expand Up @@ -180,6 +173,8 @@ public synchronized void close() throws IOException {
this.blockWritten = 0;
this.closed = true;
this.writeConsistencyChecker = null;
this.currentBlockBuffer = null;
this.currentBlockOutputStream = null;
}
}

Expand Down Expand Up @@ -264,6 +259,10 @@ public void write(byte[] b, int off, int len) throws IOException {
throw new IOException("block stream has been closed.");
}

if (this.currentBlockOutputStream == null) {
initCurrentBlock();
}

while (len > 0) {
long writeBytes = 0;
if (this.blockWritten + len > this.blockSize) {
Expand Down Expand Up @@ -297,6 +296,10 @@ public void write(int b) throws IOException {
throw new IOException("block stream has been closed.");
}

if (this.currentBlockOutputStream == null) {
initCurrentBlock();
}

byte[] singleBytes = new byte[1];
singleBytes[0] = (byte) b;
this.currentBlockOutputStream.write(singleBytes, 0, 1);
Expand All @@ -309,4 +312,28 @@ public void write(int b) throws IOException {
this.blockWritten = 0;
}
}

// init current block buffer and output stream when occur the write operation.
private void initCurrentBlock() throws IOException {
// get the buffer
try {
this.currentBlockBuffer = BufferPool.getInstance().getBuffer((int) this.blockSize);
} catch (InterruptedException e) {
String exceptionMsg = String.format("Getting a buffer size:[%d] " +
"from the buffer pool occurs an exception.",
this.blockSize);
throw new IOException(exceptionMsg);
}
// init the stream
try {
this.digest = MessageDigest.getInstance("MD5");
this.currentBlockOutputStream = new DigestOutputStream(
new BufferOutputStream(this.currentBlockBuffer),
this.digest);
} catch (NoSuchAlgorithmException e) {
this.digest = null;
this.currentBlockOutputStream =
new BufferOutputStream(this.currentBlockBuffer);
}
}
}

0 comments on commit a5f1b1b

Please sign in to comment.