Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed Oct 28, 2024
1 parent 52ca7af commit ee8e125
Show file tree
Hide file tree
Showing 22 changed files with 160 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import org.apache.hugegraph.memory.MemoryManager;

public class OnHeapMemoryStrategy implements MemoryAllocator {
public class OnHeapMemoryAllocator implements MemoryAllocator {

private final MemoryManager memoryManager;

public OnHeapMemoryStrategy(MemoryManager memoryManager) {
public OnHeapMemoryAllocator(MemoryManager memoryManager) {
this.memoryManager = memoryManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
/**
* This interface is used by immutable, memory-heavy objects which will be stored in off heap.
*/
public interface MemoryConsumer {
public interface OffHeapObject {

/**
* This method will read Off heap ByteBuf storing binary data of self.
* This method will read from off-heap ByteBuf storing binary data of self.
*
* @return self value
*/
Expand All @@ -39,15 +39,18 @@ public interface MemoryConsumer {
* Serialize to DataOutputStream in stack first, then request an off heap ByteBuf from
* OperatorMemoryPool based on size of DataOutputStream. Finally, serializing it to ByteBuf.
*/
void serializeSelfToByteBuf();
void serializeSelfToByteBuf(MemoryPool memoryPool);

/**
* Called after serializingSelfToByteBuf, pointing all self's on heap vars to null, in order
* to let GC release all its on heap memory.
*/
void releaseOriginalOnHeapVars();

MemoryPool getOperatorMemoryPool();
void releaseOriginalVarsOnHeap();

/**
* Called by memoryPool to release all its holding memory block when memoryPool release self.
*
* @return all holding memory block allocated by memoryPool
*/
List<ByteBuf> getAllMemoryBlock();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,28 @@

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;

public class BinaryIdOffHeap extends BinaryBackendEntry.BinaryId implements MemoryConsumer {
public class BinaryIdOffHeap extends BinaryBackendEntry.BinaryId implements OffHeapObject {

private final MemoryPool memoryPool;
private final MemoryConsumer originId;
private final OffHeapObject originId;
private ByteBuf bytesOffHeap;

public BinaryIdOffHeap(byte[] bytes, Id id, MemoryPool memoryPool, MemoryConsumer originId) {
public BinaryIdOffHeap(byte[] bytes, Id id, MemoryPool memoryPool, OffHeapObject originId) {
super(bytes, id);
this.memoryPool = memoryPool;
this.originId = originId;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

@Override
public void serializeSelfToByteBuf() {
public void serializeSelfToByteBuf(MemoryPool memoryPool) {
this.bytesOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length);
this.bytesOffHeap.markReaderIndex();
this.bytesOffHeap.writeBytes(bytes);
Expand All @@ -57,18 +55,13 @@ public BinaryBackendEntry.BinaryId zeroCopyReadFromByteBuf() {
(Id) originId.zeroCopyReadFromByteBuf());
}

@Override
public MemoryPool getOperatorMemoryPool() {
return this.memoryPool;
}

@Override
public List<ByteBuf> getAllMemoryBlock() {
return Collections.singletonList(bytesOffHeap);
}

@Override
public void releaseOriginalOnHeapVars() {
public void releaseOriginalVarsOnHeap() {
this.bytes = null;
this.id = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdUtil;
import org.apache.hugegraph.backend.id.SplicingIdGenerator;
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.define.Directions;
Expand All @@ -35,13 +35,13 @@
import io.netty.buffer.ByteBuf;

// TODO: rewrite static method in EdgeId
public class EdgeIdOffHeap extends EdgeId implements MemoryConsumer {
public class EdgeIdOffHeap extends EdgeId implements OffHeapObject {

private final MemoryPool memoryPool;
private final MemoryConsumer ownerVertexIdOffHeap;
private final MemoryConsumer edgeLabelIdOffHeap;
private final MemoryConsumer subLabelIdOffHeap;
private final MemoryConsumer otherVertexIdOffHeap;
private final OffHeapObject ownerVertexIdOffHeap;
private final OffHeapObject edgeLabelIdOffHeap;
private final OffHeapObject subLabelIdOffHeap;
private final OffHeapObject otherVertexIdOffHeap;
private ByteBuf sortValuesOffHeap;
private ByteBuf cacheOffHeap;

Expand All @@ -52,18 +52,18 @@ public EdgeIdOffHeap(HugeVertex ownerVertex,
String sortValues,
HugeVertex otherVertex,
MemoryPool memoryPool,
MemoryConsumer ownerVertexIdOffHeap,
MemoryConsumer edgeLabelIdOffHeap,
MemoryConsumer subLabelIdOffHeap,
MemoryConsumer otherVertexIdOffHeap) {
OffHeapObject ownerVertexIdOffHeap,
OffHeapObject edgeLabelIdOffHeap,
OffHeapObject subLabelIdOffHeap,
OffHeapObject otherVertexIdOffHeap) {
super(ownerVertex, direction, edgeLabelId, subLabelId, sortValues, otherVertex);
this.memoryPool = memoryPool;
this.ownerVertexIdOffHeap = ownerVertexIdOffHeap;
this.edgeLabelIdOffHeap = edgeLabelIdOffHeap;
this.subLabelIdOffHeap = subLabelIdOffHeap;
this.otherVertexIdOffHeap = otherVertexIdOffHeap;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

public EdgeIdOffHeap(Id ownerVertexId,
Expand All @@ -73,19 +73,19 @@ public EdgeIdOffHeap(Id ownerVertexId,
String sortValues,
Id otherVertexId,
MemoryPool memoryPool,
MemoryConsumer ownerVertexIdOffHeap,
MemoryConsumer edgeLabelIdOffHeap,
MemoryConsumer subLabelIdOffHeap,
MemoryConsumer otherVertexIdOffHeap) {
OffHeapObject ownerVertexIdOffHeap,
OffHeapObject edgeLabelIdOffHeap,
OffHeapObject subLabelIdOffHeap,
OffHeapObject otherVertexIdOffHeap) {
super(ownerVertexId, direction, edgeLabelId, subLabelId,
sortValues, otherVertexId, false);
this.memoryPool = memoryPool;
this.ownerVertexIdOffHeap = ownerVertexIdOffHeap;
this.edgeLabelIdOffHeap = edgeLabelIdOffHeap;
this.subLabelIdOffHeap = subLabelIdOffHeap;
this.otherVertexIdOffHeap = otherVertexIdOffHeap;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

public EdgeIdOffHeap(Id ownerVertexId,
Expand All @@ -96,19 +96,19 @@ public EdgeIdOffHeap(Id ownerVertexId,
Id otherVertexId,
boolean directed,
MemoryPool memoryPool,
MemoryConsumer ownerVertexIdOffHeap,
MemoryConsumer edgeLabelIdOffHeap,
MemoryConsumer subLabelIdOffHeap,
MemoryConsumer otherVertexIdOffHeap) {
OffHeapObject ownerVertexIdOffHeap,
OffHeapObject edgeLabelIdOffHeap,
OffHeapObject subLabelIdOffHeap,
OffHeapObject otherVertexIdOffHeap) {
super(ownerVertexId, direction, edgeLabelId, subLabelId, sortValues, otherVertexId,
directed);
this.memoryPool = memoryPool;
this.ownerVertexIdOffHeap = ownerVertexIdOffHeap;
this.edgeLabelIdOffHeap = edgeLabelIdOffHeap;
this.subLabelIdOffHeap = subLabelIdOffHeap;
this.otherVertexIdOffHeap = otherVertexIdOffHeap;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

@Override
Expand All @@ -126,23 +126,18 @@ public Object zeroCopyReadFromByteBuf() {
}

@Override
public void serializeSelfToByteBuf() {
public void serializeSelfToByteBuf(MemoryPool memoryPool) {
byte[] stringBytes = sortValues.getBytes((StandardCharsets.UTF_8));
this.sortValuesOffHeap = (ByteBuf) memoryPool.requireMemory(stringBytes.length);
this.sortValuesOffHeap = (ByteBuf) this.memoryPool.requireMemory(stringBytes.length);
this.sortValuesOffHeap.markReaderIndex();
this.sortValuesOffHeap.writeBytes(stringBytes);
}

@Override
public void releaseOriginalOnHeapVars() {
public void releaseOriginalVarsOnHeap() {
this.sortValues = null;
}

@Override
public MemoryPool getOperatorMemoryPool() {
return memoryPool;
}

@Override
public List<ByteBuf> getAllMemoryBlock() {
return Lists.newArrayList(this.sortValuesOffHeap, this.cacheOffHeap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,26 @@

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.util.NumericUtil;

import io.netty.buffer.ByteBuf;

public class LongIdOffHeap extends IdGenerator.LongId implements MemoryConsumer {
public class LongIdOffHeap extends IdGenerator.LongId implements OffHeapObject {

private final MemoryPool memoryPool;
private ByteBuf idOffHeap;

public LongIdOffHeap(MemoryPool memoryPool, long id) {
super(id);
this.memoryPool = memoryPool;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

public LongIdOffHeap(MemoryPool memoryPool, byte[] bytes) {
super(bytes);
this.memoryPool = memoryPool;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

@Override
Expand All @@ -57,26 +54,20 @@ public Object zeroCopyReadFromByteBuf() {
} finally {
idOffHeap.resetReaderIndex();
}

}

@Override
public void serializeSelfToByteBuf() {
public void serializeSelfToByteBuf(MemoryPool memoryPool) {
this.idOffHeap = (ByteBuf) memoryPool.requireMemory(Long.BYTES);
this.idOffHeap.markReaderIndex();
this.idOffHeap.writeLong(id);
}

@Override
public void releaseOriginalOnHeapVars() {
public void releaseOriginalVarsOnHeap() {
this.id = null;
}

@Override
public MemoryPool getOperatorMemoryPool() {
return memoryPool;
}

@Override
public List<ByteBuf> getAllMemoryBlock() {
return Collections.singletonList(idOffHeap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,50 @@
import java.util.Objects;

import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.memory.util.FurySerializationUtils;
import org.apache.hugegraph.memory.util.FurySerializationUtil;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;

public class ObjectIdOffHeap extends IdGenerator.ObjectId implements MemoryConsumer {
public class ObjectIdOffHeap extends IdGenerator.ObjectId implements OffHeapObject {

private final MemoryPool memoryPool;
private ByteBuf objectOffHeap;

public ObjectIdOffHeap(Object object, MemoryPool memoryPool) {
super(object);
this.memoryPool = memoryPool;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
serializeSelfToByteBuf(memoryPool);
releaseOriginalVarsOnHeap();
}

@Override
public Object zeroCopyReadFromByteBuf() {
return new IdGenerator.ObjectId(FurySerializationUtils.FURY.deserialize(
return new IdGenerator.ObjectId(FurySerializationUtil.FURY.deserialize(
ByteBufUtil.getBytes(this.objectOffHeap)));
}

@Override
public void serializeSelfToByteBuf() {
byte[] bytes = FurySerializationUtils.FURY.serialize(object);
public void serializeSelfToByteBuf(MemoryPool memoryPool) {
byte[] bytes = FurySerializationUtil.FURY.serialize(object);
this.objectOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length);
this.objectOffHeap.markReaderIndex();
this.objectOffHeap.writeBytes(bytes);
}

@Override
public void releaseOriginalOnHeapVars() {
public void releaseOriginalVarsOnHeap() {
this.object = null;
}

@Override
public MemoryPool getOperatorMemoryPool() {
return memoryPool;
}

@Override
public List<ByteBuf> getAllMemoryBlock() {
return Collections.singletonList(objectOffHeap);
}

@Override
public Object asObject() {
return FurySerializationUtils.FURY.deserialize(ByteBufUtil.getBytes(objectOffHeap));
return FurySerializationUtil.FURY.deserialize(ByteBufUtil.getBytes(objectOffHeap));
}

@Override
Expand All @@ -93,6 +86,3 @@ public String toString() {
return super.toString();
}
}



Loading

0 comments on commit ee8e125

Please sign in to comment.