Skip to content

Commit

Permalink
Make SizeClasses as composition with PoolArena instead of inheritance…
Browse files Browse the repository at this point in the history
…, to save memory
  • Loading branch information
laosijikaichele committed Dec 6, 2023
1 parent 8b20d6b commit b7325f5
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 103 deletions.
107 changes: 70 additions & 37 deletions buffer/src/main/java/io/netty/buffer/PoolArena.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static io.netty.buffer.PoolChunk.isSubpage;
import static java.lang.Math.max;

abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
abstract class PoolArena<T> implements PoolArenaMetric {
private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

enum SizeClass {
Expand All @@ -40,8 +40,6 @@ enum SizeClass {

final PooledByteBufAllocator parent;

final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final PoolSubpage<T>[] smallSubpagePools;

private final PoolChunkList<T> q050;
Expand Down Expand Up @@ -74,24 +72,23 @@ enum SizeClass {

private final ReentrantLock lock = new ReentrantLock();

protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int pageShifts, int chunkSize, int cacheAlignment) {
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
directMemoryCacheAlignment = cacheAlignment;
final SizeClasses sizeClass;

numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
protected PoolArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {
assert null != sizeClass;
this.parent = parent;
this.sizeClass = sizeClass;
smallSubpagePools = newSubpagePoolArray(sizeClass.nSubpages);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(i);
}

q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, sizeClass.chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, sizeClass.chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, sizeClass.chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, sizeClass.chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, sizeClass.chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, sizeClass.chunkSize);

q100.prevList(q075);
q075.prevList(q050);
Expand Down Expand Up @@ -131,15 +128,15 @@ PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacit
}

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int sizeIdx = size2SizeIdx(reqCapacity);
final int sizeIdx = sizeClass.size2SizeIdx(reqCapacity);

if (sizeIdx <= smallMaxSizeIdx) {
if (sizeIdx <= sizeClass.smallMaxSizeIdx) {
tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
} else if (sizeIdx < nSizes) {
} else if (sizeIdx < sizeClass.nSizes) {
tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
} else {
int normCapacity = directMemoryCacheAlignment > 0
? normalizeSize(reqCapacity) : reqCapacity;
int normCapacity = sizeClass.directMemoryCacheAlignment > 0
? sizeClass.normalizeSize(reqCapacity) : reqCapacity;
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, normCapacity);
}
Expand All @@ -164,7 +161,7 @@ private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, fi
final PoolSubpage<T> s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx) : "doNotDestroy=" +
assert s.doNotDestroy && s.elemSize == sizeClass.sizeIdx2size(sizeIdx) : "doNotDestroy=" +
s.doNotDestroy + ", elemSize=" + s.elemSize + ", sizeIdx=" + sizeIdx;
long handle = s.allocate();
assert handle >= 0;
Expand Down Expand Up @@ -212,7 +209,7 @@ private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx,
}

// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
PoolChunk<T> c = newChunk(sizeClass.pageSize, sizeClass.nPSizes, sizeClass.pageShifts, sizeClass.chunkSize);
boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
assert success;
qInit.add(c);
Expand Down Expand Up @@ -625,10 +622,8 @@ private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {

static final class HeapArena extends PoolArena<byte[]> {

HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
int chunkSize) {
super(parent, pageSize, pageShifts, chunkSize,
0);
HeapArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {
super(parent, sizeClass);
}

private static byte[] newByteArray(int size) {
Expand Down Expand Up @@ -674,10 +669,8 @@ protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf<byte[]> dst,

static final class DirectArena extends PoolArena<ByteBuffer> {

DirectArena(PooledByteBufAllocator parent, int pageSize, int pageShifts,
int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, pageShifts, chunkSize,
directMemoryCacheAlignment);
DirectArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {
super(parent, sizeClass);
}

@Override
Expand All @@ -688,27 +681,27 @@ boolean isDirect() {
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
if (sizeClass.directMemoryCacheAlignment == 0) {
ByteBuffer memory = allocateDirect(chunkSize);
return new PoolChunk<ByteBuffer>(this, memory, memory, pageSize, pageShifts,
chunkSize, maxPageIdx);
}

final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
final ByteBuffer base = allocateDirect(chunkSize + sizeClass.directMemoryCacheAlignment);
final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, sizeClass.directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, base, memory, pageSize,
pageShifts, chunkSize, maxPageIdx);
}

@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
if (sizeClass.directMemoryCacheAlignment == 0) {
ByteBuffer memory = allocateDirect(capacity);
return new PoolChunk<ByteBuffer>(this, memory, memory, capacity);
}

final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
final ByteBuffer base = allocateDirect(capacity + sizeClass.directMemoryCacheAlignment);
final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, sizeClass.directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, base, memory, capacity);
}

Expand Down Expand Up @@ -763,4 +756,44 @@ void lock() {
void unlock() {
lock.unlock();
}

@Override
public int sizeIdx2size(int sizeIdx) {
return sizeClass.sizeIdx2size(sizeIdx);
}

@Override
public int sizeIdx2sizeCompute(int sizeIdx) {
return sizeClass.sizeIdx2sizeCompute(sizeIdx);
}

@Override
public long pageIdx2size(int pageIdx) {
return sizeClass.pageIdx2size(pageIdx);
}

@Override
public long pageIdx2sizeCompute(int pageIdx) {
return sizeClass.pageIdx2sizeCompute(pageIdx);
}

@Override
public int size2SizeIdx(int size) {
return sizeClass.size2SizeIdx(size);
}

@Override
public int pages2pageIdx(int pages) {
return sizeClass.pages2pageIdx(pages);
}

@Override
public int pages2pageIdxFloor(int pages) {
return sizeClass.pages2pageIdxFloor(pages);
}

@Override
public int normalizeSize(int size) {
return sizeClass.normalizeSize(size);
}
}
25 changes: 13 additions & 12 deletions buffer/src/main/java/io/netty/buffer/PoolChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved so it is always used by exactly one ByteBuf and no more)
*
* For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
* For simplicity all sizes are normalized according to {@link PoolArena#sizeClass#size2SizeIdx(int)} method.
* This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
* equals the next nearest size in {@link SizeClasses}.
*
Expand Down Expand Up @@ -240,7 +240,7 @@ private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
}

private void insertAvailRun(int runOffset, int pages, long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
IntPriorityQueue queue = runsAvail[pageIdxFloor];
assert isRun(handle);
queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
Expand All @@ -259,7 +259,7 @@ private void insertAvailRun0(int runOffset, long handle) {
}

private void removeAvailRun(long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
removeAvailRun0(handle);
}
Expand Down Expand Up @@ -313,7 +313,7 @@ private int usage(int freeBytes) {

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
final long handle;
if (sizeIdx <= arena.smallMaxSizeIdx) {
if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
final PoolSubpage<T> nextSub;
// small
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
Expand All @@ -323,8 +323,9 @@ boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadC
try {
nextSub = head.next;
if (nextSub != head) {
assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeIdx2size(sizeIdx) : "doNotDestroy=" +
nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" + sizeIdx;
assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
"doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
sizeIdx;
handle = nextSub.allocate();
assert handle >= 0;
assert isSubpage(handle);
Expand All @@ -342,7 +343,7 @@ boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadC
} else {
// normal
// runSize must be multiple of pageSize
int runSize = arena.sizeIdx2size(sizeIdx);
int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
handle = allocateRun(runSize);
if (handle < 0) {
return false;
Expand All @@ -357,7 +358,7 @@ boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadC

private long allocateRun(int runSize) {
int pages = runSize >> pageShifts;
int pageIdx = arena.pages2pageIdx(pages);
int pageIdx = arena.sizeClass.pages2pageIdx(pages);

runsAvailLock.lock();
try {
Expand Down Expand Up @@ -391,7 +392,7 @@ private int calculateRunSize(int sizeIdx) {
int runSize = 0;
int nElements;

final int elemSize = arena.sizeIdx2size(sizeIdx);
final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);

//find lowest common multiple of pageSize and elemSize
do {
Expand All @@ -413,9 +414,9 @@ private int calculateRunSize(int sizeIdx) {

private int runFirstBestFit(int pageIdx) {
if (freeBytes == chunkSize) {
return arena.nPSizes - 1;
return arena.sizeClass.nPSizes - 1;
}
for (int i = pageIdx; i < arena.nPSizes; i++) {
for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
IntPriorityQueue queue = runsAvail[i];
if (queue != null && !queue.isEmpty()) {
return i;
Expand Down Expand Up @@ -469,7 +470,7 @@ private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {

int runOffset = runOffset(runHandle);
assert subpages[runOffset] == null;
int elemSize = arena.sizeIdx2size(sizeIdx);
int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);

PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
runSize(pageShifts, runHandle), elemSize);
Expand Down
2 changes: 1 addition & 1 deletion buffer/src/main/java/io/netty/buffer/PoolChunkList.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void prevList(PoolChunkList<T> prevList) {
}

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
int normCapacity = arena.sizeClass.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
Expand Down
27 changes: 10 additions & 17 deletions buffer/src/main/java/io/netty/buffer/PoolThreadCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,8 @@ final class PoolThreadCache {
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools);

normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);

smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
Expand All @@ -88,12 +84,8 @@ final class PoolThreadCache {
}
if (heapArena != null) {
// Create the caches for the heap allocations
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools);

normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);

smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
Expand Down Expand Up @@ -130,11 +122,12 @@ private static <T> MemoryRegionCache<T>[] createSubPageCaches(
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
// Create as many normal caches as we support based on how many sizeIdx we have and what the upper
// bound is that we want to cache in general.
List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
cache.add(new NormalMemoryRegionCache<T>(cacheSize));
}
return cache.toArray(new MemoryRegionCache[0]);
Expand Down Expand Up @@ -183,7 +176,7 @@ private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqC
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
int sizeIdx = area.size2SizeIdx(normCapacity);
int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
if (cache == null) {
return false;
Expand Down Expand Up @@ -297,8 +290,8 @@ private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
// We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
int idx = sizeIdx - area.numSmallSubpagePools;
// We need to subtract area.sizeClass.nSubpages as sizeIdx is the overall index for all sizes.
int idx = sizeIdx - area.sizeClass.nSubpages;
if (area.isDirect()) {
return cache(normalDirectCaches, idx);
}
Expand Down
3 changes: 2 additions & 1 deletion buffer/src/main/java/io/netty/buffer/PooledByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
assert !PoolChunk.isSubpage(handle) || chunk.arena.size2SizeIdx(maxLength) <= chunk.arena.smallMaxSizeIdx:
assert !PoolChunk.isSubpage(handle) ||
chunk.arena.sizeClass.size2SizeIdx(maxLength) <= chunk.arena.sizeClass.smallMaxSizeIdx:
"Allocated small sub-page handle for a buffer size that isn't \"small.\"";

chunk.incrementPinnedMemory(maxLength);
Expand Down
Loading

0 comments on commit b7325f5

Please sign in to comment.