diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java index e18e901ff..d5097ad65 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java @@ -66,16 +66,16 @@ public class MetaClient extends AbstractMetaClient { public static final int LATEST_SCHEMA_VERSION = -1; - private static final int DEFAULT_TIMEOUT_MS = 1000; + private static final int DEFAULT_TIMEOUT_MS = 1000; private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3; - private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3; - private static final int RETRY_TIMES = 1; + private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3; + private static final int RETRY_TIMES = 1; - private boolean enableSSL = false; - private SSLParam sslParam = null; + private boolean enableSSL = false; + private SSLParam sslParam = null; - private MetaService.Client client; - private final List addresses; + private MetaService.Client client; + private final List addresses; public MetaClient(String host, int port) throws UnknownHostException { this(new HostAddress(host, port)); @@ -122,9 +122,9 @@ public void connect() */ private void doConnect() throws TTransportException, ClientServerIncompatibleException { - Random random = new Random(System.currentTimeMillis()); - int position = random.nextInt(addresses.size()); - HostAddress address = addresses.get(position); + Random random = new Random(System.currentTimeMillis()); + int position = random.nextInt(addresses.size()); + HostAddress address = addresses.get(position); getClient(address.getHost(), address.getPort()); } @@ -192,8 +192,8 @@ public void close() { * @return */ public synchronized List getSpaces() throws TException, ExecuteFailedException { - int retry = RETRY_TIMES; - ListSpacesReq request = new ListSpacesReq(); + int retry = RETRY_TIMES; + ListSpacesReq request = new ListSpacesReq(); ListSpacesResp response = null; try { while (retry-- >= 0) { @@ -225,7 +225,7 @@ public synchronized List getSpaces() throws TException, ExecuteFailedExc */ public synchronized SpaceItem getSpace(String spaceName) throws TException, ExecuteFailedException { - int retry = RETRY_TIMES; + int retry = RETRY_TIMES; GetSpaceReq request = new GetSpaceReq(); request.setSpace_name(spaceName.getBytes()); GetSpaceResp response = null; @@ -261,8 +261,8 @@ public synchronized List getTags(String spaceName) throws TException, ExecuteFailedException { int retry = RETRY_TIMES; - int spaceID = getSpace(spaceName).space_id; - ListTagsReq request = new ListTagsReq(spaceID); + int spaceID = getSpace(spaceName).space_id; + ListTagsReq request = new ListTagsReq(spaceID); ListTagsResp response = null; try { while (retry-- >= 0) { @@ -296,9 +296,9 @@ public synchronized List getTags(String spaceName) */ public synchronized Schema getTag(String spaceName, String tagName) throws TException, ExecuteFailedException { - int retry = RETRY_TIMES; + int retry = RETRY_TIMES; GetTagReq request = new GetTagReq(); - int spaceID = getSpace(spaceName).getSpace_id(); + int spaceID = getSpace(spaceName).getSpace_id(); request.setSpace_id(spaceID); request.setTag_name(tagName.getBytes()); request.setVersion(LATEST_SCHEMA_VERSION); @@ -335,9 +335,9 @@ public synchronized Schema getTag(String spaceName, String tagName) */ public synchronized List getEdges(String spaceName) throws TException, ExecuteFailedException { - int retry = RETRY_TIMES; - int spaceID = getSpace(spaceName).getSpace_id(); - ListEdgesReq request = new ListEdgesReq(spaceID); + int retry = RETRY_TIMES; + int spaceID = getSpace(spaceName).getSpace_id(); + ListEdgesReq request = new ListEdgesReq(spaceID); ListEdgesResp response = null; try { while (retry-- >= 0) { @@ -370,9 +370,9 @@ public synchronized List getEdges(String spaceName) */ public synchronized Schema getEdge(String spaceName, String edgeName) throws TException, ExecuteFailedException { - int retry = RETRY_TIMES; + int retry = RETRY_TIMES; GetEdgeReq request = new GetEdgeReq(); - int spaceID = getSpace(spaceName).getSpace_id(); + int spaceID = getSpace(spaceName).getSpace_id(); request.setSpace_id(spaceID); request.setEdge_name(edgeName.getBytes()); request.setVersion(LATEST_SCHEMA_VERSION); @@ -410,9 +410,9 @@ public synchronized Schema getEdge(String spaceName, String edgeName) */ public synchronized Map> getPartsAlloc(String spaceName) throws ExecuteFailedException, TException { - int retry = RETRY_TIMES; + int retry = RETRY_TIMES; GetPartsAllocReq request = new GetPartsAllocReq(); - int spaceID = getSpace(spaceName).getSpace_id(); + int spaceID = getSpace(spaceName).getSpace_id(); request.setSpace_id(spaceID); GetPartsAllocResp response = null; @@ -442,7 +442,7 @@ public synchronized Map> getPartsAlloc(String spaceName) * get all Storaged servers */ public synchronized Set listHosts() { - int retry = RETRY_TIMES; + int retry = RETRY_TIMES; ListHostsReq request = new ListHostsReq(); request.setType(ListHostType.STORAGE); ListHostsResp resp = null; @@ -471,4 +471,38 @@ public synchronized Set listHosts() { } return hostAddrs; } + + /** + * get the leader parts for all storaged address + */ + public synchronized Set getHostItems() { + int retry = RETRY_TIMES; + ListHostsReq request = new ListHostsReq(); + request.setType(ListHostType.ALLOC); + ListHostsResp resp = null; + try { + while (retry-- >= 0) { + resp = client.listHosts(request); + if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) { + freshClient(resp.getLeader()); + } else { + break; + } + } + } catch (TException e) { + LOGGER.error("listHosts error", e); + return null; + } + if (resp.getCode() != ErrorCode.SUCCEEDED) { + LOGGER.error("listHosts execute failed, errorCode: " + resp.getCode()); + return null; + } + Set hostItems = new HashSet<>(); + for (HostItem hostItem : resp.hosts) { + if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) { + hostItems.add(hostItem); + } + } + return hostItems; + } } diff --git a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java index ec560417b..5e2da04b2 100644 --- a/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java +++ b/client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java @@ -13,6 +13,7 @@ import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; import com.vesoft.nebula.client.meta.exception.ExecuteFailedException; import com.vesoft.nebula.meta.EdgeItem; +import com.vesoft.nebula.meta.HostItem; import com.vesoft.nebula.meta.IdName; import com.vesoft.nebula.meta.SpaceItem; import com.vesoft.nebula.meta.TagItem; @@ -28,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import org.apache.commons.codec.Charsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,21 +155,16 @@ private void fillMetaInfo() { if (partLeaders == null) { partLeaders = new HashMap<>(); } - for (String spaceName : spacesInfo.keySet()) { - if (!partLeaders.containsKey(spaceName)) { - partLeaders.put(spaceName, Maps.newConcurrentMap()); - for (int partId : spacesInfo.get(spaceName).partsAlloc.keySet()) { - if (spacesInfo.get(spaceName).partsAlloc.get(partId).size() < 1) { - LOGGER.error("space {} part {} has not allocation host.", - spaceName, partId); - } else { - partLeaders.get(spaceName).put(partId, - spacesInfo - .get(spaceName) - .partsAlloc - .get(partId).get(0)); - } - + for (HostItem hostItem : metaClient.getHostItems()) { + HostAddr leader = hostItem.getHostAddr(); + for (Map.Entry> spaceParts + : hostItem.getLeader_parts().entrySet()) { + String space = new String(spaceParts.getKey(), Charsets.UTF_8); + if (!partLeaders.containsKey(space)) { + partLeaders.put(space, Maps.newConcurrentMap()); + } + for (int part : spaceParts.getValue()) { + partLeaders.get(space).put(part, leader); } } } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java index 6519bcef7..eca68ccf2 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/StorageClient.java @@ -656,6 +656,7 @@ private ScanVertexResultIterator doScanVertex(String spaceName, .withPartSuccess(allowPartSuccess) .withUser(user) .withPassword(password) + .withStorageAddressMapping(storageAddressMapping) .build(); } @@ -1175,5 +1176,5 @@ private long getEdgeId(String spaceName, String edgeName) { private static final long DEFAULT_START_TIME = 0; private static final long DEFAULT_END_TIME = Long.MAX_VALUE; private static final boolean DEFAULT_ALLOW_PART_SUCCESS = false; - private static final boolean DEFAULT_ALLOW_READ_FOLLOWER = true; + private static final boolean DEFAULT_ALLOW_READ_FOLLOWER = false; } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java index b82e644ce..d70f8f8a7 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanEdgeResultIterator.java @@ -5,9 +5,10 @@ package com.vesoft.nebula.client.storage.scan; -import com.facebook.thrift.TException; import com.google.common.base.Charsets; import com.vesoft.nebula.DataSet; +import com.vesoft.nebula.ErrorCode; +import com.vesoft.nebula.HostAddr; import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.client.meta.MetaManager; import com.vesoft.nebula.client.storage.GraphStorageConnection; @@ -33,7 +34,7 @@ public class ScanEdgeResultIterator extends ScanResultIterator { private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class); private final ScanEdgeRequest request; - private ExecutorService threadPool = null; + private ExecutorService threadPool = null; private ScanEdgeResultIterator(MetaManager metaManager, StorageConnPool pool, @@ -44,9 +45,10 @@ private ScanEdgeResultIterator(MetaManager metaManager, String labelName, boolean partSuccess, String user, - String password) { + String password, + Map storageAddressMapping) { super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName, - labelName, partSuccess, user, password); + labelName, partSuccess, user, password, storageAddressMapping); this.request = request; } @@ -69,13 +71,14 @@ public ScanEdgeResult next() throws Exception { List exceptions = Collections.synchronizedList(new ArrayList<>(addresses.size())); CountDownLatch countDownLatch = new CountDownLatch(addresses.size()); - AtomicInteger existSuccess = new AtomicInteger(0); + AtomicInteger existSuccess = new AtomicInteger(0); threadPool = Executors.newFixedThreadPool(addresses.size()); for (HostAddress addr : addresses) { threadPool.submit(() -> { + HostAddress leader = addr; ScanResponse response; - PartScanInfo partInfo = partScanQueue.getPart(addr); + PartScanInfo partInfo = partScanQueue.getPart(leader); // no part need to scan if (partInfo == null) { countDownLatch.countDown(); @@ -85,8 +88,7 @@ public ScanEdgeResult next() throws Exception { GraphStorageConnection connection; try { - connection = pool.getStorageConnection(new HostAddress(addr.getHost(), - addr.getPort())); + connection = pool.getStorageConnection(leader); } catch (Exception e) { LOGGER.error("get storage client error, ", e); exceptions.add(e); @@ -105,12 +107,25 @@ public ScanEdgeResult next() throws Exception { partRequest.setNeed_authenticate(true); try { response = connection.scanEdge(partRequest); - } catch (TException e) { + if (!response.getResult().failed_parts.isEmpty() + && response.getResult().failed_parts.get(0).code + == ErrorCode.E_LEADER_CHANGED) { + pool.release(leader, connection); + HostAddr newLeader = response.getResult().failed_parts.get(0).leader; + HostAddr availableLeader = storageAddressMapping + .getOrDefault(newLeader, newLeader); + leader = new HostAddress(availableLeader.host, availableLeader.getPort()); + connection = pool.getStorageConnection(leader); + response = connection.scanEdge(partRequest); + } + } catch (Exception e) { LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), e); exceptions.add(e); partScanQueue.dropPart(partInfo); countDownLatch.countDown(); return; + } finally { + pool.release(leader, connection); } if (response == null) { @@ -158,7 +173,7 @@ public ScanEdgeResult next() throws Exception { if (!exceptions.isEmpty()) { throwExceptions(exceptions); } - boolean success = (existSuccess.get() == addresses.size()); + boolean success = (existSuccess.get() == addresses.size()); List finalResults = success ? results : null; return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS); } @@ -170,16 +185,17 @@ public ScanEdgeResult next() throws Exception { */ public static class ScanEdgeResultBuilder { - MetaManager metaManager; - StorageConnPool pool; - Set partScanInfoList; - List addresses; - ScanEdgeRequest request; - String spaceName; - String edgeName; - boolean partSuccess = false; - String user = null; - String password = null; + MetaManager metaManager; + StorageConnPool pool; + Set partScanInfoList; + List addresses; + ScanEdgeRequest request; + String spaceName; + String edgeName; + boolean partSuccess = false; + String user = null; + String password = null; + Map storageAddressMapping = null; public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) { this.metaManager = metaManager; @@ -231,6 +247,12 @@ public ScanEdgeResultBuilder withPassword(String password) { return this; } + public ScanEdgeResultBuilder withStorageAddressMapping( + Map storageAddressMapping) { + this.storageAddressMapping = storageAddressMapping; + return this; + } + public ScanEdgeResultIterator build() { return new ScanEdgeResultIterator( metaManager, @@ -242,7 +264,8 @@ public ScanEdgeResultIterator build() { edgeName, partSuccess, user, - password); + password, + storageAddressMapping); } } } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java index 78b0a298a..754023ffc 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanResultIterator.java @@ -13,12 +13,12 @@ import com.vesoft.nebula.client.storage.StorageConnPool; import com.vesoft.nebula.storage.PartitionResult; import com.vesoft.nebula.storage.ScanResponse; +import com.vesoft.nebula.util.NetUtil; import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,21 +30,29 @@ public class ScanResultIterator implements Serializable { protected final Map partCursor; - protected final MetaManager metaManager; - protected final StorageConnPool pool; - protected final PartScanQueue partScanQueue; + protected final MetaManager metaManager; + protected final StorageConnPool pool; + protected final PartScanQueue partScanQueue; protected final List addresses; - protected final String spaceName; - protected final String labelName; - protected final boolean partSuccess; + protected final String spaceName; + protected final String labelName; + protected final boolean partSuccess; protected final String user; protected final String password; - protected ScanResultIterator(MetaManager metaManager, StorageConnPool pool, - PartScanQueue partScanQueue, List addresses, - String spaceName, String labelName, boolean partSuccess, - String user, String password) { + protected final Map storageAddressMapping = new ConcurrentHashMap<>(); + + protected ScanResultIterator(MetaManager metaManager, + StorageConnPool pool, + PartScanQueue partScanQueue, + List addresses, + String spaceName, + String labelName, + boolean partSuccess, + String user, + String password, + Map storageAddrMapping) { this.metaManager = metaManager; this.pool = pool; this.partScanQueue = partScanQueue; @@ -55,6 +63,12 @@ protected ScanResultIterator(MetaManager metaManager, StorageConnPool pool, this.partCursor = new HashMap<>(partScanQueue.size()); this.user = user; this.password = password; + if (storageAddrMapping != null && !storageAddrMapping.isEmpty()) { + for (Map.Entry et : storageAddrMapping.entrySet()) { + storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()), + NetUtil.parseHostAddr(et.getValue())); + } + } } diff --git a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java index 4b825c1f6..b30c910e7 100644 --- a/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java +++ b/client/src/main/java/com/vesoft/nebula/client/storage/scan/ScanVertexResultIterator.java @@ -8,6 +8,8 @@ import com.facebook.thrift.TException; import com.google.common.base.Charsets; import com.vesoft.nebula.DataSet; +import com.vesoft.nebula.ErrorCode; +import com.vesoft.nebula.HostAddr; import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.client.meta.MetaManager; import com.vesoft.nebula.client.storage.GraphStorageConnection; @@ -36,7 +38,7 @@ public class ScanVertexResultIterator extends ScanResultIterator { private static final Logger LOGGER = LoggerFactory.getLogger(ScanVertexResultIterator.class); private final ScanVertexRequest request; - private ExecutorService threadPool = null; + private ExecutorService threadPool = null; private ScanVertexResultIterator(MetaManager metaManager, StorageConnPool pool, @@ -47,9 +49,10 @@ private ScanVertexResultIterator(MetaManager metaManager, String labelName, boolean partSuccess, String user, - String password) { + String password, + Map storageAddressMapping) { super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName, - labelName, partSuccess, user, password); + labelName, partSuccess, user, password, storageAddressMapping); this.request = request; } @@ -71,14 +74,15 @@ public ScanVertexResult next() throws Exception { List exceptions = Collections.synchronizedList(new ArrayList<>(addresses.size())); CountDownLatch countDownLatch = new CountDownLatch(addresses.size()); - AtomicInteger existSuccess = new AtomicInteger(0); + AtomicInteger existSuccess = new AtomicInteger(0); threadPool = Executors.newFixedThreadPool(addresses.size()); for (HostAddress addr : addresses) { threadPool.submit(() -> { + HostAddress leader = addr; ScanResponse response; - PartScanInfo partInfo = partScanQueue.getPart(addr); + PartScanInfo partInfo = partScanQueue.getPart(leader); // no part need to scan if (partInfo == null) { countDownLatch.countDown(); @@ -88,7 +92,7 @@ public ScanVertexResult next() throws Exception { GraphStorageConnection connection; try { - connection = pool.getStorageConnection(addr); + connection = pool.getStorageConnection(leader); } catch (Exception e) { LOGGER.error("get storage client error, ", e); exceptions.add(e); @@ -107,12 +111,25 @@ public ScanVertexResult next() throws Exception { partRequest.setNeed_authenticate(true); try { response = connection.scanVertex(partRequest); - } catch (TException e) { + if (!response.getResult().failed_parts.isEmpty() + && response.getResult().failed_parts.get(0).code + == ErrorCode.E_LEADER_CHANGED) { + pool.release(leader, connection); + HostAddr newLeader = response.getResult().failed_parts.get(0).leader; + HostAddr availableLeader = storageAddressMapping + .getOrDefault(newLeader, newLeader); + leader = new HostAddress(availableLeader.host, availableLeader.getPort()); + connection = pool.getStorageConnection(leader); + response = connection.scanVertex(partRequest); + } + } catch (Exception e) { LOGGER.error(String.format("Scan vertex failed for %s", e.getMessage()), e); exceptions.add(e); partScanQueue.dropPart(partInfo); countDownLatch.countDown(); return; + } finally { + pool.release(leader, connection); } if (response == null) { @@ -131,7 +148,7 @@ public ScanVertexResult next() throws Exception { } else { handleNullResult(partInfo, exceptions); } - pool.release(addr, connection); + countDownLatch.countDown(); }); } @@ -159,7 +176,7 @@ public ScanVertexResult next() throws Exception { if (!exceptions.isEmpty()) { throwExceptions(exceptions); } - boolean success = (existSuccess.get() == addresses.size()); + boolean success = (existSuccess.get() == addresses.size()); List finalResults = success ? results : null; return new ScanVertexResult(finalResults, ScanStatus.ALL_SUCCESS); } @@ -171,18 +188,20 @@ public ScanVertexResult next() throws Exception { */ public static class ScanVertexResultBuilder { - MetaManager metaManager; - StorageConnPool pool; + MetaManager metaManager; + StorageConnPool pool; Set partScanInfoList; List addresses; ScanVertexRequest request; - String spaceName; - String tagName; - boolean partSuccess = false; + String spaceName; + String tagName; + boolean partSuccess = false; - String user = null; + String user = null; String password = null; + Map storageAddressMapping = null; + public ScanVertexResultBuilder withMetaClient(MetaManager metaManager) { this.metaManager = metaManager; return this; @@ -233,6 +252,12 @@ public ScanVertexResultBuilder withPassword(String password) { return this; } + public ScanVertexResultBuilder withStorageAddressMapping( + Map storageAddressMapping) { + this.storageAddressMapping = storageAddressMapping; + return this; + } + public ScanVertexResultIterator build() { return new ScanVertexResultIterator( metaManager, @@ -244,7 +269,8 @@ public ScanVertexResultIterator build() { tagName, partSuccess, user, - password); + password, + storageAddressMapping); } } }