Skip to content

Commit

Permalink
complete adoption for all id
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed Oct 25, 2024
1 parent 871015e commit 8344443
Show file tree
Hide file tree
Showing 16 changed files with 1,169 additions and 31 deletions.
5 changes: 5 additions & 0 deletions hugegraph-server/hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.9.0-SNAPSHOT</version>
</dependency>

<!-- jraft -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,21 @@ public Number queryNumber(Query query) {
/**
* Query as an Id for cache
*/
static class QueryId implements Id {
public static class QueryId implements Id {

private String query;
private int hashCode;
protected String query;
protected int hashCode;

public QueryId(Query q) {
this.query = q.toString();
this.hashCode = q.hashCode();
}

public QueryId(String query, int hashCode) {
this.query = query;
this.hashCode = hashCode;
}

@Override
public IdType type() {
return IdType.UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
* > sortKeys > target-vertex-id }
* NOTE:
* <p>1. for edges with edgeLabelType = NORMAL: edgeLabelId = parentEdgeLabelId = subEdgeLabelId;
* for edges with edgeLabelType = PARENT: edgeLabelId = subEdgeLabelId, parentEdgeLabelId =
* edgeLabelId.fatherId
* for edges with edgeLabelType = PARENT: edgeLabelId = subEdgeLabelId, parentEdgeLabelId =
* edgeLabelId.fatherId
* <p>2.if we use `entry.type()` which is IN or OUT as a part of id,
* an edge's id will be different due to different directions (belongs
* to 2 owner vertex)
Expand All @@ -49,15 +49,14 @@ public class EdgeId implements Id {
HugeKeys.OTHER_VERTEX
};

private final Id ownerVertexId;
private final Directions direction;
private final Id edgeLabelId;
private final Id subLabelId;
private final String sortValues;
private final Id otherVertexId;

private final boolean directed;
private String cache;
protected final Id ownerVertexId;
protected final Id edgeLabelId;
protected final Id subLabelId;
protected final Id otherVertexId;
protected final Directions direction;
protected final boolean directed;
protected String sortValues;
protected String cache;

public EdgeId(HugeVertex ownerVertex, Directions direction,
Id edgeLabelId, Id subLabelId, String sortValues, HugeVertex otherVertex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ public static IdType idType(Id id) {
return IdType.UNKNOWN;
}

private static int compareType(Id id1, Id id2) {
public static int compareType(Id id1, Id id2) {
return idType(id1).ordinal() - idType(id2).ordinal();
}

/****************************** id defines ******************************/

public static final class StringId implements Id {
public static class StringId implements Id {

private final String id;
protected String id;

public StringId(String id) {
E.checkArgument(!id.isEmpty(), "The id can't be empty");
Expand Down Expand Up @@ -196,11 +196,11 @@ public String toString() {
}
}

public static final class LongId extends Number implements Id {
public static class LongId extends Number implements Id {

private static final long serialVersionUID = -7732461469037400190L;

private final long id;
protected Long id;

public LongId(long id) {
this.id = id;
Expand Down Expand Up @@ -270,7 +270,7 @@ public String toString() {

@Override
public int intValue() {
return (int) this.id;
return this.id.intValue();
}

@Override
Expand All @@ -289,9 +289,9 @@ public double doubleValue() {
}
}

public static final class UuidId implements Id {
public static class UuidId implements Id {

private final UUID uuid;
protected UUID uuid;

public UuidId(String string) {
this(StringEncoding.uuid(string));
Expand Down Expand Up @@ -379,9 +379,9 @@ public String toString() {
/**
* This class is just used by backend store for wrapper object as Id
*/
public static final class ObjectId implements Id {
public static class ObjectId implements Id {

private final Object object;
protected Object object;

public ObjectId(Object object) {
E.checkNotNull(object, "object");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition)
}

// FIXME: `enablePartition` is unused here
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition, boolean isOlap) {
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition,
boolean isOlap) {
this(type, BytesBuffer.wrap(bytes).parseOlapId(type, isOlap));
}

Expand Down Expand Up @@ -207,10 +208,10 @@ public int hashCode() {
return this.id().hashCode() ^ this.columns.size();
}

public static final class BinaryId implements Id {
public static class BinaryId implements Id {

private final byte[] bytes;
private final Id id;
protected byte[] bytes;
protected Id id;

public BinaryId(byte[] bytes, Id id) {
this.bytes = bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public OnHeapMemoryStrategy(MemoryManager memoryManager) {
}

@Override
public Object tryToAllocate(long size) {
public byte[] tryToAllocate(long size) {
if (memoryManager.getCurrentOnHeapAllocatedMemory().get() +
memoryManager.getCurrentOffHeapAllocatedMemory().get() + size <
MemoryManager.MAX_MEMORY_CAPACITY_IN_BYTES) {
Expand All @@ -41,7 +41,7 @@ public Object tryToAllocate(long size) {
}

@Override
public Object forceAllocate(long size) {
public byte[] forceAllocate(long size) {
int sizeOfByte = (int) (size / Bytes.BASE);
memoryManager.getCurrentOnHeapAllocatedMemory().addAndGet(sizeOfByte);
return new byte[sizeOfByte];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,31 @@

package org.apache.hugegraph.memory.consumer;

// TODO(pjz): integrated it with HG objects such as edges and vertex.
import org.apache.hugegraph.memory.pool.MemoryPool;

/**
* This interface is used by immutable, memory-heavy objects which will be stored in off heap.
*/
public interface MemoryConsumer {

/**
* This method will read Off heap ByteBuf storing binary data of self.
*
* @return self value
*/
Object zeroCopyReadFromByteBuf();

/**
* Serialize to DataOutputStream in stack first, then request an off heap ByteBuf from
* OperatorMemoryPool based on size of DataOutputStream. Finally, serializing it to ByteBuf.
*/
void serializeSelfToByteBuf();

/**
* Called after serializingSelfToByteBuf, pointing all self's on heap vars to null, in order
* to let GC release all its on heap memory.
*/
void releaseOriginalOnHeapVars();

MemoryPool getOperatorMemoryPool();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.consumer.impl.id;

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;

public class BinaryIdOffHeap extends BinaryBackendEntry.BinaryId implements MemoryConsumer {

private final MemoryPool memoryPool;
private final MemoryConsumer originId;
private ByteBuf bytesOffHeap;

public BinaryIdOffHeap(byte[] bytes, Id id, MemoryPool memoryPool, MemoryConsumer originId) {
super(bytes, id);
this.memoryPool = memoryPool;
this.originId = originId;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
}

@Override
public void serializeSelfToByteBuf() {
this.bytesOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length);
this.bytesOffHeap.markReaderIndex();
this.bytesOffHeap.writeBytes(bytes);
}

@Override
public BinaryBackendEntry.BinaryId zeroCopyReadFromByteBuf() {
return new BinaryBackendEntry.BinaryId(ByteBufUtil.getBytes(bytesOffHeap),
(Id) originId.zeroCopyReadFromByteBuf());
}

@Override
public MemoryPool getOperatorMemoryPool() {
return this.memoryPool;
}

@Override
public void releaseOriginalOnHeapVars() {
this.bytes = null;
this.id = null;
}

@Override
public Object asObject() {
return bytesOffHeap.nioBuffer();
}

@Override
public String toString() {
return "0x" + Bytes.toHex(asBytes());
}

@Override
public boolean equals(Object other) {
if (!(other instanceof BinaryIdOffHeap)) {
return false;
}
return bytesOffHeap.equals(((BinaryIdOffHeap) other).bytesOffHeap);
}

@Override
public int hashCode() {
return bytesOffHeap.hashCode();
}

@Override
public int length() {
return bytesOffHeap.readableBytes();
}

@Override
public byte[] asBytes(int offset) {
E.checkArgument(offset < this.bytesOffHeap.readableBytes(),
"Invalid offset %s, must be < length %s",
offset, this.bytesOffHeap.readableBytes());
try {
// zero-copy read
byte[] tmpBytes = new byte[offset];
this.bytesOffHeap.readBytes(tmpBytes);
return tmpBytes;
} finally {
this.bytesOffHeap.resetReaderIndex();
}
}

@Override
public byte[] asBytes() {
try {
// zero-copy read
byte[] tmpBytes = new byte[bytesOffHeap.readableBytes()];
this.bytesOffHeap.readBytes(tmpBytes);
return tmpBytes;
} finally {
this.bytesOffHeap.resetReaderIndex();
}
}

@Override
public int compareTo(Id other) {
return bytesOffHeap.compareTo(((BinaryIdOffHeap) other).bytesOffHeap);
}

@Override
public Id origin() {
return (Id) originId.zeroCopyReadFromByteBuf();
}
}
Loading

0 comments on commit 8344443

Please sign in to comment.