Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support to convert internal storage address to available address for client #604

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 68 additions & 18 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.util.NetUtil;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -24,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,25 +36,26 @@
*/
public class MetaManager implements MetaCache, Serializable {
private class SpaceInfo {
private SpaceItem spaceItem = null;
private Map<String, TagItem> tagItems = new HashMap<>();
private Map<Integer, String> tagIdNames = new HashMap<>();
private Map<String, EdgeItem> edgeItems = new HashMap<>();
private Map<Integer, String> edgeTypeNames = new HashMap<>();
private Map<Integer, List<HostAddr>> partsAlloc = new HashMap<>();
private SpaceItem spaceItem = null;
private Map<String, TagItem> tagItems = new HashMap<>();
private Map<Integer, String> tagIdNames = new HashMap<>();
private Map<String, EdgeItem> edgeItems = new HashMap<>();
private Map<Integer, String> edgeTypeNames = new HashMap<>();
private Map<Integer, List<HostAddr>> partsAlloc = new HashMap<>();
}

private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;
private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;
private Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();

private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class);

private MetaClient metaClient;
private MetaClient metaClient;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;

/**
* init the meta info cache
Expand All @@ -70,11 +74,37 @@ public MetaManager(List<HostAddress> address, int timeout, int connectionRetry,
int executionRetry, boolean enableSSL, SSLParam sslParam)
throws TException, ClientServerIncompatibleException, UnknownHostException {
metaClient = new MetaClient(address, timeout, connectionRetry, executionRetry, enableSSL,
sslParam);
sslParam);
metaClient.connect();
fillMetaInfo();
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param sourceAddr ip:port
* @param targetAddr ip:port
*/
public void addStorageAddrMapping(String sourceAddr, String targetAddr) {
if (sourceAddr != null && targetAddr != null) {
storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
NetUtil.parseHostAddr(targetAddr));
}
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param addressMap sourceAddr(ip:port) => targetAddr(ip:port)
*/
public void addStorageAddrMapping(Map<String, String> addressMap) {
if (addressMap != null && !addressMap.isEmpty()) {
for (Map.Entry<String, String> et : addressMap.entrySet()) {
storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
NetUtil.parseHostAddr(et.getValue()));
}
}
}

/**
* close meta client
Expand All @@ -90,10 +120,10 @@ public void close() {
private void fillMetaInfo() {
try {
Map<String, MetaManager.SpaceInfo> tempSpacesInfo = new HashMap<>();
List<IdName> spaces = metaClient.getSpaces();
List<IdName> spaces = metaClient.getSpaces();
for (IdName space : spaces) {
SpaceInfo spaceInfo = new SpaceInfo();
String spaceName = new String(space.name);
String spaceName = new String(space.name);
SpaceItem spaceItem = metaClient.getSpace(spaceName);
spaceInfo.spaceItem = spaceItem;
List<TagItem> tags = metaClient.getTags(spaceName);
Expand Down Expand Up @@ -129,10 +159,13 @@ private void fillMetaInfo() {
for (int partId : spacesInfo.get(spaceName).partsAlloc.keySet()) {
if (spacesInfo.get(spaceName).partsAlloc.get(partId).size() < 1) {
LOGGER.error("space {} part {} has not allocation host.",
spaceName, partId);
spaceName, partId);
} else {
partLeaders.get(spaceName).put(partId,
spacesInfo.get(spaceName).partsAlloc.get(partId).get(0));
spacesInfo
.get(spaceName)
.partsAlloc
.get(partId).get(0));
}

}
Expand Down Expand Up @@ -280,7 +313,8 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce
if (!partLeaders.get(spaceName).containsKey(part)) {
throw new IllegalArgumentException("PartId:" + part + " does not exist.");
}
return partLeaders.get(spaceName).get(part);
HostAddr hostAddr = partLeaders.get(spaceName).get(part);
return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -313,7 +347,17 @@ public Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
if (!spacesInfo.containsKey(spaceName)) {
throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
}
return spacesInfo.get(spaceName).partsAlloc;
Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
if (!storageAddressMapping.isEmpty()) {
// transform real address to special address by mapping
partsAlloc.keySet().forEach(partId -> {
partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toList()));
});
}
return partsAlloc;
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -355,6 +399,12 @@ public Set<HostAddr> listHosts() {
if (hosts == null) {
return new HashSet<>();
}
if (!storageAddressMapping.isEmpty()) {
hosts = hosts
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toSet());
}
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,6 +45,8 @@ public class StorageClient implements Serializable {
private String user = null;
private String password = null;

private Map<String, String> storageAddressMapping = null;

/**
* Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with
* one server host.
Expand Down Expand Up @@ -108,6 +111,7 @@ public boolean connect() throws Exception {
pool = new StorageConnPool(config);
metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry,
enableSSL, sslParam);
metaManager.addStorageAddrMapping(storageAddressMapping);
return true;
}

Expand All @@ -122,6 +126,24 @@ public StorageClient setPassword(String password) {
}


/**
* The storage address translation relationship is set to convert the storage address
* that cannot be obtained by requesting the meta service
*
* @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port.
* eg: 127.0.0.1:9559 -> 10.1.1.2:9559,
* Translates the storage 127.0.0.1:9559 address obtained from the
* meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to
* request storage. Instead of 27.0.0.1:9559
*/
public void setStorageAddressMapping(Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
if (this.metaManager != null) {
this.metaManager.addStorageAddrMapping(storageAddressMapping);
}
}


/**
* scan vertex of all parts with specific return cols, if returnCols is an empty list, then
* return all the columns of specific tagName.
Expand Down
25 changes: 25 additions & 0 deletions client/src/main/java/com/vesoft/nebula/util/NetUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright (c) 2024 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.util;

import com.vesoft.nebula.HostAddr;

/**
* The util of network
*
* @Author jiangyiwang-jk
* @Date 2024/2/1 15:36
*/
public class NetUtil {

public static HostAddr parseHostAddr(String hostAddress) {
assert hostAddress != null : "Host address should not be null";
String[] hostPort = hostAddress.split(":");
assert hostPort.length == 2 : String.format("Invalid host address %s", hostAddress);
return new HostAddr(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()));
}

}
Loading