Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-node auth information sharing function #1350

Merged
merged 20 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions hugegraph-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,52 @@
<artifactId>grizzly-http-servlet</artifactId>
<version>2.4.4</version>
</dependency>

<!-- sofa rpc -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-all</artifactId>
<version>5.7.6</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</exclusion>
<exclusion>
<groupId>com.alipay.sofa.common</groupId>
<artifactId>sofa-common-tools</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.alipay.sofa</groupId>
<artifactId>hessian</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public final class HugeGraphAuthProxy implements HugeGraph {
private static final Logger LOG = Log.logger(HugeGraphAuthProxy.class);

private final HugeGraph hugegraph;
private final TaskScheduler taskScheduler;
private final UserManager userManager;
private final TaskSchedulerProxy taskScheduler;
private final UserManagerProxy userManager;

public HugeGraphAuthProxy(HugeGraph hugegraph) {
LOG.info("Wrap graph '{}' with HugeGraphAuthProxy", hugegraph.name());
Expand Down Expand Up @@ -652,6 +652,12 @@ public UserManager userManager() {
return this.userManager;
}

@Override
public void swichUserManager(UserManager userManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch -> switch

this.verifyAdminPermission();
this.userManager.swichUserManager(userManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swich -> switch

}

@Override
public RaftGroupManager raftGroupManager(String group) {
this.verifyAdminPermission();
Expand Down Expand Up @@ -1044,7 +1050,7 @@ private boolean hasTaskPermission(HugeTask<?> task) {

class UserManagerProxy implements UserManager {

private final UserManager userManager;
private UserManager userManager;

public UserManagerProxy(UserManager origin) {
this.userManager = origin;
Expand Down Expand Up @@ -1345,6 +1351,11 @@ public RolePermission loginUser(String username, String password) {
setContext(context);
}
}

private void swichUserManager(UserManager userManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swich -> switch

this.userManager = userManager;
hugegraph.swichUserManager(userManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

class VariablesProxy implements Variables {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Scanner;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.rpc.RpcClientProvider;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.StringEncoding;

Expand Down Expand Up @@ -78,6 +80,12 @@ public void setup(HugeConfig config) {
E.checkArgument(graphPath != null,
"Invalid graph name '%s'", graphName);
this.graph = (HugeGraph) GraphFactory.open(graphPath);

String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL);
if (StringUtils.isNotEmpty(remoteUrl)) {
RpcClientProvider provider = new RpcClientProvider(config);
this.graph.swichUserManager(provider.userManager());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,115 @@ public static synchronized ServerOptions instance() {
"hugegraph:9fd95c9c-711b-415b-b85f-d4df46ba5c31"
);

public static final ConfigOption<String> AUTH_REMOTE_URL =
new ConfigOption<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define as ListConfigOption for multi auth servers

"auth.remote_url",
"If the address is empty, it provide auth service, " +
"otherwise it is auth client and also provide auth service " +
"through rpc forwarding. The remote url can be set to " +
"multiple addresses, which are linked by ','.",
null,
""
);

public static final ConfigOption<Integer> RPC_SERVER_PORT =
new ConfigOption<>(
"rpc.server_port",
"The port bound by rpc server to provide services.",
rangeInt(1, Integer.MAX_VALUE),
8099
);

public static final ConfigOption<String> RPC_SERVER_HOST =
new ConfigOption<>(
"rpc.server_host",
"The hosts/ips bound by rpc server to provide " +
"services.",
disallowEmpty(),
"127.0.0.1"
);
javeme marked this conversation as resolved.
Show resolved Hide resolved

public static final ConfigOption<Integer> RPC_SERVER_TIMEOUT =
new ConfigOption<>(
"rpc.server_timeout",
"The timeout(in seconds) of rpc server execution.",
rangeInt(1, Integer.MAX_VALUE),
30
);

public static final ConfigOption<Integer> RPC_CLIENT_CONNECT_TIMEOUT =
new ConfigOption<>(
"rpc.client_connect_timeout",
"The timeout(in seconds) of rpc client connect to rpc " +
"server.",
rangeInt(1, Integer.MAX_VALUE),
20
);

public static final ConfigOption<Integer> RPC_CLIENT_RECONNECT_PERIOD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also update rpc.client_reconnect_timeout

new ConfigOption<>(
"rpc.client_reconnect_period",
"The period(in seconds) of rpc client reconnect to rpc " +
"server.",
rangeInt(1, Integer.MAX_VALUE),
10
);
javeme marked this conversation as resolved.
Show resolved Hide resolved

public static final ConfigOption<Integer> RPC_CLIENT_READ_TIMEOUT =
new ConfigOption<>(
"rpc.client_read_timeout",
"The timeout(in seconds) of rpc client read from rpc " +
"server.",
rangeInt(1, Integer.MAX_VALUE),
40
);

public static final ConfigOption<Integer> RPC_CLIENT_RETRIES =
new ConfigOption<>(
"rpc.client_retries",
"Failed retry number of rpc client calls to rpc server.",
rangeInt(0, Integer.MAX_VALUE),
3
);

public static final ConfigOption<String> RPC_CLIENT_LOAD_BALANCER =
new ConfigOption<>(
"rpc.client_load_balancer",
"The rpc client uses a load-balancing algorithm to " +
"access multiple rpc servers in one cluster. Default " +
"value is 'consistentHash', means forwording by request " +
"parameters.",
allowValues("random", "localPref", "roundRobin",
"consistentHash", "weightRoundRobin"),
"consistentHash"
);

public static final ConfigOption<String> RPC_PROTOCOL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename to RPC_SERVER_PROTOCOL?
and move to place of server options

new ConfigOption<>(
"rpc.protocol",
"Rpc communication protocol, client and server need to " +
"be specified the same value.",
allowValues("bolt", "rest", "dubbo", "h2c", "http"),
"bolt"
);

public static final ConfigOption<Integer> RPC_CONFIG_ORDER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the option required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this option is not set, the sofa-rpc log may be lost.

new ConfigOption<>(
"rpc.config_order",
"Sofa rpc configuration file loading order, the larger " +
"the more later loading.",
rangeInt(1, Integer.MAX_VALUE),
999
);

public static final ConfigOption<String> RPC_LOGGER_IMPL =
new ConfigOption<>(
"rpc.logger_impl",
"Sofa rpc log implementation class.",
disallowEmpty(),
"com.alipay.sofa.rpc.log.SLF4JLoggerImpl"
);

public static final ConfigOption<String> SSL_KEYSTORE_FILE =
new ConfigOption<>(
"ssl.keystore_file",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.baidu.hugegraph.serializer.JsonSerializer;
import com.baidu.hugegraph.serializer.Serializer;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.rpc.RpcServerProvider;
import com.baidu.hugegraph.task.TaskManager;
import com.baidu.hugegraph.type.define.NodeRole;
import com.baidu.hugegraph.util.E;
Expand All @@ -65,6 +66,7 @@ public final class GraphManager {

private final Map<String, Graph> graphs;
private final HugeAuthenticator authenticator;
private final RpcServerProvider rpcServerProvider;

public GraphManager(HugeConfig conf) {
this.graphs = new ConcurrentHashMap<>();
Expand All @@ -75,6 +77,7 @@ public GraphManager(HugeConfig conf) {
// Raft will load snapshot firstly then launch election and replay log
this.waitGraphsStarted();
this.checkBackendVersionOrExit();
this.rpcServerProvider = this.startRpcServer(conf);
this.serverStarted(conf);
this.addMetrics(conf);
}
Expand Down Expand Up @@ -159,6 +162,23 @@ public UserManager userManager() {
return this.authenticator().userManager();
}

public void close() {
this.destoryRpcServer();
}

private RpcServerProvider startRpcServer(HugeConfig conf) {
if (this.authenticator != null) {
return new RpcServerProvider(conf, this.authenticator.userManager());
}
return null;
}

private void destoryRpcServer() {
if (this.rpcServerProvider != null) {
this.rpcServerProvider.destroy();
}
}
javeme marked this conversation as resolved.
Show resolved Hide resolved

private HugeAuthenticator authenticator() {
E.checkState(this.authenticator != null, "Unconfigured authenticator");
return this.authenticator;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.rpc;

import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.baidu.hugegraph.auth.UserManager;
import com.baidu.hugegraph.config.HugeConfig;

public class RpcClientProvider {

public final RpcConsumerConfig rpcConsumerConfig;

public RpcClientProvider(HugeConfig conf) {
RpcCommonConfig.initRpcConfigs(conf);
this.rpcConsumerConfig = new RpcConsumerConfig();
this.rpcConsumerConfig.addConsumerConfig(UserManager.class, conf);
}

public UserManager userManager() {
return (UserManager) this.serviceProxy(UserManager.class.getName());
}

public Object serviceProxy(String serviceName) {
ConsumerConfig config = this.rpcConsumerConfig.consumerConfig(serviceName);
return config.refer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.rpc;

import java.util.Map;

import com.alipay.sofa.rpc.common.RpcConfigs;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;

public class RpcCommonConfig {

public static void initRpcConfigs(HugeConfig conf) {
RpcConfigs.putValue("rpc.config.order",
conf.get(ServerOptions.RPC_CONFIG_ORDER));
RpcConfigs.putValue("logger.impl",
conf.get(ServerOptions.RPC_LOGGER_IMPL));
}

public static void initRpcConfigs(String key, Object value) {
RpcConfigs.putValue(key, value);
}

public static void initRpcConfigs(Map<String, Object> conf) {
for(Map.Entry<String, Object> entry : conf.entrySet()) {
RpcConfigs.putValue(entry.getKey(), entry.getValue());
}
}
}
Loading