From 76643823476351b24564a9733adbee79da9eb69c Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 28 Apr 2020 15:25:52 -0700 Subject: [PATCH 1/2] Integration with Ultrawarm Ultrawarm introduces warm nodes into the ES cluster. Currently, we distribute model partitions to all data nodes in the cluster randomly, which could cause a model performance downgrade issue once warm nodes are throttled due to resource limitations. The PR excludes warm nodes to place model partitions. Since index shards are hosted on hot nodes, AD's coordinating nodes are in hot nodes as well. We don't need to send HourlyCron job and stats requests to warm nodes anymore. This PR implements those changes. Testing done: 1. Verified AD runs only in hot nodes. 2. stats API and HourlyCron still works. --- .../ad/AnomalyDetectorPlugin.java | 17 +++-- .../ad/cluster/ADClusterEventListener.java | 23 +++++-- .../ad/cluster/HashRing.java | 12 ++-- .../ad/cluster/HourlyCron.java | 12 ++-- .../ad/cluster/MasterEventListener.java | 10 ++- .../ad/constant/CommonName.java | 10 +++ .../ad/ml/ModelManager.java | 13 ++-- .../rest/RestStatsAnomalyDetectorAction.java | 17 +++-- .../ad/transport/ADStatsNodeRequest.java | 5 -- .../ad/transport/ADStatsRequest.java | 11 +++ .../StopDetectorTransportAction.java | 11 +-- .../ad/util/ClusterStateUtils.java | 68 +++++++++++++++++++ .../cluster/ADClusterEventListenerTests.java | 23 ++++++- .../ad/cluster/HashRingTests.java | 68 ++++++++++++++----- .../ad/cluster/HourlyCronTests.java | 5 +- .../ad/cluster/MasterEventListenerTests.java | 7 +- .../ad/ml/ModelManagerTests.java | 10 +-- .../ad/transport/ADStatsIT.java | 2 +- .../ad/transport/ADStatsTests.java | 7 +- 19 files changed, 254 insertions(+), 77 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 285ab608..124c6779 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -72,6 +72,7 @@ import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; import com.amazon.opendistroforelasticsearch.ad.util.Throttler; @@ -145,6 +146,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip private ADStats adStats; private NamedXContentRegistry xContentRegistry; private ClientUtil clientUtil; + private ClusterStateUtils clusterStateUtils; static { SpecialPermission.check(); @@ -201,7 +203,11 @@ public List getRestHandlers( clusterService, anomalyDetectorRunner ); - RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(restController, adStats); + RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction( + restController, + adStats, + this.clusterStateUtils + ); RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction( settings, restController, @@ -259,8 +265,9 @@ public Collection createComponents( RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe(); CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME); + this.clusterStateUtils = new ClusterStateUtils(clusterService); ModelManager modelManager = new ModelManager( - clusterService, + clusterStateUtils, jvmService, rcfSerde, checkpoint, @@ -284,7 +291,7 @@ public Collection createComponents( AnomalyDetectorSettings.SHINGLE_SIZE ); - HashRing hashRing = new HashRing(clusterService, clock, settings); + HashRing hashRing = new HashRing(clusterStateUtils, clock, settings); ADStateManager stateManager = new ADStateManager( client, xContentRegistry, @@ -353,11 +360,11 @@ public Collection createComponents( clock, stateManager, runner, - new ADClusterEventListener(clusterService, hashRing, modelManager), + new ADClusterEventListener(clusterService, hashRing, modelManager, clusterStateUtils), deleteUtil, adCircuitBreakerService, adStats, - new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil) + new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java index 1d648ecb..8abb831d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.concurrent.Semaphore; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -32,7 +34,7 @@ public class ADClusterEventListener implements ClusterStateListener { private static final Logger LOG = LogManager.getLogger(ADClusterEventListener.class); - static final String MASTER_NOT_APPLIED_MSG = "AD does not use master nodes"; + static final String NODE_NOT_APPLIED_MSG = "AD does not use master or ultrawarm nodes"; static final String NOT_RECOVERED_MSG = "CLuster is not recovered yet."; static final String IN_PROGRESS_MSG = "Cluster state change in progress, return."; static final String REMOVE_MODEL_MSG = "Remove model"; @@ -43,21 +45,28 @@ public class ADClusterEventListener implements ClusterStateListener { private HashRing hashRing; private ModelManager modelManager; private final ClusterService clusterService; + private final ClusterStateUtils clusterStateUtils; @Inject - public ADClusterEventListener(ClusterService clusterService, HashRing hashRing, ModelManager modelManager) { + public ADClusterEventListener( + ClusterService clusterService, + HashRing hashRing, + ModelManager modelManager, + ClusterStateUtils clusterStateUtils + ) { this.clusterService = clusterService; this.clusterService.addListener(this); this.hashRing = hashRing; this.modelManager = modelManager; this.inProgress = new Semaphore(1); + this.clusterStateUtils = clusterStateUtils; } @Override public void clusterChanged(ClusterChangedEvent event) { - if (!event.state().nodes().getLocalNode().isDataNode()) { - LOG.debug(MASTER_NOT_APPLIED_MSG); + if (clusterStateUtils.isIgnoredNode(event.state().nodes().getLocalNode())) { + LOG.debug(NODE_NOT_APPLIED_MSG); return; } @@ -79,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) { // Check whether it was a data node that was removed boolean dataNodeRemoved = false; for (DiscoveryNode removedNode : delta.removedNodes()) { - if (removedNode.isDataNode()) { + if (!clusterStateUtils.isIgnoredNode(removedNode)) { LOG.info(NODE_REMOVED_MSG + " {}", removedNode.getId()); dataNodeRemoved = true; break; @@ -89,7 +98,7 @@ public void clusterChanged(ClusterChangedEvent event) { // Check whether it was a data node that was added boolean dataNodeAdded = false; for (DiscoveryNode addedNode : delta.addedNodes()) { - if (addedNode.isDataNode()) { + if (!clusterStateUtils.isIgnoredNode(addedNode)) { LOG.info(NODE_ADDED_MSG + " {}", addedNode.getId()); dataNodeAdded = true; break; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java index a7fe1b7d..4c51dc7f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -28,10 +28,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.Murmur3HashFunction; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; import com.carrotsearch.hppc.cursors.ObjectCursor; public class HashRing { @@ -43,7 +43,7 @@ public class HashRing { static final String COOLDOWN_MSG = "Hash ring doesn't respond to cluster state change within the cooldown period."; private final int VIRTUAL_NODE_COUNT = 100; - private final ClusterService clusterService; + private final ClusterStateUtils clusterStateUtils; private TreeMap circle; private Semaphore inProgress; // the UTC epoch milliseconds of the most recent successful update @@ -52,9 +52,9 @@ public class HashRing { private final Clock clock; private AtomicBoolean membershipChangeRequied; - public HashRing(ClusterService clusterService, Clock clock, Settings settings) { + public HashRing(ClusterStateUtils clusterStateUtils, Clock clock, Settings settings) { this.circle = new TreeMap(); - this.clusterService = clusterService; + this.clusterStateUtils = clusterStateUtils; this.inProgress = new Semaphore(1); this.clock = clock; this.coolDownPeriod = COOLDOWN_MINUTES.get(settings); @@ -93,7 +93,7 @@ public boolean build() { TreeMap newCircle = new TreeMap<>(); try { - for (ObjectCursor cursor : clusterService.state().nodes().getDataNodes().values()) { + for (ObjectCursor cursor : clusterStateUtils.getEligibleDataNodes().values()) { DiscoveryNode curNode = cursor.value; for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) { newCircle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java index 16055aeb..156db12b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -21,27 +21,27 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import com.amazon.opendistroforelasticsearch.ad.transport.CronAction; import com.amazon.opendistroforelasticsearch.ad.transport.CronRequest; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; public class HourlyCron implements Runnable { private static final Logger LOG = LogManager.getLogger(HourlyCron.class); static final String SUCCEEDS_LOG_MSG = "Hourly maintenance succeeds"; static final String NODE_EXCEPTION_LOG_MSG = "Hourly maintenance of node has exception"; static final String EXCEPTION_LOG_MSG = "Hourly maintenance has exception."; - private ClusterService clusterService; + private ClusterStateUtils clientStateUtils; private Client client; - public HourlyCron(ClusterService clusterService, Client client) { - this.clusterService = clusterService; + public HourlyCron(Client client, ClusterStateUtils clientStateUtils) { + this.clientStateUtils = clientStateUtils; this.client = client; } @Override public void run() { - DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class); + DiscoveryNode[] dataNodes = clientStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class); // we also add the cancel query function here based on query text from the negative cache. diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index a3735579..2ba4852f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -39,6 +40,7 @@ public class MasterEventListener implements LocalNodeMasterListener { private Client client; private Clock clock; private ClientUtil clientUtil; + private ClusterStateUtils clusterStateUtils; public MasterEventListener( ClusterService clusterService, @@ -46,7 +48,8 @@ public MasterEventListener( DeleteDetector deleteUtil, Client client, Clock clock, - ClientUtil clientUtil + ClientUtil clientUtil, + ClusterStateUtils clusterStateUtils ) { this.clusterService = clusterService; this.threadPool = threadPool; @@ -55,13 +58,14 @@ public MasterEventListener( this.clusterService.addLocalNodeMasterListener(this); this.clock = clock; this.clientUtil = clientUtil; + this.clusterStateUtils = clusterStateUtils; } @Override public void onMaster() { if (hourlyCron == null) { hourlyCron = threadPool - .scheduleWithFixedDelay(new HourlyCron(clusterService, client), TimeValue.timeValueHours(1), executorName()); + .scheduleWithFixedDelay(new HourlyCron(client, clusterStateUtils), TimeValue.timeValueHours(1), executorName()); clusterService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java index b03d4364..d3572dca 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java @@ -31,4 +31,14 @@ public class CommonName { // Anomaly Detector name for X-Opaque-Id header // ====================================== public static final String ANOMALY_DETECTOR = "[Anomaly Detector]"; + + // ====================================== + // Ultrawarm node attributes + // ====================================== + + // warm node + public static String WARM_BOX_TYPE = "warm"; + + // box type + public static final String BOX_TYPE_KEY = "box_type"; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java index e334645e..ef938476 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java @@ -42,7 +42,6 @@ import com.google.gson.Gson; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.monitor.jvm.JvmService; import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException; @@ -50,6 +49,7 @@ import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; import com.amazon.opendistroforelasticsearch.ad.ml.rcf.CombinedRcfResult; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.serialize.RandomCutForestSerDe; @@ -103,7 +103,7 @@ public String getName() { private final Duration checkpointInterval; // dependencies - private final ClusterService clusterService; + private final ClusterStateUtils clusterStateUtils; private final JvmService jvmService; private final RandomCutForestSerDe rcfSerde; private final CheckpointDao checkpointDao; @@ -119,7 +119,7 @@ public String getName() { /** * Constructor. * - * @param clusterService cluster info + * @param clusterStateUtils cluster info * @param jvmService jvm info * @param rcfSerde RCF model serialization * @param checkpointDao model checkpoint storage @@ -140,9 +140,10 @@ public String getName() { * @param minPreviewSize minimum number of data points for preview * @param modelTtl time to live for hosted models * @param checkpointInterval interval between checkpoints + * @param shingleSize size of a shingle */ public ModelManager( - ClusterService clusterService, + ClusterStateUtils clusterStateUtils, JvmService jvmService, RandomCutForestSerDe rcfSerde, CheckpointDao checkpointDao, @@ -166,7 +167,7 @@ public ModelManager( int shingleSize ) { - this.clusterService = clusterService; + this.clusterStateUtils = clusterStateUtils; this.jvmService = jvmService; this.rcfSerde = rcfSerde; this.checkpointDao = checkpointDao; @@ -259,7 +260,7 @@ public Entry getPartitionedForestSizes(RandomCutForest forest, int numPartitions = (int) Math.ceil((double) totalSize / (double) partitionSize); int forestSize = (int) Math.ceil((double) forest.getNumberOfTrees() / (double) numPartitions); - int numNodes = clusterService.state().nodes().getDataNodes().size(); + int numNodes = clusterStateUtils.getEligibleDataNodes().size(); if (numPartitions > numNodes) { // partition by cluster size partitionSize = (long) Math.ceil((double) totalSize / (double) numNodes); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java index 408f0b3b..e8cd7b23 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java @@ -18,7 +18,10 @@ import com.amazon.opendistroforelasticsearch.ad.stats.ADStats; import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction; import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; + import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.rest.BaseRestHandler; @@ -39,19 +42,22 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler { private static final String STATS_ANOMALY_DETECTOR_ACTION = "stats_anomaly_detector"; private ADStats adStats; + private ClusterStateUtils clusterStateUtils; /** * Constructor * * @param controller Rest Controller * @param adStats ADStats object + * @param clusterStateUtils util to get eligible data nodes */ - public RestStatsAnomalyDetectorAction(RestController controller, ADStats adStats) { + public RestStatsAnomalyDetectorAction(RestController controller, ADStats adStats, ClusterStateUtils clusterStateUtils) { controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/", this); controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/{stat}", this); controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/", this); controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/{stat}", this); this.adStats = adStats; + this.clusterStateUtils = clusterStateUtils; } @Override @@ -73,15 +79,18 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli */ private ADStatsRequest getRequest(RestRequest request) { // parse the nodes the user wants to query the stats for - String[] nodeIdsArr = null; String nodesIdsStr = request.param("nodeId"); Set validStats = adStats.getStats().keySet(); + ADStatsRequest adStatsRequest = null; if (!Strings.isEmpty(nodesIdsStr)) { - nodeIdsArr = nodesIdsStr.split(","); + String[] nodeIdsArr = nodesIdsStr.split(","); + adStatsRequest = new ADStatsRequest(nodeIdsArr); + } else { + DiscoveryNode[] dataNodes = clusterStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class); + adStatsRequest = new ADStatsRequest(dataNodes); } - ADStatsRequest adStatsRequest = new ADStatsRequest(nodeIdsArr); adStatsRequest.timeout(request.param("timeout")); // parse the stats the user wants to see diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java index 7036ca4c..d8b85a81 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java @@ -57,11 +57,6 @@ public ADStatsRequest getADStatsRequest() { return request; } - public void readFrom(StreamInput in) throws IOException { - request = new ADStatsRequest(); - request.readFrom(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java index d7de18dc..d21422c8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.ad.transport; import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -50,6 +51,16 @@ public ADStatsRequest(String... nodeIds) { statsToBeRetrieved = new HashSet<>(); } + /** + * Constructor + * + * @param nodes nodes of nodes' stats to be retrieved + */ + public ADStatsRequest(DiscoveryNode... nodes) { + super(nodes); + statsToBeRetrieved = new HashSet<>(); + } + /** * Adds a stat to the set of stats to be retrieved * diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java index d434837e..c368b279 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java @@ -16,6 +16,8 @@ package com.amazon.opendistroforelasticsearch.ad.transport; import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -27,7 +29,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -37,18 +38,18 @@ public class StopDetectorTransportAction extends HandledTransportAction { if (response.hasFailures()) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java new file mode 100644 index 00000000..2640387f --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.inject.Inject; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + +public class ClusterStateUtils { + private static final Logger LOG = LogManager.getLogger(ClusterStateUtils.class); + private final ClusterService clusterService; + private final Map ignoredAttributes = new HashMap(); + + @Inject + public ClusterStateUtils(ClusterService clusterService) { + this.clusterService = clusterService; + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + } + + public ImmutableOpenMap getEligibleDataNodes() { + ImmutableOpenMap dataNodes = clusterService.state().nodes().getDataNodes(); + ImmutableOpenMap.Builder modelNodes = ImmutableOpenMap.builder(); + + for (Iterator> it = dataNodes.iterator(); it.hasNext();) { + ObjectObjectCursor cursor = it.next(); + if (!isIgnoredNode(cursor.value)) { + modelNodes.put(cursor.key, cursor.value); + } + } + return modelNodes.build(); + } + + public boolean isIgnoredNode(DiscoveryNode node) { + if (!node.isDataNode()) { + return true; + } + for (Map.Entry entry : ignoredAttributes.entrySet()) { + String attribute = node.getAttributes().get(entry.getKey()); + if (attribute != null && attribute.equals(entry.getValue())) { + return true; + } + } + return false; + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java index ff8c54b9..9b20e09e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Matchers.any; +import java.util.HashMap; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -32,7 +33,9 @@ import static java.util.Collections.emptySet; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -61,6 +64,7 @@ public class ADClusterEventListenerTests extends AbstractADTest { private ClusterState newClusterState; private DiscoveryNode masterNode; private DiscoveryNode dataNode1; + private ClusterStateUtils clusterStateUtils; @BeforeClass public static void setUpBeforeClass() { @@ -81,6 +85,7 @@ public void setUp() throws Exception { hashRing = mock(HashRing.class); when(hashRing.build()).thenReturn(true); modelManager = mock(ModelManager.class); + clusterStateUtils = new ClusterStateUtils(clusterService); masterNode = new DiscoveryNode(masterNodeId, buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); dataNode1 = new DiscoveryNode(dataNode1Id, buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT); oldClusterState = ClusterState @@ -92,7 +97,7 @@ public void setUp() throws Exception { .nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1)) .build(); - listener = new ADClusterEventListener(clusterService, hashRing, modelManager); + listener = new ADClusterEventListener(clusterService, hashRing, modelManager, clusterStateUtils); } @Override @@ -109,7 +114,21 @@ public void tearDown() throws Exception { public void testIsMasterNode() { listener.clusterChanged(new ClusterChangedEvent("foo", oldClusterState, oldClusterState)); - assertTrue(testAppender.containsMessage(ADClusterEventListener.MASTER_NOT_APPLIED_MSG)); + assertTrue(testAppender.containsMessage(ADClusterEventListener.NODE_NOT_APPLIED_MSG)); + } + + public void testIsWarmNode() { + HashMap attributesForNode1 = new HashMap<>(); + attributesForNode1.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + dataNode1 = new DiscoveryNode(dataNode1Id, buildNewFakeTransportAddress(), attributesForNode1, BUILT_IN_ROLES, Version.CURRENT); + + ClusterState warmNodeClusterState = ClusterState + .builder(new ClusterName(clusterName)) + .nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1)) + .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) + .build(); + listener.clusterChanged(new ClusterChangedEvent("foo", warmNodeClusterState, oldClusterState)); + assertTrue(testAppender.containsMessage(ADClusterEventListener.NODE_NOT_APPLIED_MSG)); } public void testNotRecovered() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java index d50b2f1d..b3acdb7b 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -24,11 +24,16 @@ import static org.mockito.Mockito.when; import java.util.List; +import java.util.Map; import java.util.Optional; import java.time.Clock; import java.util.ArrayList; +import java.util.HashMap; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; + import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,11 +49,38 @@ public class HashRingTests extends AbstractADTest { private ClusterService clusterService; + private ClusterStateUtils clusterStateUtils; private Settings settings; private Clock clock; - private DiscoveryNode createNode(String nodeId) { - return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT); + private DiscoveryNode createNode(String nodeId, Map attributes) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), attributes, BUILT_IN_ROLES, Version.CURRENT); + } + + private void setNodeState() { + setNodeState(emptyMap()); + } + + private void setNodeState(Map attributesForNode1) { + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List discoveryNodes = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + DiscoveryNode node = null; + if (i != 1) { + node = createNode(Integer.toString(i), emptyMap()); + } else { + node = createNode(Integer.toString(i), attributesForNode1); + } + + discoBuilder = discoBuilder.add(node); + discoveryNodes.add(node); + } + discoBuilder.localNodeId("1"); + discoBuilder.masterNodeId("0"); + ClusterState.Builder stateBuilder = ClusterState.builder(clusterService.getClusterName()); + stateBuilder.nodes(discoBuilder); + ClusterState clusterState = stateBuilder.build(); + setState(clusterService.getClusterApplierService(), clusterState); } @BeforeClass @@ -67,19 +99,7 @@ public void setUp() throws Exception { super.setUp(); super.setUpLog4jForJUnit(HashRing.class); clusterService = createClusterService(threadPool); - DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); - List discoveryNodes = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - final DiscoveryNode node = createNode(Integer.toString(i)); - discoBuilder = discoBuilder.add(node); - discoveryNodes.add(node); - } - discoBuilder.localNodeId(randomFrom(discoveryNodes).getId()); - discoBuilder.masterNodeId(randomFrom(discoveryNodes).getId()); - ClusterState.Builder stateBuilder = ClusterState.builder(clusterService.getClusterName()); - stateBuilder.nodes(discoBuilder); - ClusterState clusterState = stateBuilder.build(); - setState(clusterService.getClusterApplierService(), clusterState); + clusterStateUtils = new ClusterStateUtils(clusterService); settings = Settings .builder() @@ -98,7 +118,9 @@ public void tearDown() throws Exception { } public void testGetOwningNode() { - HashRing ring = new HashRing(clusterService, clock, settings); + setNodeState(); + + HashRing ring = new HashRing(clusterStateUtils, clock, settings); Optional node = ring.getOwningNode("http-latency-rcf-1"); assertTrue(node.isPresent()); String id = node.get().getId(); @@ -110,4 +132,16 @@ public void testGetOwningNode() { assertEquals(node, node2); assertTrue(testAppender.containsMessage(HashRing.COOLDOWN_MSG)); } + + public void testWarmNodeExcluded() { + HashMap attributesForNode1 = new HashMap<>(); + attributesForNode1.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + setNodeState(attributesForNode1); + + HashRing ring = new HashRing(clusterStateUtils, clock, settings); + Optional node = ring.getOwningNode("http-latency-rcf-1"); + assertTrue(node.isPresent()); + String id = node.get().getId(); + assertTrue(id.equals("2")); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java index 0b979df4..b0af9aeb 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java @@ -29,6 +29,8 @@ import com.amazon.opendistroforelasticsearch.ad.transport.CronAction; import com.amazon.opendistroforelasticsearch.ad.transport.CronNodeResponse; import com.amazon.opendistroforelasticsearch.ad.transport.CronResponse; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; @@ -59,6 +61,7 @@ public void templateHourlyCron(HourlyCronTestExecutionMode mode) { ClusterService clusterService = mock(ClusterService.class); ClusterState state = ClusterCreation.state(1); when(clusterService.state()).thenReturn(state); + ClusterStateUtils stateUtils = new ClusterStateUtils(clusterService); Client client = mock(Client.class); doAnswer(invocation -> { @@ -104,7 +107,7 @@ public void templateHourlyCron(HourlyCronTestExecutionMode mode) { return null; }).when(client).execute(eq(CronAction.INSTANCE), any(), any()); - HourlyCron cron = new HourlyCron(clusterService, client); + HourlyCron cron = new HourlyCron(client, stateUtils); cron.run(); Logger LOG = LogManager.getLogger(HourlyCron.class); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 0a0a4a17..e904062b 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -28,6 +28,8 @@ import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; + import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -46,6 +48,7 @@ public class MasterEventListenerTests extends AbstractADTest { private Cancellable dailyCancellable; private MasterEventListener masterService; private ClientUtil clientUtil; + private ClusterStateUtils clusterStateUtils; @Override @Before @@ -62,7 +65,9 @@ public void setUp() throws Exception { client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); - masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil); + clusterStateUtils = new ClusterStateUtils(clusterService); + + masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils); } public void testOnOffMaster() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java index 81242a49..3d3ee070 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException; import com.amazon.opendistroforelasticsearch.ad.ml.rcf.CombinedRcfResult; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; import com.google.gson.Gson; import junitparams.JUnitParamsRunner; @@ -39,7 +40,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.monitor.jvm.JvmService; @@ -92,7 +92,7 @@ public class ModelManagerTests { private AnomalyDetector anomalyDetector; @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ClusterService clusterService; + private ClusterStateUtils clusterStateUtils; @Mock(answer = Answers.RETURNS_DEEP_STUBS) private JvmService jvmService; @@ -167,7 +167,7 @@ public void setup() { modelManager = spy( new ModelManager( - clusterService, + clusterStateUtils, jvmService, rcfSerde, checkpointDao, @@ -283,7 +283,7 @@ public void getPartitionedForestSizes_returnExpected( when(modelManager.estimateModelSize(rcf)).thenReturn(totalModelSize); when(jvmService.info().getMem().getHeapMax().getBytes()).thenReturn(heapSize); - when(clusterService.state().nodes().getDataNodes()).thenReturn(dataNodes); + when(clusterStateUtils.getEligibleDataNodes()).thenReturn(dataNodes); assertEquals(expected, modelManager.getPartitionedForestSizes(rcf, "id")); } @@ -304,7 +304,7 @@ public void getPartitionedForestSizes_throwLimitExceeded( ) { when(modelManager.estimateModelSize(rcf)).thenReturn(totalModelSize); when(jvmService.info().getMem().getHeapMax().getBytes()).thenReturn(heapSize); - when(clusterService.state().nodes().getDataNodes()).thenReturn(dataNodes); + when(clusterStateUtils.getEligibleDataNodes()).thenReturn(dataNodes); modelManager.getPartitionedForestSizes(rcf, "id"); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java index 903922bd..6659900e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java @@ -37,7 +37,7 @@ protected Collection> transportClientPlugins() { } public void testNormalADStats() throws ExecutionException, InterruptedException { - ADStatsRequest adStatsRequest = new ADStatsRequest(); + ADStatsRequest adStatsRequest = new ADStatsRequest(new String[0]); ADStatsResponse response = client().execute(ADStatsAction.INSTANCE, adStatsRequest).get(); assertTrue("getting stats failed", !response.hasFailures()); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java index 55dca10f..5b97d2f5 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java @@ -50,6 +50,7 @@ public class ADStatsTests extends ESTestCase { Map clusterStats; DiscoveryNode discoveryNode1; + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -72,7 +73,7 @@ public void testADStatsNodeRequest() throws IOException { ADStatsNodeRequest adStatsNodeRequest1 = new ADStatsNodeRequest(); assertNull("ADStatsNodeRequest default constructor failed", adStatsNodeRequest1.getADStatsRequest()); - ADStatsRequest adStatsRequest = new ADStatsRequest(); + ADStatsRequest adStatsRequest = new ADStatsRequest(new String[0]); ADStatsNodeRequest adStatsNodeRequest2 = new ADStatsNodeRequest(adStatsRequest); assertEquals("ADStatsNodeRequest has the wrong ADStatsRequest", adStatsNodeRequest2.getADStatsRequest(), adStatsRequest); @@ -80,7 +81,7 @@ public void testADStatsNodeRequest() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); adStatsNodeRequest2.writeTo(output); StreamInput streamInput = output.bytes().streamInput(); - adStatsNodeRequest1.readFrom(streamInput); + adStatsNodeRequest1 = new ADStatsNodeRequest(streamInput); assertEquals( "readStats failed", adStatsNodeRequest2.getADStatsRequest().getStatsToBeRetrieved(), @@ -117,7 +118,7 @@ public void testADStatsNodeResponse() throws IOException, JsonPathNotFoundExcept @Test public void testADStatsRequest() throws IOException { List allStats = Arrays.stream(StatNames.values()).map(StatNames::getName).collect(Collectors.toList()); - ADStatsRequest adStatsRequest = new ADStatsRequest(); + ADStatsRequest adStatsRequest = new ADStatsRequest(new String[0]); // Test clear() adStatsRequest.clear(); From 4e74ce382d1bb794c8ca0fd4ec5d5487619a5f12 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Wed, 29 Apr 2020 13:53:54 -0700 Subject: [PATCH 2/2] Address Lai's comments --- .../ad/AnomalyDetectorPlugin.java | 9 ++++-- .../ad/cluster/HourlyCron.java | 8 ++--- .../ad/util/ClusterStateUtils.java | 30 +++++++++++++++---- .../cluster/ADClusterEventListenerTests.java | 5 +++- .../ad/cluster/HashRingTests.java | 4 ++- .../ad/cluster/HourlyCronTests.java | 6 +++- .../ad/cluster/MasterEventListenerTests.java | 7 ++++- 7 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 124c6779..7f5da0e1 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -92,6 +92,7 @@ import org.elasticsearch.cluster.metadata.MetaData.Custom; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -122,6 +123,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -265,7 +267,9 @@ public Collection createComponents( RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe(); CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME); - this.clusterStateUtils = new ClusterStateUtils(clusterService); + HashMap ignoredAttributes = new HashMap<>(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + this.clusterStateUtils = new ClusterStateUtils(clusterService, ignoredAttributes); ModelManager modelManager = new ModelManager( clusterStateUtils, jvmService, @@ -364,7 +368,8 @@ public Collection createComponents( deleteUtil, adCircuitBreakerService, adStats, - new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils) + new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils), + clusterStateUtils ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java index 156db12b..4a47c7e0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java @@ -31,17 +31,17 @@ public class HourlyCron implements Runnable { static final String SUCCEEDS_LOG_MSG = "Hourly maintenance succeeds"; static final String NODE_EXCEPTION_LOG_MSG = "Hourly maintenance of node has exception"; static final String EXCEPTION_LOG_MSG = "Hourly maintenance has exception."; - private ClusterStateUtils clientStateUtils; + private ClusterStateUtils clusterStateUtils; private Client client; - public HourlyCron(Client client, ClusterStateUtils clientStateUtils) { - this.clientStateUtils = clientStateUtils; + public HourlyCron(Client client, ClusterStateUtils clusterStateUtils) { + this.clusterStateUtils = clusterStateUtils; this.client = client; } @Override public void run() { - DiscoveryNode[] dataNodes = clientStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class); + DiscoveryNode[] dataNodes = clusterStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class); // we also add the cancel query function here based on query text from the negative cache. diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java index 2640387f..6c94eb75 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClusterStateUtils.java @@ -26,20 +26,36 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; -import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +/** + * Util methods for cluster state + * + */ public class ClusterStateUtils { private static final Logger LOG = LogManager.getLogger(ClusterStateUtils.class); private final ClusterService clusterService; - private final Map ignoredAttributes = new HashMap(); + private final HashMap ignoredAttributes; + // We need @Inject because StopDetectorTransportAction needs this class. + // Transport action constructor uses Guice to find injected dependencies. + // Dependency classes must have either one (and only one) constructor + // annotated with @Inject or a zero-argument constructor. Otherwise, ES cannot start. @Inject - public ClusterStateUtils(ClusterService clusterService) { + public ClusterStateUtils(ClusterService clusterService, HashMap ignoredAttributes) { this.clusterService = clusterService; - ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + this.ignoredAttributes = ignoredAttributes; } + /** + * Find nodes that are elibile to be used by us. For example, Ultrawarm + * introduces warm nodes into the ES cluster. Currently, we distribute + * model partitions to all data nodes in the cluster randomly, which + * could cause a model performance downgrade issue once warm nodes + * are throttled due to resource limitations. The PR excludes warm node + * s to place model partitions. + * @return a immutable map of eligible data nodes + */ public ImmutableOpenMap getEligibleDataNodes() { ImmutableOpenMap dataNodes = clusterService.state().nodes().getDataNodes(); ImmutableOpenMap.Builder modelNodes = ImmutableOpenMap.builder(); @@ -53,13 +69,17 @@ public ImmutableOpenMap getEligibleDataNodes() { return modelNodes.build(); } + /** + * @param node a discovery node + * @return whether we should ignore this node or not for AD + */ public boolean isIgnoredNode(DiscoveryNode node) { if (!node.isDataNode()) { return true; } for (Map.Entry entry : ignoredAttributes.entrySet()) { String attribute = node.getAttributes().get(entry.getKey()); - if (attribute != null && attribute.equals(entry.getValue())) { + if (entry.getValue().equals(attribute)) { return true; } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java index 9b20e09e..8931b21f 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -85,7 +86,9 @@ public void setUp() throws Exception { hashRing = mock(HashRing.class); when(hashRing.build()).thenReturn(true); modelManager = mock(ModelManager.class); - clusterStateUtils = new ClusterStateUtils(clusterService); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + clusterStateUtils = new ClusterStateUtils(clusterService, ignoredAttributes); masterNode = new DiscoveryNode(masterNodeId, buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); dataNode1 = new DiscoveryNode(dataNode1Id, buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT); oldClusterState = ClusterState diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java index b3acdb7b..61075e84 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java @@ -99,7 +99,9 @@ public void setUp() throws Exception { super.setUp(); super.setUpLog4jForJUnit(HashRing.class); clusterService = createClusterService(threadPool); - clusterStateUtils = new ClusterStateUtils(clusterService); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + clusterStateUtils = new ClusterStateUtils(clusterService, ignoredAttributes); settings = Settings .builder() diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java index b0af9aeb..41a673d6 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java @@ -24,8 +24,10 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.transport.CronAction; import com.amazon.opendistroforelasticsearch.ad.transport.CronNodeResponse; import com.amazon.opendistroforelasticsearch.ad.transport.CronResponse; @@ -61,7 +63,9 @@ public void templateHourlyCron(HourlyCronTestExecutionMode mode) { ClusterService clusterService = mock(ClusterService.class); ClusterState state = ClusterCreation.state(1); when(clusterService.state()).thenReturn(state); - ClusterStateUtils stateUtils = new ClusterStateUtils(clusterService); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + ClusterStateUtils stateUtils = new ClusterStateUtils(clusterService, ignoredAttributes); Client client = mock(Client.class); doAnswer(invocation -> { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index e904062b..fd574d0a 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -25,8 +25,11 @@ import java.time.Clock; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils; @@ -65,7 +68,9 @@ public void setUp() throws Exception { client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); - clusterStateUtils = new ClusterStateUtils(clusterService); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + clusterStateUtils = new ClusterStateUtils(clusterService, ignoredAttributes); masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils); }