Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add storage address mapping transform" #603

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.facebook.thrift.transport.THeaderTransport;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransportException;
import com.google.common.base.Charsets;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.util.NetUtil;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -25,9 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,8 +44,6 @@ private class SpaceInfo {
private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;

private Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();

private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class);

private MetaClient metaClient;
Expand Down Expand Up @@ -80,33 +75,6 @@ public MetaManager(List<HostAddress> address, int timeout, int connectionRetry,
fillMetaInfo();
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param sourceAddr ip:port
* @param targetAddr ip:port
*/
public void addStorageAddrMapping(String sourceAddr, String targetAddr) {
if (sourceAddr != null && targetAddr != null) {
storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
NetUtil.parseHostAddr(targetAddr));
}
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param addressMap sourceAddr(ip:port) => targetAddr(ip:port)
*/
public void addStorageAddrMapping(Map<String, String> addressMap) {
if (addressMap != null && !addressMap.isEmpty()) {
for (Map.Entry<String, String> et : addressMap.entrySet()) {
storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
NetUtil.parseHostAddr(et.getValue()));
}
}
}


/**
* close meta client
Expand Down Expand Up @@ -312,8 +280,7 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce
if (!partLeaders.get(spaceName).containsKey(part)) {
throw new IllegalArgumentException("PartId:" + part + " does not exist.");
}
HostAddr hostAddr = partLeaders.get(spaceName).get(part);
return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
return partLeaders.get(spaceName).get(part);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -346,17 +313,7 @@ public Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
if (!spacesInfo.containsKey(spaceName)) {
throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
}
Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
if (!storageAddressMapping.isEmpty()) {
// transform real address to special address by mapping
partsAlloc.keySet().forEach(partId -> {
partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toList()));
});
}
return partsAlloc;
return spacesInfo.get(spaceName).partsAlloc;
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -398,12 +355,6 @@ public Set<HostAddr> listHosts() {
if (hosts == null) {
return new HashSet<>();
}
if (!storageAddressMapping.isEmpty()) {
hosts = hosts
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toSet());
}
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,7 +41,8 @@ public class StorageClient implements Serializable {
private boolean enableSSL = false;
private SSLParam sslParam = null;

private Map<String, String> storageAddressMapping;
private String user = null;
private String password = null;

/**
* Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with
Expand Down Expand Up @@ -95,23 +95,6 @@ public StorageClient(List<HostAddress> addresses, int timeout, int connectionRet
}
}

/**
* The storage address translation relationship is set to convert the storage address
* that cannot be obtained by requesting the meta service
*
* @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port.
* eg: 127.0.0.1:9559 -> 10.1.1.2:9559,
* Translates the storage 127.0.0.1:9559 address obtained from the
* meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to
* request storage. Instead of 27.0.0.1:9559
*/
public void setStorageAddressMapping(Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
if (this.metaManager != null) {
this.metaManager.addStorageAddrMapping(storageAddressMapping);
}
}

/**
* Connect to Nebula Storage server.
*
Expand All @@ -125,10 +108,19 @@ public boolean connect() throws Exception {
pool = new StorageConnPool(config);
metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry,
enableSSL, sslParam);
metaManager.addStorageAddrMapping(storageAddressMapping);
return true;
}

public StorageClient setUser(String user) {
this.user = user;
return this;
}

public StorageClient setPassword(String password) {
this.password = password;
return this;
}


/**
* scan vertex of all parts with specific return cols, if returnCols is an empty list, then
Expand Down Expand Up @@ -640,6 +632,8 @@ private ScanVertexResultIterator doScanVertex(String spaceName,
.withSpaceName(spaceName)
.withTagName(tagName)
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.build();
}

Expand Down Expand Up @@ -1102,6 +1096,8 @@ private ScanEdgeResultIterator doScanEdge(String spaceName,
.withSpaceName(spaceName)
.withEdgeName(edgeName)
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.build();
}

Expand Down
25 changes: 0 additions & 25 deletions client/src/main/java/com/vesoft/nebula/util/NetUtil.java

This file was deleted.

This file was deleted.

Loading