1. for edges with edgeLabelType = NORMAL: edgeLabelId = parentEdgeLabelId = subEdgeLabelId; - * for edges with edgeLabelType = PARENT: edgeLabelId = subEdgeLabelId, parentEdgeLabelId = - * edgeLabelId.fatherId + * for edges with edgeLabelType = PARENT: edgeLabelId = subEdgeLabelId, parentEdgeLabelId = + * edgeLabelId.fatherId *
2.if we use `entry.type()` which is IN or OUT as a part of id, * an edge's id will be different due to different directions (belongs * to 2 owner vertex) @@ -49,15 +49,14 @@ public class EdgeId implements Id { HugeKeys.OTHER_VERTEX }; - private final Id ownerVertexId; - private final Directions direction; - private final Id edgeLabelId; - private final Id subLabelId; - private final String sortValues; - private final Id otherVertexId; - - private final boolean directed; - private String cache; + protected final Id ownerVertexId; + protected final Id edgeLabelId; + protected final Id subLabelId; + protected final Id otherVertexId; + protected final Directions direction; + protected final boolean directed; + protected String sortValues; + protected String cache; public EdgeId(HugeVertex ownerVertex, Directions direction, Id edgeLabelId, Id subLabelId, String sortValues, HugeVertex otherVertex) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/id/IdGenerator.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/id/IdGenerator.java index 9261d31fe8..17cc11684c 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/id/IdGenerator.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/id/IdGenerator.java @@ -119,15 +119,15 @@ public static IdType idType(Id id) { return IdType.UNKNOWN; } - private static int compareType(Id id1, Id id2) { + public static int compareType(Id id1, Id id2) { return idType(id1).ordinal() - idType(id2).ordinal(); } /****************************** id defines ******************************/ - public static final class StringId implements Id { + public static class StringId implements Id { - private final String id; + protected String id; public StringId(String id) { E.checkArgument(!id.isEmpty(), "The id can't be empty"); @@ -196,11 +196,11 @@ public String toString() { } } - public static final class LongId extends Number implements Id { + public static class LongId extends Number implements Id { private static final long serialVersionUID = -7732461469037400190L; - private final long id; + protected Long id; public LongId(long id) { this.id = id; @@ -270,7 +270,7 @@ public String toString() { @Override public int intValue() { - return (int) this.id; + return this.id.intValue(); } @Override @@ -289,9 +289,9 @@ public double doubleValue() { } } - public static final class UuidId implements Id { + public static class UuidId implements Id { - private final UUID uuid; + protected UUID uuid; public UuidId(String string) { this(StringEncoding.uuid(string)); @@ -379,9 +379,9 @@ public String toString() { /** * This class is just used by backend store for wrapper object as Id */ - public static final class ObjectId implements Id { + public static class ObjectId implements Id { - private final Object object; + protected Object object; public ObjectId(Object object) { E.checkNotNull(object, "object"); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BinaryBackendEntry.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BinaryBackendEntry.java index ca605f0b65..9dfb8b4e5c 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BinaryBackendEntry.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BinaryBackendEntry.java @@ -49,7 +49,8 @@ public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition) } // FIXME: `enablePartition` is unused here - public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition, boolean isOlap) { + public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition, + boolean isOlap) { this(type, BytesBuffer.wrap(bytes).parseOlapId(type, isOlap)); } @@ -207,10 +208,10 @@ public int hashCode() { return this.id().hashCode() ^ this.columns.size(); } - public static final class BinaryId implements Id { + public static class BinaryId implements Id { - private final byte[] bytes; - private final Id id; + protected byte[] bytes; + protected Id id; public BinaryId(byte[] bytes, Id id) { this.bytes = bytes; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java index 7651bd8cbd..401283254f 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java @@ -29,7 +29,7 @@ public OnHeapMemoryStrategy(MemoryManager memoryManager) { } @Override - public Object tryToAllocate(long size) { + public byte[] tryToAllocate(long size) { if (memoryManager.getCurrentOnHeapAllocatedMemory().get() + memoryManager.getCurrentOffHeapAllocatedMemory().get() + size < MemoryManager.MAX_MEMORY_CAPACITY_IN_BYTES) { @@ -41,7 +41,7 @@ public Object tryToAllocate(long size) { } @Override - public Object forceAllocate(long size) { + public byte[] forceAllocate(long size) { int sizeOfByte = (int) (size / Bytes.BASE); memoryManager.getCurrentOnHeapAllocatedMemory().addAndGet(sizeOfByte); return new byte[sizeOfByte]; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java index ca936c7754..8eb4cc0489 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java @@ -17,7 +17,31 @@ package org.apache.hugegraph.memory.consumer; -// TODO(pjz): integrated it with HG objects such as edges and vertex. +import org.apache.hugegraph.memory.pool.MemoryPool; + +/** + * This interface is used by immutable, memory-heavy objects which will be stored in off heap. + */ public interface MemoryConsumer { + /** + * This method will read Off heap ByteBuf storing binary data of self. + * + * @return self value + */ + Object zeroCopyReadFromByteBuf(); + + /** + * Serialize to DataOutputStream in stack first, then request an off heap ByteBuf from + * OperatorMemoryPool based on size of DataOutputStream. Finally, serializing it to ByteBuf. + */ + void serializeSelfToByteBuf(); + + /** + * Called after serializingSelfToByteBuf, pointing all self's on heap vars to null, in order + * to let GC release all its on heap memory. + */ + void releaseOriginalOnHeapVars(); + + MemoryPool getOperatorMemoryPool(); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java new file mode 100644 index 0000000000..8215f98749 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.serializer.BinaryBackendEntry; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; +import org.apache.hugegraph.util.Bytes; +import org.apache.hugegraph.util.E; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +public class BinaryIdOffHeap extends BinaryBackendEntry.BinaryId implements MemoryConsumer { + + private final MemoryPool memoryPool; + private final MemoryConsumer originId; + private ByteBuf bytesOffHeap; + + public BinaryIdOffHeap(byte[] bytes, Id id, MemoryPool memoryPool, MemoryConsumer originId) { + super(bytes, id); + this.memoryPool = memoryPool; + this.originId = originId; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public void serializeSelfToByteBuf() { + this.bytesOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length); + this.bytesOffHeap.markReaderIndex(); + this.bytesOffHeap.writeBytes(bytes); + } + + @Override + public BinaryBackendEntry.BinaryId zeroCopyReadFromByteBuf() { + return new BinaryBackendEntry.BinaryId(ByteBufUtil.getBytes(bytesOffHeap), + (Id) originId.zeroCopyReadFromByteBuf()); + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return this.memoryPool; + } + + @Override + public void releaseOriginalOnHeapVars() { + this.bytes = null; + this.id = null; + } + + @Override + public Object asObject() { + return bytesOffHeap.nioBuffer(); + } + + @Override + public String toString() { + return "0x" + Bytes.toHex(asBytes()); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof BinaryIdOffHeap)) { + return false; + } + return bytesOffHeap.equals(((BinaryIdOffHeap) other).bytesOffHeap); + } + + @Override + public int hashCode() { + return bytesOffHeap.hashCode(); + } + + @Override + public int length() { + return bytesOffHeap.readableBytes(); + } + + @Override + public byte[] asBytes(int offset) { + E.checkArgument(offset < this.bytesOffHeap.readableBytes(), + "Invalid offset %s, must be < length %s", + offset, this.bytesOffHeap.readableBytes()); + try { + // zero-copy read + byte[] tmpBytes = new byte[offset]; + this.bytesOffHeap.readBytes(tmpBytes); + return tmpBytes; + } finally { + this.bytesOffHeap.resetReaderIndex(); + } + } + + @Override + public byte[] asBytes() { + try { + // zero-copy read + byte[] tmpBytes = new byte[bytesOffHeap.readableBytes()]; + this.bytesOffHeap.readBytes(tmpBytes); + return tmpBytes; + } finally { + this.bytesOffHeap.resetReaderIndex(); + } + } + + @Override + public int compareTo(Id other) { + return bytesOffHeap.compareTo(((BinaryIdOffHeap) other).bytesOffHeap); + } + + @Override + public Id origin() { + return (Id) originId.zeroCopyReadFromByteBuf(); + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java new file mode 100644 index 0000000000..98d30f3dd6 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +import org.apache.hugegraph.backend.id.EdgeId; +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdUtil; +import org.apache.hugegraph.backend.id.SplicingIdGenerator; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; +import org.apache.hugegraph.structure.HugeVertex; +import org.apache.hugegraph.type.define.Directions; + +import io.netty.buffer.ByteBuf; + +// TODO: rewrite static method in EdgeId +public class EdgeIdOffHeap extends EdgeId implements MemoryConsumer { + + private final MemoryPool memoryPool; + private final MemoryConsumer ownerVertexIdOffHeap; + private final MemoryConsumer edgeLabelIdOffHeap; + private final MemoryConsumer subLabelIdOffHeap; + private final MemoryConsumer otherVertexIdOffHeap; + private ByteBuf sortValuesOffHeap; + private ByteBuf cacheOffHeap; + + public EdgeIdOffHeap(HugeVertex ownerVertex, + Directions direction, + Id edgeLabelId, + Id subLabelId, + String sortValues, + HugeVertex otherVertex, + MemoryPool memoryPool, + MemoryConsumer ownerVertexIdOffHeap, + MemoryConsumer edgeLabelIdOffHeap, + MemoryConsumer subLabelIdOffHeap, + MemoryConsumer otherVertexIdOffHeap) { + super(ownerVertex, direction, edgeLabelId, subLabelId, sortValues, otherVertex); + this.memoryPool = memoryPool; + this.ownerVertexIdOffHeap = ownerVertexIdOffHeap; + this.edgeLabelIdOffHeap = edgeLabelIdOffHeap; + this.subLabelIdOffHeap = subLabelIdOffHeap; + this.otherVertexIdOffHeap = otherVertexIdOffHeap; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + public EdgeIdOffHeap(Id ownerVertexId, + Directions direction, + Id edgeLabelId, + Id subLabelId, + String sortValues, + Id otherVertexId, + MemoryPool memoryPool, + MemoryConsumer ownerVertexIdOffHeap, + MemoryConsumer edgeLabelIdOffHeap, + MemoryConsumer subLabelIdOffHeap, + MemoryConsumer otherVertexIdOffHeap) { + super(ownerVertexId, direction, edgeLabelId, subLabelId, + sortValues, otherVertexId, false); + this.memoryPool = memoryPool; + this.ownerVertexIdOffHeap = ownerVertexIdOffHeap; + this.edgeLabelIdOffHeap = edgeLabelIdOffHeap; + this.subLabelIdOffHeap = subLabelIdOffHeap; + this.otherVertexIdOffHeap = otherVertexIdOffHeap; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + public EdgeIdOffHeap(Id ownerVertexId, + Directions direction, + Id edgeLabelId, + Id subLabelId, + String sortValues, + Id otherVertexId, + boolean directed, + MemoryPool memoryPool, + MemoryConsumer ownerVertexIdOffHeap, + MemoryConsumer edgeLabelIdOffHeap, + MemoryConsumer subLabelIdOffHeap, + MemoryConsumer otherVertexIdOffHeap) { + super(ownerVertexId, direction, edgeLabelId, subLabelId, sortValues, otherVertexId, + directed); + this.memoryPool = memoryPool; + this.ownerVertexIdOffHeap = ownerVertexIdOffHeap; + this.edgeLabelIdOffHeap = edgeLabelIdOffHeap; + this.subLabelIdOffHeap = subLabelIdOffHeap; + this.otherVertexIdOffHeap = otherVertexIdOffHeap; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public Object zeroCopyReadFromByteBuf() { + try { + return new EdgeId((HugeVertex) this.ownerVertexIdOffHeap.zeroCopyReadFromByteBuf(), + this.direction, + (Id) this.edgeLabelIdOffHeap.zeroCopyReadFromByteBuf(), + (Id) this.subLabelIdOffHeap.zeroCopyReadFromByteBuf(), + this.sortValuesOffHeap.toString(StandardCharsets.UTF_8), + (HugeVertex) this.otherVertexIdOffHeap.zeroCopyReadFromByteBuf()); + } finally { + this.sortValuesOffHeap.resetReaderIndex(); + } + } + + @Override + public void serializeSelfToByteBuf() { + byte[] stringBytes = sortValues.getBytes((StandardCharsets.UTF_8)); + this.sortValuesOffHeap = (ByteBuf) memoryPool.requireMemory(stringBytes.length); + this.sortValuesOffHeap.markReaderIndex(); + this.sortValuesOffHeap.writeBytes(stringBytes); + } + + @Override + public void releaseOriginalOnHeapVars() { + this.sortValues = null; + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return memoryPool; + } + + @Override + public EdgeId switchDirection() { + Directions newDirection = this.direction.opposite(); + return new EdgeIdOffHeap(this.otherVertexId, + newDirection, + this.edgeLabelId, + this.subLabelId, + this.sortValues, + this.ownerVertexId, + this.memoryPool, + this.ownerVertexIdOffHeap, + this.edgeLabelIdOffHeap, + this.subLabelIdOffHeap, + this.otherVertexIdOffHeap); + } + + @Override + public EdgeId directed(boolean directed) { + return new EdgeIdOffHeap(this.otherVertexId, + this.direction, + this.edgeLabelId, + this.subLabelId, + this.sortValues, + this.ownerVertexId, + directed, + this.memoryPool, + this.ownerVertexIdOffHeap, + this.edgeLabelIdOffHeap, + this.subLabelIdOffHeap, + this.otherVertexIdOffHeap); + } + + @Override + public Id ownerVertexId() { + return (Id) this.ownerVertexIdOffHeap.zeroCopyReadFromByteBuf(); + } + + @Override + public Id edgeLabelId() { + return (Id) this.edgeLabelIdOffHeap.zeroCopyReadFromByteBuf(); + } + + @Override + public Id subLabelId() { + return (Id) this.subLabelIdOffHeap.zeroCopyReadFromByteBuf(); + } + + @Override + public String sortValues() { + try { + return this.sortValuesOffHeap.toString(StandardCharsets.UTF_8); + } finally { + this.sortValuesOffHeap.resetReaderIndex(); + } + } + + @Override + public Id otherVertexId() { + return (Id) this.otherVertexIdOffHeap.zeroCopyReadFromByteBuf(); + } + + @Override + public String asString() { + if (this.cacheOffHeap != null) { + try { + return this.cacheOffHeap.toString(StandardCharsets.UTF_8); + } finally { + this.cacheOffHeap.resetReaderIndex(); + } + } + String tmpCache; + if (this.directed) { + tmpCache = SplicingIdGenerator.concat( + IdUtil.writeString((Id) this.ownerVertexIdOffHeap.zeroCopyReadFromByteBuf()), + this.direction.type().string(), + IdUtil.writeLong((Id) this.edgeLabelIdOffHeap.zeroCopyReadFromByteBuf()), + IdUtil.writeLong((Id) this.subLabelIdOffHeap.zeroCopyReadFromByteBuf()), + this.sortValues(), + IdUtil.writeString((Id) this.otherVertexIdOffHeap.zeroCopyReadFromByteBuf())); + } else { + tmpCache = SplicingIdGenerator.concat( + IdUtil.writeString((Id) this.ownerVertexIdOffHeap.zeroCopyReadFromByteBuf()), + IdUtil.writeLong((Id) this.edgeLabelIdOffHeap.zeroCopyReadFromByteBuf()), + IdUtil.writeLong((Id) this.subLabelIdOffHeap.zeroCopyReadFromByteBuf()), + this.sortValues(), + IdUtil.writeString((Id) this.otherVertexIdOffHeap.zeroCopyReadFromByteBuf())); + } + byte[] tmpCacheBytes = tmpCache.getBytes(StandardCharsets.UTF_8); + this.cacheOffHeap = (ByteBuf) memoryPool.requireMemory(tmpCacheBytes.length); + this.cacheOffHeap.markReaderIndex(); + this.cacheOffHeap.writeBytes(tmpCacheBytes); + return tmpCache; + } + + @Override + public int hashCode() { + if (this.directed) { + return Objects.hash(this.ownerVertexIdOffHeap, + this.direction, + this.edgeLabelIdOffHeap, + this.subLabelIdOffHeap, + this.sortValuesOffHeap, + this.otherVertexIdOffHeap); + } else { + return Objects.hash(this.otherVertexIdOffHeap, + this.edgeLabelIdOffHeap, + this.subLabelIdOffHeap, + this.sortValuesOffHeap, + this.ownerVertexIdOffHeap); + } + } + + @Override + public boolean equals(Object object) { + if (!(object instanceof EdgeIdOffHeap)) { + return false; + } + EdgeIdOffHeap other = (EdgeIdOffHeap) object; + if (this.directed) { + return this.ownerVertexIdOffHeap.equals(other.ownerVertexIdOffHeap) && + this.direction == other.direction && + this.edgeLabelIdOffHeap.equals(other.edgeLabelIdOffHeap) && + this.subLabelIdOffHeap.equals(other.subLabelIdOffHeap) && + this.sortValuesOffHeap.equals(other.sortValuesOffHeap) && + this.otherVertexIdOffHeap.equals(other.otherVertexIdOffHeap); + } else { + return this.otherVertexIdOffHeap.equals(other.otherVertexIdOffHeap) && + this.edgeLabelIdOffHeap.equals(other.edgeLabelIdOffHeap) && + this.subLabelIdOffHeap.equals(other.subLabelIdOffHeap) && + this.sortValuesOffHeap.equals(other.sortValuesOffHeap) && + this.ownerVertexIdOffHeap.equals(other.ownerVertexIdOffHeap); + } + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java new file mode 100644 index 0000000000..6c7bf65d66 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import static org.apache.hugegraph.backend.id.IdGenerator.compareType; + +import java.util.Objects; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; +import org.apache.hugegraph.util.NumericUtil; + +import io.netty.buffer.ByteBuf; + +public class LongIdOffHeap extends IdGenerator.LongId implements MemoryConsumer { + + private final MemoryPool memoryPool; + private ByteBuf idOffHeap; + + public LongIdOffHeap(MemoryPool memoryPool, long id) { + super(id); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + public LongIdOffHeap(MemoryPool memoryPool, byte[] bytes) { + super(bytes); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public Object zeroCopyReadFromByteBuf() { + try { + return new IdGenerator.LongId(idOffHeap.readLong()); + } finally { + idOffHeap.resetReaderIndex(); + } + + } + + @Override + public void serializeSelfToByteBuf() { + this.idOffHeap = (ByteBuf) memoryPool.requireMemory(Long.BYTES); + this.idOffHeap.markReaderIndex(); + this.idOffHeap.writeLong(id); + } + + @Override + public void releaseOriginalOnHeapVars() { + this.id = null; + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return memoryPool; + } + + @Override + public long asLong() { + try { + return idOffHeap.readLong(); + } finally { + idOffHeap.resetReaderIndex(); + } + } + + @Override + public Object asObject() { + return this.asLong(); + } + + @Override + public String asString() { + return Long.toString(this.asLong()); + } + + @Override + public byte[] asBytes() { + return NumericUtil.longToBytes(this.asLong()); + } + + @Override + public int compareTo(Id other) { + int cmp = compareType(this, other); + if (cmp != 0) { + return cmp; + } + return Long.compare(this.asLong(), other.asLong()); + } + + @Override + public int hashCode() { + return Objects.hash(idOffHeap); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Number)) { + return false; + } + return this.asLong() == ((Number) other).longValue(); + } + + @Override + public String toString() { + return String.valueOf(this.asLong()); + } + + @Override + public int intValue() { + return (int) this.asLong(); + } + + @Override + public long longValue() { + return this.asLong(); + } + + @Override + public float floatValue() { + return this.asLong(); + } + + @Override + public double doubleValue() { + return this.asLong(); + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java new file mode 100644 index 0000000000..5bf4a58093 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import java.util.Objects; + +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; +import org.apache.hugegraph.memory.util.FurySerializationUtils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +public class ObjectIdOffHeap extends IdGenerator.ObjectId implements MemoryConsumer { + + private final MemoryPool memoryPool; + private ByteBuf objectOffHeap; + + public ObjectIdOffHeap(Object object, MemoryPool memoryPool) { + super(object); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public Object zeroCopyReadFromByteBuf() { + return new IdGenerator.ObjectId(FurySerializationUtils.FURY.deserialize( + ByteBufUtil.getBytes(this.objectOffHeap))); + } + + @Override + public void serializeSelfToByteBuf() { + byte[] bytes = FurySerializationUtils.FURY.serialize(object); + this.objectOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length); + this.objectOffHeap.markReaderIndex(); + this.objectOffHeap.writeBytes(bytes); + //try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + // ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) { + // // serialize attributes to outputStream + // outputStream.writeObject(this.object); + // objectOffHeap = (ByteBuf) memoryPool.requireMemory(byteArrayOutputStream.size()); + // objectOffHeap.writeBytes(byteArrayOutputStream.toByteArray()); + //} catch (IOException e) { + // LOG.error("Unexpected error occurs when serializing ObjectId.", e); + // throw new SerializationRuntimeException(e); + //} + } + + @Override + public void releaseOriginalOnHeapVars() { + this.object = null; + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return memoryPool; + } + + @Override + public Object asObject() { + return FurySerializationUtils.FURY.deserialize(ByteBufUtil.getBytes(objectOffHeap)); + } + + @Override + public int hashCode() { + return Objects.hash(objectOffHeap); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ObjectIdOffHeap)) { + return false; + } + return this.objectOffHeap.equals(((ObjectIdOffHeap) other).objectOffHeap); + } + + @Override + public String toString() { + return super.toString(); + } +} + + + diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java new file mode 100644 index 0000000000..b2852992a9 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import java.nio.charset.StandardCharsets; + +import org.apache.hugegraph.backend.cache.CachedBackendStore; +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +public class QueryIdOffHeap extends CachedBackendStore.QueryId implements MemoryConsumer { + + private final MemoryPool memoryPool; + private ByteBuf queryOffHeap; + + public QueryIdOffHeap(MemoryPool memoryPool, Query q) { + super(q); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public Object zeroCopyReadFromByteBuf() { + try { + return new CachedBackendStore.QueryId( + this.queryOffHeap.toString(StandardCharsets.UTF_8), + this.hashCode); + } finally { + queryOffHeap.resetReaderIndex(); + } + } + + @Override + public void serializeSelfToByteBuf() { + byte[] stringBytes = query.getBytes((StandardCharsets.UTF_8)); + this.queryOffHeap = (ByteBuf) memoryPool.requireMemory(stringBytes.length); + this.queryOffHeap.markReaderIndex(); + this.queryOffHeap.writeBytes(stringBytes); + } + + @Override + public void releaseOriginalOnHeapVars() { + this.query = null; + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return memoryPool; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof QueryIdOffHeap)) { + return false; + } + return this.queryOffHeap.equals(((QueryIdOffHeap) other).queryOffHeap); + } + + @Override + public int compareTo(Id o) { + return this.asString().compareTo(o.asString()); + } + + @Override + public Object asObject() { + return this.asString(); + } + + @Override + public String asString() { + try { + return this.queryOffHeap.toString(StandardCharsets.UTF_8); + } finally { + this.queryOffHeap.resetReaderIndex(); + } + } + + @Override + public byte[] asBytes() { + return ByteBufUtil.getBytes(this.queryOffHeap); + } + + @Override + public String toString() { + return this.asString(); + } + + @Override + public int length() { + return this.asString().length(); + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java new file mode 100644 index 0000000000..e924a17a61 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import static org.apache.hugegraph.backend.id.IdGenerator.compareType; + +import java.nio.charset.StandardCharsets; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +public class StringIdOffHeap extends IdGenerator.StringId implements MemoryConsumer { + + private final MemoryPool memoryPool; + private ByteBuf idOffHeap; + + public StringIdOffHeap(MemoryPool memoryPool, String id) { + super(id); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + public StringIdOffHeap(MemoryPool memoryPool, byte[] bytes) { + super(bytes); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public Object zeroCopyReadFromByteBuf() { + try { + return new IdGenerator.StringId(idOffHeap.toString(StandardCharsets.UTF_8)); + } finally { + idOffHeap.resetReaderIndex(); + } + } + + @Override + public void serializeSelfToByteBuf() { + byte[] stringBytes = id.getBytes((StandardCharsets.UTF_8)); + this.idOffHeap = (ByteBuf) memoryPool.requireMemory(stringBytes.length); + this.idOffHeap.markReaderIndex(); + this.idOffHeap.writeBytes(stringBytes); + } + + @Override + public void releaseOriginalOnHeapVars() { + this.id = null; + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return memoryPool; + } + + @Override + public Object asObject() { + return this.asString(); + } + + @Override + public String asString() { + try { + return this.idOffHeap.toString(StandardCharsets.UTF_8); + } finally { + this.idOffHeap.resetReaderIndex(); + } + } + + @Override + public long asLong() { + return Long.parseLong(this.asString()); + } + + @Override + public byte[] asBytes() { + return ByteBufUtil.getBytes(this.idOffHeap); + } + + @Override + public int length() { + return this.asString().length(); + } + + @Override + public int compareTo(Id other) { + int cmp = compareType(this, other); + if (cmp != 0) { + return cmp; + } + return this.asString().compareTo(other.asString()); + } + + @Override + public int hashCode() { + return this.idOffHeap.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof StringIdOffHeap)) { + return false; + } + return this.idOffHeap.equals(((StringIdOffHeap) other).idOffHeap); + } + + @Override + public String toString() { + return this.asString(); + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java new file mode 100644 index 0000000000..b21b72ad63 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.consumer.impl.id; + +import static org.apache.hugegraph.backend.id.IdGenerator.compareType; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.backend.serializer.BytesBuffer; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; +import org.apache.hugegraph.memory.pool.MemoryPool; +import org.apache.hugegraph.memory.util.FurySerializationUtils; +import org.apache.hugegraph.memory.util.SerializationRuntimeException; +import org.apache.hugegraph.util.E; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +public class UuidIdOffHeap extends IdGenerator.UuidId implements MemoryConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(UuidIdOffHeap.class); + private final MemoryPool memoryPool; + private ByteBuf idOffHeap; + + public UuidIdOffHeap(MemoryPool memoryPool, String string) { + super(string); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + public UuidIdOffHeap(MemoryPool memoryPool, byte[] bytes) { + super(bytes); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + public UuidIdOffHeap(MemoryPool memoryPool, UUID uuid) { + super(uuid); + this.memoryPool = memoryPool; + serializeSelfToByteBuf(); + releaseOriginalOnHeapVars(); + } + + @Override + public Object zeroCopyReadFromByteBuf() { + return new IdGenerator.UuidId((UUID) FurySerializationUtils.FURY.deserialize( + ByteBufUtil.getBytes(this.idOffHeap))); + } + + @Override + public void serializeSelfToByteBuf() { + byte[] bytes = FurySerializationUtils.FURY.serialize(uuid); + this.idOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length); + this.idOffHeap.markReaderIndex(); + this.idOffHeap.writeBytes(bytes); + } + + @Override + public void releaseOriginalOnHeapVars() { + this.uuid = null; + } + + @Override + public MemoryPool getOperatorMemoryPool() { + return memoryPool; + } + + @Override + public Object asObject() { + return FurySerializationUtils.FURY.deserialize( + ByteBufUtil.getBytes(this.idOffHeap)); + } + + @Override + public String asString() { + return this.asObject().toString(); + } + + @Override + public byte[] asBytes() { + try (BytesBuffer buffer = BytesBuffer.allocate(16)) { + UUID tmp = (UUID) this.asObject(); + buffer.writeLong(tmp.getMostSignificantBits()); + buffer.writeLong(tmp.getLeastSignificantBits()); + return buffer.bytes(); + } catch (IOException e) { + LOG.error("Unexpected error occurs when allocate bytesBuffer.", e); + throw new SerializationRuntimeException(e); + } + } + + @Override + public int compareTo(Id other) { + E.checkNotNull(other, "compare id"); + int cmp = compareType(this, other); + if (cmp != 0) { + return cmp; + } + return ((UUID) this.asObject()).compareTo((UUID) other.asObject()); + } + + @Override + public int hashCode() { + return this.idOffHeap.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof UuidIdOffHeap)) { + return false; + } + return this.asObject().equals(((UuidIdOffHeap) other).asObject()); + } + + @Override + public String toString() { + return this.asString(); + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/util/FurySerializationUtils.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/util/FurySerializationUtils.java new file mode 100644 index 0000000000..c8388747da --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/util/FurySerializationUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.util; + +import java.util.UUID; + +import org.apache.fury.Fury; +import org.apache.fury.config.Language; + +public class FurySerializationUtils { + + public static final Fury FURY = Fury.builder().withLanguage(Language.JAVA) + // not mandatory to register all class + .requireClassRegistration(false) + .build(); + + static { + FURY.register(UUID.class); + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/util/SerializationRuntimeException.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/util/SerializationRuntimeException.java new file mode 100644 index 0000000000..83ce7c043f --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/util/SerializationRuntimeException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.memory.util; + +public class SerializationRuntimeException extends RuntimeException { + + public SerializationRuntimeException(Throwable e) { + super("Unexpected error occurs in serialization", e); + } +}