diff --git a/hugegraph-pd/hg-pd-client/pom.xml b/hugegraph-pd/hg-pd-client/pom.xml
new file mode 100644
index 0000000000..a64756fe94
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/pom.xml
@@ -0,0 +1,73 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.apache.hugegraph
+ hugegraph-pd
+ ${revision}
+ ../pom.xml
+
+ hg-pd-client
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.17.0
+
+
+ org.apache.hugegraph
+ hg-pd-grpc
+ ${revision}
+
+
+ org.apache.hugegraph
+ hg-pd-common
+ ${revision}
+ compile
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+ commons-io
+ commons-io
+ 2.8.0
+
+
+ org.yaml
+ snakeyaml
+ 1.28
+ test
+
+
+
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java
new file mode 100644
index 0000000000..874ef6f67c
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.io.Closeable;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.PDGrpc;
+import org.apache.hugegraph.pd.grpc.PDGrpc.PDBlockingStub;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.grpc.Pdpb.GetMembersRequest;
+import org.apache.hugegraph.pd.grpc.Pdpb.GetMembersResponse;
+
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.AbstractBlockingStub;
+import io.grpc.stub.AbstractStub;
+import io.grpc.stub.ClientCalls;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class AbstractClient implements Closeable {
+
+ private static final ConcurrentHashMap chs = new ConcurrentHashMap<>();
+ public static Pdpb.ResponseHeader okHeader = Pdpb.ResponseHeader.newBuilder().setError(
+ Pdpb.Error.newBuilder().setType(Pdpb.ErrorType.OK)).build();
+ protected final Pdpb.RequestHeader header;
+ protected final AbstractClientStubProxy stubProxy;
+ protected final PDConfig config;
+ protected ManagedChannel channel = null;
+ protected volatile ConcurrentMap stubs = null;
+
+ protected AbstractClient(PDConfig config) {
+ String[] hosts = config.getServerHost().split(",");
+ this.stubProxy = new AbstractClientStubProxy(hosts);
+ this.header = Pdpb.RequestHeader.getDefaultInstance();
+ this.config = config;
+ }
+
+ public static Pdpb.ResponseHeader newErrorHeader(int errorCode, String errorMsg) {
+ Pdpb.ResponseHeader header = Pdpb.ResponseHeader.newBuilder().setError(
+ Pdpb.Error.newBuilder().setTypeValue(errorCode).setMessage(errorMsg)).build();
+ return header;
+ }
+
+ protected static void handleErrors(Pdpb.ResponseHeader header) throws PDException {
+ if (header.hasError() && header.getError().getType() != Pdpb.ErrorType.OK) {
+ throw new PDException(header.getError().getTypeValue(),
+ String.format("PD request error, error code = %d, msg = %s",
+ header.getError().getTypeValue(),
+ header.getError().getMessage()));
+ }
+ }
+
+ protected AbstractBlockingStub getBlockingStub() throws PDException {
+ if (stubProxy.getBlockingStub() == null) {
+ synchronized (this) {
+ if (stubProxy.getBlockingStub() == null) {
+ String host = resetStub();
+ if (host.isEmpty()) {
+ throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE,
+ "PD unreachable, pd.peers=" +
+ config.getServerHost());
+ }
+ }
+ }
+ }
+ return (AbstractBlockingStub) stubProxy.getBlockingStub()
+ .withDeadlineAfter(config.getGrpcTimeOut(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ protected AbstractStub getStub() throws PDException {
+ if (stubProxy.getStub() == null) {
+ synchronized (this) {
+ if (stubProxy.getStub() == null) {
+ String host = resetStub();
+ if (host.isEmpty()) {
+ throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE,
+ "PD unreachable, pd.peers=" +
+ config.getServerHost());
+ }
+ }
+ }
+ }
+ return stubProxy.getStub();
+ }
+
+ protected abstract AbstractStub createStub();
+
+ protected abstract AbstractBlockingStub createBlockingStub();
+
+ private String resetStub() {
+ String leaderHost = "";
+ for (int i = 0; i < stubProxy.getHostCount(); i++) {
+ String host = stubProxy.nextHost();
+ channel = ManagedChannelBuilder.forTarget(host).usePlaintext().build();
+ PDBlockingStub blockingStub = PDGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(config.getGrpcTimeOut(),
+ TimeUnit.MILLISECONDS);
+ try {
+ GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder()
+ .setHeader(header).build();
+ GetMembersResponse members = blockingStub.getMembers(request);
+ Metapb.Member leader = members.getLeader();
+ leaderHost = leader.getGrpcUrl();
+ close();
+ channel = ManagedChannelBuilder.forTarget(leaderHost).usePlaintext().build();
+ stubProxy.setBlockingStub(createBlockingStub());
+ stubProxy.setStub(createStub());
+ log.info("PDClient connect to host = {} success", leaderHost);
+ break;
+ } catch (Exception e) {
+ log.error("PDClient connect to {} exception {}, {}", host, e.getMessage(),
+ e.getCause() != null ? e.getCause().getMessage() : "");
+ }
+ }
+ return leaderHost;
+ }
+
+ protected > RespT blockingUnaryCall(
+ MethodDescriptor method, ReqT req) throws PDException {
+ return blockingUnaryCall(method, req, 5);
+ }
+
+ protected > RespT blockingUnaryCall(
+ MethodDescriptor method, ReqT req, int retry) throws PDException {
+ AbstractBlockingStub stub = getBlockingStub();
+ try {
+ RespT resp =
+ ClientCalls.blockingUnaryCall(stub.getChannel(), method, stub.getCallOptions(),
+ req);
+ return resp;
+ } catch (Exception e) {
+ log.error(method.getFullMethodName() + " exception, {}", e.getMessage());
+ if (e instanceof StatusRuntimeException) {
+ if (retry < stubProxy.getHostCount()) {
+ // 网络不通,关掉之前连接,换host重新连接
+ synchronized (this) {
+ stubProxy.setBlockingStub(null);
+ }
+ return blockingUnaryCall(method, req, ++retry);
+ }
+ }
+ }
+ return null;
+ }
+
+ // this.stubs = new ConcurrentHashMap(hosts.length);
+ private AbstractBlockingStub getConcurrentBlockingStub(String address) {
+ AbstractBlockingStub stub = stubs.get(address);
+ if (stub != null) {
+ return stub;
+ }
+ Channel ch = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
+ PDBlockingStub blockingStub =
+ PDGrpc.newBlockingStub(ch).withDeadlineAfter(config.getGrpcTimeOut(),
+ TimeUnit.MILLISECONDS);
+ stubs.put(address, blockingStub);
+ return blockingStub;
+
+ }
+
+ protected KVPair concurrentBlockingUnaryCall(
+ MethodDescriptor method, ReqT req, Predicate predicate) {
+ LinkedList hostList = this.stubProxy.getHostList();
+ if (this.stubs == null) {
+ synchronized (this) {
+ if (this.stubs == null) {
+ this.stubs = new ConcurrentHashMap<>(hostList.size());
+ }
+ }
+ }
+ Stream respTStream = hostList.parallelStream().map((address) -> {
+ AbstractBlockingStub stub = getConcurrentBlockingStub(address);
+ RespT resp = ClientCalls.blockingUnaryCall(stub.getChannel(),
+ method, stub.getCallOptions(), req);
+ return resp;
+ });
+ KVPair pair;
+ AtomicReference response = new AtomicReference<>();
+ boolean result = respTStream.anyMatch((r) -> {
+ response.set(r);
+ return predicate.test(r);
+ });
+ if (result) {
+ pair = new KVPair<>(true, null);
+ } else {
+ pair = new KVPair<>(false, response.get());
+ }
+ return pair;
+ }
+
+ protected void streamingCall(MethodDescriptor method, ReqT request,
+ StreamObserver responseObserver,
+ int retry) throws PDException {
+ AbstractStub stub = getStub();
+ try {
+ ClientCall call = stub.getChannel().newCall(method, stub.getCallOptions());
+ ClientCalls.asyncServerStreamingCall(call, request, responseObserver);
+ } catch (Exception e) {
+ if (e instanceof StatusRuntimeException) {
+ if (retry < stubProxy.getHostCount()) {
+ synchronized (this) {
+ stubProxy.setStub(null);
+ }
+ streamingCall(method, request, responseObserver, ++retry);
+ return;
+ }
+ }
+ log.error("rpc call with exception, {}", e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() {
+ closeChannel(channel);
+ if (stubs != null) {
+ for (AbstractBlockingStub stub : stubs.values()) {
+ closeChannel((ManagedChannel) stub.getChannel());
+ }
+ }
+
+ }
+
+ private void closeChannel(ManagedChannel channel) {
+ try {
+ while (channel != null &&
+ !channel.shutdownNow().awaitTermination(100, TimeUnit.MILLISECONDS)) {
+ continue;
+ }
+ } catch (Exception e) {
+ log.info("Close channel with error : ", e);
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java
new file mode 100644
index 0000000000..6ee3fcb625
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.util.LinkedList;
+
+import io.grpc.stub.AbstractBlockingStub;
+import io.grpc.stub.AbstractStub;
+
+public class AbstractClientStubProxy {
+
+ private final LinkedList hostList = new LinkedList<>();
+ private AbstractBlockingStub blockingStub;
+ private AbstractStub stub;
+
+ public AbstractClientStubProxy(String[] hosts) {
+ for (String host : hosts) {
+ if (!host.isEmpty()) {
+ hostList.offer(host);
+ }
+ }
+ }
+
+ public LinkedList getHostList() {
+ return hostList;
+ }
+
+ public String nextHost() {
+ String host = hostList.poll();
+ hostList.offer(host); //移到尾部
+ return host;
+ }
+
+ public AbstractBlockingStub getBlockingStub() {
+ return this.blockingStub;
+ }
+
+ public void setBlockingStub(AbstractBlockingStub stub) {
+ this.blockingStub = stub;
+ }
+
+ public String getHost() {
+ return hostList.peek();
+ }
+
+ public int getHostCount() {
+ return hostList.size();
+ }
+
+ public AbstractStub getStub() {
+ return stub;
+ }
+
+ public void setStub(AbstractStub stub) {
+ this.stub = stub;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/Channels.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/Channels.java
new file mode 100644
index 0000000000..34616e6374
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/Channels.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+public class Channels {
+
+ private static final ConcurrentHashMap chs = new ConcurrentHashMap<>();
+
+ public static ManagedChannel getChannel(String target) {
+
+ ManagedChannel channel;
+ if ((channel = chs.get(target)) == null || channel.isShutdown() || channel.isTerminated()) {
+ synchronized (chs) {
+ if ((channel = chs.get(target)) == null || channel.isShutdown() ||
+ channel.isTerminated()) {
+ channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();
+ chs.put(target, channel);
+ }
+ }
+ }
+
+ return channel;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java
new file mode 100644
index 0000000000..d4fd50ffe9
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hugegraph.pd.common.GraphCache;
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.common.PartitionUtils;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.Metapb.Partition;
+import org.apache.hugegraph.pd.grpc.Metapb.Shard;
+import org.apache.hugegraph.pd.grpc.Metapb.ShardGroup;
+import org.apache.hugegraph.pd.grpc.Pdpb.CachePartitionResponse;
+import org.apache.hugegraph.pd.grpc.Pdpb.CacheResponse;
+
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ClientCache {
+
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private final org.apache.hugegraph.pd.client.PDClient client;
+ private volatile Map> groups;
+ private volatile Map stores;
+ private volatile Map caches = new ConcurrentHashMap<>();
+
+ public ClientCache(org.apache.hugegraph.pd.client.PDClient pdClient) {
+ groups = new ConcurrentHashMap<>();
+ stores = new ConcurrentHashMap<>();
+ client = pdClient;
+ }
+
+ private GraphCache getGraphCache(String graphName) {
+ GraphCache graph;
+ if ((graph = caches.get(graphName)) == null) {
+ synchronized (caches) {
+ if ((graph = caches.get(graphName)) == null) {
+ graph = new GraphCache();
+ caches.put(graphName, graph);
+ }
+ }
+ }
+ return graph;
+ }
+
+ public KVPair getPartitionById(String graphName, int partId) {
+ try {
+ GraphCache graph = initGraph(graphName);
+ Partition partition = graph.getPartition(partId);
+ Shard shard = groups.get(partId).getValue();
+ if (partition == null || shard == null) {
+ return null;
+ }
+ return new KVPair<>(partition, shard);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private KVPair getPair(int partId, GraphCache graph) {
+ Partition p = graph.getPartition(partId);
+ KVPair pair = groups.get(partId);
+ if (p != null && pair != null) {
+ Shard s = pair.getValue();
+ if (s == null) {
+ pair.setValue(getLeader(partId));
+ return new KVPair<>(p, pair.getValue());
+ } else {
+ return new KVPair<>(p, s);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * 根据key的hashcode返回分区信息
+ *
+ * @param graphName
+ * @param code
+ * @return
+ */
+ public KVPair getPartitionByCode(String graphName, long code) {
+ try {
+ GraphCache graph = initGraph(graphName);
+ RangeMap range = graph.getRange();
+ Integer pId = range.get(code);
+ if (pId != null) {
+ return getPair(pId, graph);
+ }
+ return null;
+ } catch (PDException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private GraphCache initGraph(String graphName) throws PDException {
+ initCache();
+ GraphCache graph = getGraphCache(graphName);
+ if (!graph.getInitialized().get()) {
+ synchronized (graph) {
+ if (!graph.getInitialized().get()) {
+ CachePartitionResponse pc = client.getPartitionCache(graphName);
+ RangeMap range = graph.getRange();
+ List ps = pc.getPartitionsList();
+ HashMap gps = new HashMap<>(ps.size(), 1);
+ for (Partition p : ps) {
+ gps.put(p.getId(), p);
+ range.put(Range.closedOpen(p.getStartKey(), p.getEndKey()), p.getId());
+ }
+ graph.setPartitions(gps);
+ graph.getInitialized().set(true);
+ }
+ }
+ }
+ return graph;
+ }
+
+ private void initCache() throws PDException {
+ if (!initialized.get()) {
+ synchronized (this) {
+ if (!initialized.get()) {
+ CacheResponse cache = client.getClientCache();
+ List shardGroups = cache.getShardsList();
+ for (ShardGroup s : shardGroups) {
+ this.groups.put(s.getId(), new KVPair<>(s, getLeader(s.getId())));
+ }
+ List stores = cache.getStoresList();
+ for (Metapb.Store store : stores) {
+ this.stores.put(store.getId(), store);
+ }
+ List graphs = cache.getGraphsList();
+ for (Metapb.Graph g : graphs) {
+ GraphCache c = new GraphCache(g);
+ caches.put(g.getGraphName(), c);
+ }
+ initialized.set(true);
+ }
+ }
+ }
+ }
+
+ /**
+ * 返回key所在的分区信息
+ *
+ * @param key
+ * @return
+ */
+ public KVPair getPartitionByKey(String graphName, byte[] key) {
+ int code = PartitionUtils.calcHashcode(key);
+ return getPartitionByCode(graphName, code);
+ }
+
+ public boolean update(String graphName, int partId, Partition partition) {
+ GraphCache graph = getGraphCache(graphName);
+ try {
+ Partition p = graph.getPartition(partId);
+ if (p != null && p.equals(partition)) {
+ return false;
+ }
+ RangeMap range = graph.getRange();
+ graph.addPartition(partId, partition);
+ if (p != null) {
+ // old [1-3) 被 [2-3)覆盖了。当 [1-3) 变成[1-2) 不应该删除原先的[1-3)
+ // 当确认老的 start, end 都是自己的时候,才可以删除老的. (即还没覆盖)
+ if (Objects.equals(partition.getId(), range.get(partition.getStartKey())) &&
+ Objects.equals(partition.getId(), range.get(partition.getEndKey() - 1))) {
+ range.remove(range.getEntry(partition.getStartKey()).getKey());
+ }
+ }
+ range.put(Range.closedOpen(partition.getStartKey(), partition.getEndKey()), partId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
+
+ public void removePartition(String graphName, int partId) {
+ GraphCache graph = getGraphCache(graphName);
+ Partition p = graph.removePartition(partId);
+ if (p != null) {
+ RangeMap range = graph.getRange();
+ if (Objects.equals(p.getId(), range.get(p.getStartKey())) &&
+ Objects.equals(p.getId(), range.get(p.getEndKey() - 1))) {
+ range.remove(range.getEntry(p.getStartKey()).getKey());
+ }
+ }
+ }
+
+ /**
+ * remove all partitions
+ */
+ public void removePartitions() {
+ for (Entry entry : caches.entrySet()) {
+ removePartitions(entry.getValue());
+ }
+ }
+
+ private void removePartitions(GraphCache graph) {
+ graph.getState().clear();
+ graph.getRange().clear();
+ }
+
+ /**
+ * remove partition cache of graphName
+ *
+ * @param graphName
+ */
+ public void removeAll(String graphName) {
+ GraphCache graph = caches.get(graphName);
+ if (graph != null) {
+ removePartitions(graph);
+ }
+ }
+
+ public boolean updateShardGroup(ShardGroup shardGroup) {
+ KVPair old = groups.get(shardGroup.getId());
+ Shard leader = getLeader(shardGroup);
+ if (old != null) {
+ old.setKey(shardGroup);
+ old.setValue(leader);
+ return false;
+ }
+ groups.put(shardGroup.getId(), new KVPair<>(shardGroup, leader));
+ return true;
+ }
+
+ public void deleteShardGroup(int shardGroupId) {
+ groups.remove(shardGroupId);
+ }
+
+ public ShardGroup getShardGroup(int groupId) {
+ KVPair pair = groups.get(groupId);
+ if (pair != null) {
+ return pair.getKey();
+ }
+ return null;
+ }
+
+ public boolean addStore(Long storeId, Metapb.Store store) {
+ Metapb.Store oldStore = stores.get(storeId);
+ if (oldStore != null && oldStore.equals(store)) {
+ return false;
+ }
+ stores.put(storeId, store);
+ return true;
+ }
+
+ public Metapb.Store getStoreById(Long storeId) {
+ return stores.get(storeId);
+ }
+
+ public void removeStore(Long storeId) {
+ stores.remove(storeId);
+ }
+
+ public void reset() {
+ groups = new ConcurrentHashMap<>();
+ stores = new ConcurrentHashMap<>();
+ caches = new ConcurrentHashMap<>();
+ }
+
+ public Shard getLeader(int partitionId) {
+ KVPair pair = groups.get(partitionId);
+ if (pair != null) {
+ if (pair.getValue() != null) {
+ return pair.getValue();
+ }
+ for (Shard shard : pair.getKey().getShardsList()) {
+ if (shard.getRole() == Metapb.ShardRole.Leader) {
+ pair.setValue(shard);
+ return shard;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public Shard getLeader(ShardGroup shardGroup) {
+ if (shardGroup != null) {
+ for (Shard shard : shardGroup.getShardsList()) {
+ if (shard.getRole() == Metapb.ShardRole.Leader) {
+ return shard;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public void updateLeader(int partitionId, Shard leader) {
+ KVPair pair = groups.get(partitionId);
+ if (pair != null && leader != null) {
+ Shard l = getLeader(partitionId);
+ if (l == null || leader.getStoreId() != l.getStoreId()) {
+ ShardGroup shardGroup = pair.getKey();
+ ShardGroup.Builder builder = ShardGroup.newBuilder(shardGroup).clearShards();
+ for (var shard : shardGroup.getShardsList()) {
+ builder.addShards(
+ Shard.newBuilder()
+ .setStoreId(shard.getStoreId())
+ .setRole(shard.getStoreId() == leader.getStoreId() ?
+ Metapb.ShardRole.Leader : Metapb.ShardRole.Follower)
+ .build()
+ );
+ }
+ pair.setKey(builder.build());
+ pair.setValue(leader);
+ }
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/Discoverable.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/Discoverable.java
new file mode 100644
index 0000000000..abdcac414c
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/Discoverable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import org.apache.hugegraph.pd.grpc.discovery.NodeInfos;
+import org.apache.hugegraph.pd.grpc.discovery.Query;
+
+public interface Discoverable {
+
+ NodeInfos getNodeInfos(Query query);
+
+ void scheduleTask();
+
+ void cancelTask();
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java
new file mode 100644
index 0000000000..7a9f28c013
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.io.Closeable;
+import java.util.LinkedList;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.grpc.discovery.DiscoveryServiceGrpc;
+import org.apache.hugegraph.pd.grpc.discovery.NodeInfo;
+import org.apache.hugegraph.pd.grpc.discovery.NodeInfos;
+import org.apache.hugegraph.pd.grpc.discovery.Query;
+import org.apache.hugegraph.pd.grpc.discovery.RegisterInfo;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class DiscoveryClient implements Closeable, Discoverable {
+
+ private final Timer timer = new Timer("serverHeartbeat", true);
+ private final AtomicBoolean requireResetStub = new AtomicBoolean(false);
+ protected int period; //心跳周期
+ LinkedList pdAddresses = new LinkedList<>();
+ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private volatile int currentIndex; // 当前在用pd地址位置
+ private int maxTime = 6;
+ private ManagedChannel channel = null;
+ private DiscoveryServiceGrpc.DiscoveryServiceBlockingStub registerStub;
+ private DiscoveryServiceGrpc.DiscoveryServiceBlockingStub blockingStub;
+
+ public DiscoveryClient(String centerAddress, int delay) {
+ String[] addresses = centerAddress.split(",");
+ for (int i = 0; i < addresses.length; i++) {
+ String singleAddress = addresses[i];
+ if (singleAddress == null || singleAddress.length() <= 0) {
+ continue;
+ }
+ pdAddresses.add(addresses[i]);
+ }
+ this.period = delay;
+ if (maxTime < addresses.length) {
+ maxTime = addresses.length;
+ }
+ }
+
+ private R tryWithTimes(Function function, V v) {
+ R r;
+ Exception ex = null;
+ for (int i = 0; i < maxTime; i++) {
+ try {
+ r = function.apply(v);
+ return r;
+ } catch (Exception e) {
+ requireResetStub.set(true);
+ resetStub();
+ ex = e;
+ }
+ }
+ if (ex != null) {
+ log.error("Try discovery method with error: {}", ex.getMessage());
+ }
+ return null;
+ }
+
+ /***
+ * 按照pd列表重置stub
+ */
+ private void resetStub() {
+ String errLog = null;
+ for (int i = currentIndex + 1; i <= pdAddresses.size() + currentIndex; i++) {
+ currentIndex = i % pdAddresses.size();
+ String singleAddress = pdAddresses.get(currentIndex);
+ try {
+ if (requireResetStub.get()) {
+ resetChannel(singleAddress);
+ }
+ errLog = null;
+ break;
+ } catch (Exception e) {
+ requireResetStub.set(true);
+ if (errLog == null) {
+ errLog = e.getMessage();
+ }
+ continue;
+ }
+ }
+ if (errLog != null) {
+ log.error(errLog);
+ }
+ }
+
+ /***
+ * 按照某个pd的地址重置channel和stub
+ * @param singleAddress
+ * @throws PDException
+ */
+ private void resetChannel(String singleAddress) throws PDException {
+
+ readWriteLock.writeLock().lock();
+ try {
+ if (requireResetStub.get()) {
+ while (channel != null && !channel.shutdownNow().awaitTermination(
+ 100, TimeUnit.MILLISECONDS)) {
+ continue;
+ }
+ channel = ManagedChannelBuilder.forTarget(
+ singleAddress).usePlaintext().build();
+ this.registerStub = DiscoveryServiceGrpc.newBlockingStub(
+ channel);
+ this.blockingStub = DiscoveryServiceGrpc.newBlockingStub(
+ channel);
+ requireResetStub.set(false);
+ }
+ } catch (Exception e) {
+ throw new PDException(-1, String.format(
+ "Reset channel with error : %s.", e.getMessage()));
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ /***
+ * 获取注册节点信息
+ * @param query
+ * @return
+ */
+ @Override
+ public NodeInfos getNodeInfos(Query query) {
+ return tryWithTimes((q) -> {
+ this.readWriteLock.readLock().lock();
+ NodeInfos nodes;
+ try {
+ nodes = this.blockingStub.getNodes(q);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ this.readWriteLock.readLock().unlock();
+ }
+ return nodes;
+ }, query);
+ }
+
+ /***
+ * 启动心跳任务
+ */
+ @Override
+ public void scheduleTask() {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ NodeInfo nodeInfo = getRegisterNode();
+ tryWithTimes((t) -> {
+ RegisterInfo register;
+ readWriteLock.readLock().lock();
+ try {
+ register = registerStub.register(t);
+ log.debug("Discovery Client work done.");
+ Consumer consumer = getRegisterConsumer();
+ if (consumer != null) {
+ consumer.accept(register);
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ return register;
+ }, nodeInfo);
+ }
+ }, 0, period);
+ }
+
+ abstract NodeInfo getRegisterNode();
+
+ abstract Consumer getRegisterConsumer();
+
+ @Override
+ public void cancelTask() {
+ this.timer.cancel();
+ }
+
+ @Override
+ public void close() {
+ this.timer.cancel();
+ readWriteLock.writeLock().lock();
+ try {
+ while (channel != null && !channel.shutdownNow().awaitTermination(
+ 100, TimeUnit.MILLISECONDS)) {
+ continue;
+ }
+ } catch (Exception e) {
+ log.info("Close channel with error : {}.", e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java
new file mode 100644
index 0000000000..0ded328c17
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.hugegraph.pd.grpc.discovery.NodeInfo;
+import org.apache.hugegraph.pd.grpc.discovery.RegisterType;
+
+public class DiscoveryClientImpl extends DiscoveryClient {
+
+ private final String id;
+ private final RegisterType type; // 心跳类型,备用
+ private final String version;
+ private final String appName;
+ private final int times; // 心跳过期次数,备用
+ private final String address;
+ private final Map labels;
+ private final Consumer registerConsumer;
+
+ private DiscoveryClientImpl(Builder builder) {
+ super(builder.centerAddress, builder.delay);
+ period = builder.delay;
+ id = builder.id;
+ type = builder.type;
+ version = builder.version;
+ appName = builder.appName;
+ times = builder.times;
+ address = builder.address;
+ labels = builder.labels;
+ registerConsumer = builder.registerConsumer;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ @Override
+ NodeInfo getRegisterNode() {
+ return NodeInfo.newBuilder().setAddress(this.address)
+ .setVersion(this.version)
+ .setAppName(this.appName).setInterval(this.period)
+ .setId(this.id).putAllLabels(labels).build();
+ }
+
+ @Override
+ Consumer getRegisterConsumer() {
+ return registerConsumer;
+ }
+
+ public static final class Builder {
+
+ private int delay;
+ private String centerAddress;
+ private String id;
+ private RegisterType type;
+ private String address;
+ private Map labels;
+ private String version;
+ private String appName;
+ private int times;
+ private Consumer registerConsumer;
+
+ private Builder() {
+ }
+
+ public Builder setDelay(int val) {
+ delay = val;
+ return this;
+ }
+
+ public Builder setCenterAddress(String val) {
+ centerAddress = val;
+ return this;
+ }
+
+ public Builder setId(String val) {
+ id = val;
+ return this;
+ }
+
+ public Builder setType(RegisterType val) {
+ type = val;
+ return this;
+ }
+
+ public Builder setAddress(String val) {
+ address = val;
+ return this;
+ }
+
+ public Builder setLabels(Map val) {
+ labels = val;
+ return this;
+ }
+
+ public Builder setVersion(String val) {
+ version = val;
+ return this;
+ }
+
+ public Builder setAppName(String val) {
+ appName = val;
+ return this;
+ }
+
+ public Builder setTimes(int val) {
+ times = val;
+ return this;
+ }
+
+ public Builder setRegisterConsumer(Consumer registerConsumer) {
+ this.registerConsumer = registerConsumer;
+ return this;
+ }
+
+ public DiscoveryClientImpl build() {
+ return new DiscoveryClientImpl(this);
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/KvClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/KvClient.java
new file mode 100644
index 0000000000..7e0795b2e4
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/KvClient.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.grpc.kv.K;
+import org.apache.hugegraph.pd.grpc.kv.KResponse;
+import org.apache.hugegraph.pd.grpc.kv.Kv;
+import org.apache.hugegraph.pd.grpc.kv.KvResponse;
+import org.apache.hugegraph.pd.grpc.kv.KvServiceGrpc;
+import org.apache.hugegraph.pd.grpc.kv.LockRequest;
+import org.apache.hugegraph.pd.grpc.kv.LockResponse;
+import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse;
+import org.apache.hugegraph.pd.grpc.kv.TTLRequest;
+import org.apache.hugegraph.pd.grpc.kv.TTLResponse;
+import org.apache.hugegraph.pd.grpc.kv.WatchEvent;
+import org.apache.hugegraph.pd.grpc.kv.WatchKv;
+import org.apache.hugegraph.pd.grpc.kv.WatchRequest;
+import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
+import org.apache.hugegraph.pd.grpc.kv.WatchType;
+
+import io.grpc.stub.AbstractBlockingStub;
+import io.grpc.stub.AbstractStub;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class KvClient extends AbstractClient implements Closeable {
+
+ private final AtomicLong clientId = new AtomicLong(0);
+ private final Semaphore semaphore = new Semaphore(1);
+ private final ConcurrentHashMap observers = new ConcurrentHashMap<>();
+
+ public KvClient(PDConfig pdConfig) {
+ super(pdConfig);
+ }
+
+ @Override
+ protected AbstractStub createStub() {
+ return KvServiceGrpc.newStub(channel);
+ }
+
+ @Override
+ protected AbstractBlockingStub createBlockingStub() {
+ return KvServiceGrpc.newBlockingStub(channel);
+ }
+
+ public KvResponse put(String key, String value) throws PDException {
+ Kv kv = Kv.newBuilder().setKey(key).setValue(value).build();
+ KvResponse response = blockingUnaryCall(KvServiceGrpc.getPutMethod(), kv);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public KResponse get(String key) throws PDException {
+ K k = K.newBuilder().setKey(key).build();
+ KResponse response = blockingUnaryCall(KvServiceGrpc.getGetMethod(), k);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public KvResponse delete(String key) throws PDException {
+ K k = K.newBuilder().setKey(key).build();
+ KvResponse response = blockingUnaryCall(KvServiceGrpc.getDeleteMethod(), k);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public KvResponse deletePrefix(String prefix) throws PDException {
+ K k = K.newBuilder().setKey(prefix).build();
+ KvResponse response = blockingUnaryCall(KvServiceGrpc.getDeletePrefixMethod(), k);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public ScanPrefixResponse scanPrefix(String prefix) throws PDException {
+ K k = K.newBuilder().setKey(prefix).build();
+ ScanPrefixResponse response = blockingUnaryCall(KvServiceGrpc.getScanPrefixMethod(), k);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public TTLResponse keepTTLAlive(String key) throws PDException {
+ TTLRequest request = TTLRequest.newBuilder().setKey(key).build();
+ TTLResponse response = blockingUnaryCall(KvServiceGrpc.getKeepTTLAliveMethod(), request);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public TTLResponse putTTL(String key, String value, long ttl) throws PDException {
+ TTLRequest request =
+ TTLRequest.newBuilder().setKey(key).setValue(value).setTtl(ttl).build();
+ TTLResponse response = blockingUnaryCall(KvServiceGrpc.getPutTTLMethod(), request);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ private void onEvent(WatchResponse value, Consumer consumer) {
+ log.info("receive message for {},event Count:{}", value, value.getEventsCount());
+ clientId.compareAndSet(0L, value.getClientId());
+ if (value.getEventsCount() != 0) {
+ consumer.accept((T) value);
+ }
+ }
+
+ private StreamObserver getObserver(String key, Consumer consumer,
+ BiConsumer listenWrapper,
+ long client) {
+ StreamObserver observer;
+ if ((observer = observers.get(client)) == null) {
+ synchronized (this) {
+ if ((observer = observers.get(client)) == null) {
+ observer = getObserver(key, consumer, listenWrapper);
+ observers.put(client, observer);
+ }
+ }
+ }
+ return observer;
+ }
+
+ private StreamObserver getObserver(String key, Consumer consumer,
+ BiConsumer listenWrapper) {
+ return new StreamObserver() {
+ @Override
+ public void onNext(WatchResponse value) {
+ switch (value.getState()) {
+ case Starting:
+ boolean b = clientId.compareAndSet(0, value.getClientId());
+ if (b) {
+ observers.put(value.getClientId(), this);
+ log.info("set watch client id to :{}", value.getClientId());
+ }
+ semaphore.release();
+ break;
+ case Started:
+ onEvent(value, consumer);
+ break;
+ case Leader_Changed:
+ listenWrapper.accept(key, consumer);
+ break;
+ case Alive:
+ // only for check client is alive, do nothing
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ listenWrapper.accept(key, consumer);
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ };
+ }
+
+ public void listen(String key, Consumer consumer) throws PDException {
+ long value = clientId.get();
+ StreamObserver observer = getObserver(key, consumer, listenWrapper, value);
+ acquire();
+ WatchRequest k = WatchRequest.newBuilder().setClientId(value).setKey(key).build();
+ streamingCall(KvServiceGrpc.getWatchMethod(), k, observer, 1);
+ }
+
+ public void listenPrefix(String prefix, Consumer consumer) throws PDException {
+ long value = clientId.get();
+ StreamObserver observer =
+ getObserver(prefix, consumer, prefixListenWrapper, value);
+ acquire();
+ WatchRequest k =
+ WatchRequest.newBuilder().setClientId(clientId.get()).setKey(prefix).build();
+ streamingCall(KvServiceGrpc.getWatchPrefixMethod(), k, observer, 1);
+ }
+
+ private void acquire() {
+ if (clientId.get() == 0L) {
+ try {
+ semaphore.acquire();
+ if (clientId.get() != 0L) {
+ semaphore.release();
+ }
+ } catch (InterruptedException e) {
+ log.error("get semaphore with error:", e);
+ }
+ }
+ }
+
+ public List getWatchList(T response) {
+ List values = new LinkedList<>();
+ List eventsList = response.getEventsList();
+ for (WatchEvent event : eventsList) {
+ if (event.getType() != WatchType.Put) {
+ return null;
+ }
+ String value = event.getCurrent().getValue();
+ values.add(value);
+ }
+ return values;
+ }
+
+ public Map getWatchMap(T response) {
+ Map values = new HashMap<>();
+ List eventsList = response.getEventsList();
+ for (WatchEvent event : eventsList) {
+ if (event.getType() != WatchType.Put) {
+ return null;
+ }
+ WatchKv current = event.getCurrent();
+ String key = current.getKey();
+ String value = current.getValue();
+ values.put(key, value);
+ }
+ return values;
+ }
+
+ public LockResponse lock(String key, long ttl) throws PDException {
+ acquire();
+ LockResponse response;
+ try {
+ LockRequest k =
+ LockRequest.newBuilder().setKey(key).setClientId(clientId.get()).setTtl(ttl)
+ .build();
+ response = blockingUnaryCall(KvServiceGrpc.getLockMethod(), k);
+ handleErrors(response.getHeader());
+ if (clientId.compareAndSet(0L, response.getClientId())) {
+ semaphore.release();
+ }
+ } catch (Exception e) {
+ if (clientId.get() == 0L) {
+ semaphore.release();
+ }
+ throw e;
+ }
+ return response;
+ }
+
+ public LockResponse lockWithoutReentrant(String key, long ttl) throws PDException {
+ acquire();
+ LockResponse response;
+ try {
+ LockRequest k =
+ LockRequest.newBuilder().setKey(key).setClientId(clientId.get()).setTtl(ttl)
+ .build();
+ response = blockingUnaryCall(KvServiceGrpc.getLockWithoutReentrantMethod(), k);
+ handleErrors(response.getHeader());
+ if (clientId.compareAndSet(0L, response.getClientId())) {
+ semaphore.release();
+ }
+ } catch (Exception e) {
+ if (clientId.get() == 0L) {
+ semaphore.release();
+ }
+ throw e;
+ }
+ return response;
+ }
+
+ public LockResponse isLocked(String key) throws PDException {
+ LockRequest k = LockRequest.newBuilder().setKey(key).setClientId(clientId.get()).build();
+ LockResponse response = blockingUnaryCall(KvServiceGrpc.getIsLockedMethod(), k);
+ handleErrors(response.getHeader());
+ return response;
+ }
+
+ public LockResponse unlock(String key) throws PDException {
+ assert clientId.get() != 0;
+ LockRequest k = LockRequest.newBuilder().setKey(key).setClientId(clientId.get()).build();
+ LockResponse response = blockingUnaryCall(KvServiceGrpc.getUnlockMethod(), k);
+ handleErrors(response.getHeader());
+ clientId.compareAndSet(0L, response.getClientId());
+ assert clientId.get() == response.getClientId();
+ return response;
+ }
+
+ public LockResponse keepAlive(String key) throws PDException {
+ assert clientId.get() != 0;
+ LockRequest k = LockRequest.newBuilder().setKey(key).setClientId(clientId.get()).build();
+ LockResponse response = blockingUnaryCall(KvServiceGrpc.getKeepAliveMethod(), k);
+ handleErrors(response.getHeader());
+ clientId.compareAndSet(0L, response.getClientId());
+ assert clientId.get() == response.getClientId();
+ return response;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+
+ BiConsumer listenWrapper = (key, consumer) -> {
+ try {
+ listen(key, consumer);
+ } catch (PDException e) {
+ try {
+ log.warn("start listen with warning:", e);
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ }
+ }
+ };
+
+ BiConsumer prefixListenWrapper = (key, consumer) -> {
+ try {
+ listenPrefix(key, consumer);
+ } catch (PDException e) {
+ try {
+ log.warn("start listenPrefix with warning:", e);
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ }
+ }
+ };
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/LicenseClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/LicenseClient.java
new file mode 100644
index 0000000000..a96185e5af
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/LicenseClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.grpc.PDGrpc;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.stub.AbstractBlockingStub;
+import io.grpc.stub.AbstractStub;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class LicenseClient extends AbstractClient {
+
+ public LicenseClient(PDConfig config) {
+ super(config);
+ }
+
+ @Override
+ protected AbstractStub createStub() {
+ return PDGrpc.newStub(channel);
+ }
+
+ @Override
+ protected AbstractBlockingStub createBlockingStub() {
+ return PDGrpc.newBlockingStub(channel);
+ }
+
+ public Pdpb.PutLicenseResponse putLicense(byte[] content) {
+ Pdpb.PutLicenseRequest request = Pdpb.PutLicenseRequest.newBuilder()
+ .setContent(
+ ByteString.copyFrom(content))
+ .build();
+ try {
+ KVPair pair = concurrentBlockingUnaryCall(
+ PDGrpc.getPutLicenseMethod(), request,
+ (rs) -> rs.getHeader().getError().getType().equals(Pdpb.ErrorType.OK));
+ if (pair.getKey()) {
+ Pdpb.PutLicenseResponse.Builder builder = Pdpb.PutLicenseResponse.newBuilder();
+ builder.setHeader(okHeader);
+ return builder.build();
+ } else {
+ return pair.getValue();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.debug("put license with error:{} ", e);
+ Pdpb.ResponseHeader rh =
+ newErrorHeader(Pdpb.ErrorType.LICENSE_ERROR_VALUE, e.getMessage());
+ return Pdpb.PutLicenseResponse.newBuilder().setHeader(rh).build();
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java
new file mode 100644
index 0000000000..6c3eae4251
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java
@@ -0,0 +1,1347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import static org.apache.hugegraph.pd.watch.NodeEvent.EventType.NODE_PD_LEADER_CHANGE;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.common.PartitionUtils;
+import org.apache.hugegraph.pd.grpc.MetaTask;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.Metapb.ShardGroup;
+import org.apache.hugegraph.pd.grpc.PDGrpc;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.grpc.Pdpb.CachePartitionResponse;
+import org.apache.hugegraph.pd.grpc.Pdpb.CacheResponse;
+import org.apache.hugegraph.pd.grpc.Pdpb.GetGraphRequest;
+import org.apache.hugegraph.pd.grpc.Pdpb.GetPartitionByCodeRequest;
+import org.apache.hugegraph.pd.grpc.Pdpb.GetPartitionRequest;
+import org.apache.hugegraph.pd.grpc.Pdpb.GetPartitionResponse;
+import org.apache.hugegraph.pd.grpc.watch.WatchResponse;
+import org.apache.hugegraph.pd.watch.NodeEvent;
+import org.apache.hugegraph.pd.watch.PartitionEvent;
+
+import com.google.protobuf.ByteString;
+
+import io.grpc.ManagedChannel;
+import io.grpc.MethodDescriptor;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.AbstractBlockingStub;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * PD客户端实现类
+ */
+@Slf4j
+public class PDClient {
+
+ private final PDConfig config;
+ private final Pdpb.RequestHeader header;
+ private final ClientCache cache;
+ private final StubProxy stubProxy;
+ private final List eventListeners;
+ private PDWatch.Watcher partitionWatcher;
+ private PDWatch.Watcher storeWatcher;
+ private PDWatch.Watcher graphWatcher;
+ private PDWatch.Watcher shardGroupWatcher;
+ private PDWatch pdWatch;
+
+ private PDClient(PDConfig config) {
+ this.config = config;
+ this.header = Pdpb.RequestHeader.getDefaultInstance();
+ this.stubProxy = new StubProxy(config.getServerHost().split(","));
+ this.eventListeners = new CopyOnWriteArrayList<>();
+ this.cache = new ClientCache(this);
+ }
+
+ /**
+ * 创建PDClient对象,并初始化stub
+ *
+ * @param config
+ * @return
+ */
+ public static PDClient create(PDConfig config) {
+ return new PDClient(config);
+ }
+
+ private synchronized void newBlockingStub() throws PDException {
+ if (stubProxy.get() != null) {
+ return;
+ }
+
+ String host = newLeaderStub();
+ if (host.isEmpty()) {
+ throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE,
+ "PD unreachable, pd.peers=" + config.getServerHost());
+ }
+
+ log.info("PDClient enable cache, init PDWatch object");
+ connectPdWatch(host);
+ }
+
+ public void connectPdWatch(String leader) {
+
+ if (pdWatch != null && Objects.equals(pdWatch.getCurrentHost(), leader) &&
+ pdWatch.checkChannel()) {
+ return;
+ }
+
+ log.info("PDWatch client connect host:{}", leader);
+ pdWatch = new PDWatchImpl(leader);
+
+ partitionWatcher = pdWatch.watchPartition(new PDWatch.Listener<>() {
+ @Override
+ public void onNext(PartitionEvent response) {
+ // log.info("PDClient receive partition event {}-{} {}",
+ // response.getGraph(), response.getPartitionId(), response.getChangeType());
+ invalidPartitionCache(response.getGraph(), response.getPartitionId());
+
+ if (response.getChangeType() == PartitionEvent.ChangeType.DEL) {
+ cache.removeAll(response.getGraph());
+ }
+
+ eventListeners.forEach(listener -> {
+ listener.onPartitionChanged(response);
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.error("watchPartition exception {}", throwable.getMessage());
+ closeStub(false);
+ }
+ });
+
+ storeWatcher = pdWatch.watchNode(new PDWatch.Listener<>() {
+ @Override
+ public void onNext(NodeEvent response) {
+ log.info("PDClient receive store event {} {}",
+ response.getEventType(), Long.toHexString(response.getNodeId()));
+
+ if (response.getEventType() == NODE_PD_LEADER_CHANGE) {
+ // pd raft change
+ var leaderIp = response.getGraph();
+ log.info("watchNode: pd leader changed to {}, current watch:{}",
+ leaderIp, pdWatch.getCurrentHost());
+ closeStub(!Objects.equals(pdWatch.getCurrentHost(), leaderIp));
+ connectPdWatch(leaderIp);
+ }
+
+ invalidStoreCache(response.getNodeId());
+ eventListeners.forEach(listener -> {
+ listener.onStoreChanged(response);
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.error("watchNode exception {}", throwable.getMessage());
+ closeStub(false);
+ }
+
+ });
+
+ graphWatcher = pdWatch.watchGraph(new PDWatch.Listener<>() {
+ @Override
+ public void onNext(WatchResponse response) {
+ eventListeners.forEach(listener -> {
+ listener.onGraphChanged(response);
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn("graphWatcher exception {}", throwable.getMessage());
+ }
+ });
+
+ shardGroupWatcher = pdWatch.watchShardGroup(new PDWatch.Listener<>() {
+ @Override
+ public void onNext(WatchResponse response) {
+ var shardResponse = response.getShardGroupResponse();
+ // log.info("PDClient receive shard group event: raft {}-{}", shardResponse
+ // .getShardGroupId(),
+ // shardResponse.getType());
+ if (config.isEnableCache()) {
+ switch (shardResponse.getType()) {
+ case WATCH_CHANGE_TYPE_DEL:
+ cache.deleteShardGroup(shardResponse.getShardGroupId());
+ break;
+ case WATCH_CHANGE_TYPE_ALTER:
+ cache.updateShardGroup(
+ response.getShardGroupResponse().getShardGroup());
+ break;
+ default:
+ break;
+ }
+ }
+ eventListeners.forEach(listener -> listener.onShardGroupChanged(response));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn("shardGroupWatcher exception {}", throwable.getMessage());
+ }
+ });
+
+ }
+
+ private synchronized void closeStub(boolean closeWatcher) {
+ // TODO ManagedChannel 没有正常关闭
+ stubProxy.set(null);
+ cache.reset();
+
+ if (closeWatcher) {
+ if (partitionWatcher != null) {
+ partitionWatcher.close();
+ partitionWatcher = null;
+ }
+ if (storeWatcher != null) {
+ storeWatcher.close();
+ storeWatcher = null;
+ }
+ if (graphWatcher != null) {
+ graphWatcher.close();
+ graphWatcher = null;
+ }
+
+ if (shardGroupWatcher != null) {
+ shardGroupWatcher.close();
+ shardGroupWatcher = null;
+ }
+
+ pdWatch = null;
+ }
+ }
+
+ private PDGrpc.PDBlockingStub getStub() throws PDException {
+ if (stubProxy.get() == null) {
+ newBlockingStub();
+ }
+ return stubProxy.get().withDeadlineAfter(config.getGrpcTimeOut(), TimeUnit.MILLISECONDS);
+ }
+
+ private PDGrpc.PDBlockingStub newStub() throws PDException {
+ if (stubProxy.get() == null) {
+ newBlockingStub();
+ }
+ return PDGrpc.newBlockingStub(stubProxy.get().getChannel())
+ .withDeadlineAfter(config.getGrpcTimeOut(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ private String newLeaderStub() {
+ String leaderHost = "";
+ for (int i = 0; i < stubProxy.getHostCount(); i++) {
+ String host = stubProxy.nextHost();
+ ManagedChannel channel = Channels.getChannel(host);
+
+ PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(config.getGrpcTimeOut(),
+ TimeUnit.MILLISECONDS);
+ try {
+ var leaderIp = getLeaderIp(stub);
+ if (!leaderIp.equalsIgnoreCase(host)) {
+ leaderHost = leaderIp;
+ stubProxy.set(PDGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(config.getGrpcTimeOut(),
+ TimeUnit.MILLISECONDS));
+ } else {
+ stubProxy.set(stub);
+ leaderHost = host;
+ }
+ stubProxy.setLeader(leaderIp);
+
+ log.info("PDClient connect to host = {} success", leaderHost);
+ break;
+ } catch (Exception e) {
+ log.error("PDClient connect to {} exception {}, {}", host, e.getMessage(),
+ e.getCause() != null ? e.getCause().getMessage() : "");
+ }
+ }
+ return leaderHost;
+ }
+
+ public String getLeaderIp() {
+
+ return getLeaderIp(stubProxy.get());
+ }
+
+ private String getLeaderIp(PDGrpc.PDBlockingStub stub) {
+ if (stub == null) {
+ try {
+ getStub();
+ return stubProxy.getLeader();
+ } catch (PDException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ Metapb.Member leader = stub.getMembers(request).getLeader();
+ return leader.getGrpcUrl();
+ }
+
+ /**
+ * Store注册,返回storeID,初次注册会返回新ID
+ *
+ * @param store
+ * @return
+ */
+ public long registerStore(Metapb.Store store) throws PDException {
+ Pdpb.RegisterStoreRequest request = Pdpb.RegisterStoreRequest.newBuilder()
+ .setHeader(header)
+ .setStore(store).build();
+
+ Pdpb.RegisterStoreResponse response =
+ blockingUnaryCall(PDGrpc.getRegisterStoreMethod(), request);
+ handleResponseError(response.getHeader());
+ return response.getStoreId();
+ }
+
+ /**
+ * 根据storeId返回Store对象
+ *
+ * @param storeId
+ * @return
+ * @throws PDException
+ */
+ public Metapb.Store getStore(long storeId) throws PDException {
+ Metapb.Store store = cache.getStoreById(storeId);
+ if (store == null) {
+ Pdpb.GetStoreRequest request = Pdpb.GetStoreRequest.newBuilder()
+ .setHeader(header)
+ .setStoreId(storeId).build();
+ Pdpb.GetStoreResponse response = getStub().getStore(request);
+ handleResponseError(response.getHeader());
+ store = response.getStore();
+ if (config.isEnableCache()) {
+ cache.addStore(storeId, store);
+ }
+ }
+ return store;
+ }
+
+ /**
+ * 更新Store信息,包括上下线等
+ *
+ * @param store
+ * @return
+ */
+ public Metapb.Store updateStore(Metapb.Store store) throws PDException {
+ Pdpb.SetStoreRequest request = Pdpb.SetStoreRequest.newBuilder()
+ .setHeader(header)
+ .setStore(store).build();
+
+ Pdpb.SetStoreResponse response = getStub().setStore(request);
+ handleResponseError(response.getHeader());
+ store = response.getStore();
+ if (config.isEnableCache()) {
+ cache.addStore(store.getId(), store);
+ }
+ return store;
+ }
+
+ /**
+ * 返回活跃的Store
+ *
+ * @param graphName
+ * @return
+ */
+ public List getActiveStores(String graphName) throws PDException {
+ List stores = new ArrayList<>();
+ KVPair ptShard = this.getPartitionByCode(graphName, 0);
+ while (ptShard != null) {
+ stores.add(this.getStore(ptShard.getValue().getStoreId()));
+ if (ptShard.getKey().getEndKey() < PartitionUtils.MAX_VALUE) {
+ ptShard = this.getPartitionByCode(graphName, ptShard.getKey().getEndKey());
+ } else {
+ ptShard = null;
+ }
+ }
+ return stores;
+ }
+
+ public List getActiveStores() throws PDException {
+ Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName("")
+ .setExcludeOfflineStores(true)
+ .build();
+ Pdpb.GetAllStoresResponse response = getStub().getAllStores(request);
+ handleResponseError(response.getHeader());
+ return response.getStoresList();
+
+ }
+
+ /**
+ * 返回活跃的Store
+ *
+ * @param graphName
+ * @return
+ */
+ public List getAllStores(String graphName) throws PDException {
+ Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName(graphName)
+ .setExcludeOfflineStores(false)
+ .build();
+ Pdpb.GetAllStoresResponse response = getStub().getAllStores(request);
+ handleResponseError(response.getHeader());
+ return response.getStoresList();
+
+ }
+
+ /**
+ * Store心跳,定期调用,保持在线状态
+ *
+ * @param stats
+ * @throws PDException
+ */
+ public Metapb.ClusterStats storeHeartbeat(Metapb.StoreStats stats) throws PDException {
+ Pdpb.StoreHeartbeatRequest request = Pdpb.StoreHeartbeatRequest.newBuilder()
+ .setHeader(header)
+ .setStats(stats).build();
+ Pdpb.StoreHeartbeatResponse response = getStub().storeHeartbeat(request);
+ handleResponseError(response.getHeader());
+ return response.getClusterStats();
+ }
+
+ private KVPair getKvPair(String graphName, byte[] key,
+ KVPair partShard) throws
+ PDException {
+ if (partShard == null) {
+ GetPartitionRequest request = GetPartitionRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName(graphName)
+ .setKey(ByteString.copyFrom(key))
+ .build();
+ GetPartitionResponse response =
+ blockingUnaryCall(PDGrpc.getGetPartitionMethod(), request);
+ handleResponseError(response.getHeader());
+ partShard = new KVPair<>(response.getPartition(), response.getLeader());
+ cache.update(graphName, partShard.getKey().getId(), partShard.getKey());
+ }
+ return partShard;
+ }
+
+ /**
+ * 查询Key所属分区信息
+ *
+ * @param graphName
+ * @param key
+ * @return
+ * @throws PDException
+ */
+ public KVPair getPartition(String graphName, byte[] key) throws
+ PDException {
+ // 先查cache,cache没有命中,在调用PD
+ KVPair partShard = cache.getPartitionByKey(graphName, key);
+ partShard = getKvPair(graphName, key, partShard);
+ return partShard;
+ }
+
+ public KVPair getPartition(String graphName, byte[] key,
+ int code) throws
+ PDException {
+ KVPair partShard =
+ cache.getPartitionByCode(graphName, code);
+ partShard = getKvPair(graphName, key, partShard);
+ return partShard;
+ }
+
+ /**
+ * 根据hashcode查询所属分区信息
+ *
+ * @param graphName
+ * @param hashCode
+ * @return
+ * @throws PDException
+ */
+ public KVPair getPartitionByCode(String graphName,
+ long hashCode)
+ throws PDException {
+ // 先查cache,cache没有命中,在调用PD
+ KVPair partShard =
+ cache.getPartitionByCode(graphName, hashCode);
+ if (partShard == null) {
+ GetPartitionByCodeRequest request = GetPartitionByCodeRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName(graphName)
+ .setCode(hashCode).build();
+ GetPartitionResponse response =
+ blockingUnaryCall(PDGrpc.getGetPartitionByCodeMethod(), request);
+ handleResponseError(response.getHeader());
+ partShard = new KVPair<>(response.getPartition(), response.getLeader());
+ cache.update(graphName, partShard.getKey().getId(), partShard.getKey());
+ cache.updateShardGroup(getShardGroup(partShard.getKey().getId()));
+ }
+
+ if (partShard.getValue() == null) {
+ ShardGroup shardGroup = getShardGroup(partShard.getKey().getId());
+ if (shardGroup != null) {
+ for (var shard : shardGroup.getShardsList()) {
+ if (shard.getRole() == Metapb.ShardRole.Leader) {
+ partShard.setValue(shard);
+ }
+ }
+ } else {
+ log.error("getPartitionByCode: get shard group failed, {}",
+ partShard.getKey().getId());
+ }
+ }
+ return partShard;
+ }
+
+ /**
+ * 获取Key的哈希值
+ */
+ public int keyToCode(String graphName, byte[] key) {
+ return PartitionUtils.calcHashcode(key);
+ }
+
+ /**
+ * 根据分区id返回分区信息, RPC请求
+ *
+ * @param graphName
+ * @param partId
+ * @return
+ * @throws PDException
+ */
+ public KVPair getPartitionById(String graphName,
+ int partId) throws PDException {
+ KVPair partShard =
+ cache.getPartitionById(graphName, partId);
+ if (partShard == null) {
+ Pdpb.GetPartitionByIDRequest request = Pdpb.GetPartitionByIDRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName(
+ graphName)
+ .setPartitionId(
+ partId)
+ .build();
+ GetPartitionResponse response =
+ blockingUnaryCall(PDGrpc.getGetPartitionByIDMethod(), request);
+ handleResponseError(response.getHeader());
+ partShard = new KVPair<>(response.getPartition(), response.getLeader());
+ if (config.isEnableCache()) {
+ cache.update(graphName, partShard.getKey().getId(), partShard.getKey());
+ cache.updateShardGroup(getShardGroup(partShard.getKey().getId()));
+ }
+ }
+ if (partShard.getValue() == null) {
+ var shardGroup = getShardGroup(partShard.getKey().getId());
+ if (shardGroup != null) {
+ for (var shard : shardGroup.getShardsList()) {
+ if (shard.getRole() == Metapb.ShardRole.Leader) {
+ partShard.setValue(shard);
+ }
+ }
+ } else {
+ log.error("getPartitionById: get shard group failed, {}",
+ partShard.getKey().getId());
+ }
+ }
+ return partShard;
+ }
+
+ public ShardGroup getShardGroup(int partId) throws PDException {
+ ShardGroup group = cache.getShardGroup(partId);
+ if (group == null) {
+ Pdpb.GetShardGroupRequest request = Pdpb.GetShardGroupRequest.newBuilder()
+ .setHeader(header)
+ .setGroupId(partId)
+ .build();
+ Pdpb.GetShardGroupResponse response =
+ blockingUnaryCall(PDGrpc.getGetShardGroupMethod(), request);
+ handleResponseError(response.getHeader());
+ group = response.getShardGroup();
+ if (config.isEnableCache()) {
+ cache.updateShardGroup(group);
+ }
+ }
+ return group;
+ }
+
+ public void updateShardGroup(ShardGroup shardGroup) throws PDException {
+ Pdpb.UpdateShardGroupRequest request = Pdpb.UpdateShardGroupRequest.newBuilder()
+ .setHeader(header)
+ .setShardGroup(
+ shardGroup)
+ .build();
+ Pdpb.UpdateShardGroupResponse response =
+ blockingUnaryCall(PDGrpc.getUpdateShardGroupMethod(), request);
+ handleResponseError(response.getHeader());
+
+ if (config.isEnableCache()) {
+ cache.updateShardGroup(shardGroup);
+ }
+ }
+
+ /**
+ * 返回startKey和endKey跨越的所有分区信息
+ *
+ * @param graphName
+ * @param startKey
+ * @param endKey
+ * @return
+ * @throws PDException
+ */
+ public List> scanPartitions(String graphName,
+ byte[] startKey,
+ byte[] endKey) throws
+ PDException {
+ List> partitions = new ArrayList<>();
+ KVPair startPartShard = getPartition(graphName, startKey);
+ KVPair endPartShard = getPartition(graphName, endKey);
+ if (startPartShard == null || endPartShard == null) {
+ return null;
+ }
+
+ partitions.add(startPartShard);
+ while (startPartShard.getKey().getEndKey() < endPartShard.getKey().getEndKey()
+ && startPartShard.getKey().getEndKey() <
+ PartitionUtils.MAX_VALUE /*排除最后一个分区*/) {
+ startPartShard = getPartitionByCode(graphName, startPartShard.getKey().getEndKey());
+ partitions.add(startPartShard);
+ }
+ return partitions;
+ }
+
+ /**
+ * 根据条件查询分区信息
+ *
+ * @return
+ * @throws PDException
+ */
+ public List getPartitionsByStore(long storeId) throws PDException {
+
+ Metapb.PartitionQuery query = Metapb.PartitionQuery.newBuilder()
+ .setStoreId(storeId)
+ .build();
+ Pdpb.QueryPartitionsRequest request = Pdpb.QueryPartitionsRequest.newBuilder()
+ .setQuery(query).build();
+ Pdpb.QueryPartitionsResponse response =
+ blockingUnaryCall(PDGrpc.getQueryPartitionsMethod(), request);
+
+ handleResponseError(response.getHeader());
+ return response.getPartitionsList();
+ }
+
+ /**
+ * 查找指定store上的指定partitionId
+ *
+ * @return
+ * @throws PDException
+ */
+ public List queryPartitions(long storeId, int partitionId) throws
+ PDException {
+
+ Metapb.PartitionQuery query = Metapb.PartitionQuery.newBuilder()
+ .setStoreId(storeId)
+ .setPartitionId(partitionId)
+ .build();
+ Pdpb.QueryPartitionsRequest request = Pdpb.QueryPartitionsRequest.newBuilder()
+ .setQuery(query).build();
+ Pdpb.QueryPartitionsResponse response =
+ blockingUnaryCall(PDGrpc.getQueryPartitionsMethod(), request);
+
+ handleResponseError(response.getHeader());
+ return response.getPartitionsList();
+ }
+
+ public List getPartitions(long storeId, String graphName) throws PDException {
+
+ Metapb.PartitionQuery query = Metapb.PartitionQuery.newBuilder()
+ .setStoreId(storeId)
+ .setGraphName(graphName).build();
+ Pdpb.QueryPartitionsRequest request = Pdpb.QueryPartitionsRequest.newBuilder()
+ .setQuery(query).build();
+ Pdpb.QueryPartitionsResponse response =
+ blockingUnaryCall(PDGrpc.getQueryPartitionsMethod(), request);
+
+ handleResponseError(response.getHeader());
+ return response.getPartitionsList();
+
+ }
+
+ public Metapb.Graph setGraph(Metapb.Graph graph) throws PDException {
+ Pdpb.SetGraphRequest request = Pdpb.SetGraphRequest.newBuilder()
+ .setGraph(graph)
+ .build();
+ Pdpb.SetGraphResponse response =
+ blockingUnaryCall(PDGrpc.getSetGraphMethod(), request);
+
+ handleResponseError(response.getHeader());
+ return response.getGraph();
+ }
+
+ public Metapb.Graph getGraph(String graphName) throws PDException {
+ GetGraphRequest request = GetGraphRequest.newBuilder()
+ .setGraphName(graphName)
+ .build();
+ Pdpb.GetGraphResponse response =
+ blockingUnaryCall(PDGrpc.getGetGraphMethod(), request);
+
+ handleResponseError(response.getHeader());
+ return response.getGraph();
+ }
+
+ public Metapb.Graph getGraphWithOutException(String graphName) throws
+ PDException {
+ GetGraphRequest request = GetGraphRequest.newBuilder()
+ .setGraphName(
+ graphName)
+ .build();
+ Pdpb.GetGraphResponse response = blockingUnaryCall(
+ PDGrpc.getGetGraphMethod(), request);
+ return response.getGraph();
+ }
+
+ public Metapb.Graph delGraph(String graphName) throws PDException {
+ Pdpb.DelGraphRequest request = Pdpb.DelGraphRequest.newBuilder()
+ .setGraphName(graphName)
+ .build();
+ Pdpb.DelGraphResponse response =
+ blockingUnaryCall(PDGrpc.getDelGraphMethod(), request);
+
+ handleResponseError(response.getHeader());
+ return response.getGraph();
+ }
+
+ public List updatePartition(List partitions) throws
+ PDException {
+
+ Pdpb.UpdatePartitionRequest request = Pdpb.UpdatePartitionRequest.newBuilder()
+ .addAllPartition(
+ partitions)
+ .build();
+ Pdpb.UpdatePartitionResponse response =
+ blockingUnaryCall(PDGrpc.getUpdatePartitionMethod(), request);
+ handleResponseError(response.getHeader());
+ invalidPartitionCache();
+
+ return response.getPartitionList();
+ }
+
+ public Metapb.Partition delPartition(String graphName, int partitionId) throws PDException {
+
+ Pdpb.DelPartitionRequest request = Pdpb.DelPartitionRequest.newBuilder()
+ .setGraphName(graphName)
+ .setPartitionId(partitionId)
+ .build();
+ Pdpb.DelPartitionResponse response =
+ blockingUnaryCall(PDGrpc.getDelPartitionMethod(), request);
+
+ handleResponseError(response.getHeader());
+ invalidPartitionCache(graphName, partitionId);
+ return response.getPartition();
+ }
+
+ /**
+ * 删除分区缓存
+ */
+ public void invalidPartitionCache(String graphName, int partitionId) {
+ // 检查是否存在缓存
+ if (null != cache.getPartitionById(graphName, partitionId)) {
+ cache.removePartition(graphName, partitionId);
+ }
+ }
+
+ /**
+ * 删除分区缓存
+ */
+ public void invalidPartitionCache() {
+ // 检查是否存在缓存
+ cache.removePartitions();
+ }
+
+ /**
+ * 删除分区缓存
+ */
+ public void invalidStoreCache(long storeId) {
+ cache.removeStore(storeId);
+ }
+
+ /**
+ * Hugegraph server 调用,Leader发生改变,更新缓存
+ */
+ public void updatePartitionLeader(String graphName, int partId, long leaderStoreId) {
+ KVPair partShard = null;
+ try {
+ partShard = this.getPartitionById(graphName, partId);
+
+ if (partShard != null && partShard.getValue().getStoreId() != leaderStoreId) {
+ var shardGroup = this.getShardGroup(partId);
+ Metapb.Shard shard = null;
+ List shards = new ArrayList<>();
+
+ for (Metapb.Shard s : shardGroup.getShardsList()) {
+ if (s.getStoreId() == leaderStoreId) {
+ shard = s;
+ shards.add(Metapb.Shard.newBuilder(s)
+ .setStoreId(s.getStoreId())
+ .setRole(Metapb.ShardRole.Leader).build());
+ } else {
+ shards.add(Metapb.Shard.newBuilder(s)
+ .setStoreId(s.getStoreId())
+ .setRole(Metapb.ShardRole.Follower).build());
+ }
+ }
+
+ if (config.isEnableCache()) {
+ if (shard == null) {
+ // 分区的shard中未找到leader,说明分区发生了迁移
+ cache.removePartition(graphName, partId);
+ }
+ }
+ }
+ } catch (PDException e) {
+ log.error("getPartitionException: {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Hugegraph-store调用,更新缓存
+ *
+ * @param partition
+ */
+ public void updatePartitionCache(Metapb.Partition partition, Metapb.Shard leader) {
+ if (config.isEnableCache()) {
+ cache.update(partition.getGraphName(), partition.getId(), partition);
+ cache.updateLeader(partition.getId(), leader);
+ }
+ }
+
+ public Pdpb.GetIdResponse getIdByKey(String key, int delta) throws PDException {
+ Pdpb.GetIdRequest request = Pdpb.GetIdRequest.newBuilder()
+ .setHeader(header)
+ .setKey(key)
+ .setDelta(delta)
+ .build();
+ Pdpb.GetIdResponse response = blockingUnaryCall(PDGrpc.getGetIdMethod(), request);
+ handleResponseError(response.getHeader());
+ return response;
+ }
+
+ public Pdpb.ResetIdResponse resetIdByKey(String key) throws PDException {
+ Pdpb.ResetIdRequest request = Pdpb.ResetIdRequest.newBuilder()
+ .setHeader(header)
+ .setKey(key)
+ .build();
+ Pdpb.ResetIdResponse response = blockingUnaryCall(PDGrpc.getResetIdMethod(), request);
+ handleResponseError(response.getHeader());
+ return response;
+ }
+
+ public Metapb.Member getLeader() throws PDException {
+ Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ Pdpb.GetMembersResponse response = blockingUnaryCall(PDGrpc.getGetMembersMethod(), request);
+ handleResponseError(response.getHeader());
+ return response.getLeader();
+ }
+
+ public Pdpb.GetMembersResponse getMembers() throws PDException {
+ Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ Pdpb.GetMembersResponse response = blockingUnaryCall(PDGrpc.getGetMembersMethod(), request);
+ handleResponseError(response.getHeader());
+ return response;
+ }
+
+ public Metapb.ClusterStats getClusterStats() throws PDException {
+ Pdpb.GetClusterStatsRequest request = Pdpb.GetClusterStatsRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ Pdpb.GetClusterStatsResponse response =
+ blockingUnaryCall(PDGrpc.getGetClusterStatsMethod(), request);
+ handleResponseError(response.getHeader());
+ return response.getCluster();
+ }
+
+ private > RespT
+ blockingUnaryCall(MethodDescriptor method, ReqT req) throws PDException {
+ return blockingUnaryCall(method, req, 1);
+ }
+
+ private > RespT
+ blockingUnaryCall(MethodDescriptor method, ReqT req, int retry) throws
+ PDException {
+ io.grpc.stub.AbstractBlockingStub stub = (AbstractBlockingStub) getStub();
+ try {
+ RespT resp = io.grpc.stub.ClientCalls.blockingUnaryCall(stub.getChannel(), method,
+ stub.getCallOptions(), req);
+ return resp;
+ } catch (Exception e) {
+ log.error(method.getFullMethodName() + " exception, {}", e.getMessage());
+ if (e instanceof StatusRuntimeException) {
+ StatusRuntimeException se = (StatusRuntimeException) e;
+ //se.getStatus() == Status.UNAVAILABLE &&
+ if (retry < stubProxy.getHostCount()) {
+ // 网络不通,关掉之前连接,换host重新连接
+ closeStub(true);
+ return blockingUnaryCall(method, req, ++retry);
+ }
+ }
+ }
+ return null;
+ }
+
+ private void handleResponseError(Pdpb.ResponseHeader header) throws
+ PDException {
+ var errorType = header.getError().getType();
+ if (header.hasError() && errorType != Pdpb.ErrorType.OK) {
+
+ throw new PDException(header.getError().getTypeValue(),
+ String.format(
+ "PD request error, error code = %d, msg = %s",
+ header.getError().getTypeValue(),
+ header.getError().getMessage()));
+ }
+ }
+
+ public void addEventListener(PDEventListener listener) {
+ eventListeners.add(listener);
+ }
+
+ public PDWatch getWatchClient() {
+ return new PDWatchImpl(stubProxy.getHost());
+ }
+
+ /**
+ * 返回Store状态信息
+ */
+ public List getStoreStatus(boolean offlineExcluded) throws PDException {
+ Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder()
+ .setHeader(header)
+ .setExcludeOfflineStores(
+ offlineExcluded)
+ .build();
+ Pdpb.GetAllStoresResponse response = getStub().getStoreStatus(request);
+ handleResponseError(response.getHeader());
+ List stores = response.getStoresList();
+ return stores;
+ }
+
+ public void setGraphSpace(String graphSpaceName, long storageLimit) throws PDException {
+ Metapb.GraphSpace graphSpace = Metapb.GraphSpace.newBuilder().setName(graphSpaceName)
+ .setStorageLimit(storageLimit)
+ .setTimestamp(System.currentTimeMillis())
+ .build();
+ Pdpb.SetGraphSpaceRequest request = Pdpb.SetGraphSpaceRequest.newBuilder()
+ .setHeader(header)
+ .setGraphSpace(graphSpace)
+ .build();
+ Pdpb.SetGraphSpaceResponse response = getStub().setGraphSpace(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public List getGraphSpace(String graphSpaceName) throws
+ PDException {
+ Pdpb.GetGraphSpaceRequest.Builder builder = Pdpb.GetGraphSpaceRequest.newBuilder();
+ Pdpb.GetGraphSpaceRequest request;
+ builder.setHeader(header);
+ if (graphSpaceName != null && graphSpaceName.length() > 0) {
+ builder.setGraphSpaceName(graphSpaceName);
+ }
+ request = builder.build();
+ Pdpb.GetGraphSpaceResponse response = getStub().getGraphSpace(request);
+ List graphSpaceList = response.getGraphSpaceList();
+ handleResponseError(response.getHeader());
+ return graphSpaceList;
+ }
+
+ public void setPDConfig(int partitionCount, String peerList, int shardCount,
+ long version) throws PDException {
+ Metapb.PDConfig pdConfig = Metapb.PDConfig.newBuilder().setPartitionCount(partitionCount)
+ .setPeersList(peerList).setShardCount(shardCount)
+ .setVersion(version)
+ .setTimestamp(System.currentTimeMillis())
+ .build();
+ Pdpb.SetPDConfigRequest request = Pdpb.SetPDConfigRequest.newBuilder()
+ .setHeader(header)
+ .setPdConfig(pdConfig)
+ .build();
+ Pdpb.SetPDConfigResponse response = getStub().setPDConfig(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public Metapb.PDConfig getPDConfig() throws PDException {
+ Pdpb.GetPDConfigRequest request = Pdpb.GetPDConfigRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ Pdpb.GetPDConfigResponse response = getStub().getPDConfig(request);
+ handleResponseError(response.getHeader());
+ return response.getPdConfig();
+ }
+
+ public void setPDConfig(Metapb.PDConfig pdConfig) throws PDException {
+ Pdpb.SetPDConfigRequest request = Pdpb.SetPDConfigRequest.newBuilder()
+ .setHeader(header)
+ .setPdConfig(pdConfig)
+ .build();
+ Pdpb.SetPDConfigResponse response = getStub().setPDConfig(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public Metapb.PDConfig getPDConfig(long version) throws PDException {
+ Pdpb.GetPDConfigRequest request = Pdpb.GetPDConfigRequest.newBuilder().setHeader(
+ header).setVersion(version).build();
+ Pdpb.GetPDConfigResponse response = getStub().getPDConfig(request);
+ handleResponseError(response.getHeader());
+ return response.getPdConfig();
+ }
+
+ public void changePeerList(String peerList) throws PDException {
+ Pdpb.ChangePeerListRequest request = Pdpb.ChangePeerListRequest.newBuilder()
+ .setPeerList(peerList)
+ .setHeader(header).build();
+ Pdpb.getChangePeerListResponse response =
+ blockingUnaryCall(PDGrpc.getChangePeerListMethod(), request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 工作模式
+ * Auto:自动分裂,每个Store上分区数达到最大值
+ *
+ * @throws PDException
+ */
+ public void splitData() throws PDException {
+ Pdpb.SplitDataRequest request = Pdpb.SplitDataRequest.newBuilder()
+ .setHeader(header)
+ .setMode(Pdpb.OperationMode.Auto)
+ .build();
+ Pdpb.SplitDataResponse response = getStub().splitData(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 工作模式
+ * Auto:自动分裂,每个Store上分区数达到最大值
+ * Expert:专家模式,需要指定splitParams
+ *
+ * @param mode
+ * @param params
+ * @throws PDException
+ */
+ public void splitData(Pdpb.OperationMode mode, List params) throws
+ PDException {
+ Pdpb.SplitDataRequest request = Pdpb.SplitDataRequest.newBuilder()
+ .setHeader(header)
+ .setMode(mode)
+ .addAllParam(params).build();
+ Pdpb.SplitDataResponse response = getStub().splitData(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public void splitGraphData(String graphName, int toCount) throws PDException {
+ Pdpb.SplitGraphDataRequest request = Pdpb.SplitGraphDataRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName(graphName)
+ .setToCount(toCount)
+ .build();
+ Pdpb.SplitDataResponse response = getStub().splitGraphData(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 自动转移,达到每个Store上分区数量相同
+ *
+ * @throws PDException
+ */
+ public void balancePartition() throws PDException {
+ Pdpb.MovePartitionRequest request = Pdpb.MovePartitionRequest.newBuilder()
+ .setHeader(header)
+ .setMode(
+ Pdpb.OperationMode.Auto)
+ .build();
+ Pdpb.MovePartitionResponse response = getStub().movePartition(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * //工作模式
+ * // Auto:自动转移,达到每个Store上分区数量相同
+ * // Expert:专家模式,需要指定transferParams
+ *
+ * @param mode
+ * @param params
+ * @throws PDException
+ */
+ public void movePartition(Pdpb.OperationMode mode, List params) throws
+ PDException {
+ Pdpb.MovePartitionRequest request = Pdpb.MovePartitionRequest.newBuilder()
+ .setHeader(header)
+ .setMode(mode)
+ .addAllParam(params).build();
+ Pdpb.MovePartitionResponse response = getStub().movePartition(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public void reportTask(MetaTask.Task task) throws PDException {
+ Pdpb.ReportTaskRequest request = Pdpb.ReportTaskRequest.newBuilder()
+ .setHeader(header)
+ .setTask(task).build();
+ Pdpb.ReportTaskResponse response = blockingUnaryCall(PDGrpc.getReportTaskMethod(), request);
+ handleResponseError(response.getHeader());
+ }
+
+ public Metapb.PartitionStats getPartitionsStats(String graph, int partId) throws PDException {
+ Pdpb.GetPartitionStatsRequest request = Pdpb.GetPartitionStatsRequest.newBuilder()
+ .setHeader(header)
+ .setGraphName(graph)
+ .setPartitionId(partId)
+ .build();
+ Pdpb.GetPartitionStatsResponse response = getStub().getPartitionStats(request);
+ handleResponseError(response.getHeader());
+ return response.getPartitionStats();
+ }
+
+ /**
+ * 平衡不同store中leader的数量
+ */
+ public void balanceLeaders() throws PDException {
+ Pdpb.BalanceLeadersRequest request = Pdpb.BalanceLeadersRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ Pdpb.BalanceLeadersResponse response = getStub().balanceLeaders(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 从pd中删除store
+ */
+ public Metapb.Store delStore(long storeId) throws PDException {
+ Pdpb.DetStoreRequest request = Pdpb.DetStoreRequest.newBuilder()
+ .setHeader(header)
+ .setStoreId(storeId)
+ .build();
+ Pdpb.DetStoreResponse response = getStub().delStore(request);
+ handleResponseError(response.getHeader());
+ return response.getStore();
+ }
+
+ /**
+ * 对rocksdb整体进行compaction
+ *
+ * @throws PDException
+ */
+ public void dbCompaction() throws PDException {
+ Pdpb.DbCompactionRequest request = Pdpb.DbCompactionRequest
+ .newBuilder()
+ .setHeader(header)
+ .build();
+ Pdpb.DbCompactionResponse response = getStub().dbCompaction(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 对rocksdb指定表进行compaction
+ *
+ * @param tableName
+ * @throws PDException
+ */
+ public void dbCompaction(String tableName) throws PDException {
+ Pdpb.DbCompactionRequest request = Pdpb.DbCompactionRequest
+ .newBuilder()
+ .setHeader(header)
+ .setTableName(tableName)
+ .build();
+ Pdpb.DbCompactionResponse response = getStub().dbCompaction(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 分区合并,把当前的分区缩容至toCount个
+ *
+ * @param toCount 缩容到分区的个数
+ * @throws PDException
+ */
+ public void combineCluster(int toCount) throws PDException {
+ Pdpb.CombineClusterRequest request = Pdpb.CombineClusterRequest
+ .newBuilder()
+ .setHeader(header)
+ .setToCount(toCount)
+ .build();
+ Pdpb.CombineClusterResponse response = getStub().combineCluster(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 将单图缩容到 toCount个
+ *
+ * @param graphName graph name
+ * @param toCount target count
+ * @throws PDException
+ */
+ public void combineGraph(String graphName, int toCount) throws PDException {
+ Pdpb.CombineGraphRequest request = Pdpb.CombineGraphRequest
+ .newBuilder()
+ .setHeader(header)
+ .setGraphName(graphName)
+ .setToCount(toCount)
+ .build();
+ Pdpb.CombineGraphResponse response = getStub().combineGraph(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public void deleteShardGroup(int groupId) throws PDException {
+ Pdpb.DeleteShardGroupRequest request = Pdpb.DeleteShardGroupRequest
+ .newBuilder()
+ .setHeader(header)
+ .setGroupId(groupId)
+ .build();
+ Pdpb.DeleteShardGroupResponse response =
+ blockingUnaryCall(PDGrpc.getDeleteShardGroupMethod(), request);
+
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * 用于 store的 shard list重建
+ *
+ * @param groupId shard group id
+ * @param shards shard list,delete when shards size is 0
+ */
+ public void updateShardGroupOp(int groupId, List shards) throws PDException {
+ Pdpb.ChangeShardRequest request = Pdpb.ChangeShardRequest.newBuilder()
+ .setHeader(header)
+ .setGroupId(groupId)
+ .addAllShards(shards)
+ .build();
+ Pdpb.ChangeShardResponse response = getStub().updateShardGroupOp(request);
+ handleResponseError(response.getHeader());
+ }
+
+ /**
+ * invoke fireChangeShard command
+ *
+ * @param groupId shard group id
+ * @param shards shard list
+ */
+ public void changeShard(int groupId, List shards) throws PDException {
+ Pdpb.ChangeShardRequest request = Pdpb.ChangeShardRequest.newBuilder()
+ .setHeader(header)
+ .setGroupId(groupId)
+ .addAllShards(shards)
+ .build();
+ Pdpb.ChangeShardResponse response = getStub().changeShard(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public ClientCache getCache() {
+ return cache;
+ }
+
+ public CacheResponse getClientCache() throws PDException {
+ GetGraphRequest request = GetGraphRequest.newBuilder().setHeader(header).build();
+ CacheResponse cache = getStub().getCache(request);
+ handleResponseError(cache.getHeader());
+ return cache;
+ }
+
+ public CachePartitionResponse getPartitionCache(String graph) throws PDException {
+ GetGraphRequest request =
+ GetGraphRequest.newBuilder().setHeader(header).setGraphName(graph).build();
+ CachePartitionResponse ps = getStub().getPartitions(request);
+ handleResponseError(ps.getHeader());
+ return ps;
+ }
+
+ public void updatePdRaft(String raftConfig) throws PDException {
+ Pdpb.UpdatePdRaftRequest request = Pdpb.UpdatePdRaftRequest.newBuilder()
+ .setHeader(header)
+ .setConfig(raftConfig)
+ .build();
+ Pdpb.UpdatePdRaftResponse response = getStub().updatePdRaft(request);
+ handleResponseError(response.getHeader());
+ }
+
+ public interface PDEventListener {
+
+ void onStoreChanged(NodeEvent event);
+
+ void onPartitionChanged(PartitionEvent event);
+
+ void onGraphChanged(WatchResponse event);
+
+ default void onShardGroupChanged(WatchResponse event) {
+ }
+
+ }
+
+ static class StubProxy {
+
+ private final LinkedList hostList = new LinkedList<>();
+ private volatile PDGrpc.PDBlockingStub stub;
+ private String leader;
+
+ public StubProxy(String[] hosts) {
+ for (String host : hosts) {
+ if (!host.isEmpty()) {
+ hostList.offer(host);
+ }
+ }
+ }
+
+ public String nextHost() {
+ String host = hostList.poll();
+ hostList.offer(host); //移到尾部
+ return host;
+ }
+
+ public void set(PDGrpc.PDBlockingStub stub) {
+ this.stub = stub;
+ }
+
+ public PDGrpc.PDBlockingStub get() {
+ return this.stub;
+ }
+
+ public String getHost() {
+ return hostList.peek();
+ }
+
+ public int getHostCount() {
+ return hostList.size();
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ public void setLeader(String leader) {
+ this.leader = leader;
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
new file mode 100644
index 0000000000..a1c72a2bcf
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+public final class PDConfig {
+
+ //TODO multi-server
+ private String serverHost = "localhost:9000";
+ private long grpcTimeOut = 60000; // grpc调用超时时间 10秒
+
+ // 是否接收PD异步通知
+ private boolean enablePDNotify = false;
+
+ private boolean enableCache = false;
+
+ private PDConfig() {
+ }
+
+ public static PDConfig of() {
+ return new PDConfig();
+ }
+
+ public static PDConfig of(String serverHost) {
+ PDConfig config = new PDConfig();
+ config.serverHost = serverHost;
+ return config;
+ }
+
+ public static PDConfig of(String serverHost, long timeOut) {
+ PDConfig config = new PDConfig();
+ config.serverHost = serverHost;
+ config.grpcTimeOut = timeOut;
+ return config;
+ }
+
+ public String getServerHost() {
+ return serverHost;
+ }
+
+ public long getGrpcTimeOut() {
+ return grpcTimeOut;
+ }
+
+ @Deprecated
+ public PDConfig setEnablePDNotify(boolean enablePDNotify) {
+ this.enablePDNotify = enablePDNotify;
+
+ // TODO 临时代码,hugegraph修改完后删除
+ this.enableCache = enablePDNotify;
+ return this;
+ }
+
+ public boolean isEnableCache() {
+ return enableCache;
+ }
+
+ public PDConfig setEnableCache(boolean enableCache) {
+ this.enableCache = enableCache;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "PDConfig{" +
+ "serverHost='" + serverHost + '\'' +
+ '}';
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java
new file mode 100644
index 0000000000..485417b917
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.io.Closeable;
+import java.util.function.Consumer;
+
+import org.apache.hugegraph.pd.grpc.pulse.PartitionHeartbeatRequest;
+import org.apache.hugegraph.pd.grpc.pulse.PulseResponse;
+import org.apache.hugegraph.pd.pulse.PulseServerNotice;
+
+/**
+ * Bidirectional communication interface of pd-client and pd-server
+ */
+public interface PDPulse {
+
+ /*** inner static methods ***/
+ static Listener listener(Consumer onNext) {
+ return listener(onNext, t -> {
+ }, () -> {
+ });
+ }
+
+ static Listener listener(Consumer onNext, Consumer onError) {
+ return listener(onNext, onError, () -> {
+ });
+ }
+
+ static Listener listener(Consumer onNext, Runnable onCompleted) {
+ return listener(onNext, t -> {
+ }, onCompleted);
+ }
+
+ static Listener listener(Consumer onNext, Consumer onError,
+ Runnable onCompleted) {
+ return new Listener<>() {
+ @Override
+ public void onNext(T response) {
+ onNext.accept(response);
+ }
+
+ @Override
+ public void onNotice(PulseServerNotice notice) {
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ onError.accept(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ onCompleted.run();
+ }
+ };
+ }
+
+ /**
+ * @param listener
+ * @return
+ */
+ Notifier connectPartition(Listener listener);
+
+ /**
+ * 切换成新的host。做 channel/host的检查,如果需要关闭,notifier调用close方法。
+ *
+ * @param host new host
+ * @param notifier notifier
+ * @return true if create new stub, otherwise false
+ */
+ boolean resetStub(String host, Notifier notifier);
+
+ /**
+ * Interface of pulse.
+ */
+ interface Listener {
+
+ /**
+ * Invoked on new events.
+ *
+ * @param response the response.
+ */
+ @Deprecated
+ default void onNext(T response) {
+ }
+
+ /**
+ * Invoked on new events.
+ *
+ * @param notice a wrapper of response
+ */
+ default void onNotice(PulseServerNotice notice) {
+ notice.ack();
+ }
+
+ /**
+ * Invoked on errors.
+ *
+ * @param throwable the error.
+ */
+ void onError(Throwable throwable);
+
+ /**
+ * Invoked on completion.
+ */
+ void onCompleted();
+
+ }
+
+ /**
+ * Interface of notifier that can send notice to server.
+ *
+ * @param
+ */
+ interface Notifier extends Closeable {
+
+ /**
+ * closes this watcher and all its resources.
+ */
+ @Override
+ void close();
+
+ /**
+ * Send notice to pd-server.
+ *
+ * @return
+ */
+ void notifyServer(T t);
+
+ /**
+ * Send an error report to pd-server.
+ *
+ * @param error
+ */
+ void crash(String error);
+
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulseImpl.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulseImpl.java
new file mode 100644
index 0000000000..0afc10c831
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulseImpl.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hugegraph.pd.grpc.pulse.HgPdPulseGrpc;
+import org.apache.hugegraph.pd.grpc.pulse.PartitionHeartbeatRequest;
+import org.apache.hugegraph.pd.grpc.pulse.PulseAckRequest;
+import org.apache.hugegraph.pd.grpc.pulse.PulseCreateRequest;
+import org.apache.hugegraph.pd.grpc.pulse.PulseNoticeRequest;
+import org.apache.hugegraph.pd.grpc.pulse.PulseRequest;
+import org.apache.hugegraph.pd.grpc.pulse.PulseResponse;
+import org.apache.hugegraph.pd.grpc.pulse.PulseType;
+import org.apache.hugegraph.pd.pulse.PartitionNotice;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public final class PDPulseImpl implements PDPulse {
+
+ private static final ConcurrentHashMap chs = new ConcurrentHashMap<>();
+ private final ExecutorService threadPool;
+ private HgPdPulseGrpc.HgPdPulseStub stub;
+ private String pdServerAddress;
+
+ // TODO: support several servers.
+ public PDPulseImpl(String pdServerAddress) {
+ this.pdServerAddress = pdServerAddress;
+ this.stub = HgPdPulseGrpc.newStub(Channels.getChannel(pdServerAddress));
+ var namedThreadFactory =
+ new ThreadFactoryBuilder().setNameFormat("ack-notice-pool-%d").build();
+ threadPool = Executors.newSingleThreadExecutor(namedThreadFactory);
+ }
+
+ private String getCurrentHost() {
+ return this.pdServerAddress;
+ }
+
+ private boolean checkChannel() {
+ return stub != null && !((ManagedChannel) stub.getChannel()).isShutdown();
+ }
+
+ /* TODO: handle this override problem */
+ @Override
+ public Notifier connectPartition(Listener
+ listener) {
+ return new PartitionHeartbeat(listener);
+ }
+
+ @Override
+ public boolean resetStub(String host, Notifier notifier) {
+ log.info("reset stub: current, {}, new: {}, channel state:{}", getCurrentHost(), host,
+ checkChannel());
+ if (Objects.equals(host, getCurrentHost()) && checkChannel()) {
+ return false;
+ }
+
+ if (notifier != null) {
+ notifier.close();
+ }
+
+ this.stub = HgPdPulseGrpc.newStub(Channels.getChannel(host));
+ log.info("pd pulse connect to {}", host);
+ this.pdServerAddress = host;
+ return true;
+ }
+
+ /*** PartitionHeartbeat's implement ***/
+ private class PartitionHeartbeat extends
+ AbstractConnector {
+
+ private long observerId = -1;
+
+ PartitionHeartbeat(Listener listener) {
+ super(listener, PulseType.PULSE_TYPE_PARTITION_HEARTBEAT);
+ }
+
+ private void setObserverId(long observerId) {
+ if (this.observerId == -1) {
+ this.observerId = observerId;
+ }
+ }
+
+ @Override
+ public void notifyServer(PartitionHeartbeatRequest.Builder requestBuilder) {
+ this.reqStream.onNext(PulseRequest.newBuilder()
+ .setNoticeRequest(
+ PulseNoticeRequest.newBuilder()
+ .setPartitionHeartbeatRequest(
+ requestBuilder.build()
+ ).build()
+ ).build()
+ );
+ }
+
+ @Override
+ public void onNext(PulseResponse pulseResponse) {
+ this.setObserverId(pulseResponse.getObserverId());
+ long noticeId = pulseResponse.getNoticeId();
+ this.listener.onNext(pulseResponse);
+ this.listener.onNotice(new PartitionNotice(noticeId,
+ e -> super.ackNotice(e, observerId),
+ pulseResponse));
+ }
+
+ }
+
+ private abstract class AbstractConnector implements Notifier,
+ StreamObserver {
+
+ Listener listener;
+ StreamObserver reqStream;
+ PulseType pulseType;
+ PulseRequest.Builder reqBuilder = PulseRequest.newBuilder();
+ PulseAckRequest.Builder ackBuilder = PulseAckRequest.newBuilder();
+
+ private AbstractConnector(Listener listener, PulseType pulseType) {
+ this.listener = listener;
+ this.pulseType = pulseType;
+ this.init();
+ }
+
+ void init() {
+ PulseCreateRequest.Builder builder = PulseCreateRequest.newBuilder()
+ .setPulseType(this.pulseType);
+
+ this.reqStream = PDPulseImpl.this.stub.pulse(this);
+ this.reqStream.onNext(reqBuilder.clear().setCreateRequest(builder).build());
+ }
+
+ /*** notifier ***/
+ @Override
+ public void close() {
+ this.reqStream.onCompleted();
+ }
+
+ @Override
+ public abstract void notifyServer(N t);
+
+ @Override
+ public void crash(String error) {
+ this.reqStream.onError(new Throwable(error));
+ }
+
+ /*** listener ***/
+ @Override
+ public abstract void onNext(PulseResponse pulseResponse);
+
+ @Override
+ public void onError(Throwable throwable) {
+ this.listener.onError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ this.listener.onCompleted();
+ }
+
+ protected void ackNotice(long noticeId, long observerId) {
+ threadPool.execute(() -> {
+ // log.info("send ack: {}, ts: {}", noticeId, System.currentTimeMillis());
+ this.reqStream.onNext(reqBuilder.clear()
+ .setAckRequest(
+ this.ackBuilder.clear()
+ .setNoticeId(noticeId)
+ .setObserverId(observerId)
+ .build()
+ ).build()
+ );
+ });
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDWatch.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDWatch.java
new file mode 100644
index 0000000000..c6c46d03d1
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDWatch.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.io.Closeable;
+import java.util.function.Consumer;
+
+import org.apache.hugegraph.pd.grpc.watch.WatchResponse;
+import org.apache.hugegraph.pd.watch.NodeEvent;
+import org.apache.hugegraph.pd.watch.PartitionEvent;
+
+public interface PDWatch {
+
+ /**
+ * Watch the events of all store-nodes registered in the remote PD-Server.
+ *
+ * @param listener
+ * @return
+ */
+ //PDWatcher watchNode(Listener listener);
+
+ /*** inner static methods ***/
+ static Listener listener(Consumer onNext) {
+ return listener(onNext, t -> {
+ }, () -> {
+ });
+ }
+
+ static Listener listener(Consumer onNext, Consumer onError) {
+ return listener(onNext, onError, () -> {
+ });
+ }
+
+ static Listener listener(Consumer onNext, Runnable onCompleted) {
+ return listener(onNext, t -> {
+ }, onCompleted);
+ }
+
+ static Listener listener(Consumer onNext, Consumer onError,
+ Runnable onCompleted) {
+ return new Listener() {
+ @Override
+ public void onNext(T response) {
+ onNext.accept(response);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ onError.accept(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ onCompleted.run();
+ }
+ };
+ }
+
+ /**
+ * Watch the events of the store-nodes assigned to a specified graph.
+ *
+ * @param graph the graph name which you want to watch
+ * @param listener
+ * @return
+ */
+ //PDWatcher watchNode(String graph, Listener listener);
+
+ String getCurrentHost();
+
+ boolean checkChannel();
+
+ /**
+ * @param listener
+ * @return
+ */
+ Watcher watchPartition(Listener listener);
+
+ Watcher watchNode(Listener listener);
+
+ Watcher watchGraph(Listener listener);
+
+ Watcher watchShardGroup(Listener listener);
+
+ /**
+ * Interface of Watcher.
+ */
+ interface Listener {
+
+ /**
+ * Invoked on new events.
+ *
+ * @param response the response.
+ */
+ void onNext(T response);
+
+ /**
+ * Invoked on errors.
+ *
+ * @param throwable the error.
+ */
+ void onError(Throwable throwable);
+
+ /**
+ * Invoked on completion.
+ */
+ default void onCompleted() {
+ }
+
+ }
+
+ interface Watcher extends Closeable {
+
+ /**
+ * closes this watcher and all its resources.
+ */
+ @Override
+ void close();
+
+ /**
+ * Requests the latest revision processed and propagates it to listeners
+ */
+ // TODO: what's it for?
+ //void requestProgress();
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDWatchImpl.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDWatchImpl.java
new file mode 100644
index 0000000000..9b136bb26a
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDWatchImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.client;
+
+import java.util.function.Supplier;
+
+import org.apache.hugegraph.pd.grpc.watch.HgPdWatchGrpc;
+import org.apache.hugegraph.pd.grpc.watch.WatchCreateRequest;
+import org.apache.hugegraph.pd.grpc.watch.WatchNodeResponse;
+import org.apache.hugegraph.pd.grpc.watch.WatchPartitionResponse;
+import org.apache.hugegraph.pd.grpc.watch.WatchRequest;
+import org.apache.hugegraph.pd.grpc.watch.WatchResponse;
+import org.apache.hugegraph.pd.grpc.watch.WatchType;
+import org.apache.hugegraph.pd.watch.NodeEvent;
+import org.apache.hugegraph.pd.watch.PartitionEvent;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+
+final class PDWatchImpl implements PDWatch {
+
+ private final HgPdWatchGrpc.HgPdWatchStub stub;
+
+ private final String pdServerAddress;
+
+ // TODO: support several servers.
+ PDWatchImpl(String pdServerAddress) {
+ this.pdServerAddress = pdServerAddress;
+ this.stub = HgPdWatchGrpc.newStub(Channels.getChannel(pdServerAddress));
+ }
+
+ @Override
+ public String getCurrentHost() {
+ return this.pdServerAddress;
+ }
+
+ @Override
+ public boolean checkChannel() {
+ return stub != null && !((ManagedChannel) stub.getChannel()).isShutdown();
+ }
+
+ /**
+ * Get Partition change watcher.
+ *
+ * @param listener
+ * @return
+ */
+ @Override
+ public Watcher watchPartition(Listener listener) {
+ return new PartitionWatcher(listener);
+ }
+
+ /**
+ * Get Store-Node change watcher.
+ *
+ * @param listener
+ * @return
+ */
+ @Override
+ public Watcher watchNode(Listener listener) {
+ return new NodeWatcher(listener);
+ }
+
+ @Override
+ public Watcher watchGraph(Listener listener) {
+ return new GraphWatcher(listener);
+ }
+
+ @Override
+ public Watcher watchShardGroup(Listener listener) {
+ return new ShardGroupWatcher(listener);
+ }
+
+ private class GraphWatcher extends AbstractWatcher {
+
+ private GraphWatcher(Listener listener) {
+ super(listener,
+ () -> WatchCreateRequest
+ .newBuilder()
+ .setWatchType(WatchType.WATCH_TYPE_GRAPH_CHANGE)
+ .build()
+ );
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ this.listener.onNext(watchResponse);
+ }
+ }
+
+ private class ShardGroupWatcher extends AbstractWatcher {
+
+ private ShardGroupWatcher(Listener listener) {
+ super(listener,
+ () -> WatchCreateRequest
+ .newBuilder()
+ .setWatchType(WatchType.WATCH_TYPE_SHARD_GROUP_CHANGE)
+ .build()
+ );
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ this.listener.onNext(watchResponse);
+ }
+ }
+
+ private class PartitionWatcher extends AbstractWatcher {
+
+ private PartitionWatcher(Listener listener) {
+ super(listener,
+ () -> WatchCreateRequest
+ .newBuilder()
+ .setWatchType(WatchType.WATCH_TYPE_PARTITION_CHANGE)
+ .build()
+ );
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ WatchPartitionResponse res = watchResponse.getPartitionResponse();
+ PartitionEvent event = new PartitionEvent(res.getGraph(), res.getPartitionId(),
+ PartitionEvent.ChangeType.grpcTypeOf(
+ res.getChangeType()));
+ this.listener.onNext(event);
+ }
+ }
+
+ private class NodeWatcher extends AbstractWatcher {
+
+ private NodeWatcher(Listener listener) {
+ super(listener,
+ () -> WatchCreateRequest
+ .newBuilder()
+ .setWatchType(WatchType.WATCH_TYPE_STORE_NODE_CHANGE)
+ .build()
+ );
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ WatchNodeResponse res = watchResponse.getNodeResponse();
+ NodeEvent event = new NodeEvent(res.getGraph(), res.getNodeId(),
+ NodeEvent.EventType.grpcTypeOf(res.getNodeEventType()));
+ this.listener.onNext(event);
+ }
+ }
+
+ private abstract class AbstractWatcher implements Watcher, StreamObserver {
+
+ Listener listener;
+ StreamObserver reqStream;
+ Supplier requestSupplier;
+
+ private AbstractWatcher(Listener listener,
+ Supplier requestSupplier) {
+ this.listener = listener;
+ this.requestSupplier = requestSupplier;
+ this.init();
+ }
+
+ void init() {
+ this.reqStream = PDWatchImpl.this.stub.watch(this);
+ this.reqStream.onNext(WatchRequest.newBuilder().setCreateRequest(
+ this.requestSupplier.get()
+ ).build());
+ }
+
+ @Override
+ public void close() {
+ this.reqStream.onCompleted();
+ }
+
+ @Override
+ public abstract void onNext(WatchResponse watchResponse);
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ this.listener.onError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ this.listener.onCompleted();
+ }
+ }
+
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/pulse/PartitionNotice.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/pulse/PartitionNotice.java
new file mode 100644
index 0000000000..80aa8951b7
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/pulse/PartitionNotice.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.pulse;
+
+import java.util.function.Consumer;
+
+import org.apache.hugegraph.pd.grpc.pulse.PulseResponse;
+
+public class PartitionNotice implements PulseServerNotice {
+
+ private final long noticeId;
+ private final Consumer ackConsumer;
+ private final PulseResponse content;
+
+ public PartitionNotice(long noticeId, Consumer ackConsumer, PulseResponse content) {
+ this.noticeId = noticeId;
+ this.ackConsumer = ackConsumer;
+ this.content = content;
+ }
+
+ @Override
+ public void ack() {
+ this.ackConsumer.accept(this.noticeId);
+ }
+
+ @Override
+ public long getNoticeId() {
+ return this.noticeId;
+ }
+
+ @Override
+ public PulseResponse getContent() {
+ return this.content;
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/pulse/PulseServerNotice.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/pulse/PulseServerNotice.java
new file mode 100644
index 0000000000..9a30e2679a
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/pulse/PulseServerNotice.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.pulse;
+
+public interface PulseServerNotice {
+
+ /**
+ * @throws RuntimeException when failed to send ack-message to pd-server
+ */
+ void ack();
+
+ long getNoticeId();
+
+ /**
+ * Return a response object of gRPC stream.
+ *
+ * @return
+ */
+ T getContent();
+
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/NodeEvent.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/NodeEvent.java
new file mode 100644
index 0000000000..bb68383b83
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/NodeEvent.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.watch;
+
+import java.util.Objects;
+
+import org.apache.hugegraph.pd.grpc.watch.NodeEventType;
+
+public class NodeEvent {
+
+ private final String graph;
+ private final long nodeId;
+ private final EventType eventType;
+
+ public NodeEvent(String graph, long nodeId, EventType eventType) {
+ this.graph = graph;
+ this.nodeId = nodeId;
+ this.eventType = eventType;
+ }
+
+ public String getGraph() {
+ return graph;
+ }
+
+ public long getNodeId() {
+ return nodeId;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ NodeEvent nodeEvent = (NodeEvent) o;
+ return nodeId == nodeEvent.nodeId && Objects.equals(graph,
+ nodeEvent.graph) &&
+ eventType == nodeEvent.eventType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(graph, nodeId, eventType);
+ }
+
+ @Override
+ public String toString() {
+ return "NodeEvent{" +
+ "graph='" + graph + '\'' +
+ ", nodeId=" + nodeId +
+ ", eventType=" + eventType +
+ '}';
+ }
+
+ public enum EventType {
+ UNKNOWN,
+ NODE_ONLINE,
+ NODE_OFFLINE,
+ NODE_RAFT_CHANGE,
+ NODE_PD_LEADER_CHANGE;
+
+ public static EventType grpcTypeOf(NodeEventType grpcType) {
+ switch (grpcType) {
+ case NODE_EVENT_TYPE_NODE_ONLINE:
+ return NODE_ONLINE;
+ case NODE_EVENT_TYPE_NODE_OFFLINE:
+ return NODE_OFFLINE;
+ case NODE_EVENT_TYPE_NODE_RAFT_CHANGE:
+ return NODE_RAFT_CHANGE;
+ case NODE_EVENT_TYPE_PD_LEADER_CHANGE:
+ return NODE_PD_LEADER_CHANGE;
+ default:
+ return UNKNOWN;
+ }
+
+ }
+
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/PDWatcher.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/PDWatcher.java
new file mode 100644
index 0000000000..d663f34a3c
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/PDWatcher.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.watch;
+
+public class PDWatcher {
+
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/PartitionEvent.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/PartitionEvent.java
new file mode 100644
index 0000000000..e5be1b3484
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/PartitionEvent.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.watch;
+
+import java.util.Objects;
+
+import org.apache.hugegraph.pd.grpc.watch.WatchChangeType;
+
+public class PartitionEvent {
+
+ private final String graph;
+ private final int partitionId;
+ private final ChangeType changeType;
+
+ public PartitionEvent(String graph, int partitionId, ChangeType changeType) {
+ this.graph = graph;
+ this.partitionId = partitionId;
+ this.changeType = changeType;
+ }
+
+ public String getGraph() {
+ return this.graph;
+ }
+
+ public int getPartitionId() {
+ return this.partitionId;
+ }
+
+ public ChangeType getChangeType() {
+ return this.changeType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionEvent that = (PartitionEvent) o;
+ return partitionId == that.partitionId && Objects.equals(graph, that.graph) &&
+ changeType == that.changeType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(graph, partitionId, changeType);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionEvent{" +
+ "graph='" + graph + '\'' +
+ ", partitionId=" + partitionId +
+ ", changeType=" + changeType +
+ '}';
+ }
+
+ public enum ChangeType {
+ UNKNOWN,
+ ADD,
+ ALTER,
+ DEL;
+
+ public static ChangeType grpcTypeOf(WatchChangeType grpcType) {
+ switch (grpcType) {
+ case WATCH_CHANGE_TYPE_ADD:
+ return ADD;
+ case WATCH_CHANGE_TYPE_ALTER:
+ return ALTER;
+ case WATCH_CHANGE_TYPE_DEL:
+ return DEL;
+ default:
+ return UNKNOWN;
+ }
+ }
+ }
+}
diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/WatchType.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/WatchType.java
new file mode 100644
index 0000000000..e537701936
--- /dev/null
+++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/watch/WatchType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.watch;
+
+enum WatchType {
+
+ PARTITION_CHANGE(10);
+
+ private final int value;
+
+ WatchType(int value) {
+ this.value = value;
+ }
+
+}