Skip to content

Commit

Permalink
优化shard级别变更时更新qrs的判断逻辑 (#349)
Browse files Browse the repository at this point in the history
fix:#346
  • Loading branch information
Huaixinww authored Dec 7, 2023
1 parent ec313bc commit 47d93d2
Showing 1 changed file with 33 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -40,13 +39,15 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.cluster.ClusterChangedEvent;
import org.havenask.cluster.ClusterState;
import org.havenask.cluster.ClusterStateApplier;
import org.havenask.cluster.metadata.IndexMetadata;
import org.havenask.cluster.node.DiscoveryNode;
import org.havenask.cluster.routing.IndexRoutingTable;
import org.havenask.cluster.routing.RoutingNode;
import org.havenask.cluster.routing.ShardRouting;
import org.havenask.cluster.service.ClusterService;
Expand All @@ -66,6 +67,7 @@
import org.havenask.engine.rpc.UpdateHeartbeatTargetRequest;
import org.havenask.engine.util.RangeUtil;
import org.havenask.engine.util.Utils;
import org.havenask.index.Index;
import org.havenask.threadpool.ThreadPool;

public class MetaDataSyncer extends AbstractLifecycleComponent implements ClusterStateApplier {
Expand Down Expand Up @@ -317,20 +319,24 @@ protected String getThreadPool() {

@Override
public void applyClusterState(ClusterChangedEvent event) {
if (isIngestNode && shouldUpdateQrs(event.previousState(), event.state())) {
// update qrs target
setQrsPendingSync();
try {
if (isIngestNode && shouldUpdateQrs(event)) {
// update qrs target
setQrsPendingSync();
}
} catch (Throwable e) {
LOGGER.error("error when update qrs target: ", e);
}
}

private boolean shouldUpdateQrs(ClusterState prevClusterState, ClusterState curClusterState) {
private boolean shouldUpdateQrs(ClusterChangedEvent event) {
// check 是否有索引级别的增删
if (isHavenaskIndexChanged(prevClusterState, curClusterState)) {
if (isHavenaskIndexChanged(event)) {
return true;
}

// TODO: check 分片的搬迁是否要更新qrs
if (isHavenaskShardChanged(prevClusterState, curClusterState)) {
// check shard级别的变更
if (isHavenaskShardChanged(event.previousState(), event.state())) {
return true;
}

Expand Down Expand Up @@ -611,45 +617,38 @@ private synchronized void generateDefaultBizConfig(List<String> indexList) throw
);
}

private boolean isHavenaskIndexChanged(ClusterState prevClusterState, ClusterState curClusterState) {
Set<String> prevIndexNamesSet = new HashSet<>(Arrays.asList(prevClusterState.metadata().indices().keys().toArray(String.class)));
Set<String> currentIndexNamesSet = new HashSet<>(Arrays.asList(curClusterState.metadata().indices().keys().toArray(String.class)));
Set<String> prevDiff = new HashSet<>(prevIndexNamesSet);
Set<String> curDiff = new HashSet<>(currentIndexNamesSet);
prevDiff.removeAll(currentIndexNamesSet);
curDiff.removeAll(prevIndexNamesSet);

for (String indexName : prevDiff) {
IndexMetadata indexMetadata = prevClusterState.metadata().index(indexName);
private boolean isHavenaskIndexChanged(ClusterChangedEvent event) {
List<Index> indicesDeleted = event.indicesDeleted();
List<String> indicesCreated = event.indicesCreated();
for (Index index : indicesDeleted) {
IndexMetadata indexMetadata = event.previousState().getMetadata().index(index);
if (EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
return true;
}
}

for (String indexName : curDiff) {
IndexMetadata indexMetadata = curClusterState.metadata().index(indexName);
for (String index : indicesCreated) {
IndexMetadata indexMetadata = event.state().metadata().index(index);
if (EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
return true;
}
}

return false;
}

private boolean isHavenaskShardChanged(ClusterState prevClusterState, ClusterState curClusterState) {
// TODO : 识别shard搬迁的case
for (RoutingNode routingNode : prevClusterState.getRoutingNodes()) {
for (ShardRouting shardRouting : routingNode) {
IndexMetadata indexMetadata = prevClusterState.metadata().index(shardRouting.index().getName());
if (false == EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
continue;
}
ShardRouting curShardRouting = curClusterState.getRoutingNodes()
.node(routingNode.nodeId())
.getByShardId(shardRouting.shardId());
if (curShardRouting == null || curShardRouting.getTargetRelocatingShard() != null) {
return true;
}
for (ObjectCursor<String> indexNameCursor : prevClusterState.routingTable().indicesRouting().keys()) {
String indexName = indexNameCursor.value;
IndexMetadata indexMetadata = prevClusterState.metadata().index(indexName);
if (false == EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
continue;
}
IndexRoutingTable prevIndexRoutingTable = prevClusterState.routingTable().indicesRouting().get(indexName);
IndexRoutingTable curIndexRoutingTable = curClusterState.routingTable().indicesRouting().get(indexName);

// TODO: shard级别的判断变更逻辑,目前使用IndexRoutingTable的equals方法,比较index以及shards是否相等,考虑后续优化
if (false == prevIndexRoutingTable.equals(curIndexRoutingTable)) {
return true;
}
}
return false;
Expand Down

0 comments on commit 47d93d2

Please sign in to comment.