Skip to content

Commit

Permalink
add graph layer.
Browse files Browse the repository at this point in the history
  • Loading branch information
zekisong committed Jan 26, 2021
1 parent 015bc8f commit b3c03c8
Show file tree
Hide file tree
Showing 130 changed files with 2,313 additions and 1,803 deletions.
8 changes: 7 additions & 1 deletion assembly/assembly.iml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<orderEntry type="library" name="Maven: io.grpc:grpc-netty-shaded:1.20.0" level="project" />
<orderEntry type="library" name="Maven: io.grpc:grpc-core:1.20.0" level="project" />
<orderEntry type="library" name="Maven: io.grpc:grpc-context:1.20.0" level="project" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.7" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.3.2" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:3.0.2" level="project" />
<orderEntry type="library" name="Maven: com.google.android:annotations:4.1.1.4" level="project" />
Expand Down Expand Up @@ -74,6 +73,13 @@
<orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.2" level="project" />
<orderEntry type="library" name="Maven: javax.xml.stream:stax-api:1.0-2" level="project" />
<orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" />
<orderEntry type="module" module-name="serde" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.8.6" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:kryo:4.0.2" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:reflectasm:1.11.3" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm:5.0.4" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:minlog:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.5.1" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
Expand Down
13 changes: 10 additions & 3 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>graphdb</artifactId>
<groupId>com.lightgraph.graph</groupId>
Expand Down Expand Up @@ -56,7 +56,8 @@
<goal>build-classpath</goal>
</goals>
<configuration>
<outputFile>${project.parent.basedir}/target/cached_classpath.txt</outputFile>
<outputFile>${project.parent.basedir}/target/cached_classpath.txt
</outputFile>
</configuration>
</execution>
</executions>
Expand Down Expand Up @@ -100,6 +101,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.lightgraph.graph</groupId>
<artifactId>serde</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
79 changes: 8 additions & 71 deletions core/src/main/java/com/lightgraph/graph/cluster/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,23 @@
import com.lightgraph.graph.constant.GraphConstant;
import com.lightgraph.graph.exception.GraphException;
import com.lightgraph.graph.modules.consensus.ConsensusGroup;
import com.lightgraph.graph.utils.ByteUtils;
import com.lightgraph.graph.writable.Sizeable;
import com.lightgraph.graph.writable.Writable;

import java.util.*;

public class Partition extends ConsensusGroup<Replication> implements Writable, Sizeable {
public class Partition extends ConsensusGroup<Replication> {

private String graph;
private Integer partitionIndex;

public Partition(String graph, Integer partitionIndex) {
if (graph == null || partitionIndex == null)
if (graph == null || partitionIndex == null) {
throw new GraphException("graph or partitionIndex must not be null!");
}
this.graph = graph;
this.partitionIndex = partitionIndex;
this.groupName = getName();
}

public Partition(byte[] bytes) {
int pos = ByteUtils.RESERVED_BYTE_SIZE_FOR_TX;
int groupLen = ByteUtils.getInt(bytes, pos);
pos = pos + ByteUtils.SIZE_INT;
int instanceCount = ByteUtils.getInt(bytes, pos);
pos = pos + ByteUtils.SIZE_INT;
List<Integer> instances = new ArrayList<>();
for (int i = 0; i < instanceCount; i++) {
int instanceLen = ByteUtils.getInt(bytes, pos);
pos = pos + ByteUtils.SIZE_INT;
instances.add(instanceLen);
}
int graphLen = ByteUtils.getInt(bytes, pos);
pos = pos + ByteUtils.SIZE_INT;
this.groupName = ByteUtils.getString(bytes, pos, groupLen);
pos = pos + groupLen;
for (int len : instances) {
byte[] replicationB = ByteUtils.getBytes(bytes, pos, len);
Replication replication = new Replication(replicationB);
this.instances.put(replication.getReplicationIndex(), replication);
pos = pos + len;
}
this.graph = ByteUtils.getString(bytes, pos, graphLen);
pos = pos + graphLen;
this.partitionIndex = ByteUtils.getInt(bytes, pos);
}

public String getGraph() {
return graph;
}
Expand Down Expand Up @@ -89,10 +61,12 @@ public static Partition valueOf(String name) {

@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null || getClass() != obj.getClass())
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Partition other = (Partition) obj;
return Objects.equals(graph, other.graph)
&& Objects.equals(partitionIndex, other.partitionIndex)
Expand All @@ -103,41 +77,4 @@ public boolean equals(Object obj) {
public String toString() {
return graph + "_" + partitionIndex;
}

@Override
public int size() {
int replicationHeapSize = 0;
for (Replication replication : instances.values()) {
replicationHeapSize = replicationHeapSize + replication.size();
}
return ByteUtils.RESERVED_BYTE_SIZE_FOR_TX
+ ByteUtils.SIZE_INT //group size
+ ByteUtils.SIZE_INT //instance count
+ ByteUtils.SIZE_INT * instances.size() //replicationHeap size
+ ByteUtils.SIZE_INT //graph size
+ this.groupName.length()
+ replicationHeapSize
+ graph.length()
+ ByteUtils.SIZE_INT; //partition index
}

@Override
public byte[] getBytes() {
int size = size();
byte[] data = new byte[size];
int pos = ByteUtils.RESERVED_BYTE_SIZE_FOR_TX;
pos = ByteUtils.putInt(data, pos, this.groupName.length());
pos = ByteUtils.putInt(data, pos, this.instances.size());
for (Replication replication : instances.values()) {
pos = ByteUtils.putInt(data, pos, replication.size());
}
pos = ByteUtils.putInt(data, pos, this.graph.length());
pos = ByteUtils.putString(data, pos, this.groupName);
for (Replication replication : instances.values()) {
pos = ByteUtils.putBytes(data, pos, replication.getBytes());
}
pos = ByteUtils.putString(data, pos, this.graph);
ByteUtils.putInt(data, pos, partitionIndex);
return data;
}
}
39 changes: 14 additions & 25 deletions core/src/main/java/com/lightgraph/graph/cluster/Replication.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import com.lightgraph.graph.constant.GraphConstant;
import com.lightgraph.graph.modules.consensus.ConsensusInstance;
import com.lightgraph.graph.cluster.node.Node;
import com.lightgraph.graph.utils.ByteUtils;

import java.util.Base64;
import java.util.Objects;

public class Replication extends ConsensusInstance<Partition> {

private Integer replicationIndex;

public Replication() {

}

public Replication(Partition partition, Integer replicationIndex, Node location) {
this(partition, replicationIndex);
this.location = location;
Expand All @@ -20,12 +24,8 @@ public Replication(Partition partition, Integer replicationIndex) {
this.group = partition;
this.replicationIndex = replicationIndex;
this.instanceName = getName();
this.description = String.format("[graph:%s,partition:%d,replication:%d]", getGraphName(), getPartitionIndex(), getReplicationIndex());
}

public Replication(byte[] bytes) {
super(bytes);
this.replicationIndex = ByteUtils.getInt(bytes, bytes.length - ByteUtils.SIZE_INT);
this.description = String.format("[graph:%s,partition:%d,replication:%d]", getGraphName(), getPartitionIndex(),
getReplicationIndex());
}

public Node getLocation() {
Expand Down Expand Up @@ -56,7 +56,9 @@ public Integer getReplicationIndex() {

public String getName() {
String g = new String(Base64.getEncoder().encode(this.group.getGraph().getBytes()));
String uniqName = g + GraphConstant.SPLIT_ARRAY_TOKEN + this.group.getPartitionIndex() + GraphConstant.SPLIT_ARRAY_TOKEN + replicationIndex;
String uniqName =
g + GraphConstant.SPLIT_ARRAY_TOKEN + this.group.getPartitionIndex() + GraphConstant.SPLIT_ARRAY_TOKEN
+ replicationIndex;
return new String(Base64.getEncoder().encode((uniqName).getBytes()));
}

Expand All @@ -73,26 +75,13 @@ public static Replication valueOf(String name) {

@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
}
ConsensusInstance other = (ConsensusInstance) obj;
return Objects.equals(this.instanceName, other.getInstanceName()) && this.getState() == other.getState();
}

@Override
public int size() {
return super.size() + ByteUtils.SIZE_INT;
}

@Override
public byte[] getBytes() {
int size = size();
byte[] data = new byte[size];
byte[] instance = super.getBytes();
System.arraycopy(instance, 0, data, 0, instance.length);
ByteUtils.putInt(data, data.length - 4, replicationIndex);
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
import java.util.concurrent.atomic.AtomicLong;

public class ClusterContext {

private static Log LOG = LogFactory.getLog(ClusterContext.class);
private volatile Node leader;
private Map<String, Node> nodes = new ConcurrentHashMap<>();
private Map<String, Map<Integer, Partition>> routting = new ConcurrentHashMap<>();
private volatile AtomicLong version = new AtomicLong(0);

public boolean isEmpty() {
if (leader == null && nodes.size() == 0 && routting.size() == 0)
if (leader == null && nodes.size() == 0 && routting.size() == 0) {
return true;
else
} else {
return false;
}
}

public Node getLeader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import com.lightgraph.graph.graph.EdgeMetaInfo;
import com.lightgraph.graph.graph.VertexMetaInfo;
import com.lightgraph.graph.meta.EdgeMeta;
import com.lightgraph.graph.meta.LabelMeta;
import com.lightgraph.graph.meta.LabelType;
import com.lightgraph.graph.meta.MetaType;
import com.lightgraph.graph.meta.VertexMeta;
import com.lightgraph.graph.meta.cluster.GraphMeta;
import com.lightgraph.graph.modules.consensus.ConsensusIO;
Expand Down Expand Up @@ -69,6 +72,11 @@ public synchronized boolean updateReplication(Replication replication) {
throw new GraphException("should not reach here!");
}

@Override
public LabelMeta getLabelMetaById(Long id, LabelType type) {
throw new GraphException("should not reach here!");
}

@Override
public GraphMeta getGraphMeta(String graph) {
throw new GraphException("should not reach here!");
Expand All @@ -79,6 +87,11 @@ public List<GraphMeta> listGraphMeta() {
throw new GraphException("should not reach here!");
}

@Override
public List<LabelMeta> listLabelMeta(String graph, MetaType metaType) {
throw new GraphException("should not reach here!");
}

@Override
public boolean addVertexMeta(VertexMetaInfo vertexMetaInfo) {
throw new GraphException("should not reach here!");
Expand Down
Loading

0 comments on commit b3c03c8

Please sign in to comment.