Skip to content

Commit

Permalink
feat: 支持预读buffer固定总容量,避免无限制申请内存导致OOM (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
suninsky authored Mar 29, 2024
1 parent dde0f95 commit 9a094cf
Show file tree
Hide file tree
Showing 9 changed files with 540 additions and 28 deletions.
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,25 @@
<version>3.0.8</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,10 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
// 存在同名文件,且该文件长度为0时,目录优先(默认关)
public static final String COSN_FILESTATUS_DIR_FIRST_ENABLED = "fs.cosn.filestatus.dir.first.enabled";
public static final boolean DEFAULT_FILESTATUS_DIR_FIRST_ENABLED = false;

public static final String COSN_READ_BUFFER_POOL_CAPACITY = "fs.cosn.read.buffer.pool.capacity";
public static final long DEFAULT_READ_BUFFER_POOL_CAPACITY = -1;

public static final String COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = "fs.cosn.read.buffer.allocate.timeout.seconds";
public static final long DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = 5;
}
96 changes: 72 additions & 24 deletions src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.apache.hadoop.fs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.MemoryAllocator;
import org.apache.hadoop.fs.cosn.CosNOutOfMemoryException;
import org.apache.hadoop.fs.cosn.ReadBufferHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -9,6 +12,8 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -25,8 +30,7 @@ public static class ReadBuffer {

private final Lock lock = new ReentrantLock();
private Condition readyCondition = lock.newCondition();

private byte[] buffer;
private MemoryAllocator.Memory memory;
private int status;
private long start;
private long end;
Expand All @@ -35,7 +39,6 @@ public static class ReadBuffer {
public ReadBuffer(long start, long end) {
this.start = start;
this.end = end;
this.buffer = new byte[(int) (this.end - this.start) + 1];
this.status = INIT;
this.exception = null;
}
Expand All @@ -59,7 +62,11 @@ public void signalAll() {
}

public byte[] getBuffer() {
return this.buffer;
return this.memory.array();
}

public int length() {
return (int) (end - start + 1);
}

public int getStatus() {
Expand All @@ -85,6 +92,18 @@ public long getStart() {
public long getEnd() {
return end;
}

public void allocate(long timeout, TimeUnit unit) throws CosNOutOfMemoryException, InterruptedException {
this.memory = ReadBufferHolder.getBufferAllocator()
.allocate(this.length(), timeout, unit);
}

public void free() {
if (this.memory != null) {
this.memory.free();
this.memory = null;
}
}
}

private FileSystem.Statistics statistics;
Expand All @@ -100,8 +119,8 @@ public long getEnd() {
private long bufferEnd;
private final long preReadPartSize;
private final int maxReadPartNumber;
private byte[] buffer;
private boolean closed = false;
private ReadBuffer currentReadBuffer;
private final AtomicBoolean closed;
private final int socketErrMaxRetryTimes;

private final ExecutorService readAheadExecutorService;
Expand Down Expand Up @@ -149,7 +168,28 @@ public CosNFSInputStream(
this.readAheadExecutorService = readAheadExecutorService;
this.readBufferQueue =
new ArrayDeque<>(this.maxReadPartNumber);
this.closed = false;
this.closed = new AtomicBoolean();
}

private void tryFreeBuffer(ReadBuffer readBuffer) {
if (readBuffer != null
&& readBuffer != previousReadBuffer
&& readBuffer != currentReadBuffer
&& (readBufferQueue.isEmpty() || readBufferQueue.peek() != readBuffer)) {
readBuffer.free();
}
}

private void setCurrentReadBuffer(ReadBuffer readBuffer) {
ReadBuffer readyFree = currentReadBuffer;
currentReadBuffer = readBuffer;
tryFreeBuffer(readyFree);
}

public void setPreviousReadBuffer(ReadBuffer readBuffer) {
ReadBuffer readyFree = previousReadBuffer;
previousReadBuffer = readBuffer;
tryFreeBuffer(readyFree);
}

private synchronized void reopen(long pos) throws IOException {
Expand All @@ -159,7 +199,7 @@ private synchronized void reopen(long pos) throws IOException {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}

this.buffer = null;
setCurrentReadBuffer(null);
this.bufferStart = -1;
this.bufferEnd = -1;

Expand All @@ -171,7 +211,7 @@ private synchronized void reopen(long pos) throws IOException {
// 如果不是,那么随机读只可能是发生了超出前一块范围的回溯随机读,或者是在预读队列范围或者是超出预读队列范围。
// 如果是在预读队列范围内,那么依赖在预读队列中查找直接定位到要读的块,如果是超出预读队列范围,那么队列会被排空,然后重新定位到要读的块和位置
if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) {
this.buffer = this.previousReadBuffer.getBuffer();
setCurrentReadBuffer(previousReadBuffer);
this.bufferStart = this.previousReadBuffer.getStart();
this.bufferEnd = this.previousReadBuffer.getEnd();
this.position = pos;
Expand All @@ -184,7 +224,7 @@ private synchronized void reopen(long pos) throws IOException {
while (!this.readBufferQueue.isEmpty()) {
if (pos < this.readBufferQueue.getFirst().getStart() || pos > this.readBufferQueue.getFirst().getEnd()) {
// 定位到要读的块,同时保存淘汰出来的队头元素,供小范围的随机读回溯
this.previousReadBuffer = this.readBufferQueue.poll();
setPreviousReadBuffer(this.readBufferQueue.poll());
} else {
break;
}
Expand Down Expand Up @@ -215,12 +255,12 @@ private synchronized void reopen(long pos) throws IOException {
}

ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
if (readBuffer.getBuffer().length == 0) {
if (readBuffer.length() == 0) {
readBuffer.setStatus(ReadBuffer.SUCCESS);
} else {
this.readAheadExecutorService.execute(
new CosNFileReadTask(this.conf, this.key, this.store,
readBuffer, this.socketErrMaxRetryTimes));
readBuffer, this.socketErrMaxRetryTimes, closed));
}

this.readBufferQueue.add(readBuffer);
Expand All @@ -236,11 +276,14 @@ private synchronized void reopen(long pos) throws IOException {
readBuffer.await(ReadBuffer.INIT);
if (readBuffer.getStatus() == ReadBuffer.ERROR) {
innerException = readBuffer.getException();
this.buffer = null;
setCurrentReadBuffer(null);
this.bufferStart = -1;
this.bufferEnd = -1;
if (readBuffer.getException().getCause() instanceof CosNOutOfMemoryException) {
throw readBuffer.getException();
}
} else {
this.buffer = readBuffer.getBuffer();
setCurrentReadBuffer(readBuffer);
this.bufferStart = readBuffer.getStart();
this.bufferEnd = readBuffer.getEnd();
}
Expand All @@ -250,7 +293,7 @@ private synchronized void reopen(long pos) throws IOException {
readBuffer.unLock();
}

if (null == this.buffer) {
if (null == this.currentReadBuffer) {
LOG.error(String.format("Null IO stream key:%s", this.key), innerException);
throw new IOException("Null IO stream.", innerException);
}
Expand Down Expand Up @@ -314,8 +357,8 @@ public int read() throws IOException {

int byteRead = -1;
if (this.partRemaining != 0) {
byteRead =
this.buffer[(int) (this.buffer.length - this.partRemaining)] & 0xff;
byte[] buffer = currentReadBuffer.getBuffer();
byteRead = buffer[(int) (buffer.length - this.partRemaining)] & 0xff;
}
if (byteRead >= 0) {
this.position++;
Expand Down Expand Up @@ -347,9 +390,10 @@ public int read(byte[] b, int off, int len) throws IOException {
}

int bytes = 0;
for (int i = this.buffer.length - (int) partRemaining;
i < this.buffer.length; i++) {
b[off + bytesRead] = this.buffer[i];
byte[] buffer = currentReadBuffer.getBuffer();
for (int i = buffer.length - (int) partRemaining;
i < buffer.length; i++) {
b[off + bytesRead] = buffer[i];
bytes++;
bytesRead++;
if (off + bytesRead >= len) {
Expand Down Expand Up @@ -385,16 +429,20 @@ public int available() throws IOException {
}
@Override
public void close() throws IOException {
if (this.closed) {
if (this.closed.get()) {
return;
}

this.closed = true;
this.buffer = null;
this.closed.set(true);
while (!readBufferQueue.isEmpty()) {
readBufferQueue.poll().free();
}
setCurrentReadBuffer(null);
setPreviousReadBuffer(null);
}

private void checkOpened() throws IOException {
if(this.closed) {
if(this.closed.get()) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/org/apache/hadoop/fs/CosNFileReadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class CosNFileReadTask implements Runnable {
static final Logger LOG = LoggerFactory.getLogger(CosNFileReadTask.class);
Expand All @@ -18,6 +19,7 @@ public class CosNFileReadTask implements Runnable {
private final NativeFileSystemStore store;
private final CosNFSInputStream.ReadBuffer readBuffer;
private final int socketErrMaxRetryTimes;
private final AtomicBoolean closed;

/**
* cos file read task
Expand All @@ -29,12 +31,13 @@ public class CosNFileReadTask implements Runnable {
public CosNFileReadTask(Configuration conf, String key,
NativeFileSystemStore store,
CosNFSInputStream.ReadBuffer readBuffer,
int socketErrMaxRetryTimes) {
int socketErrMaxRetryTimes, AtomicBoolean closed) {
this.conf = conf;
this.key = key;
this.store = store;
this.readBuffer = readBuffer;
this.socketErrMaxRetryTimes = socketErrMaxRetryTimes;
this.closed = closed;
}

@Override
Expand All @@ -53,9 +56,20 @@ public void run() {
LOG.debug("flush task, current classLoader: {}, context ClassLoader: {}",
this.getClass().getClassLoader(), currentThread.getContextClassLoader());
currentThread.setContextClassLoader(this.getClass().getClassLoader());

try {
this.readBuffer.lock();
if (closed.get()) {
this.setFailResult("the input stream has been canceled.", new IOException("the input stream has been canceled."));
return;
}
try {
this.readBuffer.allocate(
conf.getInt(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, 5),
TimeUnit.SECONDS);
} catch (Exception e) {
this.setFailResult("allocate read buffer failed.", new IOException(e));
return;
}
int retryIndex = 1;
boolean needRetry = false;
while (true) {
Expand Down Expand Up @@ -85,14 +99,16 @@ public void run() {
this.setFailResult(errMsg, ioException);
break;
}
} catch (Throwable throwable) {
this.setFailResult("retrieve block failed", new IOException(throwable));
}

if (!needRetry) {
break;
}
} // end of retry
this.readBuffer.signalAll();
} finally {
this.readBuffer.signalAll();
this.readBuffer.unLock();
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.hadoop.fs.cosn.CRC32CCheckSum;
import org.apache.hadoop.fs.cosn.CRC64Checksum;
import org.apache.hadoop.fs.cosn.LocalRandomAccessMappedBufferPool;
import org.apache.hadoop.fs.cosn.ReadBufferHolder;
import org.apache.hadoop.fs.cosn.Unit;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
Expand Down Expand Up @@ -235,6 +236,11 @@ public void rejectedExecution(Runnable r,
CosNConfigKeys.COSN_SYMBOLIC_SIZE_THRESHOLD, CosNConfigKeys.DEFAULT_COSN_SYMBOLIC_SIZE_THRESHOLD);
this.directoryFirstEnabled = this.getConf().getBoolean(CosNConfigKeys.COSN_FILESTATUS_DIR_FIRST_ENABLED,
CosNConfigKeys.DEFAULT_FILESTATUS_DIR_FIRST_ENABLED);
ReadBufferHolder.initialize(this.getConf().getLong(CosNConfigKeys.COSN_READ_BUFFER_POOL_CAPACITY,
CosNConfigKeys.DEFAULT_READ_BUFFER_POOL_CAPACITY));
if (this.getConf().getInt(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, 5) < 0) {
throw new IllegalArgumentException("fs.cosn.read.buffer.allocate.timeout.seconds cannot be negative.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.hadoop.fs.cosn;

public class CosNOutOfMemoryException extends Exception {

public CosNOutOfMemoryException(String message) {
super(message);
}

}
Loading

0 comments on commit 9a094cf

Please sign in to comment.