Skip to content

Commit

Permalink
Improve some action for install snapshot and add peer (#1527)
Browse files Browse the repository at this point in the history
1. Fix bug that relative path need normalize when load snapshot
2. Use zip decompresser since decompress tar file may lead dead loop
3. Move raft.group_peers from graph-config into rest-server-config
4. Pass raft.endpoint and raft.group_peers from server to core
5. Let RestServer start before GremlinServer
6. Use read-index to ensure the raft log caught up for new node
7. Let add_peer and remove_peer API work in async job
8. Use shared rpc server and delete raft.endpoint

Change-Id: I7de3d98cb51c3b7c32a4f19a0d99ed622f78d331
Co-authored-by: Zhangmei Li <[email protected]>
  • Loading branch information
Linary and javeme authored May 16, 2022
1 parent bb88c36 commit 8609968
Show file tree
Hide file tree
Showing 38 changed files with 560 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.baidu.hugegraph.api.raft;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -40,8 +41,14 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.api.filter.StatusFilter.Status;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.store.raft.RaftAddPeerJob;
import com.baidu.hugegraph.backend.store.raft.RaftGroupManager;
import com.baidu.hugegraph.backend.store.raft.RaftRemovePeerJob;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.job.JobBuilder;
import com.baidu.hugegraph.util.DateUtil;
import com.baidu.hugegraph.util.JsonUtil;
import com.baidu.hugegraph.util.Log;
import com.codahale.metrics.annotation.Timed;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -144,19 +151,26 @@ public Map<String, String> setLeader(@Context GraphManager manager,
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin"})
public Map<String, String> addPeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group")
@DefaultValue("default")
String group,
@QueryParam("endpoint")
String endpoint) {
public Map<String, Id> addPeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group") @DefaultValue("default")
String group,
@QueryParam("endpoint") String endpoint) {
LOG.debug("Graph [{}] prepare to add peer: {}", graph, endpoint);

HugeGraph g = graph(manager, graph);
RaftGroupManager raftManager = raftGroupManager(g, group, "add_peer");
String peerId = raftManager.addPeer(endpoint);
return ImmutableMap.of(raftManager.group(), peerId);

JobBuilder<String> builder = JobBuilder.of(g);
String name = String.format("raft-group-[%s]-add-peer-[%s]-at-[%s]",
raftManager.group(), endpoint,
DateUtil.now());
Map<String, String> inputs = new HashMap<>();
inputs.put("endpoint", endpoint);
builder.name(name)
.input(JsonUtil.toJson(inputs))
.job(new RaftAddPeerJob());
return ImmutableMap.of("task_id", builder.schedule().id());
}

@POST
Expand All @@ -166,26 +180,32 @@ public Map<String, String> addPeer(@Context GraphManager manager,
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin"})
public Map<String, String> removePeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group")
@DefaultValue("default")
String group,
@QueryParam("endpoint")
String endpoint) {
public Map<String, Id> removePeer(@Context GraphManager manager,
@PathParam("graph") String graph,
@QueryParam("group")
@DefaultValue("default") String group,
@QueryParam("endpoint") String endpoint) {
LOG.debug("Graph [{}] prepare to remove peer: {}", graph, endpoint);

HugeGraph g = graph(manager, graph);
RaftGroupManager raftManager = raftGroupManager(g, group,
"remove_peer");
String peerId = raftManager.removePeer(endpoint);
return ImmutableMap.of(raftManager.group(), peerId);
JobBuilder<String> builder = JobBuilder.of(g);
String name = String.format("raft-group-[%s]-remove-peer-[%s]-at-[%s]",
raftManager.group(), endpoint,
DateUtil.now());
Map<String, String> inputs = new HashMap<>();
inputs.put("endpoint", endpoint);
builder.name(name)
.input(JsonUtil.toJson(inputs))
.job(new RaftRemovePeerJob());
return ImmutableMap.of("task_id", builder.schedule().id());
}

private static RaftGroupManager raftGroupManager(HugeGraph graph,
String group,
String operation) {
RaftGroupManager raftManager = graph.raftGroupManager(group);
RaftGroupManager raftManager = graph.raftGroupManager();
if (raftManager == null) {
throw new HugeException("Allowed %s operation only when " +
"working on raft mode", operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.tinkerpop.gremlin.structure.io.Io;
import org.slf4j.Logger;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.HugeAuthenticator.RolePerm;
import com.baidu.hugegraph.auth.HugeAuthenticator.User;
Expand Down Expand Up @@ -645,9 +646,9 @@ public void readMode(GraphReadMode readMode) {
}

@Override
public void waitStarted() {
public void waitReady(RpcServer rpcServer) {
this.verifyAnyPermission();
this.hugegraph.waitStarted();
this.hugegraph.waitReady(rpcServer);
}

@Override
Expand Down Expand Up @@ -694,9 +695,9 @@ public void switchAuthManager(AuthManager authManager) {
}

@Override
public RaftGroupManager raftGroupManager(String group) {
public RaftGroupManager raftGroupManager() {
this.verifyAdminPermission();
return this.hugegraph.raftGroupManager(group);
return this.hugegraph.raftGroupManager();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public void setup(HugeConfig config) {
// Forced set RAFT_MODE to false when initializing backend
graphConfig.setProperty(CoreOptions.RAFT_MODE.name(), "false");
}

// Transfer `raft.group_peers` from server config to graph config
String raftGroupPeers = config.get(ServerOptions.RAFT_GROUP_PEERS);
graphConfig.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

this.graph = (HugeGraph) GraphFactory.open(graphConfig);

String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ public static synchronized ServerOptions instance() {
nonNegativeInt(),
0);

public static final ConfigOption<String> RAFT_GROUP_PEERS =
new ConfigOption<>(
"raft.group_peers",
"The rpc address of raft group initial peers.",
disallowEmpty(),
"127.0.0.1:8090"
);

public static final ConfigOption<Boolean> ALLOW_TRACE =
new ConfigOption<>(
"exception.allow_trace",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.config.ServerConfig;
import com.baidu.hugegraph.HugeFactory;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.AuthManager;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.baidu.hugegraph.serializer.Serializer;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.task.TaskManager;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.define.NodeRole;
import com.baidu.hugegraph.util.ConfigUtil;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -96,37 +98,39 @@ public GraphManager(HugeConfig conf, EventHub hub) {
this.rpcClient = new RpcClientProvider(conf);
this.eventHub = hub;
this.conf = conf;

this.listenChanges();

this.loadGraphs(ConfigUtil.scanGraphsDir(this.graphsDir));

// this.installLicense(conf, "");

// Start RPC-Server for raft-rpc/auth-rpc/cache-notify-rpc...
this.startRpcServer();

// Raft will load snapshot firstly then launch election and replay log
this.waitGraphsStarted();
this.waitGraphsReady();

this.checkBackendVersionOrExit(conf);
this.startRpcServer();
this.serverStarted(conf);

this.addMetrics(conf);
}

public void loadGraphs(final Map<String, String> graphConfs) {
public void loadGraphs(Map<String, String> graphConfs) {
for (Map.Entry<String, String> conf : graphConfs.entrySet()) {
String name = conf.getKey();
String path = conf.getValue();
String graphConfPath = conf.getValue();
HugeFactory.checkGraphName(name, "rest-server.properties");
try {
this.loadGraph(name, path);
this.loadGraph(name, graphConfPath);
} catch (RuntimeException e) {
LOG.error("Graph '{}' can't be loaded: '{}'", name, path, e);
LOG.error("Graph '{}' can't be loaded: '{}'",
name, graphConfPath, e);
}
}
}

public void waitGraphsStarted() {
this.graphs.keySet().forEach(name -> {
HugeGraph graph = this.graph(name);
graph.waitStarted();
});
}

public HugeGraph cloneGraph(String name, String newName,
String configText) {
/*
Expand Down Expand Up @@ -289,6 +293,13 @@ private void startRpcServer() {
}
}

private com.alipay.remoting.rpc.RpcServer remotingRpcServer() {
ServerConfig serverConfig = Whitebox.getInternalState(this.rpcServer,
"serverConfig");
return Whitebox.getInternalState(serverConfig.getServer(),
"remotingServer");
}

private void destroyRpcServer() {
try {
this.rpcClient.destroy();
Expand Down Expand Up @@ -331,21 +342,41 @@ private void closeTx(final Set<String> graphSourceNamesToCloseTxOn,
});
}

private void loadGraph(String name, String path) {
final Graph graph = GraphFactory.open(path);
private void loadGraph(String name, String graphConfPath) {
HugeConfig config = new HugeConfig(graphConfPath);

// Transfer `raft.group_peers` from server config to graph config
String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS);
config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(),
raftGroupPeers);

Graph graph = GraphFactory.open(config);
this.graphs.put(name, graph);
HugeConfig config = (HugeConfig) graph.configuration();
config.file(path);
LOG.info("Graph '{}' was successfully configured via '{}'", name, path);

HugeConfig graphConfig = (HugeConfig) graph.configuration();
assert graphConfPath.equals(graphConfig.file().getPath());

LOG.info("Graph '{}' was successfully configured via '{}'",
name, graphConfPath);

if (this.requireAuthentication() &&
!(graph instanceof HugeGraphAuthProxy)) {
LOG.warn("You may need to support access control for '{}' with {}",
path, HugeFactoryAuthProxy.GRAPH_FACTORY);
graphConfPath, HugeFactoryAuthProxy.GRAPH_FACTORY);
}
}

private void waitGraphsReady() {
com.alipay.remoting.rpc.RpcServer remotingRpcServer =
this.remotingRpcServer();
this.graphs.keySet().forEach(name -> {
HugeGraph graph = this.graph(name);
graph.waitReady(remotingRpcServer);
});
}

private void checkBackendVersionOrExit(HugeConfig config) {
LOG.info("Check backend version");
for (String graph : this.graphs()) {
// TODO: close tx from main thread
HugeGraph hugegraph = this.graph(graph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.auth.AuthManager;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.Query;
Expand Down Expand Up @@ -192,7 +193,7 @@ public interface HugeGraph extends Graph {

void readMode(GraphReadMode readMode);

void waitStarted();
void waitReady(RpcServer rpcServer);

void serverStarted(Id serverId, NodeRole serverRole);

Expand Down Expand Up @@ -227,7 +228,7 @@ public interface HugeGraph extends Graph {

TaskScheduler taskScheduler();

RaftGroupManager raftGroupManager(String group);
RaftGroupManager raftGroupManager();

void proxy(HugeGraph graph);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.slf4j.Logger;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.analyzer.Analyzer;
import com.baidu.hugegraph.analyzer.AnalyzerFactory;
import com.baidu.hugegraph.auth.AuthManager;
Expand Down Expand Up @@ -206,7 +207,7 @@ public StandardHugeGraph(HugeConfig config) {
LockUtil.destroy(this.name);
String message = "Failed to load backend store provider";
LOG.error("{}: {}", message, e.getMessage());
throw new HugeException(message);
throw new HugeException(message, e);
}

try {
Expand Down Expand Up @@ -310,10 +311,10 @@ public void readMode(GraphReadMode readMode) {
}

@Override
public void waitStarted() {
public void waitReady(RpcServer rpcServer) {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.schemaTransaction();
this.storeProvider.waitStoreStarted();
this.storeProvider.waitReady(rpcServer);
}

@Override
Expand Down Expand Up @@ -1004,13 +1005,13 @@ public void switchAuthManager(AuthManager authManager) {
}

@Override
public RaftGroupManager raftGroupManager(String group) {
public RaftGroupManager raftGroupManager() {
if (!(this.storeProvider instanceof RaftBackendStoreProvider)) {
return null;
}
RaftBackendStoreProvider provider =
((RaftBackendStoreProvider) this.storeProvider);
return provider.raftNodeManager(group);
return provider.raftNodeManager();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.slf4j.Logger;

import com.alipay.remoting.rpc.RpcServer;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.raft.StoreSnapshotFile;
Expand Down Expand Up @@ -94,8 +95,8 @@ public void open(String graph) {
}

@Override
public void waitStoreStarted() {
// pass
public void waitReady(RpcServer rpcServer) {
// passs
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static BackendStoreProvider open(HugeGraphParams params) {
if (raftMode) {
LOG.info("Opening backend store '{}' in raft mode for graph '{}'",
backend, graph);
provider = new RaftBackendStoreProvider(provider, params);
provider = new RaftBackendStoreProvider(params, provider);
}
provider.open(graph);
return provider;
Expand Down
Loading

0 comments on commit 8609968

Please sign in to comment.