Skip to content

Commit

Permalink
Add storage address mapping transform (#589)
Browse files Browse the repository at this point in the history
* fix conflict

* Add storage address mapping transform

* update

---------

Co-authored-by: Anqi <[email protected]>
Co-authored-by: jiangyiwang-jk <[email protected]>
  • Loading branch information
3 people authored Jul 5, 2024
1 parent 6b25e64 commit cd003d5
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.facebook.thrift.transport.THeaderTransport;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransportException;
import com.google.common.base.Charsets;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.util.NetUtil;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -24,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,6 +47,8 @@ private class SpaceInfo {
private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;

private Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();

private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class);

private MetaClient metaClient;
Expand Down Expand Up @@ -75,6 +80,33 @@ public MetaManager(List<HostAddress> address, int timeout, int connectionRetry,
fillMetaInfo();
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param sourceAddr ip:port
* @param targetAddr ip:port
*/
public void addStorageAddrMapping(String sourceAddr, String targetAddr) {
if (sourceAddr != null && targetAddr != null) {
storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
NetUtil.parseHostAddr(targetAddr));
}
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param addressMap sourceAddr(ip:port) => targetAddr(ip:port)
*/
public void addStorageAddrMapping(Map<String, String> addressMap) {
if (addressMap != null && !addressMap.isEmpty()) {
for (Map.Entry<String, String> et : addressMap.entrySet()) {
storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
NetUtil.parseHostAddr(et.getValue()));
}
}
}


/**
* close meta client
Expand Down Expand Up @@ -280,7 +312,8 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce
if (!partLeaders.get(spaceName).containsKey(part)) {
throw new IllegalArgumentException("PartId:" + part + " does not exist.");
}
return partLeaders.get(spaceName).get(part);
HostAddr hostAddr = partLeaders.get(spaceName).get(part);
return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -313,7 +346,17 @@ public Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
if (!spacesInfo.containsKey(spaceName)) {
throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
}
return spacesInfo.get(spaceName).partsAlloc;
Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
if (!storageAddressMapping.isEmpty()) {
// transform real address to special address by mapping
partsAlloc.keySet().forEach(partId -> {
partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toList()));
});
}
return partsAlloc;
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -355,6 +398,12 @@ public Set<HostAddr> listHosts() {
if (hosts == null) {
return new HashSet<>();
}
if (!storageAddressMapping.isEmpty()) {
hosts = hosts
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toSet());
}
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,8 +42,7 @@ public class StorageClient implements Serializable {
private boolean enableSSL = false;
private SSLParam sslParam = null;

private String user = null;
private String password = null;
private Map<String, String> storageAddressMapping;

/**
* Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with
Expand Down Expand Up @@ -95,6 +95,23 @@ public StorageClient(List<HostAddress> addresses, int timeout, int connectionRet
}
}

/**
* The storage address translation relationship is set to convert the storage address
* that cannot be obtained by requesting the meta service
*
* @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port.
* eg: 127.0.0.1:9559 -> 10.1.1.2:9559,
* Translates the storage 127.0.0.1:9559 address obtained from the
* meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to
* request storage. Instead of 27.0.0.1:9559
*/
public void setStorageAddressMapping(Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
if (this.metaManager != null) {
this.metaManager.addStorageAddrMapping(storageAddressMapping);
}
}

/**
* Connect to Nebula Storage server.
*
Expand All @@ -108,19 +125,10 @@ public boolean connect() throws Exception {
pool = new StorageConnPool(config);
metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry,
enableSSL, sslParam);
metaManager.addStorageAddrMapping(storageAddressMapping);
return true;
}

public StorageClient setUser(String user) {
this.user = user;
return this;
}

public StorageClient setPassword(String password) {
this.password = password;
return this;
}


/**
* scan vertex of all parts with specific return cols, if returnCols is an empty list, then
Expand Down Expand Up @@ -632,8 +640,6 @@ private ScanVertexResultIterator doScanVertex(String spaceName,
.withSpaceName(spaceName)
.withTagName(tagName)
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.build();
}

Expand Down Expand Up @@ -1096,8 +1102,6 @@ private ScanEdgeResultIterator doScanEdge(String spaceName,
.withSpaceName(spaceName)
.withEdgeName(edgeName)
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.build();
}

Expand Down
25 changes: 25 additions & 0 deletions client/src/main/java/com/vesoft/nebula/util/NetUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.vesoft.nebula.util;

import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;

/**
* The util of network
*
* @Author jiangyiwang-jk
* @Date 2024/2/1 15:36
*/
public class NetUtil {

private NetUtil() {
;
}

public static HostAddr parseHostAddr(String hostAddress) {
assert hostAddress != null : "Host address should not be null";
String[] hostPort = hostAddress.split(":");
assert hostPort.length == 2 : String.format("Invalid host address %s", hostAddress);
return new HostAddr(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.examples;

import com.vesoft.nebula.client.storage.StorageClient;
import com.vesoft.nebula.client.storage.data.EdgeTableRow;
import com.vesoft.nebula.client.storage.data.VertexRow;
import com.vesoft.nebula.client.storage.data.VertexTableRow;
import com.vesoft.nebula.client.storage.scan.ScanEdgeResult;
import com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator;
import com.vesoft.nebula.client.storage.scan.ScanVertexResult;
import com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpecialAddressStorageClientExample {
private static final Logger LOGGER =
LoggerFactory.getLogger(SpecialAddressStorageClientExample.class);

public static void main(String[] args) {
Map<String,String> storageAddressMapping = new HashMap<>();
storageAddressMapping.put("127.0.0.1:9559","10.xx.xx.xx:9559");
// input params are the metad's ip and port
StorageClient client = new StorageClient("127.0.0.1", 9559);
// set storage address mapping
client.setStorageAddressMapping(storageAddressMapping);
try {
client.connect();
} catch (Exception e) {
LOGGER.error("storage client connect error, ", e);
client.close();
System.exit(1);
}
scanVertex(client);
scanEdge(client);

client.close();
}

/**
* Vertex Person's property in Nebula Graph:
* first_name, last_name, gender, birthday
* Tom Li 男 2010
*/
public static void scanVertex(StorageClient client) {
ScanVertexResultIterator iterator = client.scanVertex(
"test",
"person",
Arrays.asList("name", "age"));

while (iterator.hasNext()) {
ScanVertexResult result = null;
try {
result = iterator.next();
} catch (Exception e) {
LOGGER.error("scan error, ", e);
client.close();
System.exit(1);
}
if (result.isEmpty()) {
continue;
}

List<VertexRow> vertexRows = result.getVertices();
for (VertexRow row : vertexRows) {
if (result.getVertex(row.getVid()) != null) {
System.out.println(result.getVertex(row.getVid()));
}
}

System.out.println("\nresult vertex table view:");
System.out.println(result.getPropNames());
List<VertexTableRow> vertexTableRows = result.getVertexTableRows();
for (VertexTableRow vertex : vertexTableRows) {
System.out.println(vertex.getValues());
}
System.out.println("\n");
}
}

/**
* Edge Friend's property in Nebula Graph:
* degree
* 1.0
*/
public static void scanEdge(StorageClient client) {
ScanEdgeResultIterator iterator = client.scanEdge(
"test",
"like",
Arrays.asList("likeness"));

while (iterator.hasNext()) {
ScanEdgeResult result = null;
try {
result = iterator.next();
} catch (Exception e) {
LOGGER.error("scan error, ", e);
client.close();
System.exit(1);
}
if (result.isEmpty()) {
continue;
}

System.out.println(result.getEdges());

System.out.println("\nresult edge table view:");
System.out.println(result.getPropNames());
List<EdgeTableRow> edgeTableRows = result.getEdgeTableRows();
for (EdgeTableRow edge : edgeTableRows) {
System.out.println(edge.getValues());
}
System.out.println("\n");
}
}
}

0 comments on commit cd003d5

Please sign in to comment.