diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java index dda9845fab..160ace6567 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java @@ -42,7 +42,6 @@ import com.baidu.hugegraph.api.API; import com.baidu.hugegraph.backend.query.QueryResults; import com.baidu.hugegraph.core.GraphManager; -import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.server.RestServer; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap; diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java index 0f5be90e90..18529fa244 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java @@ -45,8 +45,9 @@ public ConfigAuthenticator() { @Override public void setup(HugeConfig config) { - this.tokens.put(USER_ADMIN, config.get(ServerOptions.ADMIN_TOKEN)); - this.tokens.putAll(config.getMap(ServerOptions.USER_TOKENS)); + this.tokens.putAll(config.getMap(ServerOptions.AUTH_USER_TOKENS)); + assert !this.tokens.containsKey(USER_ADMIN); + this.tokens.put(USER_ADMIN, config.get(ServerOptions.AUTH_ADMIN_TOKEN)); } /** diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index 880cf4a02b..36ffbe32d6 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -67,6 +67,8 @@ import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.iterator.FilterIterator; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Client; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Server; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -113,7 +115,7 @@ public HugeGraphAuthProxy(HugeGraph hugegraph) { @Override public HugeGraph hugegraph() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph; } @@ -502,7 +504,7 @@ public Transaction tx() { @Override public void close() throws Exception { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.close(); } @@ -575,7 +577,7 @@ public String backendVersion() { @Override public BackendStoreSystemInfo backendStoreSystemInfo() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph.backendStoreSystemInfo(); } @@ -617,19 +619,19 @@ public void waitStarted() { @Override public void serverStarted(Id serverId, NodeRole serverRole) { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.serverStarted(serverId, serverRole); } @Override public boolean started() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph.started(); } @Override public boolean closed() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph.closed(); } @@ -664,21 +666,28 @@ public RaftGroupManager raftGroupManager(String group) { return this.hugegraph.raftGroupManager(group); } + @Override + public void registerRpcServices(RpcServiceConfig4Server serverConfig, + RpcServiceConfig4Client clientConfig) { + this.verifyAdminPermission(); + this.hugegraph.registerRpcServices(serverConfig, clientConfig); + } + @Override public void initBackend() { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.initBackend(); } @Override public void clearBackend() { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.clearBackend(); } @Override public void truncateBackend() { - verifyAdminPermission(); + this.verifyAdminPermission(); AuthManager userManager = this.hugegraph.authManager(); HugeUser admin = userManager.findUser(HugeAuthenticator.USER_ADMIN); try { diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java index 838650e29b..6f2f8f7507 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java @@ -94,8 +94,8 @@ public void setup(HugeConfig config) { String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL); if (StringUtils.isNotEmpty(remoteUrl)) { - RpcClientProvider provider = new RpcClientProvider(config); - this.graph.switchAuthManager(provider.authManager()); + RpcClientProvider clientProvider = new RpcClientProvider(config); + this.graph.switchAuthManager(clientProvider.authManager()); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 8155627b8b..8b41ea3345 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -204,7 +204,7 @@ public static synchronized ServerOptions instance() { "hugegraph" ); - public static final ConfigOption ADMIN_TOKEN = + public static final ConfigOption AUTH_ADMIN_TOKEN = new ConfigOption<>( "auth.admin_token", "Token for administrator operations, " + @@ -213,7 +213,7 @@ public static synchronized ServerOptions instance() { "162f7848-0b6d-4faf-b557-3a0797869c55" ); - public static final ConfigListOption USER_TOKENS = + public static final ConfigListOption AUTH_USER_TOKENS = new ConfigListOption<>( "auth.user_tokens", "The map of user tokens with name and password, " + @@ -228,7 +228,7 @@ public static synchronized ServerOptions instance() { "If the address is empty, it provide auth service, " + "otherwise it is auth client and also provide auth service " + "through rpc forwarding. The remote url can be set to " + - "multiple addresses, which are linked by ','.", + "multiple addresses, which are concat by ','.", null, "" ); @@ -238,7 +238,7 @@ public static synchronized ServerOptions instance() { "rpc.server_port", "The port bound by rpc server to provide services.", rangeInt(1, Integer.MAX_VALUE), - 8099 + 8090 ); public static final ConfigOption RPC_SERVER_HOST = @@ -258,6 +258,15 @@ public static synchronized ServerOptions instance() { 30 ); + public static final ConfigOption RPC_REMOTE_URL = + new ConfigOption<>( + "rpc.remote_url", + "The remote urls of rpc peers, it can be set to " + + "multiple addresses, which are concat by ','.", + disallowEmpty(), + "127.0.0.1:8090" + ); + public static final ConfigOption RPC_CLIENT_CONNECT_TIMEOUT = new ConfigOption<>( "rpc.client_connect_timeout", diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java index 14a6e307e6..b5075e4ae2 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java @@ -51,8 +51,10 @@ import com.baidu.hugegraph.license.LicenseVerifier; import com.baidu.hugegraph.metrics.MetricsUtil; import com.baidu.hugegraph.metrics.ServerReporter; +import com.baidu.hugegraph.rpc.RpcClientProvider; +import com.baidu.hugegraph.rpc.RpcConsumerConfig; import com.baidu.hugegraph.rpc.RpcProviderConfig; -import com.baidu.hugegraph.rpc.SofaRpcServer; +import com.baidu.hugegraph.rpc.RpcServer; import com.baidu.hugegraph.serializer.JsonSerializer; import com.baidu.hugegraph.serializer.Serializer; import com.baidu.hugegraph.server.RestServer; @@ -67,12 +69,14 @@ public final class GraphManager { private final Map graphs; private final HugeAuthenticator authenticator; - private final SofaRpcServer rpcServer; + private final RpcServer rpcServer; + private final RpcClientProvider rpcClient; public GraphManager(HugeConfig conf) { this.graphs = new ConcurrentHashMap<>(); this.authenticator = HugeAuthenticator.loadAuthenticator(conf); - this.rpcServer = new SofaRpcServer(conf); + this.rpcServer = new RpcServer(conf); + this.rpcClient = new RpcClientProvider(conf); this.loadGraphs(conf.getMap(ServerOptions.GRAPHS)); // this.installLicense(conf, ""); @@ -165,19 +169,35 @@ public AuthManager authManager() { } public void close() { - this.destoryRpcServer(); + this.destroyRpcServer(); } private void startRpcServer() { - RpcProviderConfig config = this.rpcServer.config(); + RpcProviderConfig serverConfig = this.rpcServer.config(); + if (this.authenticator != null) { - config.addService(AuthManager.class, - this.authenticator.authManager()); + serverConfig.addService(AuthManager.class, + this.authenticator.authManager()); + } + + if (this.rpcClient.enabled()) { + RpcConsumerConfig clientConfig = this.rpcClient.config(); + + for (Graph graph : this.graphs.values()) { + HugeGraph hugegraph = (HugeGraph) graph; + hugegraph.registerRpcServices(serverConfig, clientConfig); + } + } + + try { + this.rpcServer.exportAll(); + } catch (Throwable e) { + this.rpcServer.destroy(); + throw e; } - this.rpcServer.exportAll(); } - private void destoryRpcServer() { + private void destroyRpcServer() { this.rpcServer.destroy(); } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java index 428309c58f..93deb941e1 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java @@ -19,26 +19,58 @@ package com.baidu.hugegraph.rpc; -import com.alipay.sofa.rpc.config.ConsumerConfig; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; + +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.baidu.hugegraph.auth.AuthManager; import com.baidu.hugegraph.config.HugeConfig; +import com.baidu.hugegraph.config.ServerOptions; +import com.baidu.hugegraph.util.E; public class RpcClientProvider { - public final RpcConsumerConfig rpcConsumerConfig; + public final RpcConsumerConfig consumerConfig; + public final RpcConsumerConfig authConsumerConfig; public RpcClientProvider(HugeConfig conf) { - RpcCommonConfig.initRpcConfigs(conf); - this.rpcConsumerConfig = new RpcConsumerConfig(); - this.rpcConsumerConfig.addConsumerConfig(AuthManager.class, conf); + // TODO: fetch from registry server + String rpcUrl = conf.get(ServerOptions.RPC_REMOTE_URL); + String selfUrl = conf.get(ServerOptions.RPC_SERVER_HOST) + ":" + + conf.get(ServerOptions.RPC_SERVER_PORT); + rpcUrl = excludeSelfUrl(rpcUrl, selfUrl); + this.consumerConfig = StringUtils.isNotBlank(rpcUrl) ? + new RpcConsumerConfig(conf, rpcUrl) : null; + + String authUrl = conf.get(ServerOptions.AUTH_REMOTE_URL); + this.authConsumerConfig = StringUtils.isNotBlank(authUrl) ? + new RpcConsumerConfig(conf, authUrl) : null; + } + + public boolean enabled() { + return this.consumerConfig != null; + } + + public RpcConsumerConfig config() { + E.checkArgument(this.consumerConfig != null, + "RpcClient is not enabled, please config option '%s'", + ServerOptions.RPC_REMOTE_URL.name()); + return this.consumerConfig; } public AuthManager authManager() { - return (AuthManager) this.serviceProxy(AuthManager.class.getName()); + E.checkArgument(this.authConsumerConfig != null, + "RpcClient is not enabled, please config option '%s'", + ServerOptions.AUTH_REMOTE_URL.name()); + return this.authConsumerConfig.serviceProxy(AuthManager.class); } - public Object serviceProxy(String serviceName) { - ConsumerConfig config = this.rpcConsumerConfig.consumerConfig(serviceName); - return config.refer(); + private static String excludeSelfUrl(String rpcUrl, String selfUrl) { + String[] urls = StringUtils.splitWithCommaOrSemicolon(rpcUrl); + // Keep urls order via LinkedHashSet + Set urlSet = new LinkedHashSet<>(Arrays.asList(urls)); + urlSet.remove(selfUrl); + return String.join(",", urlSet); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java index ec92cc259b..698a53269d 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java @@ -19,20 +19,79 @@ package com.baidu.hugegraph.rpc; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import org.slf4j.Logger; + +import com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap; +import com.alipay.sofa.rpc.client.AbstractCluster; +import com.alipay.sofa.rpc.client.Cluster; +import com.alipay.sofa.rpc.client.ProviderInfo; import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.core.exception.RpcErrorType; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; +import com.baidu.hugegraph.util.Log; import com.google.common.collect.Maps; -public class RpcConsumerConfig { +public class RpcConsumerConfig implements RpcServiceConfig4Client { + + private final HugeConfig conf; + private final String remoteUrls; + private final Map> configs; + + static { + ExtensionLoaderFactory.getExtensionLoader(Cluster.class) + .loadExtension(FanoutCluster.class); + } + + public RpcConsumerConfig(HugeConfig conf, String remoteUrls) { + RpcCommonConfig.initRpcConfigs(conf); + this.conf = conf; + this.remoteUrls = remoteUrls; + this.configs = Maps.newHashMap(); + } + + @Override + public T serviceProxy(String graph, String interfaceId) { + ConsumerConfig config = this.consumerConfig(graph, interfaceId); + return config.refer(); + } + + @Override + public T serviceProxy(String interfaceId) { + ConsumerConfig config = this.consumerConfig(null, interfaceId); + return config.refer(); + } + + private ConsumerConfig consumerConfig(String graph, + String interfaceId) { + String serviceId; + if (graph != null) { + serviceId = interfaceId + ":" + graph; + } else { + serviceId = interfaceId; + } + + @SuppressWarnings("unchecked") + ConsumerConfig consumerConfig = (ConsumerConfig) + this.configs.get(serviceId); + if (consumerConfig != null) { + return consumerConfig; + } - private final Map configs = Maps.newHashMap(); + assert consumerConfig == null; + consumerConfig = new ConsumerConfig<>(); - public void addConsumerConfig(Class clazz, HugeConfig conf) { + HugeConfig conf = this.conf; String protocol = conf.get(ServerOptions.RPC_PROTOCOL); - String directUrl = conf.get(ServerOptions.AUTH_REMOTE_URL); int timeout = conf.get(ServerOptions.RPC_CLIENT_READ_TIMEOUT) * 1000; int connectTimeout = conf.get(ServerOptions .RPC_CLIENT_CONNECT_TIMEOUT) * 1000; @@ -40,22 +99,92 @@ public void addConsumerConfig(Class clazz, HugeConfig conf) { .RPC_CLIENT_RECONNECT_PERIOD) * 1000; int retries = conf.get(ServerOptions.RPC_CLIENT_RETRIES); String loadBalancer = conf.get(ServerOptions.RPC_CLIENT_LOAD_BALANCER); - ConsumerConfig consumerConfig = new ConsumerConfig() - .setInterfaceId(clazz.getName()) - .setProtocol(protocol) - .setDirectUrl(directUrl) - .setTimeout(timeout) - .setConnectTimeout(connectTimeout) - .setReconnectPeriod(reconnectPeriod) - .setRetries(retries) - .setLoadBalancer(loadBalancer); - this.configs.put(clazz.getName(), consumerConfig); + + if (graph != null) { + consumerConfig.setId(serviceId).setUniqueId(graph); + // Default is FailoverCluster, set to FanoutCluster to broadcast + consumerConfig.setCluster("fanout"); + } + consumerConfig.setInterfaceId(interfaceId) + .setProtocol(protocol) + .setDirectUrl(this.remoteUrls) + .setTimeout(timeout) + .setConnectTimeout(connectTimeout) + .setReconnectPeriod(reconnectPeriod) + .setRetries(retries) + .setLoadBalancer(loadBalancer); + + this.configs.put(serviceId, consumerConfig); + return consumerConfig; } - public ConsumerConfig consumerConfig(String serverName) { - if (!this.configs.containsKey(serverName)) { - throw new RpcException("Invalid server name '%s'", serverName); + @Extension("fanout") + private static class FanoutCluster extends AbstractCluster { + + private static final Logger LOG = Log.logger(FanoutCluster.class); + + public FanoutCluster(ConsumerBootstrap consumerBootstrap) { + super(consumerBootstrap); + } + + @Override + protected SofaResponse doInvoke(SofaRequest request) + throws SofaRpcException { + List providers = this.getRouterChain() + .route(request, null); + List responses = new ArrayList<>(providers.size()); + List excepts = new ArrayList<>(providers.size()); + + for (ProviderInfo provider : providers) { + try { + SofaResponse response = this.doInvoke(request, provider); + responses.add(response); + } catch (SofaRpcException e) { + excepts.add(e); + LOG.warn("{}.(error {})", e.getMessage(), e.getErrorType()); + } + } + + if (responses.size() > 0) { + /* + * Just choose the first one as result to return, ignore others + * TODO: maybe more strategies should be provided + */ + return responses.get(0); + } else if (excepts.size() > 0) { + throw excepts.get(0); + } else { + assert providers.isEmpty(); + String method = methodName(request); + throw new SofaRpcException(RpcErrorType.CLIENT_ROUTER, + "No service provider for " + method); + } + } + + private SofaResponse doInvoke(SofaRequest request, + ProviderInfo providerInfo) { + try { + SofaResponse response = this.filterChain(providerInfo, request); + if (response != null) { + return response; + } + String method = methodName(request); + throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, + "Failed to call " + method + " on remote server " + + providerInfo + ", return null response"); + } catch (SofaRpcException e) { + throw e; + } catch (Exception e) { + String method = methodName(request); + throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, + "Failed to call " + method + " on remote server " + + providerInfo + ", cause by exception: " + e); + } + } + + private static String methodName(SofaRequest request) { + return request.getInterfaceName() + "." + + request.getMethodName() + "()"; } - return this.configs.get(serverName); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java index 096d339ca4..7522bff507 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java @@ -24,18 +24,38 @@ import com.alipay.sofa.rpc.config.ProviderConfig; import com.google.common.collect.Maps; -public class RpcProviderConfig { +public class RpcProviderConfig implements RpcServiceConfig4Server { - private final Map configs = Maps.newHashMap(); + private final Map> configs = Maps.newHashMap(); + @Override public void addService(Class clazz, E serviceImpl) { - ProviderConfig providerConfig = new ProviderConfig() - .setInterfaceId(clazz.getName()) - .setRef(serviceImpl); - this.configs.put(clazz.getName(), providerConfig); + this.addService(null, clazz.getName(), serviceImpl); } - public Map configs() { + @Override + public void addService(String graph, Class clazz, + E serviceImpl) { + this.addService(graph, clazz.getName(), serviceImpl); + } + + private void addService(String graph, + String interfaceId, + E serviceImpl) { + ProviderConfig providerConfig = new ProviderConfig<>(); + String serviceId; + if (graph != null) { + serviceId = interfaceId + ":" + graph; + providerConfig.setId(serviceId).setUniqueId(graph); + } else { + serviceId = interfaceId; + } + providerConfig.setInterfaceId(interfaceId) + .setRef(serviceImpl); + this.configs.put(serviceId, providerConfig); + } + + public Map> configs() { return this.configs; } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/SofaRpcServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java similarity index 66% rename from hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/SofaRpcServer.java rename to hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java index a92401b518..d2ee7f17e2 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/SofaRpcServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java @@ -24,39 +24,28 @@ import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; -import com.alipay.sofa.rpc.common.RpcConfigs; -import com.alipay.sofa.rpc.common.RpcOptions; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; -import com.alipay.sofa.rpc.context.RpcRuntimeContext; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; import com.baidu.hugegraph.util.Log; -public class SofaRpcServer { +public class RpcServer { - private static final Logger LOG = Log.logger(SofaRpcServer.class); + private static final Logger LOG = Log.logger(RpcServer.class); private final HugeConfig conf; private final RpcProviderConfig configs; private final ServerConfig serverConfig; - static { - if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - RpcRuntimeContext.destroy(); - }, "SOFA-RPC-ShutdownHook")); - } - } - - public SofaRpcServer(HugeConfig conf) { + public RpcServer(HugeConfig conf) { RpcCommonConfig.initRpcConfigs(conf); this.conf = conf; - this.serverConfig = new ServerConfig() - .setProtocol(conf.get(ServerOptions.RPC_PROTOCOL)) - .setPort(conf.get(ServerOptions.RPC_SERVER_PORT)) - .setHost(conf.get(ServerOptions.RPC_SERVER_HOST)) - .setDaemon(false); + this.serverConfig = new ServerConfig(); + this.serverConfig.setProtocol(conf.get(ServerOptions.RPC_PROTOCOL)) + .setHost(conf.get(ServerOptions.RPC_SERVER_HOST)) + .setPort(conf.get(ServerOptions.RPC_SERVER_PORT)) + .setDaemon(false); this.configs = new RpcProviderConfig(); } @@ -66,22 +55,22 @@ public RpcProviderConfig config() { public void exportAll() { LOG.debug("RpcServer starting on port {}", this.port()); - Map configs = this.configs.configs(); + Map> configs = this.configs.configs(); if (MapUtils.isEmpty(configs)) { LOG.info("RpcServer config is empty, skip starting RpcServer"); return; } int timeout = this.conf.get(ServerOptions.RPC_SERVER_TIMEOUT) * 1000; - for (ProviderConfig providerConfig : configs.values()) { - providerConfig.setServer(this.serverConfig); - providerConfig.setTimeout(timeout); - providerConfig.export(); + for (ProviderConfig providerConfig : configs.values()) { + providerConfig.setServer(this.serverConfig) + .setTimeout(timeout) + .export(); } LOG.info("RpcServer started success on port {}", this.port()); } public void unExport(String serviceName) { - Map configs = this.configs.configs(); + Map> configs = this.configs.configs(); if (!configs.containsKey(serviceName)) { throw new RpcException("The service name '%s' doesn't exist", serviceName); @@ -95,6 +84,16 @@ public int port() { public void destroy() { LOG.info("RpcServer stop on port {}", this.port()); + for (ProviderConfig config : this.configs.configs().values()) { + Object service = config.getRef(); + if (service instanceof AutoCloseable) { + try { + ((AutoCloseable) service).close(); + } catch (Exception e) { + LOG.warn("Failed to close service {}", service, e); + } + } + } this.serverConfig.destroy(); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java index 2895083039..f7b1c9be80 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java @@ -23,10 +23,13 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import javax.ws.rs.core.UriBuilder; +import org.glassfish.grizzly.CompletionHandler; +import org.glassfish.grizzly.GrizzlyFuture; import org.glassfish.grizzly.http.server.HttpServer; import org.glassfish.grizzly.http.server.NetworkListener; import org.glassfish.grizzly.ssl.SSLContextConfigurator; @@ -117,7 +120,44 @@ private HttpServer configHttpServer(URI uri, ResourceConfig rc) { public Future shutdown() { E.checkNotNull(this.httpServer, "http server"); - return this.httpServer.shutdown(); + /* + * Since 2.3.x shutdown() won't call shutdownNow(), so the event + * ApplicationEvent.Type.DESTROY_FINISHED also won't be triggered, + * which is listened by ApplicationConfig.GraphManagerFactory, we + * manually call shutdownNow() here when the future is completed. + * See shutdown() change: + * https://github.com/javaee/grizzly/commit/182d8bcb4e45de5609ab92f6f1d5980f95d79b04 + * #diff-f6c130f38a1ec11bdf9d3cb7e0a81084c8788c79a00befe65e40a13bc989b098R388 + */ + CompletableFuture future = new CompletableFuture<>(); + future.whenComplete((server, exception) -> { + this.httpServer.shutdownNow(); + }); + + GrizzlyFuture grizzlyFuture = this.httpServer.shutdown(); + grizzlyFuture.addCompletionHandler(new CompletionHandler() { + @Override + public void cancelled() { + future.cancel(true); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void completed(HttpServer result) { + future.complete(result); + } + + @Override + public void updated(HttpServer result) { + // pass + } + }); + + return future; } public void shutdownNow() { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java index c4cd67a470..9bc3227388 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java @@ -50,7 +50,7 @@ public class HugeFactory { Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOG.info("HugeGraph is shutting down"); HugeFactory.shutdown(30L); - })); + }, "hugegraph-shutdown")); } private static final String NAME_REGEX = "^[A-Za-z][A-Za-z0-9_]{0,47}$"; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index f9134895f7..f9dd690144 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -38,6 +38,8 @@ import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo; import com.baidu.hugegraph.backend.store.raft.RaftGroupManager; import com.baidu.hugegraph.config.ConfigOption; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Client; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Server; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -167,6 +169,9 @@ public interface HugeGraph extends Graph { public V option(ConfigOption option); + public void registerRpcServices(RpcServiceConfig4Server serverConfig, + RpcServiceConfig4Client clientConfig); + public default List mapPkId2Name(Collection ids) { List names = new ArrayList<>(ids.size()); for (Id id : ids) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 962e8b326d..b6d4821119 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -43,6 +43,10 @@ import com.baidu.hugegraph.auth.AuthManager; import com.baidu.hugegraph.auth.StandardAuthManager; import com.baidu.hugegraph.backend.BackendException; +import com.baidu.hugegraph.backend.cache.Cache; +import com.baidu.hugegraph.backend.cache.CacheNotifier; +import com.baidu.hugegraph.backend.cache.CacheNotifier.GraphCacheNotifier; +import com.baidu.hugegraph.backend.cache.CacheNotifier.SchemaCacheNotifier; import com.baidu.hugegraph.backend.cache.CachedGraphTransaction; import com.baidu.hugegraph.backend.cache.CachedSchemaTransaction; import com.baidu.hugegraph.backend.id.Id; @@ -64,8 +68,11 @@ import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.event.EventHub; +import com.baidu.hugegraph.event.EventListener; import com.baidu.hugegraph.exception.NotAllowException; import com.baidu.hugegraph.io.HugeGraphIoRegistry; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Client; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Server; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -87,6 +94,7 @@ import com.baidu.hugegraph.type.define.NodeRole; import com.baidu.hugegraph.util.DateUtil; import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.Events; import com.baidu.hugegraph.util.LockUtil; import com.baidu.hugegraph.util.Log; import com.baidu.hugegraph.variables.HugeVariables; @@ -187,20 +195,26 @@ public StandardHugeGraph(HugeConfig config) { try { this.storeProvider = this.loadStoreProvider(); - } catch (BackendException e) { + } catch (Exception e) { LockUtil.destroy(this.name); String message = "Failed to load backend store provider"; LOG.error("{}: {}", message, e.getMessage()); throw new HugeException(message); } - this.tx = new TinkerPopTransaction(this); + try { + this.tx = new TinkerPopTransaction(this); - SnowflakeIdGenerator.init(this.params); + SnowflakeIdGenerator.init(this.params); - this.taskManager.addScheduler(this.params); - this.authManager = new StandardAuthManager(this.params); - this.variables = null; + this.taskManager.addScheduler(this.params); + this.authManager = new StandardAuthManager(this.params); + this.variables = null; + } catch (Exception e) { + this.storeProvider.close(); + LockUtil.destroy(this.name); + throw e; + } } @Override @@ -935,6 +949,29 @@ public V option(ConfigOption option) { return config.get(option); } + @Override + public void registerRpcServices(RpcServiceConfig4Server serverConfig, + RpcServiceConfig4Client clientConfig) { + /* + * Skip register cache-rpc service if it's non-shared storage, + * because we assume cache of non-shared storage is updated by raft. + */ + if (!this.backendStoreFeatures().supportsSharedStorage()) { + return; + } + + Class clazz1 = GraphCacheNotifier.class; + // The proxy is sometimes unavailable (issue #664) + CacheNotifier proxy = clientConfig.serviceProxy(this.name, clazz1); + serverConfig.addService(this.name, clazz1, new HugeGraphCacheNotifier( + this.graphEventHub, proxy)); + + Class clazz2 = SchemaCacheNotifier.class; + proxy = clientConfig.serviceProxy(this.name, clazz2); + serverConfig.addService(this.name, clazz2, new HugeSchemaCacheNotifier( + this.schemaEventHub, proxy)); + } + private void closeTx() { try { if (this.tx.isOpen()) { @@ -1334,4 +1371,83 @@ public SysTransaction(HugeGraphParams graph, BackendStore store) { this.autoCommit(true); } } + + private static class AbstractCacheNotifier implements CacheNotifier { + + private final EventHub hub; + private final EventListener cacheEventListener; + + public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) { + this.hub = hub; + this.cacheEventListener = event -> { + Object[] args = event.args(); + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + if (Cache.ACTION_INVALIDED.equals(args[0])) { + event.checkArgs(String.class, HugeType.class, Object.class); + HugeType type = (HugeType) args[1]; + Object ids = args[2]; + if (ids instanceof Id[]) { + // argument type mismatch: proxy.invalid2(type,Id[]ids) + proxy.invalid2(type, (Id[]) ids); + } else if (ids instanceof Id) { + proxy.invalid(type, (Id) ids); + } else { + E.checkArgument(false, "Unexpected argument: %s", ids); + } + return true; + } else if (Cache.ACTION_CLEARED.equals(args[0])) { + event.checkArgs(String.class, HugeType.class); + HugeType type = (HugeType) args[1]; + proxy.clear(type); + return true; + } + return false; + }; + this.hub.listen(Events.CACHE, this.cacheEventListener); + } + + @Override + public void close() { + this.hub.unlisten(Events.CACHE, this.cacheEventListener); + } + + @Override + public void invalid(HugeType type, Id id) { + this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id); + } + + @Override + public void invalid2(HugeType type, Object[] ids) { + this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids); + } + + @Override + public void clear(HugeType type) { + this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type, null); + } + + @Override + public void reload() { + // pass + } + } + + private static class HugeSchemaCacheNotifier + extends AbstractCacheNotifier + implements SchemaCacheNotifier { + + public HugeSchemaCacheNotifier(EventHub hub, CacheNotifier proxy) { + super(hub, proxy); + } + } + + private static class HugeGraphCacheNotifier + extends AbstractCacheNotifier + implements GraphCacheNotifier { + + public HugeGraphCacheNotifier(EventHub hub, CacheNotifier proxy) { + super(hub, proxy); + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java index 14d251e515..8243b0360b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java @@ -34,9 +34,6 @@ public abstract class AbstractCache implements Cache { public static final int DEFAULT_SIZE = 1 * MB; public static final int MAX_INIT_CAP = 100 * MB; - public static final String ACTION_INVALID = "invalid"; - public static final String ACTION_CLEAR = "clear"; - protected static final Logger LOG = Log.logger(Cache.class); private volatile long hits = 0L; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java index c8294af949..5347b3af7a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java @@ -24,6 +24,11 @@ public interface Cache { + public static final String ACTION_INVALID = "invalid"; + public static final String ACTION_CLEAR = "clear"; + public static final String ACTION_INVALIDED = "invalided"; + public static final String ACTION_CLEARED = "cleared"; + public Object get(K id); public Object getOrFetch(K id, Function fetcher); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java new file mode 100644 index 0000000000..39154807a0 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.backend.cache; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.type.HugeType; + +public interface CacheNotifier extends AutoCloseable { + + public void invalid(HugeType type, Id id); + + public void invalid2(HugeType type, Object[] ids); + + public void clear(HugeType type); + + public void reload(); + + public interface GraphCacheNotifier extends CacheNotifier {} + + public interface SchemaCacheNotifier extends CacheNotifier {} +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java index 1d1a5bcdc3..a24d54df3d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java @@ -19,9 +19,7 @@ package com.baidu.hugegraph.backend.cache; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; - +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -50,6 +48,7 @@ import com.baidu.hugegraph.structure.HugeEdge; import com.baidu.hugegraph.structure.HugeVertex; import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Events; import com.google.common.collect.ImmutableSet; @@ -127,8 +126,7 @@ private void listenChanges() { if (storeEvents.contains(event.name())) { LOG.debug("Graph {} clear graph cache on event '{}'", this.graph(), event.name()); - this.verticesCache.clear(); - this.edgesCache.clear(); + this.clearCache(null, true); return true; } return false; @@ -139,14 +137,32 @@ private void listenChanges() { this.cacheEventListener = event -> { LOG.debug("Graph {} received graph cache event: {}", this.graph(), event); - event.checkArgs(String.class, HugeType.class, Id.class); Object[] args = event.args(); - if (ACTION_INVALID.equals(args[0])) { + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + if (Cache.ACTION_INVALID.equals(args[0])) { + event.checkArgs(String.class, HugeType.class, Object.class); HugeType type = (HugeType) args[1]; - Id id = (Id) args[2]; if (type.isVertex()) { // Invalidate vertex cache - this.verticesCache.invalidate(id); + Object arg2 = args[2]; + if (arg2 instanceof Id) { + Id id = (Id) arg2; + this.verticesCache.invalidate(id); + } else if (arg2 != null && arg2.getClass().isArray()) { + int size = Array.getLength(arg2); + for (int i = 0; i < size; i++) { + Object id = Array.get(arg2, i); + E.checkArgument(id instanceof Id, + "Expect instance of Id in array, " + + "but got '%s'", id.getClass()); + this.verticesCache.invalidate((Id) id); + } + } else { + E.checkArgument(false, + "Expect Id or Id[], but got: %s", + arg2); + } } else if (type.isEdge()) { /* * Invalidate edge cache via clear instead of invalidate @@ -156,9 +172,10 @@ private void listenChanges() { this.edgesCache.clear(); } return true; - } else if (ACTION_CLEAR.equals(args[0])) { - this.verticesCache.clear(); - this.edgesCache.clear(); + } else if (Cache.ACTION_CLEAR.equals(args[0])) { + event.checkArgs(String.class, HugeType.class); + HugeType type = (HugeType) args[1]; + this.clearCache(type, false); return true; } return false; @@ -178,6 +195,24 @@ private void unlistenChanges() { graphEventHub.unlisten(Events.CACHE, this.cacheEventListener); } + private void notifyChanges(String action, HugeType type, Id[] ids) { + EventHub graphEventHub = this.params().graphEventHub(); + graphEventHub.notify(Events.CACHE, action, type, ids); + } + + private void clearCache(HugeType type, boolean notify) { + if (type == null || type == HugeType.VERTEX) { + this.verticesCache.clear(); + } + if (type == null || type == HugeType.EDGE) { + this.edgesCache.clear(); + } + + if (notify) { + this.notifyChanges(Cache.ACTION_CLEARED, null, null); + } + } + @Override protected final Iterator queryVerticesFromBackend(Query query) { if (!query.ids().isEmpty() && query.conditions().isEmpty()) { @@ -284,14 +319,18 @@ protected final Iterator queryEdgesFromBackend(Query query) { @Override protected final void commitMutation2Backend(BackendMutation... mutations) { // Collect changes before commit - Collection changes = this.verticesInTxUpdated(); + Collection updates = this.verticesInTxUpdated(); Collection deletions = this.verticesInTxRemoved(); + Id[] vertexIds = new Id[updates.size() + deletions.size()]; + int vertexOffset = 0; + int edgesInTxSize = this.edgesInTxSize(); try { super.commitMutation2Backend(mutations); // Update vertex cache - for (HugeVertex vertex : changes) { + for (HugeVertex vertex : updates) { + vertexIds[vertexOffset++] = vertex.id(); if (vertex.sizeOfSubProperties() > MAX_CACHE_PROPS_PER_VERTEX) { // Skip large vertex this.verticesCache.invalidate(vertex.id()); @@ -302,13 +341,19 @@ protected final void commitMutation2Backend(BackendMutation... mutations) { } finally { // Update removed vertex in cache whatever success or fail for (HugeVertex vertex : deletions) { + vertexIds[vertexOffset++] = vertex.id(); this.verticesCache.invalidate(vertex.id()); } + if (vertexOffset > 0) { + this.notifyChanges(Cache.ACTION_INVALIDED, + HugeType.VERTEX, vertexIds); + } // Update edge cache if any edges change if (edgesInTxSize > 0) { // TODO: Use a more precise strategy to update the edge cache this.edgesCache.clear(); + this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE, null); } } } @@ -322,6 +367,7 @@ public final void removeIndex(IndexLabel indexLabel) { if (indexLabel.baseType() == HugeType.EDGE_LABEL) { // TODO: Use a more precise strategy to update the edge cache this.edgesCache.clear(); + this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE, null); } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index 1b3bbeccc6..7adc1bf77a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -19,9 +19,6 @@ package com.baidu.hugegraph.backend.cache; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; - import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -38,6 +35,7 @@ import com.baidu.hugegraph.event.EventListener; import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Events; import com.google.common.collect.ImmutableSet; @@ -74,7 +72,7 @@ public void close() { try { super.close(); } finally { - this.clearCache(); + this.clearCache(false); this.unlistenChanges(); } } @@ -94,7 +92,7 @@ private void listenChanges() { if (storeEvents.contains(event.name())) { LOG.debug("Graph {} clear schema cache on event '{}'", this.graph(), event.name()); - this.clearCache(); + this.clearCache(true); return true; } return false; @@ -105,9 +103,11 @@ private void listenChanges() { this.cacheEventListener = event -> { LOG.debug("Graph {} received schema cache event: {}", this.graph(), event); - event.checkArgs(String.class, HugeType.class, Id.class); Object[] args = event.args(); - if (ACTION_INVALID.equals(args[0])) { + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + if (Cache.ACTION_INVALID.equals(args[0])) { + event.checkArgs(String.class, HugeType.class, Id.class); HugeType type = (HugeType) args[1]; Id id = (Id) args[2]; this.arrayCaches.remove(type, id); @@ -126,8 +126,9 @@ private void listenChanges() { } this.resetCachedAll(type); return true; - } else if (ACTION_CLEAR.equals(args[0])) { - this.clearCache(); + } else if (Cache.ACTION_CLEAR.equals(args[0])) { + event.checkArgs(String.class, HugeType.class); + this.clearCache(false); return true; } return false; @@ -143,10 +144,14 @@ private final void resetCachedAll(HugeType type) { this.cachedTypes().put(type, false); } - private void clearCache() { + private void clearCache(boolean notify) { this.idCache.clear(); this.nameCache.clear(); this.arrayCaches.clear(); + + if (notify) { + this.notifyChanges(Cache.ACTION_CLEARED, null, null); + } } private void unlistenChanges() { @@ -158,6 +163,11 @@ private void unlistenChanges() { schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener); } + private void notifyChanges(String action, HugeType type, Id id) { + EventHub graphEventHub = this.params().schemaEventHub(); + graphEventHub.notify(Events.CACHE, action, type, id); + } + private final void resetCachedAllIfReachedCapacity() { if (this.idCache.size() >= this.idCache.capacity()) { LOG.warn("Schema cache reached capacity({}): {}", @@ -196,6 +206,8 @@ protected void addSchema(SchemaElement schema) { // update optimized array cache this.arrayCaches.updateIfNeeded(schema); + + this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id()); } @Override @@ -267,6 +279,8 @@ protected void removeSchema(SchemaElement schema) { // remove from optimized array cache this.arrayCaches.remove(schema.type(), schema.id()); + + this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id()); } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java index 8330147d0e..49b83a3e36 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java @@ -25,6 +25,10 @@ public default boolean supportsPersistence() { return true; } + public default boolean supportsSharedStorage() { + return true; + } + public boolean supportsScanToken(); public boolean supportsScanKeyPrefix(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java index c7d9db8e1b..7b1e12bea3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java @@ -363,6 +363,11 @@ public boolean supportsPersistence() { return false; } + @Override + public boolean supportsSharedStorage() { + return false; + } + @Override public boolean supportsScanToken() { return false; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java index aeb40eae56..0e465da9ec 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java @@ -74,6 +74,12 @@ private void checkOpened() { "The RaftBackendStoreProvider has not been opened"); } + private void checkNonSharedStore(BackendStore store) { + E.checkArgument(!store.features().supportsSharedStorage(), + "Can't enable raft mode with %s backend", + this.type()); + } + @Override public String type() { return this.provider.type(); @@ -94,6 +100,7 @@ public synchronized BackendStore loadSchemaStore(final String name) { if (this.schemaStore == null) { LOG.info("Init raft backend schema store"); BackendStore store = this.provider.loadSchemaStore(name); + this.checkNonSharedStore(store); this.schemaStore = new RaftBackendStore(store, this.context); this.context.addStore(StoreType.SCHEMA, this.schemaStore); } @@ -105,6 +112,7 @@ public synchronized BackendStore loadGraphStore(String name) { if (this.graphStore == null) { LOG.info("Init raft backend graph store"); BackendStore store = this.provider.loadGraphStore(name); + this.checkNonSharedStore(store); this.graphStore = new RaftBackendStore(store, this.context); this.context.addStore(StoreType.GRAPH, this.graphStore); } @@ -116,6 +124,7 @@ public synchronized BackendStore loadSystemStore(String name) { if (this.systemStore == null) { LOG.info("Init raft backend system store"); BackendStore store = this.provider.loadSystemStore(name); + this.checkNonSharedStore(store); this.systemStore = new RaftBackendStore(store, this.context); this.context.addStore(StoreType.SYSTEM, this.systemStore); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index d9493df35f..5b2815fd08 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.backend.store.raft; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; - import java.io.File; import java.io.IOException; import java.nio.file.Paths; @@ -44,7 +42,7 @@ import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraphParams; -import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.cache.Cache; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType; @@ -257,11 +255,11 @@ public NodeOptions nodeOptions() throws IOException { public void clearCache() { // Just choose two representatives used to represent schema and graph - this.notifyCache(ACTION_CLEAR, HugeType.VERTEX_LABEL, null); - this.notifyCache(ACTION_CLEAR, HugeType.VERTEX, null); + this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX_LABEL, null); + this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX, null); } - public void notifyCache(String action, HugeType type, Id id) { + protected void notifyCache(String action, HugeType type, Object id) { EventHub eventHub; if (type.isGraph()) { eventHub = this.params.graphEventHub(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 42cfefb37b..a5697209a6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -19,8 +19,7 @@ package com.baidu.hugegraph.backend.store.raft; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; - +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -36,6 +35,9 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.baidu.hugegraph.backend.BackendException; +import com.baidu.hugegraph.backend.cache.Cache; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.serializer.BytesBuffer; import com.baidu.hugegraph.backend.store.BackendAction; import com.baidu.hugegraph.backend.store.BackendEntry; @@ -84,13 +86,23 @@ private void updateCacheIfNeeded(BackendMutation mutation, return; } for (HugeType type : mutation.types()) { - if (!type.isGraph() && !type.isSchema()) { - continue; - } - for (java.util.Iterator it = mutation.mutation(type); - it.hasNext();) { - BackendEntry entry = it.next().entry(); - this.context.notifyCache(ACTION_INVALID, type, entry.originId()); + if (type.isSchema()) { + java.util.Iterator it = mutation.mutation(type); + while (it.hasNext()) { + BackendEntry entry = it.next().entry(); + this.context.notifyCache(Cache.ACTION_INVALID, type, + entry.originId()); + } + } else if (type.isGraph()) { + List ids = new ArrayList<>((int) Query.COMMIT_BATCH); + java.util.Iterator it = mutation.mutation(type); + while (it.hasNext()) { + ids.add(it.next().entry().originId()); + } + this.context.notifyCache(Cache.ACTION_INVALID, type, + ids.toArray()); + } else { + // Ignore other types due to not cached } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java new file mode 100644 index 0000000000..9c42ce6ee9 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.rpc; + +public interface RpcServiceConfig4Client { + + public T serviceProxy(String interfaceId); + + public T serviceProxy(String graph, String interfaceId); + + public default T serviceProxy(Class clazz) { + return this.serviceProxy(clazz.getName()); + } + + public default T serviceProxy(String graph, Class clazz) { + return this.serviceProxy(graph, clazz.getName()); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java new file mode 100644 index 0000000000..53ce5a3aaa --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.rpc; + +public interface RpcServiceConfig4Server { + + public void addService(Class clazz, E serviceImpl); + + public void addService(String graph, Class clazz, + E serviceImpl); +} diff --git a/hugegraph-dist/src/assembly/static/conf/log4j2.xml b/hugegraph-dist/src/assembly/static/conf/log4j2.xml index 12c0fb84ea..07621ed182 100644 --- a/hugegraph-dist/src/assembly/static/conf/log4j2.xml +++ b/hugegraph-dist/src/assembly/static/conf/log4j2.xml @@ -30,6 +30,9 @@ + + + diff --git a/hugegraph-dist/src/assembly/static/conf/rest-server.properties b/hugegraph-dist/src/assembly/static/conf/rest-server.properties index 4287e13566..30bc288287 100644 --- a/hugegraph-dist/src/assembly/static/conf/rest-server.properties +++ b/hugegraph-dist/src/assembly/static/conf/rest-server.properties @@ -16,11 +16,11 @@ server.id=server-1 server.role=master rpc.server_host=127.0.0.1 -rpc.server_port=8893 +rpc.server_port=8090 rpc.server_timeout=30 +rpc.remote_url=127.0.0.1:8090 rpc.client_connect_timeout=20 rpc.client_reconnect_period=10 rpc.client_read_timeout=40 rpc.client_retries=3 rpc.client_load_balancer=consistentHash -rpc.protocol=bolt diff --git a/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java b/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java index 76603526e2..7b0def5ab6 100644 --- a/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java +++ b/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java @@ -102,6 +102,6 @@ public static void main(String[] args) throws Exception { LOG.info("HugeGraphServer stopping"); server.stop(); LOG.info("HugeGraphServer stopped"); - })); + }, "hugegraph-server-shutdown")); } } diff --git a/hugegraph-dist/src/main/resources/log4j2.xml b/hugegraph-dist/src/main/resources/log4j2.xml index d2aac25589..10f10c7662 100644 --- a/hugegraph-dist/src/main/resources/log4j2.xml +++ b/hugegraph-dist/src/main/resources/log4j2.xml @@ -35,6 +35,10 @@ + + + + diff --git a/hugegraph-example/src/main/resources/log4j2.xml b/hugegraph-example/src/main/resources/log4j2.xml index 934cef3440..5a705332ff 100644 --- a/hugegraph-example/src/main/resources/log4j2.xml +++ b/hugegraph-example/src/main/resources/log4j2.xml @@ -20,21 +20,31 @@ + + + + + + + - + + - + + - + + diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java index 0820b639eb..cdbb00a8aa 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java @@ -23,6 +23,11 @@ public class RocksDBFeatures implements BackendFeatures { + @Override + public boolean supportsSharedStorage() { + return false; + } + @Override public boolean supportsScanToken() { return false; diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java index d1741dc870..7c070da341 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java @@ -3139,8 +3139,7 @@ public void testQueryAdjacentVerticesOfEdgesWithoutVertex() Assert.assertTrue(adjacent.schemaLabel().undefined()); Assert.assertEquals("~undefined", adjacent.label()); - params().graphEventHub().notify(Events.CACHE, "clear", - null, null).get(); + params().graphEventHub().notify(Events.CACHE, "clear", null).get(); vertices = graph.traversal().V(james.id()).outE().otherV().toList(); Assert.assertEquals(1, vertices.size()); adjacent = (HugeVertex) vertices.get(0); @@ -3285,8 +3284,7 @@ public void testQueryAdjacentVerticesOfEdgesWithInvalidVertexLabel() Whitebox.setInternalState(params().graphTransaction(), "checkCustomVertexExist", false); - params().graphEventHub().notify(Events.CACHE, "clear", - null, null).get(); + params().graphEventHub().notify(Events.CACHE, "clear", null).get(); try { // override vertex designer-456 wirh programmer-456 graph.addVertex(T.label, "programmer", T.id, "456", diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java index e96a4ee392..4a7080b284 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java @@ -1215,7 +1215,7 @@ public void testListEdgeLabels() { Assert.assertTrue(edgeLabels.contains(write)); // clear cache - params().schemaEventHub().call(Events.CACHE, "clear", null, null); + params().schemaEventHub().call(Events.CACHE, "clear", null); Assert.assertEquals(look, schema.getEdgeLabel("look")); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java index 30425d7807..79525d8cbe 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java @@ -1097,7 +1097,7 @@ public void testListVertexLabels() { Assert.assertTrue(vertexLabels.contains(book)); // clear cache - params().schemaEventHub().call(Events.CACHE, "clear", null, null); + params().schemaEventHub().call(Events.CACHE, "clear", null); Assert.assertEquals(person, schema.getVertexLabel("person")); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java index c0c4d1ba7a..6583a0730c 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java @@ -86,8 +86,7 @@ public void testEventClear() throws Exception { Assert.assertEquals(2L, Whitebox.invoke(cache, "verticesCache", "size")); - this.params.graphEventHub().notify(Events.CACHE, "clear", - null, null).get(); + this.params.graphEventHub().notify(Events.CACHE, "clear", null).get(); Assert.assertEquals(0L, Whitebox.invoke(cache, "verticesCache", "size")); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java index 82884b2cbc..271e5267ba 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java @@ -83,8 +83,7 @@ public void testEventClear() throws Exception { Assert.assertEquals(IdGenerator.of(2), cache.getPropertyKey("fake-pk-2").id()); - this.params.schemaEventHub().notify(Events.CACHE, "clear", - null, null).get(); + this.params.schemaEventHub().notify(Events.CACHE, "clear", null).get(); Assert.assertEquals(0L, Whitebox.invoke(cache, "idCache", "size")); Assert.assertEquals(0L, Whitebox.invoke(cache, "nameCache", "size")); diff --git a/hugegraph-test/src/main/resources/log4j2.xml b/hugegraph-test/src/main/resources/log4j2.xml index dcfec0cf2a..5881345a7e 100644 --- a/hugegraph-test/src/main/resources/log4j2.xml +++ b/hugegraph-test/src/main/resources/log4j2.xml @@ -20,18 +20,31 @@ + + + + + + + + + + + + +