diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java index 004dbb6f8..ec560417b 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java @@ -16,6 +16,7 @@ import com.vesoft.nebula.meta.IdName; import com.vesoft.nebula.meta.SpaceItem; import com.vesoft.nebula.meta.TagItem; +import com.vesoft.nebula.util.NetUtil; import java.io.Serializable; import java.net.UnknownHostException; import java.util.ArrayList; @@ -24,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,25 +36,26 @@ */ public class MetaManager implements MetaCache, Serializable { private class SpaceInfo { - private SpaceItem spaceItem = null; - private Map tagItems = new HashMap<>(); - private Map tagIdNames = new HashMap<>(); - private Map edgeItems = new HashMap<>(); - private Map edgeTypeNames = new HashMap<>(); - private Map> partsAlloc = new HashMap<>(); + private SpaceItem spaceItem = null; + private Map tagItems = new HashMap<>(); + private Map tagIdNames = new HashMap<>(); + private Map edgeItems = new HashMap<>(); + private Map edgeTypeNames = new HashMap<>(); + private Map> partsAlloc = new HashMap<>(); } - private Map spacesInfo = new HashMap<>(); - private Map> partLeaders = null; + private Map spacesInfo = new HashMap<>(); + private Map> partLeaders = null; + private Map storageAddressMapping = new ConcurrentHashMap<>(); private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class); - private MetaClient metaClient; + private MetaClient metaClient; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private static final int DEFAULT_TIMEOUT_MS = 1000; + private static final int DEFAULT_TIMEOUT_MS = 1000; private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3; - private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3; + private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3; /** * init the meta info cache @@ -70,11 +74,37 @@ public MetaManager(List address, int timeout, int connectionRetry, int executionRetry, boolean enableSSL, SSLParam sslParam) throws TException, ClientServerIncompatibleException, UnknownHostException { metaClient = new MetaClient(address, timeout, connectionRetry, executionRetry, enableSSL, - sslParam); + sslParam); metaClient.connect(); fillMetaInfo(); } + /** + * Add address mapping for storage.Used for change address of storage read from meta server. + * + * @param sourceAddr ip:port + * @param targetAddr ip:port + */ + public void addStorageAddrMapping(String sourceAddr, String targetAddr) { + if (sourceAddr != null && targetAddr != null) { + storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr), + NetUtil.parseHostAddr(targetAddr)); + } + } + + /** + * Add address mapping for storage.Used for change address of storage read from meta server. + * + * @param addressMap sourceAddr(ip:port) => targetAddr(ip:port) + */ + public void addStorageAddrMapping(Map addressMap) { + if (addressMap != null && !addressMap.isEmpty()) { + for (Map.Entry et : addressMap.entrySet()) { + storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()), + NetUtil.parseHostAddr(et.getValue())); + } + } + } /** * close meta client @@ -90,10 +120,10 @@ public void close() { private void fillMetaInfo() { try { Map tempSpacesInfo = new HashMap<>(); - List spaces = metaClient.getSpaces(); + List spaces = metaClient.getSpaces(); for (IdName space : spaces) { SpaceInfo spaceInfo = new SpaceInfo(); - String spaceName = new String(space.name); + String spaceName = new String(space.name); SpaceItem spaceItem = metaClient.getSpace(spaceName); spaceInfo.spaceItem = spaceItem; List tags = metaClient.getTags(spaceName); @@ -129,10 +159,13 @@ private void fillMetaInfo() { for (int partId : spacesInfo.get(spaceName).partsAlloc.keySet()) { if (spacesInfo.get(spaceName).partsAlloc.get(partId).size() < 1) { LOGGER.error("space {} part {} has not allocation host.", - spaceName, partId); + spaceName, partId); } else { partLeaders.get(spaceName).put(partId, - spacesInfo.get(spaceName).partsAlloc.get(partId).get(0)); + spacesInfo + .get(spaceName) + .partsAlloc + .get(partId).get(0)); } } @@ -280,7 +313,8 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce if (!partLeaders.get(spaceName).containsKey(part)) { throw new IllegalArgumentException("PartId:" + part + " does not exist."); } - return partLeaders.get(spaceName).get(part); + HostAddr hostAddr = partLeaders.get(spaceName).get(part); + return storageAddressMapping.getOrDefault(hostAddr, hostAddr); } finally { lock.readLock().unlock(); } @@ -313,7 +347,17 @@ public Map> getPartsAlloc(String spaceName) if (!spacesInfo.containsKey(spaceName)) { throw new IllegalArgumentException("Space:" + spaceName + " does not exist."); } - return spacesInfo.get(spaceName).partsAlloc; + Map> partsAlloc = spacesInfo.get(spaceName).partsAlloc; + if (!storageAddressMapping.isEmpty()) { + // transform real address to special address by mapping + partsAlloc.keySet().forEach(partId -> { + partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList + .stream() + .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr)) + .collect(Collectors.toList())); + }); + } + return partsAlloc; } finally { lock.readLock().unlock(); } @@ -355,6 +399,12 @@ public Set listHosts() { if (hosts == null) { return new HashSet<>(); } + if (!storageAddressMapping.isEmpty()) { + hosts = hosts + .stream() + .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr)) + .collect(Collectors.toSet()); + } return hosts; } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java index 551f3b558..6519bcef7 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,8 @@ public class StorageClient implements Serializable { private String user = null; private String password = null; + private Map storageAddressMapping = null; + /** * Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with * one server host. @@ -108,6 +111,7 @@ public boolean connect() throws Exception { pool = new StorageConnPool(config); metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry, enableSSL, sslParam); + metaManager.addStorageAddrMapping(storageAddressMapping); return true; } @@ -122,6 +126,24 @@ public StorageClient setPassword(String password) { } + /** + * The storage address translation relationship is set to convert the storage address + * that cannot be obtained by requesting the meta service + * + * @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port. + * eg: 127.0.0.1:9559 -> 10.1.1.2:9559, + * Translates the storage 127.0.0.1:9559 address obtained from the + * meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to + * request storage. Instead of 27.0.0.1:9559 + */ + public void setStorageAddressMapping(Map storageAddressMapping) { + this.storageAddressMapping = storageAddressMapping; + if (this.metaManager != null) { + this.metaManager.addStorageAddrMapping(storageAddressMapping); + } + } + + /** * scan vertex of all parts with specific return cols, if returnCols is an empty list, then * return all the columns of specific tagName. diff --git a/client/src/main/java/com/vesoft/nebula/util/NetUtil.java b/client/src/main/java/com/vesoft/nebula/util/NetUtil.java new file mode 100644 index 000000000..cb9f67231 --- /dev/null +++ b/client/src/main/java/com/vesoft/nebula/util/NetUtil.java @@ -0,0 +1,25 @@ +/* Copyright (c) 2024 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.util; + +import com.vesoft.nebula.HostAddr; + +/** + * The util of network + * + * @Author jiangyiwang-jk + * @Date 2024/2/1 15:36 + */ +public class NetUtil { + + public static HostAddr parseHostAddr(String hostAddress) { + assert hostAddress != null : "Host address should not be null"; + String[] hostPort = hostAddress.split(":"); + assert hostPort.length == 2 : String.format("Invalid host address %s", hostAddress); + return new HostAddr(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim())); + } + +}