diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java index e18e901ff..1dff7d096 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java @@ -10,7 +10,6 @@ import com.facebook.thrift.transport.THeaderTransport; import com.facebook.thrift.transport.TSocket; import com.facebook.thrift.transport.TTransportException; -import com.google.common.base.Charsets; import com.vesoft.nebula.ErrorCode; import com.vesoft.nebula.HostAddr; import com.vesoft.nebula.client.graph.data.CASignedSSLParam; diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java index 004dbb6f8..5d18f7aa3 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java @@ -16,6 +16,7 @@ import com.vesoft.nebula.meta.IdName; import com.vesoft.nebula.meta.SpaceItem; import com.vesoft.nebula.meta.TagItem; +import com.vesoft.nebula.util.NetUtil; import java.io.Serializable; import java.net.UnknownHostException; import java.util.ArrayList; @@ -24,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +47,8 @@ private class SpaceInfo { private Map spacesInfo = new HashMap<>(); private Map> partLeaders = null; + private Map storageAddressMapping = new ConcurrentHashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class); private MetaClient metaClient; @@ -75,6 +80,33 @@ public MetaManager(List address, int timeout, int connectionRetry, fillMetaInfo(); } + /** + * Add address mapping for storage.Used for change address of storage read from meta server. + * + * @param sourceAddr ip:port + * @param targetAddr ip:port + */ + public void addStorageAddrMapping(String sourceAddr, String targetAddr) { + if (sourceAddr != null && targetAddr != null) { + storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr), + NetUtil.parseHostAddr(targetAddr)); + } + } + + /** + * Add address mapping for storage.Used for change address of storage read from meta server. + * + * @param addressMap sourceAddr(ip:port) => targetAddr(ip:port) + */ + public void addStorageAddrMapping(Map addressMap) { + if (addressMap != null && !addressMap.isEmpty()) { + for (Map.Entry et : addressMap.entrySet()) { + storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()), + NetUtil.parseHostAddr(et.getValue())); + } + } + } + /** * close meta client @@ -280,7 +312,8 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce if (!partLeaders.get(spaceName).containsKey(part)) { throw new IllegalArgumentException("PartId:" + part + " does not exist."); } - return partLeaders.get(spaceName).get(part); + HostAddr hostAddr = partLeaders.get(spaceName).get(part); + return storageAddressMapping.getOrDefault(hostAddr, hostAddr); } finally { lock.readLock().unlock(); } @@ -313,7 +346,17 @@ public Map> getPartsAlloc(String spaceName) if (!spacesInfo.containsKey(spaceName)) { throw new IllegalArgumentException("Space:" + spaceName + " does not exist."); } - return spacesInfo.get(spaceName).partsAlloc; + Map> partsAlloc = spacesInfo.get(spaceName).partsAlloc; + if (!storageAddressMapping.isEmpty()) { + // transform real address to special address by mapping + partsAlloc.keySet().forEach(partId -> { + partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList + .stream() + .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr)) + .collect(Collectors.toList())); + }); + } + return partsAlloc; } finally { lock.readLock().unlock(); } @@ -355,6 +398,12 @@ public Set listHosts() { if (hosts == null) { return new HashSet<>(); } + if (!storageAddressMapping.isEmpty()) { + hosts = hosts + .stream() + .map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr)) + .collect(Collectors.toSet()); + } return hosts; } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java index 551f3b558..02e1c2152 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +42,7 @@ public class StorageClient implements Serializable { private boolean enableSSL = false; private SSLParam sslParam = null; - private String user = null; - private String password = null; + private Map storageAddressMapping; /** * Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with @@ -95,6 +95,23 @@ public StorageClient(List addresses, int timeout, int connectionRet } } + /** + * The storage address translation relationship is set to convert the storage address + * that cannot be obtained by requesting the meta service + * + * @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port. + * eg: 127.0.0.1:9559 -> 10.1.1.2:9559, + * Translates the storage 127.0.0.1:9559 address obtained from the + * meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to + * request storage. Instead of 27.0.0.1:9559 + */ + public void setStorageAddressMapping(Map storageAddressMapping) { + this.storageAddressMapping = storageAddressMapping; + if (this.metaManager != null) { + this.metaManager.addStorageAddrMapping(storageAddressMapping); + } + } + /** * Connect to Nebula Storage server. * @@ -108,19 +125,10 @@ public boolean connect() throws Exception { pool = new StorageConnPool(config); metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry, enableSSL, sslParam); + metaManager.addStorageAddrMapping(storageAddressMapping); return true; } - public StorageClient setUser(String user) { - this.user = user; - return this; - } - - public StorageClient setPassword(String password) { - this.password = password; - return this; - } - /** * scan vertex of all parts with specific return cols, if returnCols is an empty list, then @@ -632,8 +640,6 @@ private ScanVertexResultIterator doScanVertex(String spaceName, .withSpaceName(spaceName) .withTagName(tagName) .withPartSuccess(allowPartSuccess) - .withUser(user) - .withPassword(password) .build(); } @@ -1096,8 +1102,6 @@ private ScanEdgeResultIterator doScanEdge(String spaceName, .withSpaceName(spaceName) .withEdgeName(edgeName) .withPartSuccess(allowPartSuccess) - .withUser(user) - .withPassword(password) .build(); } diff --git a/client/src/main/java/com/vesoft/nebula/util/NetUtil.java b/client/src/main/java/com/vesoft/nebula/util/NetUtil.java new file mode 100644 index 000000000..15a99bb53 --- /dev/null +++ b/client/src/main/java/com/vesoft/nebula/util/NetUtil.java @@ -0,0 +1,25 @@ +package com.vesoft.nebula.util; + +import com.vesoft.nebula.HostAddr; +import com.vesoft.nebula.client.graph.data.HostAddress; + +/** + * The util of network + * + * @Author jiangyiwang-jk + * @Date 2024/2/1 15:36 + */ +public class NetUtil { + + private NetUtil() { + ; + } + + public static HostAddr parseHostAddr(String hostAddress) { + assert hostAddress != null : "Host address should not be null"; + String[] hostPort = hostAddress.split(":"); + assert hostPort.length == 2 : String.format("Invalid host address %s", hostAddress); + return new HostAddr(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim())); + } + +} diff --git a/examples/src/main/java/com/vesoft/nebula/examples/SpecialAddressStorageClientExample.java b/examples/src/main/java/com/vesoft/nebula/examples/SpecialAddressStorageClientExample.java new file mode 100644 index 000000000..19249cc36 --- /dev/null +++ b/examples/src/main/java/com/vesoft/nebula/examples/SpecialAddressStorageClientExample.java @@ -0,0 +1,123 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.examples; + +import com.vesoft.nebula.client.storage.StorageClient; +import com.vesoft.nebula.client.storage.data.EdgeTableRow; +import com.vesoft.nebula.client.storage.data.VertexRow; +import com.vesoft.nebula.client.storage.data.VertexTableRow; +import com.vesoft.nebula.client.storage.scan.ScanEdgeResult; +import com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator; +import com.vesoft.nebula.client.storage.scan.ScanVertexResult; +import com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SpecialAddressStorageClientExample { + private static final Logger LOGGER = + LoggerFactory.getLogger(SpecialAddressStorageClientExample.class); + + public static void main(String[] args) { + Map storageAddressMapping = new HashMap<>(); + storageAddressMapping.put("127.0.0.1:9559","10.xx.xx.xx:9559"); + // input params are the metad's ip and port + StorageClient client = new StorageClient("127.0.0.1", 9559); + // set storage address mapping + client.setStorageAddressMapping(storageAddressMapping); + try { + client.connect(); + } catch (Exception e) { + LOGGER.error("storage client connect error, ", e); + client.close(); + System.exit(1); + } + scanVertex(client); + scanEdge(client); + + client.close(); + } + + /** + * Vertex Person's property in Nebula Graph: + * first_name, last_name, gender, birthday + * Tom Li 男 2010 + */ + public static void scanVertex(StorageClient client) { + ScanVertexResultIterator iterator = client.scanVertex( + "test", + "person", + Arrays.asList("name", "age")); + + while (iterator.hasNext()) { + ScanVertexResult result = null; + try { + result = iterator.next(); + } catch (Exception e) { + LOGGER.error("scan error, ", e); + client.close(); + System.exit(1); + } + if (result.isEmpty()) { + continue; + } + + List vertexRows = result.getVertices(); + for (VertexRow row : vertexRows) { + if (result.getVertex(row.getVid()) != null) { + System.out.println(result.getVertex(row.getVid())); + } + } + + System.out.println("\nresult vertex table view:"); + System.out.println(result.getPropNames()); + List vertexTableRows = result.getVertexTableRows(); + for (VertexTableRow vertex : vertexTableRows) { + System.out.println(vertex.getValues()); + } + System.out.println("\n"); + } + } + + /** + * Edge Friend's property in Nebula Graph: + * degree + * 1.0 + */ + public static void scanEdge(StorageClient client) { + ScanEdgeResultIterator iterator = client.scanEdge( + "test", + "like", + Arrays.asList("likeness")); + + while (iterator.hasNext()) { + ScanEdgeResult result = null; + try { + result = iterator.next(); + } catch (Exception e) { + LOGGER.error("scan error, ", e); + client.close(); + System.exit(1); + } + if (result.isEmpty()) { + continue; + } + + System.out.println(result.getEdges()); + + System.out.println("\nresult edge table view:"); + System.out.println(result.getPropNames()); + List edgeTableRows = result.getEdgeTableRows(); + for (EdgeTableRow edge : edgeTableRows) { + System.out.println(edge.getValues()); + } + System.out.println("\n"); + } + } +}