Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry for scan for leader change #605

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 59 additions & 25 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ public class MetaClient extends AbstractMetaClient {

public static final int LATEST_SCHEMA_VERSION = -1;

private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
private static final int RETRY_TIMES = 1;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
private static final int RETRY_TIMES = 1;

private boolean enableSSL = false;
private SSLParam sslParam = null;
private boolean enableSSL = false;
private SSLParam sslParam = null;

private MetaService.Client client;
private final List<HostAddress> addresses;
private MetaService.Client client;
private final List<HostAddress> addresses;

public MetaClient(String host, int port) throws UnknownHostException {
this(new HostAddress(host, port));
Expand Down Expand Up @@ -122,9 +122,9 @@ public void connect()
*/
private void doConnect()
throws TTransportException, ClientServerIncompatibleException {
Random random = new Random(System.currentTimeMillis());
int position = random.nextInt(addresses.size());
HostAddress address = addresses.get(position);
Random random = new Random(System.currentTimeMillis());
int position = random.nextInt(addresses.size());
HostAddress address = addresses.get(position);
getClient(address.getHost(), address.getPort());
}

Expand Down Expand Up @@ -192,8 +192,8 @@ public void close() {
* @return
*/
public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
ListSpacesReq request = new ListSpacesReq();
int retry = RETRY_TIMES;
ListSpacesReq request = new ListSpacesReq();
ListSpacesResp response = null;
try {
while (retry-- >= 0) {
Expand Down Expand Up @@ -225,7 +225,7 @@ public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedExc
*/
public synchronized SpaceItem getSpace(String spaceName) throws TException,
ExecuteFailedException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetSpaceReq request = new GetSpaceReq();
request.setSpace_name(spaceName.getBytes());
GetSpaceResp response = null;
Expand Down Expand Up @@ -261,8 +261,8 @@ public synchronized List<TagItem> getTags(String spaceName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;

int spaceID = getSpace(spaceName).space_id;
ListTagsReq request = new ListTagsReq(spaceID);
int spaceID = getSpace(spaceName).space_id;
ListTagsReq request = new ListTagsReq(spaceID);
ListTagsResp response = null;
try {
while (retry-- >= 0) {
Expand Down Expand Up @@ -296,9 +296,9 @@ public synchronized List<TagItem> getTags(String spaceName)
*/
public synchronized Schema getTag(String spaceName, String tagName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetTagReq request = new GetTagReq();
int spaceID = getSpace(spaceName).getSpace_id();
int spaceID = getSpace(spaceName).getSpace_id();
request.setSpace_id(spaceID);
request.setTag_name(tagName.getBytes());
request.setVersion(LATEST_SCHEMA_VERSION);
Expand Down Expand Up @@ -335,9 +335,9 @@ public synchronized Schema getTag(String spaceName, String tagName)
*/
public synchronized List<EdgeItem> getEdges(String spaceName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
int spaceID = getSpace(spaceName).getSpace_id();
ListEdgesReq request = new ListEdgesReq(spaceID);
int retry = RETRY_TIMES;
int spaceID = getSpace(spaceName).getSpace_id();
ListEdgesReq request = new ListEdgesReq(spaceID);
ListEdgesResp response = null;
try {
while (retry-- >= 0) {
Expand Down Expand Up @@ -370,9 +370,9 @@ public synchronized List<EdgeItem> getEdges(String spaceName)
*/
public synchronized Schema getEdge(String spaceName, String edgeName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetEdgeReq request = new GetEdgeReq();
int spaceID = getSpace(spaceName).getSpace_id();
int spaceID = getSpace(spaceName).getSpace_id();
request.setSpace_id(spaceID);
request.setEdge_name(edgeName.getBytes());
request.setVersion(LATEST_SCHEMA_VERSION);
Expand Down Expand Up @@ -410,9 +410,9 @@ public synchronized Schema getEdge(String spaceName, String edgeName)
*/
public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
throws ExecuteFailedException, TException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetPartsAllocReq request = new GetPartsAllocReq();
int spaceID = getSpace(spaceName).getSpace_id();
int spaceID = getSpace(spaceName).getSpace_id();
request.setSpace_id(spaceID);

GetPartsAllocResp response = null;
Expand Down Expand Up @@ -442,7 +442,7 @@ public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
* get all Storaged servers
*/
public synchronized Set<HostAddr> listHosts() {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
ListHostsReq request = new ListHostsReq();
request.setType(ListHostType.STORAGE);
ListHostsResp resp = null;
Expand Down Expand Up @@ -471,4 +471,38 @@ public synchronized Set<HostAddr> listHosts() {
}
return hostAddrs;
}

/**
* get the leader parts for all storaged address
*/
public synchronized Set<HostItem> getHostItems() {
int retry = RETRY_TIMES;
ListHostsReq request = new ListHostsReq();
request.setType(ListHostType.ALLOC);
ListHostsResp resp = null;
try {
while (retry-- >= 0) {
resp = client.listHosts(request);
if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
freshClient(resp.getLeader());
} else {
break;
}
}
} catch (TException e) {
LOGGER.error("listHosts error", e);
return null;
}
if (resp.getCode() != ErrorCode.SUCCEEDED) {
LOGGER.error("listHosts execute failed, errorCode: " + resp.getCode());
return null;
}
Set<HostItem> hostItems = new HashSet<>();
for (HostItem hostItem : resp.hosts) {
if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
hostItems.add(hostItem);
}
}
return hostItems;
}
}
27 changes: 12 additions & 15 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.HostItem;
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.codec.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -153,21 +155,16 @@ private void fillMetaInfo() {
if (partLeaders == null) {
partLeaders = new HashMap<>();
}
for (String spaceName : spacesInfo.keySet()) {
if (!partLeaders.containsKey(spaceName)) {
partLeaders.put(spaceName, Maps.newConcurrentMap());
for (int partId : spacesInfo.get(spaceName).partsAlloc.keySet()) {
if (spacesInfo.get(spaceName).partsAlloc.get(partId).size() < 1) {
LOGGER.error("space {} part {} has not allocation host.",
spaceName, partId);
} else {
partLeaders.get(spaceName).put(partId,
spacesInfo
.get(spaceName)
.partsAlloc
.get(partId).get(0));
}

for (HostItem hostItem : metaClient.getHostItems()) {
HostAddr leader = hostItem.getHostAddr();
for (Map.Entry<byte[], List<Integer>> spaceParts
: hostItem.getLeader_parts().entrySet()) {
String space = new String(spaceParts.getKey(), Charsets.UTF_8);
if (!partLeaders.containsKey(space)) {
partLeaders.put(space, Maps.newConcurrentMap());
}
for (int part : spaceParts.getValue()) {
partLeaders.get(space).put(part, leader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ private ScanVertexResultIterator doScanVertex(String spaceName,
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.withStorageAddressMapping(storageAddressMapping)
.build();
}

Expand Down Expand Up @@ -1175,5 +1176,5 @@ private long getEdgeId(String spaceName, String edgeName) {
private static final long DEFAULT_START_TIME = 0;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
private static final boolean DEFAULT_ALLOW_PART_SUCCESS = false;
private static final boolean DEFAULT_ALLOW_READ_FOLLOWER = true;
private static final boolean DEFAULT_ALLOW_READ_FOLLOWER = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package com.vesoft.nebula.client.storage.scan;

import com.facebook.thrift.TException;
import com.google.common.base.Charsets;
import com.vesoft.nebula.DataSet;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.meta.MetaManager;
import com.vesoft.nebula.client.storage.GraphStorageConnection;
Expand All @@ -33,7 +34,7 @@ public class ScanEdgeResultIterator extends ScanResultIterator {
private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class);

private final ScanEdgeRequest request;
private ExecutorService threadPool = null;
private ExecutorService threadPool = null;

private ScanEdgeResultIterator(MetaManager metaManager,
StorageConnPool pool,
Expand All @@ -44,9 +45,10 @@ private ScanEdgeResultIterator(MetaManager metaManager,
String labelName,
boolean partSuccess,
String user,
String password) {
String password,
Map<String, String> storageAddressMapping) {
super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
labelName, partSuccess, user, password);
labelName, partSuccess, user, password, storageAddressMapping);
this.request = request;
}

Expand All @@ -69,13 +71,14 @@ public ScanEdgeResult next() throws Exception {
List<Exception> exceptions =
Collections.synchronizedList(new ArrayList<>(addresses.size()));
CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
AtomicInteger existSuccess = new AtomicInteger(0);
AtomicInteger existSuccess = new AtomicInteger(0);

threadPool = Executors.newFixedThreadPool(addresses.size());
for (HostAddress addr : addresses) {
threadPool.submit(() -> {
HostAddress leader = addr;
ScanResponse response;
PartScanInfo partInfo = partScanQueue.getPart(addr);
PartScanInfo partInfo = partScanQueue.getPart(leader);
// no part need to scan
if (partInfo == null) {
countDownLatch.countDown();
Expand All @@ -85,8 +88,7 @@ public ScanEdgeResult next() throws Exception {

GraphStorageConnection connection;
try {
connection = pool.getStorageConnection(new HostAddress(addr.getHost(),
addr.getPort()));
connection = pool.getStorageConnection(leader);
} catch (Exception e) {
LOGGER.error("get storage client error, ", e);
exceptions.add(e);
Expand All @@ -105,12 +107,25 @@ public ScanEdgeResult next() throws Exception {
partRequest.setNeed_authenticate(true);
try {
response = connection.scanEdge(partRequest);
} catch (TException e) {
if (!response.getResult().failed_parts.isEmpty()
&& response.getResult().failed_parts.get(0).code
== ErrorCode.E_LEADER_CHANGED) {
pool.release(leader, connection);
HostAddr newLeader = response.getResult().failed_parts.get(0).leader;
HostAddr availableLeader = storageAddressMapping
.getOrDefault(newLeader, newLeader);
leader = new HostAddress(availableLeader.host, availableLeader.getPort());
connection = pool.getStorageConnection(leader);
response = connection.scanEdge(partRequest);
}
} catch (Exception e) {
LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), e);
exceptions.add(e);
partScanQueue.dropPart(partInfo);
countDownLatch.countDown();
return;
} finally {
pool.release(leader, connection);
}

if (response == null) {
Expand Down Expand Up @@ -158,7 +173,7 @@ public ScanEdgeResult next() throws Exception {
if (!exceptions.isEmpty()) {
throwExceptions(exceptions);
}
boolean success = (existSuccess.get() == addresses.size());
boolean success = (existSuccess.get() == addresses.size());
List<DataSet> finalResults = success ? results : null;
return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS);
}
Expand All @@ -170,16 +185,17 @@ public ScanEdgeResult next() throws Exception {
*/
public static class ScanEdgeResultBuilder {

MetaManager metaManager;
StorageConnPool pool;
Set<PartScanInfo> partScanInfoList;
List<HostAddress> addresses;
ScanEdgeRequest request;
String spaceName;
String edgeName;
boolean partSuccess = false;
String user = null;
String password = null;
MetaManager metaManager;
StorageConnPool pool;
Set<PartScanInfo> partScanInfoList;
List<HostAddress> addresses;
ScanEdgeRequest request;
String spaceName;
String edgeName;
boolean partSuccess = false;
String user = null;
String password = null;
Map<String, String> storageAddressMapping = null;

public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) {
this.metaManager = metaManager;
Expand Down Expand Up @@ -231,6 +247,12 @@ public ScanEdgeResultBuilder withPassword(String password) {
return this;
}

public ScanEdgeResultBuilder withStorageAddressMapping(
Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
return this;
}

public ScanEdgeResultIterator build() {
return new ScanEdgeResultIterator(
metaManager,
Expand All @@ -242,7 +264,8 @@ public ScanEdgeResultIterator build() {
edgeName,
partSuccess,
user,
password);
password,
storageAddressMapping);
}
}
}
Loading
Loading