implements Serializable {
+
+ /**
+ * Key of this Pair
.
+ */
+ private K key;
+ /**
+ * Value of this this Pair
.
+ */
+ private V value;
+
+ /**
+ * Creates a new pair
+ *
+ * @param key The key for this pair
+ * @param value The value to use for this pair
+ */
+ public KVPair(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Gets the key for this pair.
+ *
+ * @return key for this pair
+ */
+ public K getKey() {
+ return key;
+ }
+
+ public void setKey(K key) {
+ this.key = key;
+ }
+
+ /**
+ * Gets the value for this pair.
+ *
+ * @return value for this pair
+ */
+ public V getValue() {
+ return value;
+ }
+
+ public void setValue(V value) {
+ this.value = value;
+ }
+
+ /**
+ * String
representation of this
+ * Pair
.
+ *
+ * The default name/value delimiter '=' is always used.
+ *
+ * @return String
representation of this Pair
+ */
+ @Override
+ public String toString() {
+ return key + "=" + value;
+ }
+
+ /**
+ * Generate a hash code for this Pair
.
+ *
+ * The hash code is calculated using both the name and
+ * the value of the Pair
.
+ *
+ * @return hash code for this Pair
+ */
+ @Override
+ public int hashCode() {
+ // name's hashCode is multiplied by an arbitrary prime number (13)
+ // in order to make sure there is a difference in the hashCode between
+ // these two parameters:
+ // name: a value: aa
+ // name: aa value: a
+ return key.hashCode() * 13 + (value == null ? 0 : value.hashCode());
+ }
+
+ /**
+ * Test this Pair
for equality with another
+ * Object
.
+ *
+ * If the Object
to be tested is not a
+ * Pair
or is null
, then this method
+ * returns false
.
+ *
+ * Two Pair
s are considered equal if and only if
+ * both the names and values are equal.
+ *
+ * @param o the Object
to test for
+ * equality with this Pair
+ * @return true
if the given Object
is
+ * equal to this Pair
else false
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof KVPair) {
+ KVPair pair = (KVPair) o;
+ if (!Objects.equals(key, pair.key)) {
+ return false;
+ }
+ return Objects.equals(value, pair.value);
+ }
+ return false;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDException.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDException.java
new file mode 100644
index 0000000000..b398137e82
--- /dev/null
+++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+public class PDException extends Exception {
+
+ private final int errorCode;
+
+ public PDException(int error) {
+ super(String.format("Error code = %d", error));
+ this.errorCode = error;
+ }
+
+ public PDException(int error, String msg) {
+ super(msg);
+ this.errorCode = error;
+ }
+
+ public PDException(int error, Throwable e) {
+ super(e);
+ this.errorCode = error;
+ }
+
+ public PDException(int error, String msg, Throwable e) {
+ super(msg, e);
+ this.errorCode = error;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDRuntimeException.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDRuntimeException.java
new file mode 100644
index 0000000000..0bd90241df
--- /dev/null
+++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDRuntimeException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+public class PDRuntimeException extends RuntimeException {
+
+ // public static final int LICENSE_ERROR = -11;
+
+ private int errorCode = 0;
+
+ public PDRuntimeException(int error) {
+ super(String.format("Error code = %d", error));
+ this.errorCode = error;
+ }
+
+ public PDRuntimeException(int error, String msg) {
+ super(msg);
+ this.errorCode = error;
+ }
+
+ public PDRuntimeException(int error, Throwable e) {
+ super(e);
+ this.errorCode = error;
+ }
+
+ public PDRuntimeException(int error, String msg, Throwable e) {
+ super(msg, e);
+ this.errorCode = error;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionCache.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionCache.java
new file mode 100644
index 0000000000..9bd233fd21
--- /dev/null
+++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionCache.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hugegraph.pd.grpc.Metapb;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+/**
+ * 放弃 copy on write 的方式
+ * 1. 在 graph * partition 数量极多的时候,效率严重下降,不能用
+ */
+public class PartitionCache {
+
+ // 读写锁对象
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Map locks = new HashMap<>();
+ Lock writeLock = readWriteLock.writeLock();
+ // 每张图一个缓存
+ private volatile Map> keyToPartIdCache;
+ // graphName + PartitionID 组成 key
+ private volatile Map> partitionCache;
+ private volatile Map shardGroupCache;
+ private volatile Map storeCache;
+ private volatile Map graphCache;
+
+ public PartitionCache() {
+ keyToPartIdCache = new HashMap<>();
+ partitionCache = new HashMap<>();
+ shardGroupCache = new ConcurrentHashMap<>();
+ storeCache = new ConcurrentHashMap<>();
+ graphCache = new ConcurrentHashMap<>();
+ }
+
+ private AtomicBoolean getOrCreateGraphLock(String graphName) {
+ var lock = this.locks.get(graphName);
+ if (lock == null) {
+ try {
+ writeLock.lock();
+ if ((lock = this.locks.get(graphName)) == null) {
+ lock = new AtomicBoolean();
+ locks.put(graphName, lock);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ return lock;
+ }
+
+ public void waitGraphLock(String graphName) {
+ var lock = getOrCreateGraphLock(graphName);
+ while (lock.get()) {
+ Thread.onSpinWait();
+ }
+ }
+
+ public void lockGraph(String graphName) {
+ var lock = getOrCreateGraphLock(graphName);
+ while (lock.compareAndSet(false, true)) {
+ Thread.onSpinWait();
+ }
+ }
+
+ public void unlockGraph(String graphName) {
+ var lock = getOrCreateGraphLock(graphName);
+ lock.set(false);
+ }
+
+ /**
+ * 根据 partitionId 返回分区信息
+ *
+ * @param graphName
+ * @param partId
+ * @return
+ */
+ public KVPair getPartitionById(String graphName, int partId) {
+ waitGraphLock(graphName);
+ var graphs = partitionCache.get(graphName);
+ if (graphs != null) {
+ var partition = graphs.get(partId);
+ if (partition != null) {
+ return new KVPair<>(partition, getLeaderShard(partId));
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * 返回 key 所在的分区信息
+ *
+ * @param key
+ * @return
+ */
+ public KVPair getPartitionByKey(String graphName, byte[] key) {
+ int code = PartitionUtils.calcHashcode(key);
+ return getPartitionByCode(graphName, code);
+ }
+
+ /**
+ * 根据 key 的 hashcode 返回分区信息
+ *
+ * @param graphName
+ * @param code
+ * @return
+ */
+ public KVPair getPartitionByCode(String graphName, long code) {
+ waitGraphLock(graphName);
+ RangeMap rangeMap = keyToPartIdCache.get(graphName);
+ if (rangeMap != null) {
+ Integer partId = rangeMap.get(code);
+ if (partId != null) {
+ return getPartitionById(graphName, partId);
+ }
+ }
+ return null;
+ }
+
+ public List getPartitions(String graphName) {
+ waitGraphLock(graphName);
+
+ List partitions = new ArrayList<>();
+ if (!partitionCache.containsKey(graphName)) {
+ return partitions;
+ }
+ partitionCache.get(graphName).forEach((k, v) -> {
+ partitions.add(v);
+ });
+
+ return partitions;
+ }
+
+ public boolean addPartition(String graphName, int partId, Metapb.Partition partition) {
+ waitGraphLock(graphName);
+ Metapb.Partition old = null;
+
+ if (partitionCache.containsKey(graphName)) {
+ old = partitionCache.get(graphName).get(partId);
+ }
+
+ if (old != null && old.equals(partition)) {
+ return false;
+ }
+ try {
+
+ lockGraph(graphName);
+
+ partitionCache.computeIfAbsent(graphName, k -> new HashMap<>()).put(partId, partition);
+
+ if (old != null) {
+ // old [1-3) 被 [2-3) 覆盖了。当 [1-3) 变成 [1-2) 不应该删除原先的 [1-3)
+ // 当确认老的 start, end 都是自己的时候,才可以删除老的。(即还没覆盖)
+ var graphRange = keyToPartIdCache.get(graphName);
+ if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) &&
+ Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) {
+ graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey());
+ }
+ }
+
+ keyToPartIdCache.computeIfAbsent(graphName, k -> TreeRangeMap.create())
+ .put(Range.closedOpen(partition.getStartKey(),
+ partition.getEndKey()), partId);
+ } finally {
+ unlockGraph(graphName);
+ }
+ return true;
+ }
+
+ public void updatePartition(String graphName, int partId, Metapb.Partition partition) {
+ try {
+ lockGraph(graphName);
+ Metapb.Partition old = null;
+ var graphs = partitionCache.get(graphName);
+ if (graphs != null) {
+ old = graphs.get(partId);
+ }
+
+ if (old != null) {
+ var graphRange = keyToPartIdCache.get(graphName);
+ if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) &&
+ Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) {
+ graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey());
+ }
+ }
+
+ partitionCache.computeIfAbsent(graphName, k -> new HashMap<>()).put(partId, partition);
+ keyToPartIdCache.computeIfAbsent(graphName, k -> TreeRangeMap.create())
+ .put(Range.closedOpen(partition.getStartKey(), partition.getEndKey()),
+ partId);
+ } finally {
+ unlockGraph(graphName);
+ }
+ }
+
+ public boolean updatePartition(Metapb.Partition partition) {
+
+ var graphName = partition.getGraphName();
+ var partitionId = partition.getId();
+
+ var old = getPartitionById(graphName, partitionId);
+ if (old != null && Objects.equals(partition, old.getKey())) {
+ return false;
+ }
+
+ updatePartition(graphName, partitionId, partition);
+ return true;
+ }
+
+ public void removePartition(String graphName, int partId) {
+ try {
+ lockGraph(graphName);
+ var partition = partitionCache.get(graphName).remove(partId);
+ if (partition != null) {
+ var graphRange = keyToPartIdCache.get(graphName);
+
+ if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) &&
+ Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) {
+ graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey());
+ }
+ }
+ } finally {
+ unlockGraph(graphName);
+ }
+ }
+
+ /**
+ * remove partition id of graph name
+ *
+ * @param graphName
+ * @param id
+ */
+ public void remove(String graphName, int id) {
+ removePartition(graphName, id);
+ }
+
+ /**
+ * remove all partitions
+ */
+ public void removePartitions() {
+ writeLock.lock();
+ try {
+ partitionCache = new HashMap<>();
+ keyToPartIdCache = new HashMap<>();
+ locks.clear();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * remove partition cache of graphName
+ *
+ * @param graphName
+ */
+ public void removeAll(String graphName) {
+ try {
+ lockGraph(graphName);
+ partitionCache.remove(graphName);
+ keyToPartIdCache.remove(graphName);
+ locks.remove(graphName);
+ } finally {
+ unlockGraph(graphName);
+ }
+ }
+
+ private String makePartitionKey(String graphName, int partId) {
+ return graphName + "/" + partId;
+ }
+
+ public boolean updateShardGroup(Metapb.ShardGroup shardGroup) {
+ Metapb.ShardGroup oldShardGroup = shardGroupCache.get(shardGroup.getId());
+ if (oldShardGroup != null && oldShardGroup.equals(shardGroup)) {
+ return false;
+ }
+ shardGroupCache.put(shardGroup.getId(), shardGroup);
+ return true;
+ }
+
+ public void deleteShardGroup(int shardGroupId) {
+ shardGroupCache.remove(shardGroupId);
+ }
+
+ public Metapb.ShardGroup getShardGroup(int groupId) {
+ return shardGroupCache.get(groupId);
+ }
+
+ public boolean addStore(Long storeId, Metapb.Store store) {
+ Metapb.Store oldStore = storeCache.get(storeId);
+ if (oldStore != null && oldStore.equals(store)) {
+ return false;
+ }
+ storeCache.put(storeId, store);
+ return true;
+ }
+
+ public Metapb.Store getStoreById(Long storeId) {
+ return storeCache.get(storeId);
+ }
+
+ public void removeStore(Long storeId) {
+ storeCache.remove(storeId);
+ }
+
+ public boolean hasGraph(String graphName) {
+ return getPartitions(graphName).size() > 0;
+ }
+
+ public void updateGraph(Metapb.Graph graph) {
+ if (Objects.equals(graph, getGraph(graph.getGraphName()))) {
+ return;
+ }
+ graphCache.put(graph.getGraphName(), graph);
+ }
+
+ public Metapb.Graph getGraph(String graphName) {
+ return graphCache.get(graphName);
+ }
+
+ public List getGraphs() {
+ List graphs = new ArrayList<>();
+ graphCache.forEach((k, v) -> {
+ graphs.add(v);
+ });
+ return graphs;
+ }
+
+ public void reset() {
+ writeLock.lock();
+ try {
+ partitionCache = new HashMap<>();
+ keyToPartIdCache = new HashMap<>();
+ shardGroupCache = new ConcurrentHashMap<>();
+ storeCache = new ConcurrentHashMap<>();
+ graphCache = new ConcurrentHashMap<>();
+ locks.clear();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void clear() {
+ reset();
+ }
+
+ public String debugCacheByGraphName(String graphName) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Graph:").append(graphName).append(", cache info: range info: {");
+ var rangeMap = keyToPartIdCache.get(graphName);
+ builder.append(rangeMap == null ? "" : rangeMap).append("}");
+
+ if (rangeMap != null) {
+ builder.append(", partition info : {");
+ rangeMap.asMapOfRanges().forEach((k, v) -> {
+ var partition = partitionCache.get(graphName).get(v);
+ builder.append("[part_id:").append(v);
+ if (partition != null) {
+ builder.append(", start_key:").append(partition.getStartKey())
+ .append(", end_key:").append(partition.getEndKey())
+ .append(", state:").append(partition.getState().name());
+ }
+ builder.append("], ");
+ });
+ builder.append("}");
+ }
+
+ builder.append(", graph info:{");
+ var graph = graphCache.get(graphName);
+ if (graph != null) {
+ builder.append("partition_count:").append(graph.getPartitionCount())
+ .append(", state:").append(graph.getState().name());
+ }
+ builder.append("}]");
+ return builder.toString();
+ }
+
+ public Metapb.Shard getLeaderShard(int partitionId) {
+ var shardGroup = shardGroupCache.get(partitionId);
+ if (shardGroup != null) {
+ for (Metapb.Shard shard : shardGroup.getShardsList()) {
+ if (shard.getRole() == Metapb.ShardRole.Leader) {
+ return shard;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public void updateShardGroupLeader(int partitionId, Metapb.Shard leader) {
+ if (shardGroupCache.containsKey(partitionId) && leader != null) {
+ if (!Objects.equals(getLeaderShard(partitionId), leader)) {
+ var shardGroup = shardGroupCache.get(partitionId);
+ var builder = Metapb.ShardGroup.newBuilder(shardGroup).clearShards();
+ for (var shard : shardGroup.getShardsList()) {
+ builder.addShards(
+ Metapb.Shard.newBuilder()
+ .setStoreId(shard.getStoreId())
+ .setRole(shard.getStoreId() == leader.getStoreId() ?
+ Metapb.ShardRole.Leader :
+ Metapb.ShardRole.Follower)
+ .build()
+ );
+ }
+ shardGroupCache.put(partitionId, builder.build());
+ }
+ }
+ }
+
+ public String debugShardGroup() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("shard group cache:{");
+ shardGroupCache.forEach((partitionId, shardGroup) -> {
+ builder.append(partitionId).append("::{")
+ .append("version:").append(shardGroup.getVersion())
+ .append(", conf_version:").append(shardGroup.getConfVer())
+ .append(", state:").append(shardGroup.getState().name())
+ .append(", shards:[");
+
+ for (var shard : shardGroup.getShardsList()) {
+ builder.append("{store_id:").append(shard.getStoreId())
+ .append(", role:").append(shard.getRole().name())
+ .append("},");
+ }
+ builder.append("], ");
+ });
+ builder.append("}");
+ return builder.toString();
+ }
+}
diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionUtils.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionUtils.java
new file mode 100644
index 0000000000..0e35cc555e
--- /dev/null
+++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+public class PartitionUtils {
+
+ public static final int MAX_VALUE = 0xffff;
+
+ /**
+ * 计算key的hashcode
+ *
+ * @param key
+ * @return hashcode
+ */
+ public static int calcHashcode(byte[] key) {
+ final int p = 16777619;
+ int hash = (int) 2166136261L;
+ for (byte element : key) {
+ hash = (hash ^ element) * p;
+ }
+ hash += hash << 13;
+ hash ^= hash >> 7;
+ hash += hash << 3;
+ hash ^= hash >> 17;
+ hash += hash << 5;
+ hash = hash & PartitionUtils.MAX_VALUE;
+ if (hash == PartitionUtils.MAX_VALUE) {
+ hash = PartitionUtils.MAX_VALUE - 1;
+ }
+ return hash;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-grpc/pom.xml b/hugegraph-pd/hg-pd-grpc/pom.xml
new file mode 100644
index 0000000000..cef49e957d
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/pom.xml
@@ -0,0 +1,138 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.apache.hugegraph
+ hugegraph-pd
+ ${revision}
+ ../pom.xml
+
+ hg-pd-grpc
+
+
+
+ 1.6.0
+ 1.39.0
+ 3.17.2
+ 0.6.1
+
+
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+ javax.annotation
+ javax.annotation-api
+ 1.3.2
+
+
+
+
+ ${basedir}/src/main/java
+
+
+ src/main/resources
+
+
+ src/main/proto
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ ${os.plugin.version}
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ ${protobuf.plugin.version}
+ true
+
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+
+ grpc-java
+
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+
+ ${project.basedir}/src/main/proto
+
+
+ ${project.basedir}/src/main/java
+
+ false
+
+
+
+
+
+ generate-sources
+
+
+ compile
+
+ compile-custom
+
+
+
+
+
+ maven-clean-plugin
+
+
+
+ src/main/java
+
+
+
+
+
+
+ initialize
+
+ clean
+
+
+
+
+
+
+
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/discovery.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/discovery.proto
new file mode 100644
index 0000000000..b434ab0e86
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/discovery.proto
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package discovery;
+import "pdpb.proto";
+
+option java_package = "org.apache.hugegraph.pd.grpc.discovery";
+option java_multiple_files = true;
+
+
+service DiscoveryService {
+ rpc register(NodeInfo) returns (RegisterInfo);
+ rpc getNodes(Query) returns (NodeInfos);
+ // rpc getNodesByLabel(Conditions) returns (NodeInfos);
+}
+
+/* requests */
+message NodeInfo {
+ string id = 1;
+ string appName = 2;
+ string version = 3;
+ string address = 4;
+ int64 interval = 5;
+ map labels = 6;
+}
+message Query {
+ string appName = 1;
+ string version = 2;
+ map labels = 3;
+}
+message LeaseInfo {
+ int64 registrationTs = 1;
+ int64 lastHeartbeatTs = 2;
+ int64 serverUpTs = 3;
+}
+message RegisterInfo {
+ NodeInfo nodeInfo = 1;
+ LeaseInfo leaseInfo = 2 ;
+ RegisterType type = 3 ;
+ pdpb.ResponseHeader header = 4;
+}
+enum RegisterType {
+ Register = 0;
+ Heartbeat = 1;
+ Dislodge = 2;
+}
+//message Condition{
+// string label = 1;
+//}
+//message Conditions{
+// string label = 1;
+// string value = 2;
+//}
+message NodeInfos{
+ repeated NodeInfo info = 1;
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/kv.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/kv.proto
new file mode 100644
index 0000000000..22007cda31
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/kv.proto
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package kv;
+import "pdpb.proto";
+import "metapb.proto";
+
+option java_package = "org.apache.hugegraph.pd.grpc.kv";
+option java_multiple_files = true;
+
+
+service KvService {
+ rpc put(Kv) returns (KvResponse);
+ rpc get(K) returns (KResponse);
+ rpc delete(K) returns (KvResponse);
+ rpc deletePrefix(K) returns (KvResponse);
+ rpc scanPrefix(K) returns (ScanPrefixResponse);
+ rpc watch(WatchRequest) returns (stream WatchResponse);
+ rpc watchPrefix(WatchRequest) returns (stream WatchResponse);
+ rpc lock(LockRequest) returns (LockResponse);
+ rpc lockWithoutReentrant(LockRequest) returns (LockResponse);
+ rpc unlock(LockRequest) returns (LockResponse);
+ rpc keepAlive(LockRequest) returns (LockResponse);
+ rpc isLocked(LockRequest) returns (LockResponse);
+ rpc putTTL(TTLRequest) returns (TTLResponse);
+ rpc keepTTLAlive(TTLRequest) returns (TTLResponse);
+}
+
+/* requests */
+message Kv {
+ pdpb.RequestHeader header = 1;
+ string key = 2;
+ string value = 3;
+}
+message KvResponse {
+ pdpb.ResponseHeader header = 1;
+}
+
+message K{
+ pdpb.RequestHeader header = 1;
+ string key = 2;
+}
+
+message KResponse{
+ pdpb.ResponseHeader header = 1;
+ string value = 2;
+}
+
+message ScanPrefixResponse {
+ pdpb.ResponseHeader header = 1;
+ map kvs = 2;
+}
+
+message LockRequest{
+ pdpb.RequestHeader header = 1;
+ string key = 2;
+ int64 ttl = 3;
+ int64 clientId = 4;
+}
+message LockResponse{
+ pdpb.ResponseHeader header = 1;
+ string key = 2;
+ int64 ttl = 3;
+ int64 clientId = 4;
+ bool succeed = 5;
+}
+
+message LockAliveResponse{
+ pdpb.ResponseHeader header = 1;
+ int64 clientId = 2;
+}
+
+
+message WatchKv {
+ string key = 1;
+ string value = 2;
+}
+
+enum WatchType {
+ Put = 0;
+ Delete = 1;
+ Unrecognized = 2;
+}
+
+message WatchEvent {
+ WatchKv current = 1;
+ WatchKv prev = 2;
+ WatchType type = 3;
+}
+
+message WatchResponse {
+ pdpb.ResponseHeader header = 1;
+ repeated WatchEvent events = 2;
+ int64 clientId = 3;
+ WatchState state = 4;
+}
+
+enum WatchState {
+ Starting = 0;
+ Started = 1;
+ Leader_Changed = 2;
+ Alive = 3;
+}
+
+message WatchRequest {
+ pdpb.RequestHeader header = 1;
+ WatchState state = 2;
+ string key = 3;
+ int64 clientId = 4;
+}
+
+message V{
+ string value = 1;
+ int64 ttl = 2;
+ int64 st = 3;
+}
+
+message TTLRequest{
+ pdpb.RequestHeader header = 1;
+ string key = 2;
+ string value = 3;
+ int64 ttl = 4;
+}
+
+message TTLResponse{
+ pdpb.ResponseHeader header = 1;
+ bool succeed = 2;
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto
new file mode 100644
index 0000000000..c4bb8bde10
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/metaTask.proto
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package metaTask;
+import "metapb.proto";
+import "pd_pulse.proto";
+option java_package = "org.apache.hugegraph.pd.grpc";
+
+enum TaskType {
+ Unknown = 0;
+ Split_Partition = 1;
+ Change_Shard = 2;
+ Move_Partition = 3;
+ Clean_Partition = 4;
+ Change_KeyRange = 5;
+}
+
+// 一条任务信息
+message Task {
+ uint64 id = 1;
+ TaskType type = 2;
+ TaskState state = 3;
+ int64 start_timestamp = 4;
+ metapb.Partition partition = 5;
+ string message = 6;
+ //每个shard执行的任务状态
+ repeated ShardTaskState shardState = 7;
+ ChangeShard changeShard = 9;
+ SplitPartition splitPartition = 10;
+ MovePartition movePartition = 11;
+ CleanPartition cleanPartition = 12;
+ PartitionKeyRange partitionKeyRange = 13;
+}
+
+enum TaskState{
+ Task_Unknown = 0;
+ Task_Ready = 1; //任务就绪
+ Task_Doing = 2; //执行中
+ Task_Done = 3; //完成
+ Task_Exit = 4; //退出
+ Task_Stop = 10;
+ Task_Success = 11;
+ Task_Failure = 12;
+}
+
+message ShardTaskState{
+ uint64 store_id = 1;
+ TaskState state = 2;
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto
new file mode 100644
index 0000000000..a8a695be04
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/metapb.proto
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package metapb;
+option java_package = "org.apache.hugegraph.pd.grpc";
+import "google/protobuf/any.proto";
+
+enum ClusterState{
+ // 集群健康
+ Cluster_OK = 0;
+ // 分区警告,存在部分故障节点,短时间不影响读写
+ Cluster_Warn = 2;
+ // 分区下线,可以读,无法写
+ Cluster_Offline = 10;
+ // 分区故障,无法读写,需要尽快修复故障节点。
+ Cluster_Fault = 11;
+ Cluster_Not_Ready = -1;
+}
+// 集群状态
+message ClusterStats{
+ ClusterState state = 1;
+ string message = 2;
+ uint64 timestamp = 16;
+}
+
+enum StoreState {
+ Unknown = 0;
+ // 未激活
+ Pending = 4;
+ // 在线
+ Up = 1;
+ // 离线
+ Offline = 2;
+ // 下线中
+ Exiting = 5;
+ // 已下线
+ Tombstone = 3;
+}
+
+// Store label for Storage grouping.
+message StoreLabel {
+ string key = 1;
+ string value = 2;
+}
+
+message Store {
+ uint64 id = 1;
+ // Address to handle client requests
+ string address = 2;
+ string raft_address = 3;
+ repeated StoreLabel labels = 4;
+ // Store软件版本号
+ string version = 5;
+ StoreState state = 6;
+ // The start timestamp of the current store
+ int64 start_timestamp = 7;
+ string deploy_path = 8;
+ // The last heartbeat timestamp of the store.
+ int64 last_heartbeat = 9;
+ StoreStats stats = 10;
+ // 数据格式版本号
+ int32 data_version = 11;
+ int32 cores = 12;
+ string data_path = 13;
+}
+
+enum ShardRole {
+ None = 0;
+ Leader = 1;
+ Follower = 2;
+ // Learner/None -> Learner
+ Learner = 3;
+}
+
+message Shard {
+ uint64 store_id = 2;
+ ShardRole role = 3;
+}
+
+message ShardGroup{
+ uint32 id = 1;
+ uint64 version = 2;
+ uint64 conf_ver = 3;
+ repeated Shard shards = 6;
+ PartitionState state = 10;
+ string message = 11;
+}
+
+message Graph {
+ string graph_name = 2;
+ // 分区数量,0表示无效,不能大于raft分组总数
+ int32 partition_count = 3;
+ // 当前工作状态
+ PartitionState state = 10;
+ string message = 11;
+ GraphState graph_state = 12;
+}
+// 分区工作状态
+enum PartitionState{
+ PState_None = 0;
+ //
+ PState_Normal = 1;
+ // 分区警告,存在部分故障节点,短时间不影响读写
+ PState_Warn = 2;
+ // 分区下线,可以读,无法写
+ PState_Offline = 10;
+ // 分区故障,无法读写,需要尽快修复故障节点。
+ PState_Fault = 11;
+}
+
+message PartitionV36 {
+ uint32 id = 1;
+ string graph_name = 3;
+ // 分区范围 [start_key, end_key).
+ uint64 start_key = 4;
+ uint64 end_key = 5;
+ repeated Shard shards = 6;
+ // Leader任期,leader切换后递增
+ uint64 version = 7;
+ // shards版本号,每次改变后递增
+ uint64 conf_ver = 8;
+ // 当前工作状态
+ PartitionState state = 10;
+ string message = 11;
+}
+
+message Partition {
+ uint32 id = 1;
+ string graph_name = 3;
+ // 分区范围 [start_key, end_key).
+ uint64 start_key = 4;
+ uint64 end_key = 5;
+ // Partition 对象不在保存 shard list(根据对应的shard group 去查询), version 和 conf version不再有实际的意义
+ // repeated Shard shards = 6;
+ // key range 每次改变后递增
+ uint64 version = 7;
+ // shards版本号,每次改变后递增
+ // uint64 conf_ver = 8;
+ // 当前工作状态
+ PartitionState state = 10;
+ string message = 11;
+}
+
+message PartitionShard {
+ metapb.Partition partition = 1;
+ metapb.Shard leader = 2;
+ // 离线的Shard
+ repeated metapb.Shard offline_shards = 3;
+}
+// 记录分区所在的存储位置
+message PartitionStore {
+ uint32 partition_id = 1;
+ string graph_name = 3;
+ // 存储位置
+ string store_location = 4;
+}
+
+message PartitionRaft {
+ uint32 partition_id = 1;
+ string graph_name = 3;
+ // 存储位置
+ string raft_location = 4;
+}
+
+message ShardStats{
+ uint64 store_id = 2;
+ ShardRole role = 3;
+ ShardState state = 4;
+ // 安装快照的进度
+ uint32 progress = 5;
+}
+message PartitionStats{
+ uint32 id = 1;
+ // raft分组的任期.
+ uint64 leader_term = 2;
+ repeated string graph_name = 3;
+ metapb.Shard leader = 4;
+ // 离线 shards
+ repeated metapb.Shard shard = 5;
+ repeated metapb.Shard learner = 6;
+ uint64 conf_ver = 7;
+ // 分区状态
+ PartitionState state = 8;
+ repeated ShardStats shardStats = 9;
+ // 分区近似大小
+ uint64 approximate_size = 10;
+ // 分区key的近似数量
+ uint64 approximate_keys = 13;
+ // heartbeat timestamp
+ int64 timestamp = 16;
+}
+
+message GraphStats{
+ // 图名
+ string graph_name = 1;
+ // 分区近似大小
+ uint64 approximate_size = 2;
+ // 分区key的近似数量
+ uint64 approximate_keys = 3;
+ // // committed index
+ // uint64 committed_index = 4;
+ uint32 partition_id = 5;
+ ShardRole role = 6;
+ // 当前工作状态
+ PartitionState work_state = 8;
+}
+
+message RaftStats {
+ // partition id
+ uint32 partition_id = 1;
+ // committed index
+ uint64 committed_index = 2;
+}
+
+message TimeInterval {
+ // The unix timestamp in seconds of the start of this period.
+ uint64 start_timestamp = 1;
+ // The unix timestamp in seconds of the end of this period.
+ uint64 end_timestamp = 2;
+}
+
+message RecordPair {
+ string key = 1;
+ uint64 value = 2;
+}
+
+
+message QueryStats {
+ uint64 GC = 1;
+ uint64 Get = 2;
+ uint64 Scan = 3;
+ uint64 Coprocessor = 4;
+ uint64 Delete = 5;
+ uint64 DeleteRange = 6;
+ uint64 Put = 7;
+}
+
+enum ShardState{
+ SState_None = 0;
+ // 正常
+ SState_Normal = 1;
+ // 安装快照
+ SState_Snapshot = 2;
+ // 离线
+ SState_Offline = 10;
+}
+
+
+message StoreStats {
+ uint64 store_id = 1;
+ // Capacity for the store.
+ uint64 capacity = 2;
+ // Available size for the store.
+ uint64 available = 3;
+ // Total partition count in this store.
+ uint32 partition_count = 4;
+ // Current sending snapshot count.
+ uint32 sending_snap_count = 5;
+ // Current receiving snapshot count.
+ uint32 receiving_snap_count = 6;
+ // When the store is started (unix timestamp in seconds).
+ uint32 start_time = 7;
+ // How many partition is applying snapshot.
+ uint32 applying_snap_count = 8;
+ // If the store is busy
+ bool is_busy = 9;
+ // Actually used space by db
+ uint64 used_size = 10;
+ // Bytes written for the store during this period.
+ uint64 bytes_written = 11;
+ // Keys written for the store during this period.
+ uint64 keys_written = 12;
+ // Bytes read for the store during this period.
+ uint64 bytes_read = 13;
+ // Keys read for the store during this period.
+ uint64 keys_read = 14;
+ // Actually reported time interval
+ TimeInterval interval = 15;
+ // Threads' CPU usages in the store
+ repeated RecordPair cpu_usages = 16;
+ // Threads' read disk I/O rates in the store
+ repeated RecordPair read_io_rates = 17;
+ // Threads' write disk I/O rates in the store
+ repeated RecordPair write_io_rates = 18;
+ // Operations' latencies in the store
+ repeated RecordPair op_latencies = 19;
+ // Store query stats
+ QueryStats query_stats = 21;
+ // graph stats
+ repeated GraphStats graph_stats = 22;
+ // raft stats
+ repeated RaftStats raft_stats = 23;
+ int32 cores = 24;
+ // system metrics
+ repeated RecordPair system_metrics = 25;
+}
+
+// 分区查询条件
+message PartitionQuery{
+ optional uint64 store_id = 1; // 0 表示查询条件不包含store_id
+ optional string graph_name = 2;
+ optional uint32 partition_id = 4;
+}
+
+//PD 节点信息
+message Member {
+ uint64 cluster_id = 1;
+ string raft_url = 3;
+ string grpc_url = 4;
+ string rest_url = 5;
+ string data_path = 6;
+ StoreState state = 7;
+ ShardRole role = 8;
+ string replicator_state = 9;
+}
+
+// 图空间配置
+message GraphSpace{
+ string name = 1;
+ // 最大占用存储
+ uint64 storage_limit = 2;
+ // 已使用空间
+ uint64 used_size = 3;
+ // 修改时间
+ uint64 timestamp = 10;
+}
+
+// PD 配置
+message PDConfig{
+ uint64 version = 1;
+ // 分区数量, 初始化根据Store数量动态计算,分裂后进行修改
+ int32 partition_count = 2;
+ // 每分区副本数量
+ int32 shard_count = 3;
+ // pd集群列表
+ string peers_list = 4;
+ // 集群中最少store数量
+ int32 min_store_count = 6;
+ // 每个store最大副本数
+ int32 max_Shards_Per_Store = 7;
+ // 修改时间
+ uint64 timestamp = 10;
+}
+
+
+
+//消息持久化
+message QueueItem{
+ string item_id = 1;
+ string item_class = 2;
+ bytes item_content = 3;
+ int64 timestamp = 10;
+}
+
+message LogRecord{
+ string action = 1;
+ int64 timestamp = 2;
+ map labels = 3;
+ google.protobuf.Any object = 4;
+ string message = 5;
+}
+
+message GraphState{
+ GraphMode mode = 1;
+ GraphModeReason reason = 2;
+}
+
+enum GraphMode{
+ ReadWrite = 0;
+ ReadOnly = 1;
+ WriteOnly = 2;
+}
+
+enum GraphModeReason{
+ Empty = 0; // 空
+ Initiative = 1; // 主动的状态设置
+ Quota = 2; // 达到限额条件
+
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_common.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_common.proto
new file mode 100644
index 0000000000..c9eec81494
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_common.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.hugegraph.pd.grpc.common";
+option java_outer_classname = "HgPdCommonProto";
+
+message RequestHeader {
+ // 集群 ID.
+ uint64 cluster_id = 1;
+ // 发送者 ID.
+ uint64 sender_id = 2;
+}
+
+message ResponseHeader {
+ // cluster_id is the ID of the cluster which sent the response.
+ uint64 cluster_id = 1;
+ Error error = 2;
+}
+
+enum ErrorType {
+ OK = 0;
+ UNKNOWN = 1;
+ STORE_NON_EXIST = 101;
+ STORE_TOMBSTONE = 103;
+ ALREADY_BOOTSTRAPPED = 4;
+ INCOMPATIBLE_VERSION = 5;
+ PARTITION_NOT_FOUND = 6;
+
+ ETCD_READ_ERROR = 1000;
+ ETCD_WRITE_ERROR = 1001;
+}
+
+message Error {
+ ErrorType type = 1;
+ string message = 2;
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto
new file mode 100644
index 0000000000..fb8940df6c
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_pulse.proto
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "metapb.proto";
+import "pd_common.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.hugegraph.pd.grpc.pulse";
+option java_outer_classname = "HgPdPulseProto";
+
+service HgPdPulse {
+ rpc Pulse(stream PulseRequest) returns (stream PulseResponse);
+}
+
+/* requests */
+message PulseRequest {
+ PulseCreateRequest create_request = 1;
+ PulseCancelRequest cancel_request = 2;
+ PulseNoticeRequest notice_request = 3;
+ PulseAckRequest ack_request = 4;
+}
+
+message PulseCreateRequest {
+ PulseType pulse_type = 1;
+}
+
+message PulseCancelRequest {
+ int64 observer_id = 1;
+}
+
+message PulseNoticeRequest {
+ int64 observer_id = 1;
+ oneof request_union {
+ PartitionHeartbeatRequest partition_heartbeat_request = 10;
+ }
+}
+
+message PulseAckRequest {
+ int64 observer_id = 1;
+ int64 notice_id = 2;
+}
+
+// 分区心跳,分区的peer增减、leader改变等事件发生时,由leader发送心跳。
+// 同时pd对分区进行shard增减通过Response发送给leader
+message PartitionHeartbeatRequest {
+ RequestHeader header = 1;
+ // Leader Peer sending the heartbeat
+ metapb.PartitionStats states = 4;
+}
+
+/* responses */
+message PulseResponse {
+ PulseType pulse_type = 1;
+ int64 observer_id = 2;
+ int32 status = 3; //0=ok,1=fail
+ int64 notice_id = 4;
+ oneof response_union {
+ PartitionHeartbeatResponse partition_heartbeat_response = 10;
+ PdInstructionResponse instruction_response = 11;
+ }
+}
+
+message PartitionHeartbeatResponse {
+ ResponseHeader header = 1;
+ uint64 id = 3;
+ metapb.Partition partition = 2;
+ ChangeShard change_shard = 4;
+
+ TransferLeader transfer_leader = 5;
+ // 拆分成多个分区,第一个SplitPartition是原分区,从第二开始是新分区
+ SplitPartition split_partition = 6;
+ // rocksdb compaction 指定的表,null是针对所有
+ DbCompaction db_compaction = 7;
+ // 将partition的数据,迁移到 target
+ MovePartition move_partition = 8;
+ // 清理partition的graph的数据
+ CleanPartition clean_partition = 9;
+ // partition key range 变化
+ PartitionKeyRange key_range = 10;
+}
+
+/* Date model */
+message ChangeShard {
+ repeated metapb.Shard shard = 1;
+ ConfChangeType change_type = 2;
+}
+
+message TransferLeader {
+ metapb.Shard shard = 1;
+}
+
+message SplitPartition {
+ repeated metapb.Partition new_partition = 1;
+}
+
+message DbCompaction {
+ string table_name = 3;
+}
+
+message MovePartition{
+ // target partition的key range为,迁移后的新range
+ metapb.Partition target_partition = 1;
+ // partition 的 key start 和 key end的所有数据,
+ // 会迁移到 target partition 上
+ uint64 key_start = 2;
+ uint64 key_end = 3;
+}
+
+message CleanPartition {
+ uint64 key_start = 1;
+ uint64 key_end = 2;
+ CleanType clean_type = 3;
+ bool delete_partition = 4; //是否删除分区
+}
+
+message PartitionKeyRange{
+ uint32 partition_id = 1;
+ uint64 key_start = 2;
+ uint64 key_end = 3;
+}
+
+message PdInstructionResponse {
+ PdInstructionType instruction_type = 1;
+ string leader_ip = 2;
+}
+
+/* enums */
+enum PulseType {
+ PULSE_TYPE_UNKNOWN = 0;
+ PULSE_TYPE_PARTITION_HEARTBEAT = 1;
+ PULSE_TYPE_PD_INSTRUCTION = 2;
+}
+
+enum PulseChangeType {
+ PULSE_CHANGE_TYPE_UNKNOWN = 0;
+ PULSE_CHANGE_TYPE_ADD = 1;
+ PULSE_CHANGE_TYPE_ALTER = 2;
+ PULSE_CHANGE_TYPE_DEL = 3;
+}
+
+enum ConfChangeType {
+ CONF_CHANGE_TYPE_UNKNOWN = 0;
+ CONF_CHANGE_TYPE_ADD_NODE = 1;
+ CONF_CHANGE_TYPE_REMOVE_NODE = 2;
+ CONF_CHANGE_TYPE_ADD_LEARNER_NODE = 3;
+ CONF_CHANGE_TYPE_ADJUST = 4; // 调整shard,leader根据新的配置动态增减。
+}
+
+enum CleanType {
+ CLEAN_TYPE_KEEP_RANGE = 0; // 仅保留这个range
+ CLEAN_TYPE_EXCLUDE_RANGE = 1; // 删除这个range
+}
+
+enum PdInstructionType {
+ CHANGE_TO_FOLLOWER = 0;
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_watch.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_watch.proto
new file mode 100644
index 0000000000..febc41f522
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/pd_watch.proto
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "metapb.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.hugegraph.pd.grpc.watch";
+option java_outer_classname = "HgPdWatchProto";
+
+service HgPdWatch {
+ rpc Watch(stream WatchRequest) returns (stream WatchResponse);
+}
+
+message WatchRequest {
+ WatchCreateRequest create_request = 1;
+ WatchCancelRequest cancel_request = 2;
+}
+
+message WatchCreateRequest {
+ WatchType watch_type = 1;
+}
+
+message WatchCancelRequest {
+ int64 watcher_id = 1;
+}
+
+message WatchResponse {
+ WatchType watch_type = 1;
+ int64 watcher_id = 2;
+ int32 status = 3; //0=ok,1=fail
+ int64 notice_id = 4;
+ string msg = 5;
+ oneof response_union {
+ WatchPartitionResponse partition_response = 10;
+ WatchNodeResponse node_response = 11;
+ WatchGraphResponse graph_response = 12;
+ WatchShardGroupResponse shard_group_response = 13;
+ }
+}
+
+message WatchPartitionResponse {
+ string graph = 1;
+ int32 partition_id = 2;
+ WatchChangeType change_type = 3;
+}
+
+message WatchNodeResponse {
+ string graph = 1;
+ uint64 node_id = 2;
+ NodeEventType node_event_type = 3;
+}
+
+message WatchGraphResponse {
+ metapb.Graph graph = 1;
+ WatchType type = 2;
+}
+
+message WatchShardGroupResponse {
+ metapb.ShardGroup shard_group = 1;
+ WatchChangeType type = 2;
+ int32 shard_group_id = 3;
+}
+
+enum WatchType {
+ WATCH_TYPE_UNKNOWN = 0;
+ WATCH_TYPE_PARTITION_CHANGE = 1;
+ WATCH_TYPE_STORE_NODE_CHANGE = 2;
+ WATCH_TYPE_GRAPH_CHANGE = 3;
+ WATCH_TYPE_SHARD_GROUP_CHANGE = 4;
+}
+
+enum WatchChangeType {
+ WATCH_CHANGE_TYPE_UNKNOWN = 0;
+ WATCH_CHANGE_TYPE_ADD = 1;
+ WATCH_CHANGE_TYPE_ALTER = 2;
+ WATCH_CHANGE_TYPE_DEL = 3;
+ WATCH_CHANGE_TYPE_SPECIAL1 = 4;
+}
+
+enum NodeEventType {
+ NODE_EVENT_TYPE_UNKNOWN = 0;
+ NODE_EVENT_TYPE_NODE_ONLINE = 1;
+ NODE_EVENT_TYPE_NODE_OFFLINE = 2;
+ NODE_EVENT_TYPE_NODE_RAFT_CHANGE = 3;
+ // pd leader 变更
+ NODE_EVENT_TYPE_PD_LEADER_CHANGE = 4;
+}
diff --git a/hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto b/hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto
new file mode 100644
index 0000000000..4e293ca08e
--- /dev/null
+++ b/hugegraph-pd/hg-pd-grpc/src/main/proto/pdpb.proto
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package pdpb;
+
+import "metapb.proto";
+import "metaTask.proto";
+
+option java_package = "org.apache.hugegraph.pd.grpc";
+
+service PD {
+ // 注册store,首次注册会生成新的store_id, store_id是store唯一标识
+ rpc RegisterStore(RegisterStoreRequest) returns (RegisterStoreResponse) {}
+ rpc GetStore(GetStoreRequest) returns (GetStoreResponse) {}
+ // 修改Store状态等信息.
+ rpc SetStore(SetStoreRequest) returns (SetStoreResponse) {}
+ // 根据可以查找所属分区
+ rpc DelStore(DetStoreRequest) returns (DetStoreResponse) {}
+ rpc GetAllStores(GetAllStoresRequest) returns (GetAllStoresResponse) {}
+ rpc StoreHeartbeat(StoreHeartbeatRequest) returns (StoreHeartbeatResponse) {}
+
+ // 根据可以查找所属分区
+ rpc GetPartition(GetPartitionRequest) returns (GetPartitionResponse) {}
+
+ // 根据HashCode查找所属分区
+ rpc GetPartitionByCode(GetPartitionByCodeRequest) returns (GetPartitionResponse) {}
+ // 根据PartitionID返回分区
+ rpc GetPartitionByID(GetPartitionByIDRequest) returns (GetPartitionResponse) {}
+ rpc ScanPartitions(ScanPartitionsRequest) returns (ScanPartitionsResponse) {}
+ // 更新分区信息,主要用来更新分区key范围,调用此接口需谨慎,否则会造成数据丢失。
+ rpc UpdatePartition(UpdatePartitionRequest) returns (UpdatePartitionResponse) {}
+ // 根据可以查找所属分区
+ rpc DelPartition(DelPartitionRequest) returns (DelPartitionResponse) {}
+ // 根据条件查询分区信息, 包括Store、Graph等条件
+ rpc QueryPartitions(QueryPartitionsRequest) returns (QueryPartitionsResponse){}
+ // 读取图信息
+ rpc GetGraph(GetGraphRequest) returns (GetGraphResponse){}
+ // 修改图信息
+ rpc SetGraph(SetGraphRequest) returns (SetGraphResponse){}
+ rpc DelGraph(DelGraphRequest) returns (DelGraphResponse){}
+ // 全局唯一自增ID
+ rpc GetId(GetIdRequest) returns (GetIdResponse){}
+ rpc ResetId(ResetIdRequest) returns (ResetIdResponse){}
+ // PD的集群列表
+ rpc GetMembers(GetMembersRequest) returns (GetMembersResponse) {}
+ rpc GetStoreStatus(GetAllStoresRequest) returns (GetAllStoresResponse) {}
+ rpc GetPDConfig(GetPDConfigRequest) returns (GetPDConfigResponse){}
+ rpc SetPDConfig(SetPDConfigRequest) returns (SetPDConfigResponse){}
+ rpc GetGraphSpace(GetGraphSpaceRequest) returns (GetGraphSpaceResponse){}
+ rpc SetGraphSpace(SetGraphSpaceRequest) returns (SetGraphSpaceResponse){}
+ // 获取集群健康状态
+ rpc GetClusterStats(GetClusterStatsRequest) returns (GetClusterStatsResponse){}
+ // 替换PD的集群节点
+ rpc ChangePeerList(ChangePeerListRequest) returns (getChangePeerListResponse) {}
+ // 数据分裂
+ rpc SplitData(SplitDataRequest) returns (SplitDataResponse){}
+
+ rpc SplitGraphData(SplitGraphDataRequest) returns (SplitDataResponse) {}
+ // 数据迁移
+ rpc MovePartition(MovePartitionRequest) returns (MovePartitionResponse){}
+ // 汇报分区分裂等任务执行结果
+ rpc ReportTask(ReportTaskRequest) returns (ReportTaskResponse){}
+
+ rpc GetPartitionStats(GetPartitionStatsRequest) returns (GetPartitionStatsResponse){}
+ //平衡store中分区leader的数量
+ rpc BalanceLeaders(BalanceLeadersRequest) returns (BalanceLeadersResponse){}
+
+ // 替换license文件
+ rpc PutLicense(PutLicenseRequest) returns (PutLicenseResponse){}
+
+ // 通知rocksdb进行compaction
+ rpc DbCompaction(DbCompactionRequest) returns (DbCompactionResponse){}
+
+ // 合并分区
+ rpc CombineCluster(CombineClusterRequest) returns (CombineClusterResponse){}
+ // 单个图缩容
+ rpc CombineGraph(CombineGraphRequest) returns (CombineGraphResponse) {}
+
+ // shard group
+ rpc GetShardGroup(GetShardGroupRequest) returns (GetShardGroupResponse){}
+ rpc UpdateShardGroup(UpdateShardGroupRequest) returns (UpdateShardGroupResponse){}
+ // 删除掉shard group
+ rpc DeleteShardGroup(DeleteShardGroupRequest) returns (DeleteShardGroupResponse) {}
+ // shard group 运维相关的处理
+ rpc UpdateShardGroupOp(ChangeShardRequest) returns (ChangeShardResponse){}
+ // change shard
+ rpc ChangeShard(ChangeShardRequest) returns (ChangeShardResponse) {}
+ // 更新pd raft
+ rpc updatePdRaft(UpdatePdRaftRequest) returns (UpdatePdRaftResponse) {}
+
+ rpc getCache(GetGraphRequest) returns (CacheResponse) {}
+ rpc getPartitions(GetGraphRequest) returns (CachePartitionResponse) {}
+}
+
+message RequestHeader {
+ // 集群 ID.
+ uint64 cluster_id = 1;
+ // 发送者 ID.
+ uint64 sender_id = 2;
+}
+
+message ResponseHeader {
+ // cluster_id is the ID of the cluster which sent the response.
+ uint64 cluster_id = 1;
+ Error error = 2;
+}
+
+enum ErrorType {
+ OK = 0;
+ UNKNOWN = 1;
+
+ NOT_LEADER = 100;
+ STORE_ID_NOT_EXIST = 101;
+ NO_ACTIVE_STORE = 102;
+ NOT_FOUND = 103;
+ PD_UNREACHABLE = 104;
+ LESS_ACTIVE_STORE = 105;
+ STORE_HAS_BEEN_REMOVED = 106;
+ STORE_PROHIBIT_DELETION = 111;
+ SET_CONFIG_SHARD_COUNT_ERROR = 112;
+ UPDATE_STORE_STATE_ERROR = 113;
+ STORE_PROHIBIT_DUPLICATE = 114;
+ ROCKSDB_READ_ERROR = 1002;
+ ROCKSDB_WRITE_ERROR = 1003;
+ ROCKSDB_DEL_ERROR = 1004;
+ ROCKSDB_SAVE_SNAPSHOT_ERROR = 1005;
+ ROCKSDB_LOAD_SNAPSHOT_ERROR = 1006;
+
+ // 当前集群状态禁止分裂
+ Cluster_State_Forbid_Splitting = 1007;
+ // 正在分裂中
+ Split_Partition_Doing = 1008;
+ // store上分区数量超过上限
+ Too_Many_Partitions_Per_Store = 1009;
+ // license 错误
+ LICENSE_ERROR = 107;
+ // license 认证错误
+ LICENSE_VERIFY_ERROR = 108;
+
+ //分区下线正在进行
+ Store_Tombstone_Doing = 1010;
+
+ // 不合法的分裂个数
+ Invalid_Split_Partition_Count = 1011;
+}
+
+message Error {
+ ErrorType type = 1;
+ string message = 2;
+}
+message GetStoreRequest {
+ RequestHeader header = 1;
+ uint64 store_id = 2;
+}
+
+message GetStoreResponse {
+ ResponseHeader header = 1;
+
+ metapb.Store store = 2;
+ metapb.StoreStats stats = 3;
+}
+
+message DetStoreRequest {
+ RequestHeader header = 1;
+ uint64 store_id = 2;
+}
+
+message DetStoreResponse {
+ ResponseHeader header = 1;
+ metapb.Store store = 2;
+}
+
+message RegisterStoreRequest {
+ RequestHeader header = 1;
+ metapb.Store store = 2;
+}
+
+
+message RegisterStoreResponse {
+ ResponseHeader header = 1;
+ // 初次注册,返回新的store_id
+ uint64 store_id = 2;
+}
+
+message SetStoreRequest {
+ RequestHeader header = 1;
+ metapb.Store store = 2;
+}
+
+message SetStoreResponse {
+ ResponseHeader header = 1;
+ // 返回修改后的Store
+ metapb.Store store = 2;
+}
+
+
+// 返回graph_name所在的所有store,如果graph_name为空值,则返回系统所有的store
+message GetAllStoresRequest {
+ RequestHeader header = 1;
+ string graph_name = 2;
+ // 是否返回离线的store
+ bool exclude_offline_stores = 3;
+}
+
+message GetAllStoresResponse {
+ ResponseHeader header = 1;
+
+ repeated metapb.Store stores = 2;
+}
+
+
+message StoreHeartbeatRequest {
+ RequestHeader header = 1;
+
+ metapb.StoreStats stats = 2;
+}
+
+message StoreHeartbeatResponse {
+ ResponseHeader header = 1;
+ string cluster_version = 3;
+ metapb.ClusterStats clusterStats = 4;
+}
+
+message GetPartitionRequest {
+ RequestHeader header = 1;
+ string graph_name = 2;
+ bytes key = 3;
+}
+
+
+message GetPartitionByCodeRequest {
+ RequestHeader header = 1;
+ string graph_name = 2;
+ uint64 code = 3;
+}
+
+
+message GetPartitionResponse {
+ ResponseHeader header = 1;
+ metapb.Partition partition = 2;
+ metapb.Shard leader = 3;
+ // 离线的Shard
+ repeated metapb.Shard offline_shards = 4;
+}
+
+message GetPartitionByIDRequest {
+ RequestHeader header = 1;
+ string graph_name = 2;
+ uint32 partition_id = 3;
+}
+
+message DelPartitionRequest {
+ RequestHeader header = 1;
+ string graph_name = 2;
+ uint32 partition_id = 3;
+}
+message DelPartitionResponse {
+ ResponseHeader header = 1;
+ metapb.Partition partition = 2;
+}
+
+message UpdatePartitionRequest{
+ RequestHeader header = 1;
+ repeated metapb.Partition partition = 2;
+}
+
+message UpdatePartitionResponse{
+ ResponseHeader header = 1;
+ repeated metapb.Partition partition = 2;
+}
+// Use GetPartitionResponse as the response of GetPartitionByIDRequest.
+
+message ScanPartitionsRequest {
+ RequestHeader header = 1;
+ string graph_name = 2;
+ bytes start_key = 3;
+ bytes end_key = 4; // end_key is +inf when it is empty.
+}
+
+
+
+message ScanPartitionsResponse {
+ ResponseHeader header = 1;
+ repeated metapb.PartitionShard partitions = 4;
+}
+
+
+
+message QueryPartitionsRequest{
+ RequestHeader header = 1;
+ metapb.PartitionQuery query = 2;
+}
+
+message QueryPartitionsResponse {
+ ResponseHeader header = 1;
+ repeated metapb.Partition partitions = 4;
+}
+
+
+
+message GetGraphRequest{
+ RequestHeader header = 1;
+ string graph_name = 2;
+}
+
+message GetGraphResponse{
+ ResponseHeader header = 1;
+ metapb.Graph graph = 2;
+}
+
+message SetGraphRequest{
+ RequestHeader header = 1;
+ metapb.Graph graph = 2;
+}
+
+message SetGraphResponse{
+ ResponseHeader header = 1;
+ metapb.Graph graph = 2;
+}
+
+message DelGraphRequest{
+ RequestHeader header = 1;
+ string graph_name = 2;
+}
+
+message DelGraphResponse{
+ ResponseHeader header = 1;
+ metapb.Graph graph = 2;
+}
+
+message GetIdRequest{
+ RequestHeader header = 1;
+ string key = 2;
+ int32 delta = 3;
+}
+
+message GetIdResponse{
+ ResponseHeader header = 1;
+ int64 id = 2;
+ int32 delta = 3;
+}
+
+message ResetIdRequest{
+ RequestHeader header = 1;
+ string key = 2;
+}
+
+message ResetIdResponse{
+ ResponseHeader header = 1;
+ int32 result = 2;
+}
+
+message GetMembersRequest{
+ RequestHeader header = 1;
+}
+
+message GetMembersResponse{
+ ResponseHeader header = 1;
+ repeated metapb.Member members = 2;
+ metapb.Member leader = 3;
+}
+
+message GetPDConfigRequest{
+ RequestHeader header = 1;
+ uint64 version = 2 ;
+}
+
+message GetPDConfigResponse{
+ ResponseHeader header = 1;
+ metapb.PDConfig pd_config = 2;
+}
+
+message SetPDConfigRequest{
+ RequestHeader header = 1;
+ metapb.PDConfig pd_config = 2;
+}
+
+message SetPDConfigResponse{
+ ResponseHeader header = 1;
+}
+
+
+message GetGraphSpaceRequest{
+ RequestHeader header = 1;
+ string graph_Space_Name = 2;
+}
+
+message GetGraphSpaceResponse{
+ ResponseHeader header = 1;
+ repeated metapb.GraphSpace graph_space = 2;
+}
+
+message SetGraphSpaceRequest{
+ RequestHeader header = 1;
+ metapb.GraphSpace graph_space = 2;
+}
+
+message SetGraphSpaceResponse{
+ ResponseHeader header = 1;
+}
+
+message GetClusterStatsRequest{
+ RequestHeader header = 1;
+}
+
+message GetClusterStatsResponse{
+ ResponseHeader header = 1;
+ metapb.ClusterStats cluster = 2;
+}
+message ChangePeerListRequest{
+ RequestHeader header = 1;
+ string peer_List = 2;
+}
+message getChangePeerListResponse{
+ ResponseHeader header = 1;
+}
+
+enum OperationMode {
+ Auto = 0;
+ Expert = 1;
+}
+
+message SplitDataParam{
+ // 被分裂的源分区ID
+ uint32 partition_id = 1;
+ //目标分区数量
+ uint32 count = 2;
+}
+
+message SplitDataRequest{
+ RequestHeader header = 1;
+ //工作模式
+ // Auto:自动分裂,每个Store上分区数达到最大值
+ // Expert:专家模式,需要指定splitParams
+ OperationMode mode = 2;
+ repeated SplitDataParam param = 3;
+}
+
+message SplitGraphDataRequest{
+ RequestHeader header = 1;
+ //工作模式
+ string graph_name = 2;
+ uint32 to_count = 3;
+}
+
+message SplitDataResponse{
+ ResponseHeader header = 1;
+}
+
+message MovePartitionParam{
+ uint32 partition_id = 1;
+ uint64 src_store_id = 2;
+ uint64 dst_store_id = 3;
+}
+
+message MovePartitionRequest{
+ RequestHeader header = 1;
+ //工作模式
+ // Auto:自动转移,达到每个Store上分区数量相同
+ // Expert:专家模式,需要指定transferParams
+ OperationMode mode = 2;
+ repeated MovePartitionParam param = 3;
+}
+
+message MovePartitionResponse{
+ ResponseHeader header = 1;
+}
+
+message ReportTaskRequest{
+ RequestHeader header = 1;
+ metaTask.Task task = 2;
+}
+
+message ReportTaskResponse{
+ ResponseHeader header = 1;
+}
+
+message GetPartitionStatsRequest{
+ RequestHeader header = 1;
+ uint32 partition_id = 2;
+ // 如果未空,返回所有图的同一分区ID
+ string graph_name = 4;
+}
+
+message GetPartitionStatsResponse{
+ ResponseHeader header = 1;
+ metapb.PartitionStats partition_stats = 2;
+}
+
+message BalanceLeadersRequest{
+ RequestHeader header = 1;
+}
+
+message BalanceLeadersResponse{
+ ResponseHeader header = 1;
+}
+
+message PutLicenseRequest{
+ RequestHeader header = 1;
+ bytes content = 2;
+}
+
+message PutLicenseResponse{
+ ResponseHeader header = 1;
+}
+
+message DbCompactionRequest{
+ RequestHeader header = 1;
+ string tableName = 2;
+}
+
+message DbCompactionResponse{
+ ResponseHeader header = 1;
+}
+
+message CombineClusterRequest {
+ RequestHeader header = 1;
+ uint32 toCount = 2;
+}
+
+message CombineClusterResponse {
+ ResponseHeader header = 1;
+}
+
+message CombineGraphRequest {
+ RequestHeader header = 1;
+ string graphName = 2;
+ uint32 toCount = 3;
+}
+
+message CombineGraphResponse {
+ ResponseHeader header = 1;
+}
+
+message DeleteShardGroupRequest {
+ RequestHeader header = 1;
+ uint32 groupId = 2;
+}
+
+message DeleteShardGroupResponse {
+ ResponseHeader header = 1;
+}
+
+message GetShardGroupRequest{
+ RequestHeader header = 1;
+ uint32 group_id = 2 ;
+}
+
+message GetShardGroupResponse{
+ ResponseHeader header = 1;
+ metapb.ShardGroup shardGroup = 2;
+}
+
+message UpdateShardGroupRequest{
+ RequestHeader header = 1;
+ metapb.ShardGroup shardGroup = 2;
+}
+
+message UpdateShardGroupResponse{
+ ResponseHeader header = 1;
+}
+
+message ChangeShardRequest{
+ RequestHeader header = 1;
+ uint32 groupId = 2;
+ repeated metapb.Shard shards = 3;
+}
+
+message ChangeShardResponse {
+ ResponseHeader header = 1;
+}
+
+message UpdatePdRaftRequest{
+ RequestHeader header = 1;
+ string config = 3;
+}
+
+message UpdatePdRaftResponse{
+ ResponseHeader header = 1;
+ string message = 2;
+}
+message CacheResponse {
+ ResponseHeader header = 1;
+ // 返回修改后的Store
+ repeated metapb.Store stores = 2;
+ repeated metapb.ShardGroup shards = 3;
+ repeated metapb.Graph graphs = 4;
+}
+message CachePartitionResponse {
+ ResponseHeader header = 1;
+ repeated metapb.Partition partitions = 2;
+}
diff --git a/hugegraph-pd/hg-pd-test/pom.xml b/hugegraph-pd/hg-pd-test/pom.xml
new file mode 100644
index 0000000000..31c0fd889d
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/pom.xml
@@ -0,0 +1,259 @@
+
+
+
+
+
+ org.apache.hugegraph
+ hugegraph-pd
+ ${revision}
+ ../pom.xml
+
+ 4.0.0
+
+ hg-pd-test
+
+
+ true
+
+ 2.0.0-RC.3
+
+
+
+
+ jacoco
+
+ false
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.4
+
+
+ **/grpc/**.*
+ **/config/**.*
+
+
+
+
+
+ prepare-agent
+
+
+
+
+
+
+
+
+
+
+
+ junit
+ junit
+ 4.13.2
+
+
+ org.projectlombok
+ lombok
+ 1.18.24
+
+
+ org.springframework
+ spring-context-support
+ 5.3.20
+
+
+ org.springframework
+ spring-test
+ 5.3.20
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j2.version}
+
+
+
+ org.apache.hugegraph
+ hg-pd-common
+ ${revision}
+
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
+
+ commons-io
+ commons-io
+ 2.7
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.13.0
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.13.0
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.13.0
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ 2.5.14
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+
+ org.powermock
+ powermock-classloading-xstream
+ ${powermock.version}
+
+
+ org.powermock
+ powermock-module-junit4-rule
+ ${powermock.version}
+
+
+ org.powermock
+ powermock-api-support
+ ${powermock.version}
+
+
+ org.powermock
+ powermock-module-junit4
+ ${powermock.version}
+ compile
+
+
+ org.powermock
+ powermock-api-mockito2
+ ${powermock.version}
+ compile
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.14.0
+ compile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.20
+
+
+ pd-common-test
+
+ ${basedir}/src/main/java/
+
+ ${basedir}/target/classes/
+
+
+ **/CommonSuiteTest.java
+
+
+
+
+ pd-client-test
+
+ ${basedir}/src/main/java/
+
+ ${basedir}/target/classes/
+
+
+ **/PDClientSuiteTest.java
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.4
+
+
+ pre-test
+
+ prepare-agent
+
+
+
+ post-test
+ test
+
+ report-aggregate
+
+
+ ${basedir}/target/site/jacoco
+
+
+
+
+
+ org/apache/hugegraph/pd/rest/*.class
+ org/apache/hugegraph/pd/service/*.class
+ org/apache/hugegraph/pd/model/*.class
+ org/apache/hugegraph/pd/watch/*.class
+ org/apache/hugegraph/pd/pulse/*.class
+ org/apache/hugegraph/pd/license/*.class
+ org/apache/hugegraph/pd/notice/*.class
+ org/apache/hugegraph/pd/util/*.class
+ org/apache/hugegraph/pd/metrics/*.class
+ org/apache/hugegraph/pd/util/grpc/*.class
+ org/apache/hugegraph/pd/boot/*.class
+ org/apache/hugegraph/pd/grpc/**/*.class
+ org/apache/hugegraph/pd/raft/*.class
+ **/RaftKVStore.class
+
+
+
+
+
+
+ src/main/resources/
+ true
+
+
+
+
diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/BaseCommonTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/BaseCommonTest.java
new file mode 100644
index 0000000000..fb4478e3d6
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/BaseCommonTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+
+public class BaseCommonTest {
+
+ @BeforeClass
+ public static void init() {
+
+ }
+
+ @After
+ public void teardown() {
+ // pass
+ }
+}
diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/CommonSuiteTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/CommonSuiteTest.java
new file mode 100644
index 0000000000..02a5dfca64
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/CommonSuiteTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import lombok.extern.slf4j.Slf4j;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ PartitionUtilsTest.class,
+ PartitionCacheTest.class,
+ HgAssertTest.class,
+ KVPairTest.class,
+})
+
+@Slf4j
+public class CommonSuiteTest {
+
+}
diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/HgAssertTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/HgAssertTest.java
new file mode 100644
index 0000000000..3e61dd0a94
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/HgAssertTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.junit.Test;
+
+public class HgAssertTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsTrue() {
+ HgAssert.isTrue(false, "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsTrue2() {
+ HgAssert.isTrue(true, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsFalse() {
+ HgAssert.isFalse(true, "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsFalse2() {
+ HgAssert.isTrue(false, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void isArgumentValid() {
+ HgAssert.isArgumentValid(new byte[0], "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void isArgumentValidStr() {
+ HgAssert.isArgumentValid("", "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsArgumentNotNull() {
+ HgAssert.isArgumentNotNull(null, "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIstValid() {
+ HgAssert.istValid(new byte[0], "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIstValidStr() {
+ HgAssert.isValid("", "");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsNotNull() {
+ HgAssert.isNotNull(null, "");
+ }
+
+ @Test
+ public void testIsInvalid() {
+ assertFalse(HgAssert.isInvalid("abc", "test"));
+ assertTrue(HgAssert.isInvalid("", null));
+ }
+
+ @Test
+ public void testIsInvalidByte() {
+ assertTrue(HgAssert.isInvalid(new byte[0]));
+ assertFalse(HgAssert.isInvalid(new byte[1]));
+ }
+
+ @Test
+ public void testIsInvalidMap() {
+ assertTrue(HgAssert.isInvalid(new HashMap()));
+ assertFalse(HgAssert.isInvalid(new HashMap() {{
+ put(1, 1);
+ }}));
+ }
+
+ @Test
+ public void testIsInvalidCollection() {
+ assertTrue(HgAssert.isInvalid(new ArrayList()));
+ assertFalse(HgAssert.isInvalid(new ArrayList() {{
+ add(1);
+ }}));
+ }
+
+ @Test
+ public void testIsContains() {
+ assertTrue(HgAssert.isContains(new Object[]{Integer.valueOf(1), Long.valueOf(2)},
+ Long.valueOf(2)));
+ assertFalse(HgAssert.isContains(new Object[]{Integer.valueOf(1), Long.valueOf(2)},
+ Long.valueOf(3)));
+ }
+
+ @Test
+ public void testIsContainsT() {
+ assertTrue(HgAssert.isContains(new ArrayList<>() {{
+ add(1);
+ }}, 1));
+ assertFalse(HgAssert.isContains(new ArrayList<>() {{
+ add(1);
+ }}, 2));
+ }
+
+ @Test
+ public void testIsNull() {
+ assertTrue(HgAssert.isNull(null));
+ assertFalse(HgAssert.isNull("abc", "cdf"));
+ }
+
+}
diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/KVPairTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/KVPairTest.java
new file mode 100644
index 0000000000..9fb676d392
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/KVPairTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KVPairTest {
+
+ KVPair pair;
+
+ @Before
+ public void init() {
+ this.pair = new KVPair<>("key", 1);
+ }
+
+ @Test
+ public void testGetKey() {
+ assertEquals(this.pair.getKey(), "key");
+ }
+
+ @Test
+ public void testSetKey() {
+ this.pair.setKey("key2");
+ assertEquals(this.pair.getKey(), "key2");
+ }
+
+ @Test
+ public void testGetValue() {
+ assertEquals(1, this.pair.getValue());
+ }
+
+ @Test
+ public void testSetValue() {
+ this.pair.setValue(2);
+ assertEquals(2, this.pair.getValue());
+ }
+
+ @Test
+ public void testToString() {
+
+ }
+
+ @Test
+ public void testHashCode() {
+
+ }
+
+ @Test
+ public void testEquals() {
+ var pair2 = new KVPair<>("key", 1);
+ Assert.assertEquals(pair2, this.pair);
+ }
+}
diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/PartitionCacheTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/PartitionCacheTest.java
new file mode 100644
index 0000000000..21e757ffa9
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/PartitionCacheTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PartitionCacheTest {
+
+ private PartitionCache cache;
+
+ private static Metapb.Partition createPartition(int pid, String graphName, long start,
+ long end) {
+ return Metapb.Partition.newBuilder()
+ .setId(pid)
+ .setGraphName(graphName)
+ .setStartKey(start)
+ .setEndKey(end)
+ .setState(Metapb.PartitionState.PState_Normal)
+ .setVersion(1)
+ .build();
+ }
+
+ private static Metapb.ShardGroup creteShardGroup(int pid) {
+ return Metapb.ShardGroup.newBuilder()
+ .addShards(
+ Metapb.Shard.newBuilder().setStoreId(0)
+ .setRole(Metapb.ShardRole.Leader).build()
+ )
+ .setId(pid)
+ .setVersion(0)
+ .setConfVer(0)
+ .setState(Metapb.PartitionState.PState_Normal)
+ .build();
+ }
+
+ private static Metapb.Shard createShard() {
+ return Metapb.Shard.newBuilder()
+ .setStoreId(0)
+ .setRole(Metapb.ShardRole.Leader)
+ .build();
+ }
+
+ private static Metapb.Store createStore(long storeId) {
+ return Metapb.Store.newBuilder()
+ .setId(storeId)
+ .setAddress("127.0.0.1")
+ .setCores(4)
+ .setVersion("1")
+ .setDataPath("/tmp/junit")
+ .setDataVersion(1)
+ .setLastHeartbeat(System.currentTimeMillis())
+ .setStartTimestamp(System.currentTimeMillis())
+ .setState(Metapb.StoreState.Up)
+ .setDeployPath("/tmp/junit")
+ .build();
+ }
+
+ private static Metapb.Graph createGraph(String graphName, int partitionCount) {
+ return Metapb.Graph.newBuilder()
+ .setGraphName(graphName)
+ .setPartitionCount(partitionCount)
+ .setState(Metapb.PartitionState.PState_Normal)
+ .build();
+ }
+
+ private static Metapb.ShardGroup createShardGroup() {
+ List shards = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ shards.add(Metapb.Shard.newBuilder()
+ .setStoreId(i)
+ .setRole(i == 0 ? Metapb.ShardRole.Leader :
+ Metapb.ShardRole.Follower)
+ .build()
+ );
+ }
+
+ return Metapb.ShardGroup.newBuilder()
+ .setId(1)
+ .setVersion(1)
+ .setConfVer(1)
+ .setState(Metapb.PartitionState.PState_Normal)
+ .addAllShards(shards)
+ .build();
+ }
+
+ @Before
+ public void setup() {
+ this.cache = new PartitionCache();
+ }
+
+ @Test
+ public void testGetPartitionById() {
+ var partition = createPartition(0, "graph0", 0, 65535);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition);
+ var ret = this.cache.getPartitionById("graph0", 0);
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition);
+ }
+
+ @Test
+ public void testGetPartitionByKey() throws UnsupportedEncodingException {
+ var partition = createPartition(0, "graph0", 0, 65535);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition);
+ var ret = this.cache.getPartitionByKey("graph0", "0".getBytes(StandardCharsets.UTF_8));
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition);
+ }
+
+ @Test
+ public void getPartitionByCode() {
+ var partition = createPartition(0, "graph0", 0, 1024);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition);
+ var ret = this.cache.getPartitionByCode("graph0", 10);
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition);
+ assertNull(this.cache.getPartitionByCode("graph0", 2000));
+ }
+
+ @Test
+ public void testGetPartitions() {
+ var partition1 = createPartition(0, "graph0", 0, 1024);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition1);
+ assertEquals(this.cache.getPartitions("graph0").size(), 1);
+ var partition2 = createPartition(1, "graph0", 1024, 2048);
+ this.cache.updateShardGroup(creteShardGroup(1));
+ this.cache.updatePartition(partition2);
+ assertEquals(this.cache.getPartitions("graph0").size(), 2);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+ }
+
+ @Test
+ public void testAddPartition() {
+ var partition = createPartition(0, "graph0", 0, 65535);
+ this.cache.addPartition("graph0", 0, partition);
+ var ret = this.cache.getPartitionById("graph0", 0);
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition);
+ assertNotNull(this.cache.getPartitionByCode("graph0", 2000));
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+ var partition2 = createPartition(0, "graph0", 0, 1024);
+ this.cache.addPartition("graph0", 0, partition2);
+ ret = this.cache.getPartitionById("graph0", 0);
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition2);
+ assertNull(this.cache.getPartitionByCode("graph0", 2000));
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+ }
+
+ @Test
+ public void testUpdatePartition() {
+ var partition = createPartition(0, "graph0", 0, 65535);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.addPartition("graph0", 0, partition);
+ var partition2 = createPartition(0, "graph0", 0, 1024);
+ this.cache.updatePartition("graph0", 0, partition2);
+ var ret = this.cache.getPartitionById("graph0", 0);
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition2);
+ assertNull(this.cache.getPartitionByCode("graph0", 2000));
+ }
+
+ @Test
+ public void testUpdatePartition2() {
+ var partition = createPartition(0, "graph0", 0, 1024);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ assertTrue(this.cache.updatePartition(partition));
+ assertFalse(this.cache.updatePartition(partition));
+ var ret = this.cache.getPartitionById("graph0", 0);
+ assertNotNull(ret);
+ assertEquals(ret.getKey(), partition);
+ assertNull(this.cache.getPartitionByCode("graph0", 2000));
+ }
+
+ @Test
+ public void testRemovePartition() {
+ var partition = createPartition(0, "graph0", 0, 1024);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition);
+ assertNotNull(this.cache.getPartitionById("graph0", 0));
+ this.cache.removePartition("graph0", 0);
+ assertNull(this.cache.getPartitionById("graph0", 0));
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+ }
+
+ @Test
+ public void testRange() {
+ var partition1 = createPartition(1, "graph0", 0, 3);
+ var partition2 = createPartition(2, "graph0", 3, 6);
+ this.cache.updatePartition(partition1);
+ this.cache.updatePartition(partition2);
+
+ var partition3 = createPartition(3, "graph0", 1, 2);
+ var partition4 = createPartition(4, "graph0", 2, 3);
+ this.cache.updatePartition(partition3);
+ this.cache.updatePartition(partition4);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+
+ var partition6 = createPartition(1, "graph0", 0, 1);
+ this.cache.updatePartition(partition6);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+
+ var partition5 = createPartition(1, "graph0", 0, 3);
+ this.cache.updatePartition(partition5);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+ }
+
+ @Test
+ public void testRange2() {
+ var partition1 = createPartition(1, "graph0", 0, 3);
+ var partition2 = createPartition(2, "graph0", 3, 6);
+ this.cache.updatePartition(partition1);
+ this.cache.updatePartition(partition2);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+
+ // 中间有缺失
+ var partition3 = createPartition(1, "graph0", 2, 3);
+ this.cache.updatePartition(partition3);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+
+ var partition5 = createPartition(1, "graph0", 0, 3);
+ this.cache.updatePartition(partition5);
+ System.out.println(this.cache.debugCacheByGraphName("graph0"));
+ }
+
+ @Test
+ public void testRemovePartitions() {
+ var partition1 = createPartition(0, "graph0", 0, 1024);
+ var partition2 = createPartition(1, "graph0", 1024, 2048);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition1);
+ this.cache.updateShardGroup(creteShardGroup(1));
+ this.cache.updatePartition(partition2);
+ assertEquals(this.cache.getPartitions("graph0").size(), 2);
+ this.cache.removePartitions();
+ assertEquals(this.cache.getPartitions("graph0").size(), 0);
+ }
+
+ @Test
+ public void testRemoveAll() {
+ var partition1 = createPartition(0, "graph0", 0, 1024);
+ var partition2 = createPartition(1, "graph0", 1024, 2048);
+ var partition3 = createPartition(0, "graph1", 0, 2048);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updateShardGroup(creteShardGroup(1));
+ this.cache.updatePartition(partition1);
+ this.cache.updatePartition(partition2);
+ this.cache.updatePartition(partition3);
+
+ assertEquals(this.cache.getPartitions("graph0").size(), 2);
+ assertEquals(this.cache.getPartitions("graph1").size(), 1);
+ this.cache.removeAll("graph0");
+ assertEquals(this.cache.getPartitions("graph0").size(), 0);
+ assertEquals(this.cache.getPartitions("graph1").size(), 1);
+ }
+
+ @Test
+ public void testUpdateShardGroup() {
+ var shardGroup = createShardGroup();
+ this.cache.updateShardGroup(shardGroup);
+ assertNotNull(this.cache.getShardGroup(shardGroup.getId()));
+ }
+
+ @Test
+ public void testGetShardGroup() {
+ var shardGroup = createShardGroup();
+ this.cache.updateShardGroup(shardGroup);
+ assertEquals(this.cache.getShardGroup(shardGroup.getId()), shardGroup);
+ }
+
+ @Test
+ public void testAddStore() {
+ var store = createStore(1);
+ this.cache.addStore(1L, store);
+ assertEquals(this.cache.getStoreById(1L), store);
+ }
+
+ @Test
+ public void testGetStoreById() {
+ var store = createStore(1);
+ this.cache.addStore(1L, store);
+ assertEquals(this.cache.getStoreById(1L), store);
+ }
+
+ @Test
+ public void testRemoveStore() {
+ var store = createStore(1);
+ this.cache.addStore(1L, store);
+ assertEquals(this.cache.getStoreById(1L), store);
+
+ this.cache.removeStore(1L);
+ assertNull(this.cache.getStoreById(1L));
+ }
+
+ @Test
+ public void testHasGraph() {
+ var partition = createPartition(0, "graph0", 0, 65535);
+ this.cache.updateShardGroup(creteShardGroup(0));
+ this.cache.updatePartition(partition);
+ assertTrue(this.cache.hasGraph("graph0"));
+ assertFalse(this.cache.hasGraph("graph1"));
+ }
+
+ @Test
+ public void testUpdateGraph() {
+ var graph = createGraph("graph0", 10);
+ this.cache.updateGraph(graph);
+ assertEquals(this.cache.getGraph("graph0"), graph);
+ graph = createGraph("graph0", 12);
+ this.cache.updateGraph(graph);
+ assertEquals(this.cache.getGraph("graph0"), graph);
+ }
+
+ @Test
+ public void testGetGraph() {
+ var graph = createGraph("graph0", 12);
+ this.cache.updateGraph(graph);
+ assertEquals(this.cache.getGraph("graph0"), graph);
+ }
+
+ @Test
+ public void testGetGraphs() {
+ var graph1 = createGraph("graph0", 12);
+ var graph2 = createGraph("graph1", 12);
+ var graph3 = createGraph("graph2", 12);
+ this.cache.updateGraph(graph1);
+ this.cache.updateGraph(graph2);
+ this.cache.updateGraph(graph3);
+ assertEquals(this.cache.getGraphs().size(), 3);
+ }
+
+ @Test
+ public void testReset() {
+ var graph1 = createGraph("graph0", 12);
+ var graph2 = createGraph("graph1", 12);
+ var graph3 = createGraph("graph2", 12);
+ this.cache.updateGraph(graph1);
+ this.cache.updateGraph(graph2);
+ this.cache.updateGraph(graph3);
+ assertEquals(this.cache.getGraphs().size(), 3);
+ this.cache.reset();
+ assertEquals(this.cache.getGraphs().size(), 0);
+ }
+
+ @Test
+ public void testUpdateShardGroupLeader() {
+ var shardGroup = createShardGroup();
+ this.cache.updateShardGroup(shardGroup);
+
+ var leader =
+ Metapb.Shard.newBuilder().setStoreId(2).setRole(Metapb.ShardRole.Leader).build();
+ this.cache.updateShardGroupLeader(shardGroup.getId(), leader);
+
+ assertEquals(this.cache.getLeaderShard(shardGroup.getId()), leader);
+ }
+
+}
diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/PartitionUtilsTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/PartitionUtilsTest.java
new file mode 100644
index 0000000000..e0742a4838
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/common/PartitionUtilsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.common;
+
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PartitionUtilsTest extends BaseCommonTest {
+
+ @Test
+ public void testCalcHashcode() {
+ byte[] key = new byte[5];
+ long code = PartitionUtils.calcHashcode(key);
+ Assert.assertEquals(code, 31912L);
+ }
+
+ // @Test
+ public void testHashCode() {
+ int partCount = 10;
+ int partSize = PartitionUtils.MAX_VALUE / partCount + 1;
+ int[] counter = new int[partCount];
+ for (int i = 0; i < 10000; i++) {
+ String s = String.format("BATCH-GET-UNIT-%02d", i);
+ int c = PartitionUtils.calcHashcode(s.getBytes(StandardCharsets.UTF_8));
+
+ counter[c / partSize]++;
+
+ }
+
+ for (int i = 0; i < counter.length; i++) {
+ System.out.println(i + " " + counter[i]);
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-test/src/main/resources/log4j2.xml b/hugegraph-pd/hg-pd-test/src/main/resources/log4j2.xml
new file mode 100644
index 0000000000..e462bf16e9
--- /dev/null
+++ b/hugegraph-pd/hg-pd-test/src/main/resources/log4j2.xml
@@ -0,0 +1,139 @@
+
+
+
+
+
+
+
+ logs
+ hg-pd-test
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hugegraph-pd/pom.xml b/hugegraph-pd/pom.xml
new file mode 100644
index 0000000000..6253cfd443
--- /dev/null
+++ b/hugegraph-pd/pom.xml
@@ -0,0 +1,184 @@
+
+
+
+
+ 4.0.0
+ hugegraph-pd
+ ${revision}
+ pom
+
+
+ org.apache.hugegraph
+ hugegraph
+ ${revision}
+ ../pom.xml
+
+
+
+ hg-pd-grpc
+ hg-pd-common
+ hg-pd-client
+ hg-pd-test
+
+
+
+
+
+
+
+
+ 11
+ 11
+ 2.17.0
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.17.0
+
+
+ org.apache.hugegraph
+ hg-pd-grpc
+ ${revision}
+
+
+ org.apache.hugegraph
+ hg-pd-common
+ ${revision}
+
+
+
+
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.4
+
+
+ **/grpc/**.*
+ **/config/**.*
+
+
+
+
+
+ prepare-agent
+
+
+
+
+
+ org.codehaus.mojo
+ flatten-maven-plugin
+ 1.2.7
+
+ true
+ resolveCiFriendliesOnly
+
+
+
+ flatten
+ process-resources
+
+ flatten
+
+
+
+ flatten.clean
+ clean
+
+ clean
+
+
+
+
+ remove-flattened-pom
+ install
+
+ clean
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-clean-plugin
+
+
+
+ ${project.basedir}/
+
+ *.tar
+ *.tar.gz
+ .flattened-pom.xml
+
+ dist/**
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+ pd-common-test
+
+ true
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.20
+
+
+ pd-common-test
+
+ test
+
+ test
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index ed2b53af33..ff448ffc29 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,8 +91,8 @@
hugegraph-server
-
-
+ hugegraph-pd
+
@@ -177,6 +177,8 @@
**/hbase-*/**
**/apache-cassandra-*/**
**/pid
+
+ **/src/main/java/org/apache/hugegraph/pd/grpc/**
true
@@ -197,7 +199,7 @@
- [1.8,12)
+ [11,)
[3.5.0,)
@@ -287,7 +289,7 @@
-
+
stage
@@ -297,5 +299,18 @@
+
+
+ arm-mac
+
+
+ mac
+ aarch64
+
+
+
+ osx-x86_64
+
+