From 47d93d2e76544c35b26410745f3957bbf78582b2 Mon Sep 17 00:00:00 2001 From: Huaixinww <141887897+Huaixinww@users.noreply.github.com> Date: Thu, 7 Dec 2023 15:27:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96shard=E7=BA=A7=E5=88=AB?= =?UTF-8?q?=E5=8F=98=E6=9B=B4=E6=97=B6=E6=9B=B4=E6=96=B0qrs=E7=9A=84?= =?UTF-8?q?=E5=88=A4=E6=96=AD=E9=80=BB=E8=BE=91=20(#349)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix:https://github.com/alibaba/havenask-federation/issues/346 --- .../org/havenask/engine/MetaDataSyncer.java | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java index ff874a8e..3f12cea7 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/MetaDataSyncer.java @@ -24,7 +24,6 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -40,6 +39,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.havenask.cluster.ClusterChangedEvent; @@ -47,6 +47,7 @@ import org.havenask.cluster.ClusterStateApplier; import org.havenask.cluster.metadata.IndexMetadata; import org.havenask.cluster.node.DiscoveryNode; +import org.havenask.cluster.routing.IndexRoutingTable; import org.havenask.cluster.routing.RoutingNode; import org.havenask.cluster.routing.ShardRouting; import org.havenask.cluster.service.ClusterService; @@ -66,6 +67,7 @@ import org.havenask.engine.rpc.UpdateHeartbeatTargetRequest; import org.havenask.engine.util.RangeUtil; import org.havenask.engine.util.Utils; +import org.havenask.index.Index; import org.havenask.threadpool.ThreadPool; public class MetaDataSyncer extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -317,20 +319,24 @@ protected String getThreadPool() { @Override public void applyClusterState(ClusterChangedEvent event) { - if (isIngestNode && shouldUpdateQrs(event.previousState(), event.state())) { - // update qrs target - setQrsPendingSync(); + try { + if (isIngestNode && shouldUpdateQrs(event)) { + // update qrs target + setQrsPendingSync(); + } + } catch (Throwable e) { + LOGGER.error("error when update qrs target: ", e); } } - private boolean shouldUpdateQrs(ClusterState prevClusterState, ClusterState curClusterState) { + private boolean shouldUpdateQrs(ClusterChangedEvent event) { // check 是否有索引级别的增删 - if (isHavenaskIndexChanged(prevClusterState, curClusterState)) { + if (isHavenaskIndexChanged(event)) { return true; } - // TODO: check 分片的搬迁是否要更新qrs - if (isHavenaskShardChanged(prevClusterState, curClusterState)) { + // check shard级别的变更 + if (isHavenaskShardChanged(event.previousState(), event.state())) { return true; } @@ -611,45 +617,38 @@ private synchronized void generateDefaultBizConfig(List indexList) throw ); } - private boolean isHavenaskIndexChanged(ClusterState prevClusterState, ClusterState curClusterState) { - Set prevIndexNamesSet = new HashSet<>(Arrays.asList(prevClusterState.metadata().indices().keys().toArray(String.class))); - Set currentIndexNamesSet = new HashSet<>(Arrays.asList(curClusterState.metadata().indices().keys().toArray(String.class))); - Set prevDiff = new HashSet<>(prevIndexNamesSet); - Set curDiff = new HashSet<>(currentIndexNamesSet); - prevDiff.removeAll(currentIndexNamesSet); - curDiff.removeAll(prevIndexNamesSet); - - for (String indexName : prevDiff) { - IndexMetadata indexMetadata = prevClusterState.metadata().index(indexName); + private boolean isHavenaskIndexChanged(ClusterChangedEvent event) { + List indicesDeleted = event.indicesDeleted(); + List indicesCreated = event.indicesCreated(); + for (Index index : indicesDeleted) { + IndexMetadata indexMetadata = event.previousState().getMetadata().index(index); if (EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) { return true; } } - for (String indexName : curDiff) { - IndexMetadata indexMetadata = curClusterState.metadata().index(indexName); + for (String index : indicesCreated) { + IndexMetadata indexMetadata = event.state().metadata().index(index); if (EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) { return true; } } - return false; } private boolean isHavenaskShardChanged(ClusterState prevClusterState, ClusterState curClusterState) { - // TODO : 识别shard搬迁的case - for (RoutingNode routingNode : prevClusterState.getRoutingNodes()) { - for (ShardRouting shardRouting : routingNode) { - IndexMetadata indexMetadata = prevClusterState.metadata().index(shardRouting.index().getName()); - if (false == EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) { - continue; - } - ShardRouting curShardRouting = curClusterState.getRoutingNodes() - .node(routingNode.nodeId()) - .getByShardId(shardRouting.shardId()); - if (curShardRouting == null || curShardRouting.getTargetRelocatingShard() != null) { - return true; - } + for (ObjectCursor indexNameCursor : prevClusterState.routingTable().indicesRouting().keys()) { + String indexName = indexNameCursor.value; + IndexMetadata indexMetadata = prevClusterState.metadata().index(indexName); + if (false == EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) { + continue; + } + IndexRoutingTable prevIndexRoutingTable = prevClusterState.routingTable().indicesRouting().get(indexName); + IndexRoutingTable curIndexRoutingTable = curClusterState.routingTable().indicesRouting().get(indexName); + + // TODO: shard级别的判断变更逻辑,目前使用IndexRoutingTable的equals方法,比较index以及shards是否相等,考虑后续优化 + if (false == prevIndexRoutingTable.equals(curIndexRoutingTable)) { + return true; } } return false;