Skip to content

Commit

Permalink
refactor(commons): handle sofa-rpc desc type & mark TODO (#2666)
Browse files Browse the repository at this point in the history
* chore: upgrade sofa-rpc to latest version (2024.8)

* fix conflicts

* downgrade sofa-rpc to 5.12.

* fix: replace the wrong dep usage for sofa-rpc

* feat: support multi RPC desc type for users

* chore: disable sofa-rpc server by default
  • Loading branch information
imbajin authored Oct 25, 2024
1 parent 7b16f4c commit 21855c6
Show file tree
Hide file tree
Showing 30 changed files with 97 additions and 96 deletions.
2 changes: 1 addition & 1 deletion hugegraph-commons/hugegraph-dist/release-docs/NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache HugeGraph(incubating)
Copyright 2022-2023 The Apache Software Foundation
Copyright 2022-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
1 change: 1 addition & 0 deletions hugegraph-commons/hugegraph-rpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-all</artifactId>
<!-- Fixme/TODO: upgrade to version > 5.12 to avoid potential SEC risk -->
<version>5.7.6</version>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<Integer> RPC_CLIENT_READ_TIMEOUT =
new ConfigOption<>(
"rpc.client_read_timeout",
"The timeout(in seconds) of rpc client read from rpc " +
"server.",
"The timeout(in seconds) of rpc client read from RPC server.",
rangeInt(1, Integer.MAX_VALUE),
40
);
Expand All @@ -121,7 +120,7 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<String> RPC_CLIENT_LOAD_BALANCER =
new ConfigOption<>(
"rpc.client_load_balancer",
"The rpc client uses a load-balancing algorithm to " +
"The RPC client uses a load-balancing algorithm to " +
"access multiple rpc servers in one cluster. Default " +
"value is 'consistentHash', means forwarding by request " +
"parameters.",
Expand All @@ -133,16 +132,26 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<String> RPC_PROTOCOL =
new ConfigOption<>(
"rpc.protocol",
"Rpc communication protocol, client and server need to " +
"RPC communication protocol, client and server need to " +
"be specified the same value.",
allowValues("bolt", "rest", "dubbo", "h2c", "http"),
"bolt"
);

public static final ConfigOption<String> RPC_SERIALIZATION =
new ConfigOption<>(
"rpc.serialization",
"RPC serialization type, client and server must set the same value." +
"Note: If you choose 'protobuf', you need to add the relative IDL file. " +
"(Could refer PD/Store *.proto)",
allowValues("hessian2", "protobuf"),
"hessian2"
);

public static final ConfigOption<Integer> RPC_CONFIG_ORDER =
new ConfigOption<>(
"rpc.config_order",
"Sofa rpc configuration file loading order, the larger " +
"Sofa-RPC configuration file loading order, the larger " +
"the more later loading.",
rangeInt(1, Integer.MAX_VALUE),
999
Expand All @@ -151,7 +160,7 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<String> RPC_LOGGER_IMPL =
new ConfigOption<>(
"rpc.logger_impl",
"Sofa rpc log implementation class.",
"Sofa-RPC log implementation class.",
disallowEmpty(),
"com.alipay.sofa.rpc.log.SLF4JLoggerImpl"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.bootstrap.Bootstraps;
Expand All @@ -36,9 +39,6 @@
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -135,6 +135,7 @@ private <T> ConsumerConfig<T> consumerConfig(String graph,
.setConnectTimeout(connectTimeout)
.setReconnectPeriod(reconnectPeriod)
.setRetries(retries)
.setSerialization(conf.get(RpcOptions.RPC_SERIALIZATION))
.setLoadBalancer(loadBalancer);

this.configs.put(serviceId, consumerConfig);
Expand Down Expand Up @@ -168,13 +169,13 @@ protected SofaResponse doInvoke(SofaRequest request)
}
}

if (responses.size() > 0) {
if (!responses.isEmpty()) {
/*
* Just choose the first one as result to return, ignore others
* TODO: maybe more strategies should be provided
*/
return responses.get(0);
} else if (excepts.size() > 0) {
} else if (!excepts.isEmpty()) {
throw excepts.get(0);
} else {
assert providers.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

import java.util.Map;

import org.apache.commons.collections.MapUtils;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.remoting.RemotingServer;
Expand All @@ -27,12 +33,6 @@
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.server.Server;
import com.alipay.sofa.rpc.server.bolt.BoltServer;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.commons.collections.MapUtils;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;

public class RpcServer {

Expand All @@ -55,6 +55,7 @@ public RpcServer(HugeConfig config) {
this.serverConfig.setProtocol(config.get(RpcOptions.RPC_PROTOCOL))
.setHost(host).setPort(port)
.setAdaptivePort(adaptivePort)
.setSerialization(config.get(RpcOptions.RPC_SERIALIZATION))
.setDaemon(false);
} else {
this.serverConfig = null;
Expand Down Expand Up @@ -87,8 +88,7 @@ public int port() {
* TODO: remove this code after adding Server.port() interface:
* https://github.com/sofastack/sofa-rpc/issues/1022
*/
RemotingServer rs = Whitebox.getInternalState(server,
"remotingServer");
RemotingServer rs = Whitebox.getInternalState(server, "remotingServer");
return rs.port();
}
// When using random port 0, the returned port is not the actual port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@
import java.net.InetAddress;
import java.net.ServerSocket;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.rpc.RpcClientProvider;
import org.apache.hugegraph.rpc.RpcCommonConfig;
Expand All @@ -38,6 +30,14 @@
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.google.common.collect.ImmutableMap;

public class ServerClientTest extends BaseUnitTest {
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testStartServerWithAdaptivePort() throws IOException {
RpcProviderConfig serverConfig = rpcServerAdaptive.config();
serverConfig.addService(HelloService.class, new HelloServiceImpl());

// Start other server bound the port
// Start another server bound the port
int usedPort = rpcServerAdaptive.port();
InetAddress ip = InetAddress.getByName(rpcServerAdaptive.host());
ServerSocket inUse = new ServerSocket(usedPort,50, ip);
Expand Down
2 changes: 2 additions & 0 deletions hugegraph-server/hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<!-- Keep consistent with grpc dependency version (pd/store) -->
<!-- <version>1.39.0</version>-->
<version>1.47.0</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public final class GraphManager {
private final RpcClientProvider rpcClient;

private RoleElectionStateMachine roleStateMachine;
private GlobalMasterInfo globalNodeRoleInfo;
private final GlobalMasterInfo globalNodeRoleInfo;

private final HugeConfig conf;
private final EventHub eventHub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.job.EphemeralJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition)
}

// FIXME: `enablePartition` is unused here
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition, boolean isOlap) {
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition,
boolean isOlap) {
this(type, BytesBuffer.wrap(bytes).parseOlapId(type, isOlap));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hugegraph.backend.serializer;

import static org.apache.hugegraph.schema.SchemaElement.UNDEF;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -74,8 +76,6 @@
import org.apache.hugegraph.util.StringEncoding;
import org.apache.tinkerpop.gremlin.structure.Edge;

import static org.apache.hugegraph.schema.SchemaElement.UNDEF;

public class BinarySerializer extends AbstractSerializer {

/*
Expand Down Expand Up @@ -932,10 +932,10 @@ public BackendEntry parse(BackendEntry originEntry) {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
buffer.write(parsedEntry.id().asBytes());
buffer.write(bytes);
parsedEntry = new BinaryBackendEntry(originEntry.type(), new BinaryId(buffer.bytes(),
BytesBuffer.wrap(
buffer.bytes())
.readEdgeId()));
parsedEntry = new BinaryBackendEntry(originEntry.type(),
new BinaryId(buffer.bytes(),
BytesBuffer.wrap(buffer.bytes())
.readEdgeId()));

for (BackendColumn col : originEntry.columns()) {
parsedEntry.column(buffer.bytes(), col.value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.collect.Iterators;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
Expand Down Expand Up @@ -103,6 +101,7 @@
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;

import jakarta.ws.rs.ForbiddenException;

Expand Down Expand Up @@ -1053,8 +1052,10 @@ protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
if (query instanceof ConditionQuery && !query.paging()) {
// TODO: support: paging + parent label
boolean supportIn = this.storeFeatures().supportsQueryWithInCondition();
// consider multi labels + properties, see org.apache.hugegraph.core.EdgeCoreTest.testQueryInEdgesOfVertexByLabels
Stream<ConditionQuery> flattenedQueries = ConditionQueryFlatten.flatten((ConditionQuery) query, supportIn).stream();
// consider multi labels + properties,
// see org.apache.hugegraph.core.EdgeCoreTest.testQueryInEdgesOfVertexByLabels
Stream<ConditionQuery> flattenedQueries =
ConditionQueryFlatten.flatten((ConditionQuery) query, supportIn).stream();

Stream<Iterator<HugeEdge>> edgeIterators = flattenedQueries.map(cq -> {
Id label = cq.condition(HugeKeys.LABEL);
Expand All @@ -1073,7 +1074,8 @@ protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
}
});

return edgeIterators.reduce(ExtendableIterator::concat).orElse(Collections.emptyIterator());
return edgeIterators.reduce(ExtendableIterator::concat)
.orElse(Collections.emptyIterator());
}

return queryEdgesFromBackendInternal(query);
Expand Down Expand Up @@ -1640,7 +1642,8 @@ private Query optimizeQuery(ConditionQuery query) {
*/
boolean byLabel = (label != null && query.conditionsSize() == 1);
if (!byLabel || this.store().features().supportsQueryByLabel()) {
if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && byLabel && query.resultType().isEdge()) {
if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && byLabel &&
query.resultType().isEdge()) {
// for memory backend
EdgeLabel edgeLabel = graph().edgeLabel(label);
if (edgeLabel.hasFather()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,8 @@ public Id createOlapPk(PropertyKey propertyKey) {
return asyncRun(this.graph(), propertyKey, job);
}

// -- store related methods, divided into two categories: 1. olap table related 2. ID generation strategy
// -- store related methods, divided into two categories:
// 1. olap table related 2. ID generation strategy
// - 1. olap table related
public void createOlapPk(Id id) {
this.graphParams().loadGraphStore().createOlapTable(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.job.UserJob;
import org.apache.hugegraph.job.algorithm.BfsTraverser;
import org.apache.hugegraph.schema.SchemaLabel;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.traversal.algorithm.HugeTraverser;
import org.apache.hugegraph.type.define.Directions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.schema.SchemaElement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hugegraph.job.schema;

import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyCreateJob extends SchemaJob {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyRemoveJob extends OlapPropertyKeyClearJob {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
Expand Down
Loading

0 comments on commit 21855c6

Please sign in to comment.