Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(commons): handle sofa-rpc desc type & mark TODO #2666

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hugegraph-commons/hugegraph-dist/release-docs/NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache HugeGraph(incubating)
Copyright 2022-2023 The Apache Software Foundation
Copyright 2022-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
1 change: 1 addition & 0 deletions hugegraph-commons/hugegraph-rpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-all</artifactId>
<!-- Fixme/TODO: upgrade to version > 5.12 to avoid potential SEC risk -->
<version>5.7.6</version>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<Integer> RPC_CLIENT_READ_TIMEOUT =
new ConfigOption<>(
"rpc.client_read_timeout",
"The timeout(in seconds) of rpc client read from rpc " +
"server.",
"The timeout(in seconds) of rpc client read from RPC server.",
rangeInt(1, Integer.MAX_VALUE),
40
);
Expand All @@ -121,7 +120,7 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<String> RPC_CLIENT_LOAD_BALANCER =
new ConfigOption<>(
"rpc.client_load_balancer",
"The rpc client uses a load-balancing algorithm to " +
"The RPC client uses a load-balancing algorithm to " +
"access multiple rpc servers in one cluster. Default " +
"value is 'consistentHash', means forwarding by request " +
"parameters.",
Expand All @@ -133,16 +132,26 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<String> RPC_PROTOCOL =
new ConfigOption<>(
"rpc.protocol",
"Rpc communication protocol, client and server need to " +
"RPC communication protocol, client and server need to " +
"be specified the same value.",
allowValues("bolt", "rest", "dubbo", "h2c", "http"),
"bolt"
);

public static final ConfigOption<String> RPC_SERIALIZATION =
new ConfigOption<>(
"rpc.serialization",
"RPC serialization type, client and server must set the same value." +
"Note: If you choose 'protobuf', you need to add the relative IDL file. " +
"(Could refer PD/Store *.proto)",
allowValues("hessian2", "protobuf"),
"hessian2"
);

public static final ConfigOption<Integer> RPC_CONFIG_ORDER =
new ConfigOption<>(
"rpc.config_order",
"Sofa rpc configuration file loading order, the larger " +
"Sofa-RPC configuration file loading order, the larger " +
"the more later loading.",
rangeInt(1, Integer.MAX_VALUE),
999
Expand All @@ -151,7 +160,7 @@ public static synchronized RpcOptions instance() {
public static final ConfigOption<String> RPC_LOGGER_IMPL =
new ConfigOption<>(
"rpc.logger_impl",
"Sofa rpc log implementation class.",
"Sofa-RPC log implementation class.",
disallowEmpty(),
"com.alipay.sofa.rpc.log.SLF4JLoggerImpl"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.bootstrap.Bootstraps;
Expand All @@ -36,9 +39,6 @@
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -135,6 +135,7 @@ private <T> ConsumerConfig<T> consumerConfig(String graph,
.setConnectTimeout(connectTimeout)
.setReconnectPeriod(reconnectPeriod)
.setRetries(retries)
.setSerialization(conf.get(RpcOptions.RPC_SERIALIZATION))
.setLoadBalancer(loadBalancer);

this.configs.put(serviceId, consumerConfig);
Expand Down Expand Up @@ -168,13 +169,13 @@ protected SofaResponse doInvoke(SofaRequest request)
}
}

if (responses.size() > 0) {
if (!responses.isEmpty()) {
/*
* Just choose the first one as result to return, ignore others
* TODO: maybe more strategies should be provided
*/
return responses.get(0);
} else if (excepts.size() > 0) {
} else if (!excepts.isEmpty()) {
throw excepts.get(0);
} else {
assert providers.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

import java.util.Map;

import org.apache.commons.collections.MapUtils;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.remoting.RemotingServer;
Expand All @@ -27,12 +33,6 @@
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.server.Server;
import com.alipay.sofa.rpc.server.bolt.BoltServer;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.commons.collections.MapUtils;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;

public class RpcServer {

Expand All @@ -55,6 +55,7 @@ public RpcServer(HugeConfig config) {
this.serverConfig.setProtocol(config.get(RpcOptions.RPC_PROTOCOL))
.setHost(host).setPort(port)
.setAdaptivePort(adaptivePort)
.setSerialization(config.get(RpcOptions.RPC_SERIALIZATION))
.setDaemon(false);
} else {
this.serverConfig = null;
Expand Down Expand Up @@ -87,8 +88,7 @@ public int port() {
* TODO: remove this code after adding Server.port() interface:
* https://github.com/sofastack/sofa-rpc/issues/1022
*/
RemotingServer rs = Whitebox.getInternalState(server,
"remotingServer");
RemotingServer rs = Whitebox.getInternalState(server, "remotingServer");
return rs.port();
}
// When using random port 0, the returned port is not the actual port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@
import java.net.InetAddress;
import java.net.ServerSocket;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.rpc.RpcClientProvider;
import org.apache.hugegraph.rpc.RpcCommonConfig;
Expand All @@ -38,6 +30,14 @@
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.google.common.collect.ImmutableMap;

public class ServerClientTest extends BaseUnitTest {
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testStartServerWithAdaptivePort() throws IOException {
RpcProviderConfig serverConfig = rpcServerAdaptive.config();
serverConfig.addService(HelloService.class, new HelloServiceImpl());

// Start other server bound the port
// Start another server bound the port
int usedPort = rpcServerAdaptive.port();
InetAddress ip = InetAddress.getByName(rpcServerAdaptive.host());
ServerSocket inUse = new ServerSocket(usedPort,50, ip);
Expand Down
2 changes: 2 additions & 0 deletions hugegraph-server/hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<!-- Keep consistent with grpc dependency version (pd/store) -->
<!-- <version>1.39.0</version>-->
<version>1.47.0</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public final class GraphManager {
private final RpcClientProvider rpcClient;

private RoleElectionStateMachine roleStateMachine;
private GlobalMasterInfo globalNodeRoleInfo;
private final GlobalMasterInfo globalNodeRoleInfo;

private final HugeConfig conf;
private final EventHub eventHub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.job.EphemeralJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition)
}

// FIXME: `enablePartition` is unused here
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition, boolean isOlap) {
public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition,
boolean isOlap) {
this(type, BytesBuffer.wrap(bytes).parseOlapId(type, isOlap));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hugegraph.backend.serializer;

import static org.apache.hugegraph.schema.SchemaElement.UNDEF;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -74,8 +76,6 @@
import org.apache.hugegraph.util.StringEncoding;
import org.apache.tinkerpop.gremlin.structure.Edge;

import static org.apache.hugegraph.schema.SchemaElement.UNDEF;

public class BinarySerializer extends AbstractSerializer {

/*
Expand Down Expand Up @@ -932,10 +932,10 @@ public BackendEntry parse(BackendEntry originEntry) {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
buffer.write(parsedEntry.id().asBytes());
buffer.write(bytes);
parsedEntry = new BinaryBackendEntry(originEntry.type(), new BinaryId(buffer.bytes(),
BytesBuffer.wrap(
buffer.bytes())
.readEdgeId()));
parsedEntry = new BinaryBackendEntry(originEntry.type(),
new BinaryId(buffer.bytes(),
BytesBuffer.wrap(buffer.bytes())
.readEdgeId()));

for (BackendColumn col : originEntry.columns()) {
parsedEntry.column(buffer.bytes(), col.value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.collect.Iterators;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
Expand Down Expand Up @@ -103,6 +101,7 @@
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;

import jakarta.ws.rs.ForbiddenException;

Expand Down Expand Up @@ -1053,8 +1052,10 @@ protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
if (query instanceof ConditionQuery && !query.paging()) {
// TODO: support: paging + parent label
boolean supportIn = this.storeFeatures().supportsQueryWithInCondition();
// consider multi labels + properties, see org.apache.hugegraph.core.EdgeCoreTest.testQueryInEdgesOfVertexByLabels
Stream<ConditionQuery> flattenedQueries = ConditionQueryFlatten.flatten((ConditionQuery) query, supportIn).stream();
// consider multi labels + properties,
// see org.apache.hugegraph.core.EdgeCoreTest.testQueryInEdgesOfVertexByLabels
Stream<ConditionQuery> flattenedQueries =
ConditionQueryFlatten.flatten((ConditionQuery) query, supportIn).stream();

Stream<Iterator<HugeEdge>> edgeIterators = flattenedQueries.map(cq -> {
Id label = cq.condition(HugeKeys.LABEL);
Expand All @@ -1073,7 +1074,8 @@ protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
}
});

return edgeIterators.reduce(ExtendableIterator::concat).orElse(Collections.emptyIterator());
return edgeIterators.reduce(ExtendableIterator::concat)
.orElse(Collections.emptyIterator());
}

return queryEdgesFromBackendInternal(query);
Expand Down Expand Up @@ -1640,7 +1642,8 @@ private Query optimizeQuery(ConditionQuery query) {
*/
boolean byLabel = (label != null && query.conditionsSize() == 1);
if (!byLabel || this.store().features().supportsQueryByLabel()) {
if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && byLabel && query.resultType().isEdge()) {
if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && byLabel &&
query.resultType().isEdge()) {
// for memory backend
EdgeLabel edgeLabel = graph().edgeLabel(label);
if (edgeLabel.hasFather()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,8 @@ public Id createOlapPk(PropertyKey propertyKey) {
return asyncRun(this.graph(), propertyKey, job);
}

// -- store related methods, divided into two categories: 1. olap table related 2. ID generation strategy
// -- store related methods, divided into two categories:
// 1. olap table related 2. ID generation strategy
// - 1. olap table related
public void createOlapPk(Id id) {
this.graphParams().loadGraphStore().createOlapTable(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.job.UserJob;
import org.apache.hugegraph.job.algorithm.BfsTraverser;
import org.apache.hugegraph.schema.SchemaLabel;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.traversal.algorithm.HugeTraverser;
import org.apache.hugegraph.type.define.Directions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.schema.SchemaElement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hugegraph.job.schema;

import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyCreateJob extends SchemaJob {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyRemoveJob extends OlapPropertyKeyClearJob {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.ISchemaTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.type.define.SchemaStatus;
Expand Down
Loading
Loading