From fc7e50ca11fc2e7b7abc047d357bb6d6464558b6 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Fri, 12 Feb 2021 03:03:18 +0800 Subject: [PATCH 01/11] support: invalid cache through rpc Change-Id: I3fe18cd8c893a84a56fb66df651b78c5e0fe5f4b --- .../api/traversers/FusiformSimilarityAPI.java | 1 - .../hugegraph/auth/ConfigAuthenticator.java | 4 +- .../hugegraph/auth/HugeGraphAuthProxy.java | 29 ++-- .../hugegraph/auth/StandardAuthenticator.java | 4 +- .../baidu/hugegraph/config/ServerOptions.java | 13 +- .../baidu/hugegraph/core/GraphManager.java | 31 +++- .../hugegraph/rpc/RpcClientProvider.java | 23 +-- .../hugegraph/rpc/RpcConsumerConfig.java | 75 ++++++-- .../hugegraph/rpc/RpcProviderConfig.java | 34 +++- .../{SofaRpcServer.java => RpcServer.java} | 18 +- .../java/com/baidu/hugegraph/HugeGraph.java | 5 + .../baidu/hugegraph/StandardHugeGraph.java | 162 ++++++++++++++++++ .../backend/cache/AbstractCache.java | 3 - .../baidu/hugegraph/backend/cache/Cache.java | 5 + .../backend/cache/CachedGraphTransaction.java | 53 ++++-- .../cache/CachedSchemaTransaction.java | 36 ++-- .../backend/store/raft/RaftSharedContext.java | 7 +- .../backend/store/raft/StoreStateMachine.java | 6 +- .../src/assembly/static/conf/log4j2.xml | 3 + hugegraph-dist/src/main/resources/log4j2.xml | 4 + .../src/main/resources/log4j2.xml | 16 +- hugegraph-test/src/main/resources/log4j2.xml | 13 ++ 22 files changed, 440 insertions(+), 105 deletions(-) rename hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/{SofaRpcServer.java => RpcServer.java} (86%) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java index dda9845fab..160ace6567 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/traversers/FusiformSimilarityAPI.java @@ -42,7 +42,6 @@ import com.baidu.hugegraph.api.API; import com.baidu.hugegraph.backend.query.QueryResults; import com.baidu.hugegraph.core.GraphManager; -import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.server.RestServer; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser; import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap; diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java index ce5b1bed47..791774c084 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java @@ -45,8 +45,8 @@ public ConfigAuthenticator() { @Override public void setup(HugeConfig config) { - this.tokens.put(USER_ADMIN, config.get(ServerOptions.ADMIN_TOKEN)); - this.tokens.putAll(config.getMap(ServerOptions.USER_TOKENS)); + this.tokens.put(USER_ADMIN, config.get(ServerOptions.AUTH_ADMIN_TOKEN)); + this.tokens.putAll(config.getMap(ServerOptions.AUTH_USER_TOKENS)); } /** diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index d06ebceb18..e4caff6275 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -67,6 +67,8 @@ import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.iterator.FilterIterator; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Client; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Server; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -113,7 +115,7 @@ public HugeGraphAuthProxy(HugeGraph hugegraph) { @Override public HugeGraph hugegraph() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph; } @@ -502,7 +504,7 @@ public Transaction tx() { @Override public void close() throws Exception { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.close(); } @@ -575,7 +577,7 @@ public String backendVersion() { @Override public BackendStoreSystemInfo backendStoreSystemInfo() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph.backendStoreSystemInfo(); } @@ -617,19 +619,19 @@ public void waitStarted() { @Override public void serverStarted(Id serverId, NodeRole serverRole) { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.serverStarted(serverId, serverRole); } @Override public boolean started() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph.started(); } @Override public boolean closed() { - verifyAdminPermission(); + this.verifyAdminPermission(); return this.hugegraph.closed(); } @@ -664,21 +666,28 @@ public RaftGroupManager raftGroupManager(String group) { return this.hugegraph.raftGroupManager(group); } + @Override + public void registerRpcServices(RpcServiceConfig4Server serverConfig, + RpcServiceConfig4Client clientConfig) { + this.verifyAdminPermission(); + this.hugegraph.registerRpcServices(serverConfig, clientConfig); + } + @Override public void initBackend() { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.initBackend(); } @Override public void clearBackend() { - verifyAdminPermission(); + this.verifyAdminPermission(); this.hugegraph.clearBackend(); } @Override public void truncateBackend() { - verifyAdminPermission(); + this.verifyAdminPermission(); HugeUser admin = this.hugegraph.userManager() .findUser(HugeAuthenticator.USER_ADMIN); try { @@ -1354,7 +1363,7 @@ public RolePermission loginUser(String username, String password) { private void switchUserManager(UserManager userManager) { this.userManager = userManager; - hugegraph.switchUserManager(userManager); + HugeGraphAuthProxy.this.hugegraph.switchUserManager(userManager); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java index 08c8a98ed6..2dd8a5371d 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/StandardAuthenticator.java @@ -83,8 +83,8 @@ public void setup(HugeConfig config) { String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL); if (StringUtils.isNotEmpty(remoteUrl)) { - RpcClientProvider provider = new RpcClientProvider(config); - this.graph.switchUserManager(provider.userManager()); + RpcClientProvider clientProvider = new RpcClientProvider(config); + this.graph.switchUserManager(clientProvider.userManager()); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index dcce7c7493..2d869189f1 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -204,7 +204,7 @@ public static synchronized ServerOptions instance() { "hugegraph" ); - public static final ConfigOption ADMIN_TOKEN = + public static final ConfigOption AUTH_ADMIN_TOKEN = new ConfigOption<>( "auth.admin_token", "Token for administrator operations, " + @@ -213,7 +213,7 @@ public static synchronized ServerOptions instance() { "162f7848-0b6d-4faf-b557-3a0797869c55" ); - public static final ConfigListOption USER_TOKENS = + public static final ConfigListOption AUTH_USER_TOKENS = new ConfigListOption<>( "auth.user_tokens", "The map of user tokens with name and password, " + @@ -258,6 +258,15 @@ public static synchronized ServerOptions instance() { 30 ); + public static final ConfigOption RPC_REMOTE_URL = + new ConfigOption<>( + "rpc.remote_url", + "The remote urls of rpc peers, it can be set to " + + "multiple addresses, which are concat by ','.", + disallowEmpty(), + "127.0.0.1:8099" + ); + public static final ConfigOption RPC_CLIENT_CONNECT_TIMEOUT = new ConfigOption<>( "rpc.client_connect_timeout", diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java index 727f6ea034..cd1cd088c0 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java @@ -51,8 +51,10 @@ import com.baidu.hugegraph.license.LicenseVerifier; import com.baidu.hugegraph.metrics.MetricsUtil; import com.baidu.hugegraph.metrics.ServerReporter; +import com.baidu.hugegraph.rpc.RpcClientProvider; +import com.baidu.hugegraph.rpc.RpcConsumerConfig; import com.baidu.hugegraph.rpc.RpcProviderConfig; -import com.baidu.hugegraph.rpc.SofaRpcServer; +import com.baidu.hugegraph.rpc.RpcServer; import com.baidu.hugegraph.serializer.JsonSerializer; import com.baidu.hugegraph.serializer.Serializer; import com.baidu.hugegraph.server.RestServer; @@ -67,12 +69,14 @@ public final class GraphManager { private final Map graphs; private final HugeAuthenticator authenticator; - private final SofaRpcServer rpcServer; + private final RpcServer rpcServer; + private final RpcClientProvider rpcClient; public GraphManager(HugeConfig conf) { this.graphs = new ConcurrentHashMap<>(); this.authenticator = HugeAuthenticator.loadAuthenticator(conf); - this.rpcServer = new SofaRpcServer(conf); + this.rpcServer = new RpcServer(conf); + this.rpcClient = new RpcClientProvider(conf); this.loadGraphs(conf.getMap(ServerOptions.GRAPHS)); // this.installLicense(conf, ""); @@ -169,12 +173,25 @@ public void close() { } private void startRpcServer() { - RpcProviderConfig config = this.rpcServer.config(); + RpcProviderConfig serverConfig = this.rpcServer.config(); + RpcConsumerConfig clientConfig = this.rpcClient.config(); + if (this.authenticator != null) { - config.addService(UserManager.class, - this.authenticator.userManager()); + serverConfig.addService(UserManager.class, + this.authenticator.userManager()); + } + + for (Graph graph : this.graphs.values()) { + HugeGraph hugegraph = (HugeGraph) graph; + hugegraph.registerRpcServices(serverConfig, clientConfig); + } + + try { + this.rpcServer.exportAll(); + } catch (Throwable e) { + this.rpcServer.destroy(); + throw e; } - this.rpcServer.exportAll(); } private void destoryRpcServer() { diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java index d028ce8104..b0d84a5989 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java @@ -19,26 +19,29 @@ package com.baidu.hugegraph.rpc; -import com.alipay.sofa.rpc.config.ConsumerConfig; import com.baidu.hugegraph.auth.UserManager; import com.baidu.hugegraph.config.HugeConfig; +import com.baidu.hugegraph.config.ServerOptions; public class RpcClientProvider { - public final RpcConsumerConfig rpcConsumerConfig; + public final RpcConsumerConfig consumerConfig; + public final RpcConsumerConfig authConsumerConfig; public RpcClientProvider(HugeConfig conf) { - RpcCommonConfig.initRpcConfigs(conf); - this.rpcConsumerConfig = new RpcConsumerConfig(); - this.rpcConsumerConfig.addConsumerConfig(UserManager.class, conf); + // TODO: fetch from registry server + String rpcUrl = conf.get(ServerOptions.RPC_REMOTE_URL); + this.consumerConfig = new RpcConsumerConfig(conf, rpcUrl); + + String authUrl = conf.get(ServerOptions.AUTH_REMOTE_URL); + this.authConsumerConfig = new RpcConsumerConfig(conf, authUrl); } - public UserManager userManager() { - return (UserManager) this.serviceProxy(UserManager.class.getName()); + public RpcConsumerConfig config() { + return this.consumerConfig; } - public Object serviceProxy(String serviceName) { - ConsumerConfig config = this.rpcConsumerConfig.consumerConfig(serviceName); - return config.refer(); + public UserManager userManager() { + return this.authConsumerConfig.serviceProxy(UserManager.class); } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java index ec92cc259b..75871e1765 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java @@ -26,13 +26,52 @@ import com.baidu.hugegraph.config.ServerOptions; import com.google.common.collect.Maps; -public class RpcConsumerConfig { +public class RpcConsumerConfig implements RpcServiceConfig4Client { - private final Map configs = Maps.newHashMap(); + private final HugeConfig conf; + private final String remoteUrl; + private final Map> configs; - public void addConsumerConfig(Class clazz, HugeConfig conf) { + public RpcConsumerConfig(HugeConfig conf, String remoteUrl) { + RpcCommonConfig.initRpcConfigs(conf); + this.conf = conf; + this.remoteUrl = remoteUrl; + this.configs = Maps.newHashMap(); + } + + @Override + public T serviceProxy(String graph, String interfaceId) { + ConsumerConfig config = this.consumerConfig(graph, interfaceId); + return config.refer(); + } + + @Override + public T serviceProxy(String interfaceId) { + ConsumerConfig config = this.consumerConfig(null, interfaceId); + return config.refer(); + } + + private ConsumerConfig consumerConfig(String graph, + String interfaceId) { + String serviceId; + if (graph != null) { + serviceId = interfaceId + ":" + graph; + } else { + serviceId = interfaceId; + } + + @SuppressWarnings("unchecked") + ConsumerConfig consumerConfig = (ConsumerConfig) + this.configs.get(serviceId); + if (consumerConfig != null) { + return consumerConfig; + } + + assert consumerConfig == null; + consumerConfig = new ConsumerConfig<>(); + + HugeConfig conf = this.conf; String protocol = conf.get(ServerOptions.RPC_PROTOCOL); - String directUrl = conf.get(ServerOptions.AUTH_REMOTE_URL); int timeout = conf.get(ServerOptions.RPC_CLIENT_READ_TIMEOUT) * 1000; int connectTimeout = conf.get(ServerOptions .RPC_CLIENT_CONNECT_TIMEOUT) * 1000; @@ -40,22 +79,20 @@ public void addConsumerConfig(Class clazz, HugeConfig conf) { .RPC_CLIENT_RECONNECT_PERIOD) * 1000; int retries = conf.get(ServerOptions.RPC_CLIENT_RETRIES); String loadBalancer = conf.get(ServerOptions.RPC_CLIENT_LOAD_BALANCER); - ConsumerConfig consumerConfig = new ConsumerConfig() - .setInterfaceId(clazz.getName()) - .setProtocol(protocol) - .setDirectUrl(directUrl) - .setTimeout(timeout) - .setConnectTimeout(connectTimeout) - .setReconnectPeriod(reconnectPeriod) - .setRetries(retries) - .setLoadBalancer(loadBalancer); - this.configs.put(clazz.getName(), consumerConfig); - } - public ConsumerConfig consumerConfig(String serverName) { - if (!this.configs.containsKey(serverName)) { - throw new RpcException("Invalid server name '%s'", serverName); + if (graph != null) { + consumerConfig.setId(serviceId).setUniqueId(graph); } - return this.configs.get(serverName); + consumerConfig.setInterfaceId(interfaceId) + .setProtocol(protocol) + .setDirectUrl(this.remoteUrl) + .setTimeout(timeout) + .setConnectTimeout(connectTimeout) + .setReconnectPeriod(reconnectPeriod) + .setRetries(retries) + .setLoadBalancer(loadBalancer); + + this.configs.put(serviceId, consumerConfig); + return consumerConfig; } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java index 096d339ca4..7522bff507 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcProviderConfig.java @@ -24,18 +24,38 @@ import com.alipay.sofa.rpc.config.ProviderConfig; import com.google.common.collect.Maps; -public class RpcProviderConfig { +public class RpcProviderConfig implements RpcServiceConfig4Server { - private final Map configs = Maps.newHashMap(); + private final Map> configs = Maps.newHashMap(); + @Override public void addService(Class clazz, E serviceImpl) { - ProviderConfig providerConfig = new ProviderConfig() - .setInterfaceId(clazz.getName()) - .setRef(serviceImpl); - this.configs.put(clazz.getName(), providerConfig); + this.addService(null, clazz.getName(), serviceImpl); } - public Map configs() { + @Override + public void addService(String graph, Class clazz, + E serviceImpl) { + this.addService(graph, clazz.getName(), serviceImpl); + } + + private void addService(String graph, + String interfaceId, + E serviceImpl) { + ProviderConfig providerConfig = new ProviderConfig<>(); + String serviceId; + if (graph != null) { + serviceId = interfaceId + ":" + graph; + providerConfig.setId(serviceId).setUniqueId(graph); + } else { + serviceId = interfaceId; + } + providerConfig.setInterfaceId(interfaceId) + .setRef(serviceImpl); + this.configs.put(serviceId, providerConfig); + } + + public Map> configs() { return this.configs; } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/SofaRpcServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java similarity index 86% rename from hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/SofaRpcServer.java rename to hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java index a92401b518..9cbddcfe26 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/SofaRpcServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java @@ -33,9 +33,9 @@ import com.baidu.hugegraph.config.ServerOptions; import com.baidu.hugegraph.util.Log; -public class SofaRpcServer { +public class RpcServer { - private static final Logger LOG = Log.logger(SofaRpcServer.class); + private static final Logger LOG = Log.logger(RpcServer.class); private final HugeConfig conf; private final RpcProviderConfig configs; @@ -49,7 +49,7 @@ public class SofaRpcServer { } } - public SofaRpcServer(HugeConfig conf) { + public RpcServer(HugeConfig conf) { RpcCommonConfig.initRpcConfigs(conf); this.conf = conf; this.serverConfig = new ServerConfig() @@ -66,22 +66,22 @@ public RpcProviderConfig config() { public void exportAll() { LOG.debug("RpcServer starting on port {}", this.port()); - Map configs = this.configs.configs(); + Map> configs = this.configs.configs(); if (MapUtils.isEmpty(configs)) { LOG.info("RpcServer config is empty, skip starting RpcServer"); return; } int timeout = this.conf.get(ServerOptions.RPC_SERVER_TIMEOUT) * 1000; - for (ProviderConfig providerConfig : configs.values()) { - providerConfig.setServer(this.serverConfig); - providerConfig.setTimeout(timeout); - providerConfig.export(); + for (ProviderConfig providerConfig : configs.values()) { + providerConfig.setServer(this.serverConfig) + .setTimeout(timeout) + .export(); } LOG.info("RpcServer started success on port {}", this.port()); } public void unExport(String serviceName) { - Map configs = this.configs.configs(); + Map> configs = this.configs.configs(); if (!configs.containsKey(serviceName)) { throw new RpcException("The service name '%s' doesn't exist", serviceName); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index 82ea83a729..1b93b801be 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -38,6 +38,8 @@ import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo; import com.baidu.hugegraph.backend.store.raft.RaftGroupManager; import com.baidu.hugegraph.config.ConfigOption; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Client; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Server; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -167,6 +169,9 @@ public interface HugeGraph extends Graph { public V option(ConfigOption option); + public void registerRpcServices(RpcServiceConfig4Server serverConfig, + RpcServiceConfig4Client clientConfig); + public default List mapPkId2Name(Collection ids) { List names = new ArrayList<>(ids.size()); for (Id id : ids) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 38a8d57eb7..c4572acfb4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -43,6 +43,10 @@ import com.baidu.hugegraph.auth.StandardUserManager; import com.baidu.hugegraph.auth.UserManager; import com.baidu.hugegraph.backend.BackendException; +import com.baidu.hugegraph.backend.cache.Cache; +import com.baidu.hugegraph.backend.cache.CacheNotifier; +import com.baidu.hugegraph.backend.cache.CacheNotifier.GraphCacheNotifier; +import com.baidu.hugegraph.backend.cache.CacheNotifier.SchemaCacheNotifier; import com.baidu.hugegraph.backend.cache.CachedGraphTransaction; import com.baidu.hugegraph.backend.cache.CachedSchemaTransaction; import com.baidu.hugegraph.backend.id.Id; @@ -64,8 +68,11 @@ import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.event.EventHub; +import com.baidu.hugegraph.event.EventListener; import com.baidu.hugegraph.exception.NotAllowException; import com.baidu.hugegraph.io.HugeGraphIoRegistry; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Client; +import com.baidu.hugegraph.rpc.RpcServiceConfig4Server; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; import com.baidu.hugegraph.schema.PropertyKey; @@ -87,6 +94,7 @@ import com.baidu.hugegraph.type.define.NodeRole; import com.baidu.hugegraph.util.DateUtil; import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.Events; import com.baidu.hugegraph.util.LockUtil; import com.baidu.hugegraph.util.Log; import com.baidu.hugegraph.variables.HugeVariables; @@ -933,6 +941,21 @@ public V option(ConfigOption option) { return config.get(option); } + @Override + public void registerRpcServices(RpcServiceConfig4Server serverConfig, + RpcServiceConfig4Client clientConfig) { + Class clazz1 = GraphCacheNotifier.class; + // The proxy is sometimes unavailable (issue #664) + CacheNotifier proxy = clientConfig.serviceProxy(this.name, clazz1); + serverConfig.addService(this.name, clazz1, new HugeGraphCacheNotifier( + this.graphEventHub, proxy)); + + Class clazz2 = SchemaCacheNotifier.class; + proxy = clientConfig.serviceProxy(this.name, clazz2); + serverConfig.addService(this.name, clazz2, new HugeSchemaCacheNotifier( + this.schemaEventHub, proxy)); + } + private void closeTx() { try { if (this.tx.isOpen()) { @@ -1332,4 +1355,143 @@ public SysTransaction(HugeGraphParams graph, BackendStore store) { this.autoCommit(true); } } + +// private final class HugeGraphCacheNotifier implements GraphCacheNotifier { +// +// private final EventHub hub = StandardHugeGraph.this.graphEventHub; +// private final EventListener cacheEventListener; +// +// public HugeGraphCacheNotifier(CacheNotifier proxy) { +// this.cacheEventListener = event -> { +// Object[] args = event.args(); +// E.checkArgument(args.length > 0 && args[0] instanceof String, +// "Expect event action argument"); +// if (Cache.ACTION_INVALIDED.equals(args[0])) { +// event.checkArgs(String.class, HugeType.class, Id[].class); +// HugeType type = (HugeType) args[1]; +// Id[] ids = (Id[]) args[2]; +// // argument type mismatch: proxy.invalid2(type, Id[] ids) +// proxy.invalid2(type, ids); +// return true; +// } else if (Cache.ACTION_CLEARED.equals(args[0])) { +// proxy.clear(); +// return true; +// } +// return false; +// }; +// this.hub.listen(Events.CACHE, this.cacheEventListener); +// } +// +// public void close() { +// this.hub.unlisten(Events.CACHE, this.cacheEventListener); +// } +// +// @Override +// public void invalid(HugeType type, Id id) { +// this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id); +// } +// +// @Override +// public void invalid2(HugeType type, Object[] ids) { +// for (Object id : ids) { +// E.checkArgument(id instanceof Id, +// "Expect instance of Id , but got '%s'", +// id.getClass()); +// this.invalid(type, (Id) id); +// } +// } +// +// @Override +// public void clear() { +// this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, null, null); +// } +// +// @Override +// public void reload() { +// // pass +// } +// } + + private static class AbstractCacheNotifier implements CacheNotifier { + + private final EventHub hub; + private final EventListener cacheEventListener; + + public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) { + this.hub = hub; + this.cacheEventListener = event -> { + Object[] args = event.args(); + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + if (Cache.ACTION_INVALIDED.equals(args[0])) { + event.checkArgs(String.class, HugeType.class, Object.class); + HugeType type = (HugeType) args[1]; + Object ids = args[2]; + if (ids instanceof Id[]) { + // argument type mismatch: proxy.invalid2(type,Id[]ids) + proxy.invalid2(type, (Id[]) ids); + } else if (ids instanceof Id) { + proxy.invalid(type, (Id) ids); + } else { + E.checkArgument(false, "Unexpected argument: %s", ids); + } + return true; + } else if (Cache.ACTION_CLEARED.equals(args[0])) { + event.checkArgs(String.class, HugeType.class); + HugeType type = (HugeType) args[1]; + proxy.clear(type); + return true; + } + return false; + }; + this.hub.listen(Events.CACHE, this.cacheEventListener); + } + + public void close() { + this.hub.unlisten(Events.CACHE, this.cacheEventListener); + } + + @Override + public void invalid(HugeType type, Id id) { + this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id); + } + + @Override + public void invalid2(HugeType type, Object[] ids) { + for (Object id : ids) { + E.checkArgument(id instanceof Id, + "Expect instance of Id , but got '%s'", + id.getClass()); + this.invalid(type, (Id) id); + } + } + + @Override + public void clear(HugeType type) { + this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type, null); + } + + @Override + public void reload() { + // pass + } + } + + private static class HugeSchemaCacheNotifier + extends AbstractCacheNotifier + implements SchemaCacheNotifier { + + public HugeSchemaCacheNotifier(EventHub hub, CacheNotifier proxy) { + super(hub, proxy); + } + } + + private static class HugeGraphCacheNotifier + extends AbstractCacheNotifier + implements GraphCacheNotifier { + + public HugeGraphCacheNotifier(EventHub hub, CacheNotifier proxy) { + super(hub, proxy); + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java index 14d251e515..8243b0360b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java @@ -34,9 +34,6 @@ public abstract class AbstractCache implements Cache { public static final int DEFAULT_SIZE = 1 * MB; public static final int MAX_INIT_CAP = 100 * MB; - public static final String ACTION_INVALID = "invalid"; - public static final String ACTION_CLEAR = "clear"; - protected static final Logger LOG = Log.logger(Cache.class); private volatile long hits = 0L; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java index c8294af949..5347b3af7a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java @@ -24,6 +24,11 @@ public interface Cache { + public static final String ACTION_INVALID = "invalid"; + public static final String ACTION_CLEAR = "clear"; + public static final String ACTION_INVALIDED = "invalided"; + public static final String ACTION_CLEARED = "cleared"; + public Object get(K id); public Object getOrFetch(K id, Function fetcher); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java index 1d1a5bcdc3..3c6b750322 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java @@ -19,9 +19,6 @@ package com.baidu.hugegraph.backend.cache; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -50,6 +47,7 @@ import com.baidu.hugegraph.structure.HugeEdge; import com.baidu.hugegraph.structure.HugeVertex; import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Events; import com.google.common.collect.ImmutableSet; @@ -127,8 +125,7 @@ private void listenChanges() { if (storeEvents.contains(event.name())) { LOG.debug("Graph {} clear graph cache on event '{}'", this.graph(), event.name()); - this.verticesCache.clear(); - this.edgesCache.clear(); + this.clearCache(null, true); return true; } return false; @@ -139,9 +136,11 @@ private void listenChanges() { this.cacheEventListener = event -> { LOG.debug("Graph {} received graph cache event: {}", this.graph(), event); - event.checkArgs(String.class, HugeType.class, Id.class); Object[] args = event.args(); - if (ACTION_INVALID.equals(args[0])) { + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + if (Cache.ACTION_INVALID.equals(args[0])) { + event.checkArgs(String.class, HugeType.class, Id.class); HugeType type = (HugeType) args[1]; Id id = (Id) args[2]; if (type.isVertex()) { @@ -156,9 +155,10 @@ private void listenChanges() { this.edgesCache.clear(); } return true; - } else if (ACTION_CLEAR.equals(args[0])) { - this.verticesCache.clear(); - this.edgesCache.clear(); + } else if (Cache.ACTION_CLEAR.equals(args[0])) { + event.checkArgs(String.class, HugeType.class); + HugeType type = (HugeType) args[1]; + this.clearCache(type, false); return true; } return false; @@ -178,6 +178,24 @@ private void unlistenChanges() { graphEventHub.unlisten(Events.CACHE, this.cacheEventListener); } + private void notifyChanges(String action, HugeType type, Id[] ids) { + EventHub graphEventHub = this.params().graphEventHub(); + graphEventHub.notify(Events.CACHE, action, type, ids); + } + + private void clearCache(HugeType type, boolean notify) { + if (type == null || type == HugeType.VERTEX) { + this.verticesCache.clear(); + } + if (type == null || type == HugeType.EDGE) { + this.edgesCache.clear(); + } + + if (notify) { + this.notifyChanges(Cache.ACTION_CLEARED, null, null); + } + } + @Override protected final Iterator queryVerticesFromBackend(Query query) { if (!query.ids().isEmpty() && query.conditions().isEmpty()) { @@ -284,14 +302,18 @@ protected final Iterator queryEdgesFromBackend(Query query) { @Override protected final void commitMutation2Backend(BackendMutation... mutations) { // Collect changes before commit - Collection changes = this.verticesInTxUpdated(); + Collection updates = this.verticesInTxUpdated(); Collection deletions = this.verticesInTxRemoved(); + Id[] vertexIds = new Id[updates.size() + deletions.size()]; + int vertexOffset = 0; + int edgesInTxSize = this.edgesInTxSize(); try { super.commitMutation2Backend(mutations); // Update vertex cache - for (HugeVertex vertex : changes) { + for (HugeVertex vertex : updates) { + vertexIds[vertexOffset++] = vertex.id(); if (vertex.sizeOfSubProperties() > MAX_CACHE_PROPS_PER_VERTEX) { // Skip large vertex this.verticesCache.invalidate(vertex.id()); @@ -302,13 +324,19 @@ protected final void commitMutation2Backend(BackendMutation... mutations) { } finally { // Update removed vertex in cache whatever success or fail for (HugeVertex vertex : deletions) { + vertexIds[vertexOffset++] = vertex.id(); this.verticesCache.invalidate(vertex.id()); } + if (vertexOffset > 0) { + this.notifyChanges(Cache.ACTION_INVALIDED, + HugeType.VERTEX, vertexIds); + } // Update edge cache if any edges change if (edgesInTxSize > 0) { // TODO: Use a more precise strategy to update the edge cache this.edgesCache.clear(); + this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE, null); } } } @@ -322,6 +350,7 @@ public final void removeIndex(IndexLabel indexLabel) { if (indexLabel.baseType() == HugeType.EDGE_LABEL) { // TODO: Use a more precise strategy to update the edge cache this.edgesCache.clear(); + this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE, null); } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index 1b3bbeccc6..fce35be4f4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -19,9 +19,6 @@ package com.baidu.hugegraph.backend.cache; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; - import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -38,6 +35,7 @@ import com.baidu.hugegraph.event.EventListener; import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Events; import com.google.common.collect.ImmutableSet; @@ -74,7 +72,7 @@ public void close() { try { super.close(); } finally { - this.clearCache(); + this.clearCache(false); this.unlistenChanges(); } } @@ -94,7 +92,7 @@ private void listenChanges() { if (storeEvents.contains(event.name())) { LOG.debug("Graph {} clear schema cache on event '{}'", this.graph(), event.name()); - this.clearCache(); + this.clearCache(true); return true; } return false; @@ -103,11 +101,13 @@ private void listenChanges() { // Listen cache event: "cache"(invalid cache item) this.cacheEventListener = event -> { - LOG.debug("Graph {} received schema cache event: {}", + LOG.info("Graph {} received schema cache event: {}", this.graph(), event); - event.checkArgs(String.class, HugeType.class, Id.class); Object[] args = event.args(); - if (ACTION_INVALID.equals(args[0])) { + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + if (Cache.ACTION_INVALID.equals(args[0])) { + event.checkArgs(String.class, HugeType.class, Id.class); HugeType type = (HugeType) args[1]; Id id = (Id) args[2]; this.arrayCaches.remove(type, id); @@ -126,8 +126,9 @@ private void listenChanges() { } this.resetCachedAll(type); return true; - } else if (ACTION_CLEAR.equals(args[0])) { - this.clearCache(); + } else if (Cache.ACTION_CLEAR.equals(args[0])) { + event.checkArgs(String.class, HugeType.class); + this.clearCache(false); return true; } return false; @@ -143,10 +144,14 @@ private final void resetCachedAll(HugeType type) { this.cachedTypes().put(type, false); } - private void clearCache() { + private void clearCache(boolean notify) { this.idCache.clear(); this.nameCache.clear(); this.arrayCaches.clear(); + + if (notify) { + this.notifyChanges(Cache.ACTION_CLEARED, null, null); + } } private void unlistenChanges() { @@ -158,6 +163,11 @@ private void unlistenChanges() { schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener); } + private void notifyChanges(String action, HugeType type, Id id) { + EventHub graphEventHub = this.params().schemaEventHub(); + graphEventHub.notify(Events.CACHE, action, type, id); + } + private final void resetCachedAllIfReachedCapacity() { if (this.idCache.size() >= this.idCache.capacity()) { LOG.warn("Schema cache reached capacity({}): {}", @@ -196,6 +206,8 @@ protected void addSchema(SchemaElement schema) { // update optimized array cache this.arrayCaches.updateIfNeeded(schema); + + this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id()); } @Override @@ -267,6 +279,8 @@ protected void removeSchema(SchemaElement schema) { // remove from optimized array cache this.arrayCaches.remove(schema.type(), schema.id()); + + this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id()); } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index d9493df35f..0b801928d0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.backend.store.raft; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; - import java.io.File; import java.io.IOException; import java.nio.file.Paths; @@ -44,6 +42,7 @@ import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraphParams; +import com.baidu.hugegraph.backend.cache.Cache; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor; @@ -257,8 +256,8 @@ public NodeOptions nodeOptions() throws IOException { public void clearCache() { // Just choose two representatives used to represent schema and graph - this.notifyCache(ACTION_CLEAR, HugeType.VERTEX_LABEL, null); - this.notifyCache(ACTION_CLEAR, HugeType.VERTEX, null); + this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX_LABEL, null); + this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX, null); } public void notifyCache(String action, HugeType type, Id id) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 42cfefb37b..323f9d7f68 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.backend.store.raft; -import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; - import java.util.List; import org.slf4j.Logger; @@ -36,6 +34,7 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.baidu.hugegraph.backend.BackendException; +import com.baidu.hugegraph.backend.cache.Cache; import com.baidu.hugegraph.backend.serializer.BytesBuffer; import com.baidu.hugegraph.backend.store.BackendAction; import com.baidu.hugegraph.backend.store.BackendEntry; @@ -90,7 +89,8 @@ private void updateCacheIfNeeded(BackendMutation mutation, for (java.util.Iterator it = mutation.mutation(type); it.hasNext();) { BackendEntry entry = it.next().entry(); - this.context.notifyCache(ACTION_INVALID, type, entry.originId()); + this.context.notifyCache(Cache.ACTION_INVALID, type, + entry.originId()); } } } diff --git a/hugegraph-dist/src/assembly/static/conf/log4j2.xml b/hugegraph-dist/src/assembly/static/conf/log4j2.xml index 12c0fb84ea..07621ed182 100644 --- a/hugegraph-dist/src/assembly/static/conf/log4j2.xml +++ b/hugegraph-dist/src/assembly/static/conf/log4j2.xml @@ -30,6 +30,9 @@ + + + diff --git a/hugegraph-dist/src/main/resources/log4j2.xml b/hugegraph-dist/src/main/resources/log4j2.xml index d2aac25589..10f10c7662 100644 --- a/hugegraph-dist/src/main/resources/log4j2.xml +++ b/hugegraph-dist/src/main/resources/log4j2.xml @@ -35,6 +35,10 @@ + + + + diff --git a/hugegraph-example/src/main/resources/log4j2.xml b/hugegraph-example/src/main/resources/log4j2.xml index 934cef3440..5a705332ff 100644 --- a/hugegraph-example/src/main/resources/log4j2.xml +++ b/hugegraph-example/src/main/resources/log4j2.xml @@ -20,21 +20,31 @@ + + + + + + + - + + - + + - + + diff --git a/hugegraph-test/src/main/resources/log4j2.xml b/hugegraph-test/src/main/resources/log4j2.xml index dcfec0cf2a..5881345a7e 100644 --- a/hugegraph-test/src/main/resources/log4j2.xml +++ b/hugegraph-test/src/main/resources/log4j2.xml @@ -20,18 +20,31 @@ + + + + + + + + + + + + + From 4a7ee707cbe042706dfa8fee77ff4ce50d66f68a Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Fri, 12 Feb 2021 03:10:53 +0800 Subject: [PATCH 02/11] fix test Change-Id: I65b2da4e8dd3b89440e7c4d715a87ea95d6c14ac --- .../baidu/hugegraph/StandardHugeGraph.java | 56 ------------------- .../cache/CachedSchemaTransaction.java | 2 +- .../baidu/hugegraph/core/EdgeCoreTest.java | 6 +- .../hugegraph/core/EdgeLabelCoreTest.java | 2 +- .../hugegraph/core/VertexLabelCoreTest.java | 2 +- .../cache/CachedGraphTransactionTest.java | 3 +- .../cache/CachedSchemaTransactionTest.java | 3 +- 7 files changed, 7 insertions(+), 67 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index c4572acfb4..b317139857 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -1356,62 +1356,6 @@ public SysTransaction(HugeGraphParams graph, BackendStore store) { } } -// private final class HugeGraphCacheNotifier implements GraphCacheNotifier { -// -// private final EventHub hub = StandardHugeGraph.this.graphEventHub; -// private final EventListener cacheEventListener; -// -// public HugeGraphCacheNotifier(CacheNotifier proxy) { -// this.cacheEventListener = event -> { -// Object[] args = event.args(); -// E.checkArgument(args.length > 0 && args[0] instanceof String, -// "Expect event action argument"); -// if (Cache.ACTION_INVALIDED.equals(args[0])) { -// event.checkArgs(String.class, HugeType.class, Id[].class); -// HugeType type = (HugeType) args[1]; -// Id[] ids = (Id[]) args[2]; -// // argument type mismatch: proxy.invalid2(type, Id[] ids) -// proxy.invalid2(type, ids); -// return true; -// } else if (Cache.ACTION_CLEARED.equals(args[0])) { -// proxy.clear(); -// return true; -// } -// return false; -// }; -// this.hub.listen(Events.CACHE, this.cacheEventListener); -// } -// -// public void close() { -// this.hub.unlisten(Events.CACHE, this.cacheEventListener); -// } -// -// @Override -// public void invalid(HugeType type, Id id) { -// this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id); -// } -// -// @Override -// public void invalid2(HugeType type, Object[] ids) { -// for (Object id : ids) { -// E.checkArgument(id instanceof Id, -// "Expect instance of Id , but got '%s'", -// id.getClass()); -// this.invalid(type, (Id) id); -// } -// } -// -// @Override -// public void clear() { -// this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, null, null); -// } -// -// @Override -// public void reload() { -// // pass -// } -// } - private static class AbstractCacheNotifier implements CacheNotifier { private final EventHub hub; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index fce35be4f4..7adc1bf77a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -101,7 +101,7 @@ private void listenChanges() { // Listen cache event: "cache"(invalid cache item) this.cacheEventListener = event -> { - LOG.info("Graph {} received schema cache event: {}", + LOG.debug("Graph {} received schema cache event: {}", this.graph(), event); Object[] args = event.args(); E.checkArgument(args.length > 0 && args[0] instanceof String, diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java index d1741dc870..7c070da341 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java @@ -3139,8 +3139,7 @@ public void testQueryAdjacentVerticesOfEdgesWithoutVertex() Assert.assertTrue(adjacent.schemaLabel().undefined()); Assert.assertEquals("~undefined", adjacent.label()); - params().graphEventHub().notify(Events.CACHE, "clear", - null, null).get(); + params().graphEventHub().notify(Events.CACHE, "clear", null).get(); vertices = graph.traversal().V(james.id()).outE().otherV().toList(); Assert.assertEquals(1, vertices.size()); adjacent = (HugeVertex) vertices.get(0); @@ -3285,8 +3284,7 @@ public void testQueryAdjacentVerticesOfEdgesWithInvalidVertexLabel() Whitebox.setInternalState(params().graphTransaction(), "checkCustomVertexExist", false); - params().graphEventHub().notify(Events.CACHE, "clear", - null, null).get(); + params().graphEventHub().notify(Events.CACHE, "clear", null).get(); try { // override vertex designer-456 wirh programmer-456 graph.addVertex(T.label, "programmer", T.id, "456", diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java index e96a4ee392..4a7080b284 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeLabelCoreTest.java @@ -1215,7 +1215,7 @@ public void testListEdgeLabels() { Assert.assertTrue(edgeLabels.contains(write)); // clear cache - params().schemaEventHub().call(Events.CACHE, "clear", null, null); + params().schemaEventHub().call(Events.CACHE, "clear", null); Assert.assertEquals(look, schema.getEdgeLabel("look")); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java index 30425d7807..79525d8cbe 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexLabelCoreTest.java @@ -1097,7 +1097,7 @@ public void testListVertexLabels() { Assert.assertTrue(vertexLabels.contains(book)); // clear cache - params().schemaEventHub().call(Events.CACHE, "clear", null, null); + params().schemaEventHub().call(Events.CACHE, "clear", null); Assert.assertEquals(person, schema.getVertexLabel("person")); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java index c0c4d1ba7a..6583a0730c 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedGraphTransactionTest.java @@ -86,8 +86,7 @@ public void testEventClear() throws Exception { Assert.assertEquals(2L, Whitebox.invoke(cache, "verticesCache", "size")); - this.params.graphEventHub().notify(Events.CACHE, "clear", - null, null).get(); + this.params.graphEventHub().notify(Events.CACHE, "clear", null).get(); Assert.assertEquals(0L, Whitebox.invoke(cache, "verticesCache", "size")); diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java index 82884b2cbc..271e5267ba 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/cache/CachedSchemaTransactionTest.java @@ -83,8 +83,7 @@ public void testEventClear() throws Exception { Assert.assertEquals(IdGenerator.of(2), cache.getPropertyKey("fake-pk-2").id()); - this.params.schemaEventHub().notify(Events.CACHE, "clear", - null, null).get(); + this.params.schemaEventHub().notify(Events.CACHE, "clear", null).get(); Assert.assertEquals(0L, Whitebox.invoke(cache, "idCache", "size")); Assert.assertEquals(0L, Whitebox.invoke(cache, "nameCache", "size")); From f3f8dd23591a71448a0dac4e799f5eb293619c77 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 13 Feb 2021 00:47:48 +0800 Subject: [PATCH 03/11] add RpcServiceConfig* interfaces Change-Id: I2f8edb2c70b1ad185f66da527af0e34557c6c889 --- .../backend/cache/CacheNotifier.java | 38 +++++++++++++++++++ .../rpc/RpcServiceConfig4Client.java | 35 +++++++++++++++++ .../rpc/RpcServiceConfig4Server.java | 28 ++++++++++++++ 3 files changed, 101 insertions(+) create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java new file mode 100644 index 0000000000..39154807a0 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CacheNotifier.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.backend.cache; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.type.HugeType; + +public interface CacheNotifier extends AutoCloseable { + + public void invalid(HugeType type, Id id); + + public void invalid2(HugeType type, Object[] ids); + + public void clear(HugeType type); + + public void reload(); + + public interface GraphCacheNotifier extends CacheNotifier {} + + public interface SchemaCacheNotifier extends CacheNotifier {} +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java new file mode 100644 index 0000000000..9c42ce6ee9 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Client.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.rpc; + +public interface RpcServiceConfig4Client { + + public T serviceProxy(String interfaceId); + + public T serviceProxy(String graph, String interfaceId); + + public default T serviceProxy(Class clazz) { + return this.serviceProxy(clazz.getName()); + } + + public default T serviceProxy(String graph, Class clazz) { + return this.serviceProxy(graph, clazz.getName()); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java new file mode 100644 index 0000000000..53ce5a3aaa --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/rpc/RpcServiceConfig4Server.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.rpc; + +public interface RpcServiceConfig4Server { + + public void addService(Class clazz, E serviceImpl); + + public void addService(String graph, Class clazz, + E serviceImpl); +} From b44c873c283367bd6d77d4c30c0814437dc8f9b3 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 13 Feb 2021 00:01:53 +0800 Subject: [PATCH 04/11] add FanoutCluster to broadcast reuqest Change-Id: I4e4abf3b71a6182e8f16af1cd17f60c2690c0767 --- .../baidu/hugegraph/config/ServerOptions.java | 6 +- .../baidu/hugegraph/core/GraphManager.java | 11 ++- .../hugegraph/rpc/RpcClientProvider.java | 32 +++++- .../hugegraph/rpc/RpcConsumerConfig.java | 98 ++++++++++++++++++- .../com/baidu/hugegraph/rpc/RpcServer.java | 31 +++--- .../baidu/hugegraph/StandardHugeGraph.java | 1 + .../static/conf/rest-server.properties | 4 +- 7 files changed, 152 insertions(+), 31 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 2d869189f1..8d5144686d 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -228,7 +228,7 @@ public static synchronized ServerOptions instance() { "If the address is empty, it provide auth service, " + "otherwise it is auth client and also provide auth service " + "through rpc forwarding. The remote url can be set to " + - "multiple addresses, which are linked by ','.", + "multiple addresses, which are concat by ','.", null, "" ); @@ -238,7 +238,7 @@ public static synchronized ServerOptions instance() { "rpc.server_port", "The port bound by rpc server to provide services.", rangeInt(1, Integer.MAX_VALUE), - 8099 + 8090 ); public static final ConfigOption RPC_SERVER_HOST = @@ -264,7 +264,7 @@ public static synchronized ServerOptions instance() { "The remote urls of rpc peers, it can be set to " + "multiple addresses, which are concat by ','.", disallowEmpty(), - "127.0.0.1:8099" + "127.0.0.1:8090" ); public static final ConfigOption RPC_CLIENT_CONNECT_TIMEOUT = diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java index cd1cd088c0..6bc61d8db2 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java @@ -174,16 +174,19 @@ public void close() { private void startRpcServer() { RpcProviderConfig serverConfig = this.rpcServer.config(); - RpcConsumerConfig clientConfig = this.rpcClient.config(); if (this.authenticator != null) { serverConfig.addService(UserManager.class, this.authenticator.userManager()); } - for (Graph graph : this.graphs.values()) { - HugeGraph hugegraph = (HugeGraph) graph; - hugegraph.registerRpcServices(serverConfig, clientConfig); + if (this.rpcClient.enabled()) { + RpcConsumerConfig clientConfig = this.rpcClient.config(); + + for (Graph graph : this.graphs.values()) { + HugeGraph hugegraph = (HugeGraph) graph; + hugegraph.registerRpcServices(serverConfig, clientConfig); + } } try { diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java index b0d84a5989..3e2d9146f1 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java @@ -19,9 +19,15 @@ package com.baidu.hugegraph.rpc; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; + +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.baidu.hugegraph.auth.UserManager; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; +import com.baidu.hugegraph.util.E; public class RpcClientProvider { @@ -31,17 +37,39 @@ public class RpcClientProvider { public RpcClientProvider(HugeConfig conf) { // TODO: fetch from registry server String rpcUrl = conf.get(ServerOptions.RPC_REMOTE_URL); - this.consumerConfig = new RpcConsumerConfig(conf, rpcUrl); + String selfUrl = conf.get(ServerOptions.RPC_SERVER_HOST) + ":" + + conf.get(ServerOptions.RPC_SERVER_PORT); + rpcUrl = execludeSelfUrl(rpcUrl, selfUrl); + this.consumerConfig = StringUtils.isNotBlank(rpcUrl) ? + new RpcConsumerConfig(conf, rpcUrl) : null; String authUrl = conf.get(ServerOptions.AUTH_REMOTE_URL); - this.authConsumerConfig = new RpcConsumerConfig(conf, authUrl); + this.authConsumerConfig = StringUtils.isNotBlank(authUrl) ? + new RpcConsumerConfig(conf, authUrl) : null; + } + + public boolean enabled() { + return this.consumerConfig != null; } public RpcConsumerConfig config() { + E.checkArgument(this.consumerConfig != null, + "RpcClient is not enabled, please config option '%s'", + ServerOptions.RPC_REMOTE_URL.name()); return this.consumerConfig; } public UserManager userManager() { + E.checkArgument(this.authConsumerConfig != null, + "RpcClient is not enabled, please config option '%s'", + ServerOptions.AUTH_REMOTE_URL.name()); return this.authConsumerConfig.serviceProxy(UserManager.class); } + + private static String execludeSelfUrl(String rpcUrl, String selfUrl) { + String[] urls = StringUtils.splitWithCommaOrSemicolon(rpcUrl); + Set urlsSet = new LinkedHashSet<>(Arrays.asList(urls)); + urlsSet.remove(selfUrl); + return String.join(",", urlsSet); + } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java index 75871e1765..72ba8f7ed2 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java @@ -19,23 +19,43 @@ package com.baidu.hugegraph.rpc; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import org.slf4j.Logger; + +import com.alipay.sofa.rpc.bootstrap.ConsumerBootstrap; +import com.alipay.sofa.rpc.client.AbstractCluster; +import com.alipay.sofa.rpc.client.Cluster; +import com.alipay.sofa.rpc.client.ProviderInfo; import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.core.exception.RpcErrorType; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; +import com.baidu.hugegraph.util.Log; import com.google.common.collect.Maps; public class RpcConsumerConfig implements RpcServiceConfig4Client { private final HugeConfig conf; - private final String remoteUrl; + private final String remoteUrls; private final Map> configs; - public RpcConsumerConfig(HugeConfig conf, String remoteUrl) { + static { + ExtensionLoaderFactory.getExtensionLoader(Cluster.class) + .loadExtension(FanoutCluster.class); + } + + public RpcConsumerConfig(HugeConfig conf, String remoteUrls) { RpcCommonConfig.initRpcConfigs(conf); this.conf = conf; - this.remoteUrl = remoteUrl; + this.remoteUrls = remoteUrls; this.configs = Maps.newHashMap(); } @@ -82,10 +102,12 @@ private ConsumerConfig consumerConfig(String graph, if (graph != null) { consumerConfig.setId(serviceId).setUniqueId(graph); + // Default is FailoverCluster, set to FanoutCluster to broadcast + consumerConfig.setCluster("fanout"); } consumerConfig.setInterfaceId(interfaceId) .setProtocol(protocol) - .setDirectUrl(this.remoteUrl) + .setDirectUrl(this.remoteUrls) .setTimeout(timeout) .setConnectTimeout(connectTimeout) .setReconnectPeriod(reconnectPeriod) @@ -95,4 +117,72 @@ private ConsumerConfig consumerConfig(String graph, this.configs.put(serviceId, consumerConfig); return consumerConfig; } + + @Extension("fanout") + private static class FanoutCluster extends AbstractCluster { + + private static final Logger LOG = Log.logger(FanoutCluster.class); + + public FanoutCluster(ConsumerBootstrap consumerBootstrap) { + super(consumerBootstrap); + } + + @Override + protected SofaResponse doInvoke(SofaRequest request) + throws SofaRpcException { + List providers = this.getRouterChain() + .route(request, null); + List responses = new ArrayList<>(providers.size()); + List excepts = new ArrayList<>(providers.size()); + + for (ProviderInfo provider : providers) { + try { + SofaResponse response = this.doInvoke(request, provider); + responses.add(response); + } catch (SofaRpcException e) { + excepts.add(e); + LOG.warn("{}.(error {})", e.getMessage(), e.getErrorType()); + } + } + + if (responses.size() > 0) { + // Just choose one + return responses.get(0); + } else if (excepts.size() > 0) { + throw excepts.get(0); + } else { + assert providers.isEmpty(); + String method = methodName(request); + throw new SofaRpcException(RpcErrorType.CLIENT_ROUTER, + "No service provider for " + method); + } + } + + private SofaResponse doInvoke(SofaRequest request, + ProviderInfo providerInfo) { + try { + SofaResponse response = this.filterChain(providerInfo, request); + if (response != null) { + return response; + } + String method = methodName(request); + throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, + "Failed to call " + method + " on remote server " + + providerInfo + ", return null response"); + } catch (SofaRpcException e) { + throw e; + } catch (Exception e) { + String method = methodName(request); + throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, + "Failed to call " + method + " on remote server " + + providerInfo + ", cause by exception: " + e); + } + } + + private static String methodName(SofaRequest request) { + String method = request.getInterfaceName() + "." + + request.getMethodName() + "()"; + return method; + } + } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java index 9cbddcfe26..24cd029fcd 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java @@ -24,11 +24,8 @@ import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; -import com.alipay.sofa.rpc.common.RpcConfigs; -import com.alipay.sofa.rpc.common.RpcOptions; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; -import com.alipay.sofa.rpc.context.RpcRuntimeContext; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; import com.baidu.hugegraph.util.Log; @@ -41,22 +38,14 @@ public class RpcServer { private final RpcProviderConfig configs; private final ServerConfig serverConfig; - static { - if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - RpcRuntimeContext.destroy(); - }, "SOFA-RPC-ShutdownHook")); - } - } - public RpcServer(HugeConfig conf) { RpcCommonConfig.initRpcConfigs(conf); this.conf = conf; - this.serverConfig = new ServerConfig() - .setProtocol(conf.get(ServerOptions.RPC_PROTOCOL)) - .setPort(conf.get(ServerOptions.RPC_SERVER_PORT)) - .setHost(conf.get(ServerOptions.RPC_SERVER_HOST)) - .setDaemon(false); + this.serverConfig = new ServerConfig(); + this.serverConfig.setProtocol(conf.get(ServerOptions.RPC_PROTOCOL)) + .setPort(conf.get(ServerOptions.RPC_SERVER_PORT)) + .setHost(conf.get(ServerOptions.RPC_SERVER_HOST)) + .setDaemon(false); this.configs = new RpcProviderConfig(); } @@ -95,6 +84,16 @@ public int port() { public void destroy() { LOG.info("RpcServer stop on port {}", this.port()); + for (ProviderConfig config : this.configs.configs().values()) { + Object service = config.getRef(); + if (service instanceof AutoCloseable) { + try { + ((AutoCloseable) service).close(); + } catch (Exception e) { + LOG.warn("Failed to close service {}", service, e); + } + } + } this.serverConfig.destroy(); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index b317139857..9ce4413ff6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -1391,6 +1391,7 @@ public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) { this.hub.listen(Events.CACHE, this.cacheEventListener); } + @Override public void close() { this.hub.unlisten(Events.CACHE, this.cacheEventListener); } diff --git a/hugegraph-dist/src/assembly/static/conf/rest-server.properties b/hugegraph-dist/src/assembly/static/conf/rest-server.properties index 4287e13566..30bc288287 100644 --- a/hugegraph-dist/src/assembly/static/conf/rest-server.properties +++ b/hugegraph-dist/src/assembly/static/conf/rest-server.properties @@ -16,11 +16,11 @@ server.id=server-1 server.role=master rpc.server_host=127.0.0.1 -rpc.server_port=8893 +rpc.server_port=8090 rpc.server_timeout=30 +rpc.remote_url=127.0.0.1:8090 rpc.client_connect_timeout=20 rpc.client_reconnect_period=10 rpc.client_read_timeout=40 rpc.client_retries=3 rpc.client_load_balancer=consistentHash -rpc.protocol=bolt From 3d571568ea3f41a2c3ca735fe1a90cb49bed0a64 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 13 Feb 2021 00:41:42 +0800 Subject: [PATCH 05/11] fix GraphManager.close() is not called by HttpServer.DESTROY_FINISHED Change-Id: I8723f401ee430519bc488554c31da16e4fdffdb4 --- .../baidu/hugegraph/server/RestServer.java | 42 ++++++++++++++++++- .../java/com/baidu/hugegraph/HugeFactory.java | 2 +- .../baidu/hugegraph/dist/HugeGraphServer.java | 2 +- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java index 2895083039..f7b1c9be80 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java @@ -23,10 +23,13 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import javax.ws.rs.core.UriBuilder; +import org.glassfish.grizzly.CompletionHandler; +import org.glassfish.grizzly.GrizzlyFuture; import org.glassfish.grizzly.http.server.HttpServer; import org.glassfish.grizzly.http.server.NetworkListener; import org.glassfish.grizzly.ssl.SSLContextConfigurator; @@ -117,7 +120,44 @@ private HttpServer configHttpServer(URI uri, ResourceConfig rc) { public Future shutdown() { E.checkNotNull(this.httpServer, "http server"); - return this.httpServer.shutdown(); + /* + * Since 2.3.x shutdown() won't call shutdownNow(), so the event + * ApplicationEvent.Type.DESTROY_FINISHED also won't be triggered, + * which is listened by ApplicationConfig.GraphManagerFactory, we + * manually call shutdownNow() here when the future is completed. + * See shutdown() change: + * https://github.com/javaee/grizzly/commit/182d8bcb4e45de5609ab92f6f1d5980f95d79b04 + * #diff-f6c130f38a1ec11bdf9d3cb7e0a81084c8788c79a00befe65e40a13bc989b098R388 + */ + CompletableFuture future = new CompletableFuture<>(); + future.whenComplete((server, exception) -> { + this.httpServer.shutdownNow(); + }); + + GrizzlyFuture grizzlyFuture = this.httpServer.shutdown(); + grizzlyFuture.addCompletionHandler(new CompletionHandler() { + @Override + public void cancelled() { + future.cancel(true); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void completed(HttpServer result) { + future.complete(result); + } + + @Override + public void updated(HttpServer result) { + // pass + } + }); + + return future; } public void shutdownNow() { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java index c4cd67a470..9bc3227388 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeFactory.java @@ -50,7 +50,7 @@ public class HugeFactory { Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOG.info("HugeGraph is shutting down"); HugeFactory.shutdown(30L); - })); + }, "hugegraph-shutdown")); } private static final String NAME_REGEX = "^[A-Za-z][A-Za-z0-9_]{0,47}$"; diff --git a/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java b/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java index 76603526e2..7b0def5ab6 100644 --- a/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java +++ b/hugegraph-dist/src/main/java/com/baidu/hugegraph/dist/HugeGraphServer.java @@ -102,6 +102,6 @@ public static void main(String[] args) throws Exception { LOG.info("HugeGraphServer stopping"); server.stop(); LOG.info("HugeGraphServer stopped"); - })); + }, "hugegraph-server-shutdown")); } } From 883b7f35073f81b181b711b40c3a6b0385012b00 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 13 Feb 2021 01:36:19 +0800 Subject: [PATCH 06/11] notify cache with Id[] arg to avoid too many events Change-Id: Iefde073a4f07b5bad4cc08f065a3ee0f034114d8 --- .../baidu/hugegraph/StandardHugeGraph.java | 7 +----- .../backend/cache/CachedGraphTransaction.java | 23 ++++++++++++++++--- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 9ce4413ff6..d324199439 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -1403,12 +1403,7 @@ public void invalid(HugeType type, Id id) { @Override public void invalid2(HugeType type, Object[] ids) { - for (Object id : ids) { - E.checkArgument(id instanceof Id, - "Expect instance of Id , but got '%s'", - id.getClass()); - this.invalid(type, (Id) id); - } + this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids); } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java index 3c6b750322..a24d54df3d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java @@ -19,6 +19,7 @@ package com.baidu.hugegraph.backend.cache; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -140,12 +141,28 @@ private void listenChanges() { E.checkArgument(args.length > 0 && args[0] instanceof String, "Expect event action argument"); if (Cache.ACTION_INVALID.equals(args[0])) { - event.checkArgs(String.class, HugeType.class, Id.class); + event.checkArgs(String.class, HugeType.class, Object.class); HugeType type = (HugeType) args[1]; - Id id = (Id) args[2]; if (type.isVertex()) { // Invalidate vertex cache - this.verticesCache.invalidate(id); + Object arg2 = args[2]; + if (arg2 instanceof Id) { + Id id = (Id) arg2; + this.verticesCache.invalidate(id); + } else if (arg2 != null && arg2.getClass().isArray()) { + int size = Array.getLength(arg2); + for (int i = 0; i < size; i++) { + Object id = Array.get(arg2, i); + E.checkArgument(id instanceof Id, + "Expect instance of Id in array, " + + "but got '%s'", id.getClass()); + this.verticesCache.invalidate((Id) id); + } + } else { + E.checkArgument(false, + "Expect Id or Id[], but got: %s", + arg2); + } } else if (type.isEdge()) { /* * Invalidate edge cache via clear instead of invalidate From cbb45f7609d52d2f1b667b12e67fe3c9414c6d0b Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 13 Feb 2021 02:06:31 +0800 Subject: [PATCH 07/11] improve perf of raft notify-cache Change-Id: I9c6c5f5e069d2e6f4aad9ec7adbb3dba26de2460 --- .../backend/store/raft/RaftSharedContext.java | 3 +-- .../backend/store/raft/StoreStateMachine.java | 26 ++++++++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index 0b801928d0..5b2815fd08 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -43,7 +43,6 @@ import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraphParams; import com.baidu.hugegraph.backend.cache.Cache; -import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.backend.store.raft.rpc.ListPeersProcessor; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType; @@ -260,7 +259,7 @@ public void clearCache() { this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX, null); } - public void notifyCache(String action, HugeType type, Id id) { + protected void notifyCache(String action, HugeType type, Object id) { EventHub eventHub; if (type.isGraph()) { eventHub = this.params.graphEventHub(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 323f9d7f68..a5697209a6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -19,6 +19,7 @@ package com.baidu.hugegraph.backend.store.raft; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -35,6 +36,8 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.cache.Cache; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.serializer.BytesBuffer; import com.baidu.hugegraph.backend.store.BackendAction; import com.baidu.hugegraph.backend.store.BackendEntry; @@ -83,14 +86,23 @@ private void updateCacheIfNeeded(BackendMutation mutation, return; } for (HugeType type : mutation.types()) { - if (!type.isGraph() && !type.isSchema()) { - continue; - } - for (java.util.Iterator it = mutation.mutation(type); - it.hasNext();) { - BackendEntry entry = it.next().entry(); + if (type.isSchema()) { + java.util.Iterator it = mutation.mutation(type); + while (it.hasNext()) { + BackendEntry entry = it.next().entry(); + this.context.notifyCache(Cache.ACTION_INVALID, type, + entry.originId()); + } + } else if (type.isGraph()) { + List ids = new ArrayList<>((int) Query.COMMIT_BATCH); + java.util.Iterator it = mutation.mutation(type); + while (it.hasNext()) { + ids.add(it.next().entry().originId()); + } this.context.notifyCache(Cache.ACTION_INVALID, type, - entry.originId()); + ids.toArray()); + } else { + // Ignore other types due to not cached } } } From 578a16c0878117b97f876428750dd5f602d5bdcd Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 13 Feb 2021 02:18:57 +0800 Subject: [PATCH 08/11] only enable cache-rpc service for shared storage Change-Id: Id45b11844ed5c0a7e6cfb672057c42b455119ba3 --- .../main/java/com/baidu/hugegraph/StandardHugeGraph.java | 8 ++++++++ .../baidu/hugegraph/backend/store/BackendFeatures.java | 4 ++++ .../hugegraph/backend/store/memory/InMemoryDBStore.java | 5 +++++ .../hugegraph/backend/store/rocksdb/RocksDBFeatures.java | 5 +++++ 4 files changed, 22 insertions(+) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index d324199439..69315a7022 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -944,6 +944,14 @@ public V option(ConfigOption option) { @Override public void registerRpcServices(RpcServiceConfig4Server serverConfig, RpcServiceConfig4Client clientConfig) { + /* + * Skip register cache-rpc service if it's non-shared storage, + * because we assume cache of non-shared storage is updated by raft. + */ + if (!this.backendStoreFeatures().supportsSharedStorage()) { + return; + } + Class clazz1 = GraphCacheNotifier.class; // The proxy is sometimes unavailable (issue #664) CacheNotifier proxy = clientConfig.serviceProxy(this.name, clazz1); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java index 8330147d0e..49b83a3e36 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendFeatures.java @@ -25,6 +25,10 @@ public default boolean supportsPersistence() { return true; } + public default boolean supportsSharedStorage() { + return true; + } + public boolean supportsScanToken(); public boolean supportsScanKeyPrefix(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java index c7d9db8e1b..7b1e12bea3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java @@ -363,6 +363,11 @@ public boolean supportsPersistence() { return false; } + @Override + public boolean supportsSharedStorage() { + return false; + } + @Override public boolean supportsScanToken() { return false; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java index 0820b639eb..cdbb00a8aa 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java @@ -23,6 +23,11 @@ public class RocksDBFeatures implements BackendFeatures { + @Override + public boolean supportsSharedStorage() { + return false; + } + @Override public boolean supportsScanToken() { return false; From cad6dc4f00d799a5cb9b3aff98cdd046cb3f19f9 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Thu, 25 Feb 2021 14:10:21 +0800 Subject: [PATCH 09/11] tiny improve Change-Id: I23878e7991774dbb3dcc7723137bac314502cc68 --- .../com/baidu/hugegraph/auth/ConfigAuthenticator.java | 3 ++- .../java/com/baidu/hugegraph/core/GraphManager.java | 4 ++-- .../com/baidu/hugegraph/rpc/RpcClientProvider.java | 4 ++-- .../com/baidu/hugegraph/rpc/RpcConsumerConfig.java | 10 ++++++---- .../main/java/com/baidu/hugegraph/rpc/RpcServer.java | 2 +- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java index 791774c084..4b585f47b7 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ConfigAuthenticator.java @@ -45,8 +45,9 @@ public ConfigAuthenticator() { @Override public void setup(HugeConfig config) { - this.tokens.put(USER_ADMIN, config.get(ServerOptions.AUTH_ADMIN_TOKEN)); this.tokens.putAll(config.getMap(ServerOptions.AUTH_USER_TOKENS)); + assert !this.tokens.containsKey(USER_ADMIN); + this.tokens.put(USER_ADMIN, config.get(ServerOptions.AUTH_ADMIN_TOKEN)); } /** diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java index 6bc61d8db2..c5ece268e0 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/GraphManager.java @@ -169,7 +169,7 @@ public UserManager userManager() { } public void close() { - this.destoryRpcServer(); + this.destroyRpcServer(); } private void startRpcServer() { @@ -197,7 +197,7 @@ private void startRpcServer() { } } - private void destoryRpcServer() { + private void destroyRpcServer() { this.rpcServer.destroy(); } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java index 3e2d9146f1..9d297835a4 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java @@ -39,7 +39,7 @@ public RpcClientProvider(HugeConfig conf) { String rpcUrl = conf.get(ServerOptions.RPC_REMOTE_URL); String selfUrl = conf.get(ServerOptions.RPC_SERVER_HOST) + ":" + conf.get(ServerOptions.RPC_SERVER_PORT); - rpcUrl = execludeSelfUrl(rpcUrl, selfUrl); + rpcUrl = excludeSelfUrl(rpcUrl, selfUrl); this.consumerConfig = StringUtils.isNotBlank(rpcUrl) ? new RpcConsumerConfig(conf, rpcUrl) : null; @@ -66,7 +66,7 @@ public UserManager userManager() { return this.authConsumerConfig.serviceProxy(UserManager.class); } - private static String execludeSelfUrl(String rpcUrl, String selfUrl) { + private static String excludeSelfUrl(String rpcUrl, String selfUrl) { String[] urls = StringUtils.splitWithCommaOrSemicolon(rpcUrl); Set urlsSet = new LinkedHashSet<>(Arrays.asList(urls)); urlsSet.remove(selfUrl); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java index 72ba8f7ed2..698a53269d 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcConsumerConfig.java @@ -146,7 +146,10 @@ protected SofaResponse doInvoke(SofaRequest request) } if (responses.size() > 0) { - // Just choose one + /* + * Just choose the first one as result to return, ignore others + * TODO: maybe more strategies should be provided + */ return responses.get(0); } else if (excepts.size() > 0) { throw excepts.get(0); @@ -180,9 +183,8 @@ private SofaResponse doInvoke(SofaRequest request, } private static String methodName(SofaRequest request) { - String method = request.getInterfaceName() + "." + - request.getMethodName() + "()"; - return method; + return request.getInterfaceName() + "." + + request.getMethodName() + "()"; } } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java index 24cd029fcd..d2ee7f17e2 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcServer.java @@ -43,8 +43,8 @@ public RpcServer(HugeConfig conf) { this.conf = conf; this.serverConfig = new ServerConfig(); this.serverConfig.setProtocol(conf.get(ServerOptions.RPC_PROTOCOL)) - .setPort(conf.get(ServerOptions.RPC_SERVER_PORT)) .setHost(conf.get(ServerOptions.RPC_SERVER_HOST)) + .setPort(conf.get(ServerOptions.RPC_SERVER_PORT)) .setDaemon(false); this.configs = new RpcProviderConfig(); } From ace11adc465bd67001f7f2039a611b89e120733f Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Thu, 25 Feb 2021 15:08:21 +0800 Subject: [PATCH 10/11] check raft mode can only be enabled if non-shared store Change-Id: Ia671437ae51f88f8eebb3fcdac20321af65d10a9 --- .../com/baidu/hugegraph/StandardHugeGraph.java | 18 ++++++++++++------ .../store/raft/RaftBackendStoreProvider.java | 9 +++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 69315a7022..4ec1ea9091 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -195,20 +195,26 @@ public StandardHugeGraph(HugeConfig config) { try { this.storeProvider = this.loadStoreProvider(); - } catch (BackendException e) { + } catch (Exception e) { LockUtil.destroy(this.name); String message = "Failed to load backend store provider"; LOG.error("{}: {}", message, e.getMessage()); throw new HugeException(message); } - this.tx = new TinkerPopTransaction(this); + try { + this.tx = new TinkerPopTransaction(this); - SnowflakeIdGenerator.init(this.params); + SnowflakeIdGenerator.init(this.params); - this.taskManager.addScheduler(this.params); - this.userManager = new StandardUserManager(this.params); - this.variables = null; + this.taskManager.addScheduler(this.params); + this.userManager = new StandardUserManager(this.params); + this.variables = null; + } catch (Exception e) { + this.storeProvider.close(); + LockUtil.destroy(this.name); + throw e; + } } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java index aeb40eae56..0e465da9ec 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStoreProvider.java @@ -74,6 +74,12 @@ private void checkOpened() { "The RaftBackendStoreProvider has not been opened"); } + private void checkNonSharedStore(BackendStore store) { + E.checkArgument(!store.features().supportsSharedStorage(), + "Can't enable raft mode with %s backend", + this.type()); + } + @Override public String type() { return this.provider.type(); @@ -94,6 +100,7 @@ public synchronized BackendStore loadSchemaStore(final String name) { if (this.schemaStore == null) { LOG.info("Init raft backend schema store"); BackendStore store = this.provider.loadSchemaStore(name); + this.checkNonSharedStore(store); this.schemaStore = new RaftBackendStore(store, this.context); this.context.addStore(StoreType.SCHEMA, this.schemaStore); } @@ -105,6 +112,7 @@ public synchronized BackendStore loadGraphStore(String name) { if (this.graphStore == null) { LOG.info("Init raft backend graph store"); BackendStore store = this.provider.loadGraphStore(name); + this.checkNonSharedStore(store); this.graphStore = new RaftBackendStore(store, this.context); this.context.addStore(StoreType.GRAPH, this.graphStore); } @@ -116,6 +124,7 @@ public synchronized BackendStore loadSystemStore(String name) { if (this.systemStore == null) { LOG.info("Init raft backend system store"); BackendStore store = this.provider.loadSystemStore(name); + this.checkNonSharedStore(store); this.systemStore = new RaftBackendStore(store, this.context); this.context.addStore(StoreType.SYSTEM, this.systemStore); } From 8da4366497181f3f936c7058c05d6974398acb35 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Thu, 4 Mar 2021 13:03:25 +0800 Subject: [PATCH 11/11] tiny improve Change-Id: I8a8266d7d618410dcff2e882dcbc71c9d71f66e5 --- .../java/com/baidu/hugegraph/rpc/RpcClientProvider.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java index 0ac1b987f8..93deb941e1 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/rpc/RpcClientProvider.java @@ -68,8 +68,9 @@ public AuthManager authManager() { private static String excludeSelfUrl(String rpcUrl, String selfUrl) { String[] urls = StringUtils.splitWithCommaOrSemicolon(rpcUrl); - Set urlsSet = new LinkedHashSet<>(Arrays.asList(urls)); - urlsSet.remove(selfUrl); - return String.join(",", urlsSet); + // Keep urls order via LinkedHashSet + Set urlSet = new LinkedHashSet<>(Arrays.asList(urls)); + urlSet.remove(selfUrl); + return String.join(",", urlSet); } }