diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 484e6f6e..7f003a68 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -73,6 +73,7 @@ import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension; @@ -147,6 +148,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip private ADStats adStats; private NamedXContentRegistry xContentRegistry; private ClientUtil clientUtil; + private DiscoveryNodeFilterer nodeFilter; static { SpecialPermission.check(); @@ -183,34 +185,27 @@ public List getRestHandlers( jobRunner.setSettings(settings); AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry); - RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction( - restController, - profileRunner, - ProfileName.getNames() - ); + RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner, ProfileName.getNames()); RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction( settings, - restController, clusterService, anomalyDetectionIndices ); - RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction(settings, restController); - RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction(settings, restController); - RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(restController, clusterService); + RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction(); + RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction(); + RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(clusterService); RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction( settings, - restController, clusterService, anomalyDetectorRunner ); RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction( - restController, - clusterService, - adStats + adStats, + this.nodeFilter, + this.clusterService ); RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction( settings, - restController, clusterService, anomalyDetectionIndices ); @@ -253,7 +248,7 @@ public Collection createComponents( Throttler throttler = new Throttler(clock); this.clientUtil = new ClientUtil(settings, client, throttler, threadPool); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); - anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil); + anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings); this.clusterService = clusterService; this.xContentRegistry = xContentRegistry; @@ -266,8 +261,10 @@ public Collection createComponents( RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe(); CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME); + this.nodeFilter = new DiscoveryNodeFilterer(this.clusterService); + ModelManager modelManager = new ModelManager( - clusterService, + nodeFilter, jvmService, rcfSerde, checkpoint, @@ -291,7 +288,7 @@ public Collection createComponents( AnomalyDetectorSettings.SHINGLE_SIZE ); - HashRing hashRing = new HashRing(clusterService, clock, settings); + HashRing hashRing = new HashRing(nodeFilter, clock, settings); ADStateManager stateManager = new ADStateManager( client, xContentRegistry, @@ -357,11 +354,12 @@ public Collection createComponents( clock, stateManager, runner, - new ADClusterEventListener(clusterService, hashRing, modelManager), + new ADClusterEventListener(clusterService, hashRing, modelManager, nodeFilter), deleteUtil, adCircuitBreakerService, adStats, - new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil) + new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, nodeFilter), + nodeFilter ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java index 1d648ecb..4ff2d5b7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.concurrent.Semaphore; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -32,7 +34,7 @@ public class ADClusterEventListener implements ClusterStateListener { private static final Logger LOG = LogManager.getLogger(ADClusterEventListener.class); - static final String MASTER_NOT_APPLIED_MSG = "AD does not use master nodes"; + static final String NODE_NOT_APPLIED_MSG = "AD does not use master or ultrawarm nodes"; static final String NOT_RECOVERED_MSG = "CLuster is not recovered yet."; static final String IN_PROGRESS_MSG = "Cluster state change in progress, return."; static final String REMOVE_MODEL_MSG = "Remove model"; @@ -43,21 +45,28 @@ public class ADClusterEventListener implements ClusterStateListener { private HashRing hashRing; private ModelManager modelManager; private final ClusterService clusterService; + private final DiscoveryNodeFilterer nodeFilter; @Inject - public ADClusterEventListener(ClusterService clusterService, HashRing hashRing, ModelManager modelManager) { + public ADClusterEventListener( + ClusterService clusterService, + HashRing hashRing, + ModelManager modelManager, + DiscoveryNodeFilterer nodeFilter + ) { this.clusterService = clusterService; this.clusterService.addListener(this); this.hashRing = hashRing; this.modelManager = modelManager; this.inProgress = new Semaphore(1); + this.nodeFilter = nodeFilter; } @Override public void clusterChanged(ClusterChangedEvent event) { - if (!event.state().nodes().getLocalNode().isDataNode()) { - LOG.debug(MASTER_NOT_APPLIED_MSG); + if (!nodeFilter.isEligibleNode(event.state().nodes().getLocalNode())) { + LOG.debug(NODE_NOT_APPLIED_MSG); return; } @@ -79,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) { // Check whether it was a data node that was removed boolean dataNodeRemoved = false; for (DiscoveryNode removedNode : delta.removedNodes()) { - if (removedNode.isDataNode()) { + if (nodeFilter.isEligibleNode(removedNode)) { LOG.info(NODE_REMOVED_MSG + " {}", removedNode.getId()); dataNodeRemoved = true; break; @@ -89,7 +98,7 @@ public void clusterChanged(ClusterChangedEvent event) { // Check whether it was a data node that was added boolean dataNodeAdded = false; for (DiscoveryNode addedNode : delta.addedNodes()) { - if (addedNode.isDataNode()) { + if (nodeFilter.isEligibleNode(addedNode)) { LOG.info(NODE_ADDED_MSG + " {}", addedNode.getId()); dataNodeAdded = true; break; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java index a7fe1b7d..b82ababe 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRing.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -28,11 +28,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.Murmur3HashFunction; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; public class HashRing { private static final Logger LOG = LogManager.getLogger(HashRing.class); @@ -43,7 +42,7 @@ public class HashRing { static final String COOLDOWN_MSG = "Hash ring doesn't respond to cluster state change within the cooldown period."; private final int VIRTUAL_NODE_COUNT = 100; - private final ClusterService clusterService; + private final DiscoveryNodeFilterer nodeFilter; private TreeMap circle; private Semaphore inProgress; // the UTC epoch milliseconds of the most recent successful update @@ -52,9 +51,9 @@ public class HashRing { private final Clock clock; private AtomicBoolean membershipChangeRequied; - public HashRing(ClusterService clusterService, Clock clock, Settings settings) { + public HashRing(DiscoveryNodeFilterer nodeFilter, Clock clock, Settings settings) { this.circle = new TreeMap(); - this.clusterService = clusterService; + this.nodeFilter = nodeFilter; this.inProgress = new Semaphore(1); this.clock = clock; this.coolDownPeriod = COOLDOWN_MINUTES.get(settings); @@ -93,8 +92,7 @@ public boolean build() { TreeMap newCircle = new TreeMap<>(); try { - for (ObjectCursor cursor : clusterService.state().nodes().getDataNodes().values()) { - DiscoveryNode curNode = cursor.value; + for (DiscoveryNode curNode : nodeFilter.getEligibleDataNodes()) { for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) { newCircle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java index 16055aeb..eb958fdf 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -21,27 +21,27 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import com.amazon.opendistroforelasticsearch.ad.transport.CronAction; import com.amazon.opendistroforelasticsearch.ad.transport.CronRequest; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; public class HourlyCron implements Runnable { private static final Logger LOG = LogManager.getLogger(HourlyCron.class); static final String SUCCEEDS_LOG_MSG = "Hourly maintenance succeeds"; static final String NODE_EXCEPTION_LOG_MSG = "Hourly maintenance of node has exception"; static final String EXCEPTION_LOG_MSG = "Hourly maintenance has exception."; - private ClusterService clusterService; + private DiscoveryNodeFilterer nodeFilter; private Client client; - public HourlyCron(ClusterService clusterService, Client client) { - this.clusterService = clusterService; + public HourlyCron(Client client, DiscoveryNodeFilterer nodeFilter) { + this.nodeFilter = nodeFilter; this.client = client; } @Override public void run() { - DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class); + DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); // we also add the cancel query function here based on query text from the negative cache. diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index a3735579..c8e6bedb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import org.elasticsearch.threadpool.ThreadPool; @@ -39,6 +40,7 @@ public class MasterEventListener implements LocalNodeMasterListener { private Client client; private Clock clock; private ClientUtil clientUtil; + private DiscoveryNodeFilterer nodeFilter; public MasterEventListener( ClusterService clusterService, @@ -46,7 +48,8 @@ public MasterEventListener( DeleteDetector deleteUtil, Client client, Clock clock, - ClientUtil clientUtil + ClientUtil clientUtil, + DiscoveryNodeFilterer nodeFilter ) { this.clusterService = clusterService; this.threadPool = threadPool; @@ -55,13 +58,13 @@ public MasterEventListener( this.clusterService.addLocalNodeMasterListener(this); this.clock = clock; this.clientUtil = clientUtil; + this.nodeFilter = nodeFilter; } @Override public void onMaster() { if (hourlyCron == null) { - hourlyCron = threadPool - .scheduleWithFixedDelay(new HourlyCron(clusterService, client), TimeValue.timeValueHours(1), executorName()); + hourlyCron = threadPool.scheduleWithFixedDelay(new HourlyCron(client, nodeFilter), TimeValue.timeValueHours(1), executorName()); clusterService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java index b03d4364..c378dc1d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java @@ -31,4 +31,17 @@ public class CommonName { // Anomaly Detector name for X-Opaque-Id header // ====================================== public static final String ANOMALY_DETECTOR = "[Anomaly Detector]"; + + // ====================================== + // Ultrawarm node attributes + // ====================================== + + // hot node + public static String HOT_BOX_TYPE = "hot"; + + // warm node + public static String WARM_BOX_TYPE = "warm"; + + // box type + public static final String BOX_TYPE_KEY = "box_type"; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java index 7cdedf1d..049eb9ca 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; -import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -31,8 +30,6 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -55,7 +52,7 @@ /** * This class manages creation of anomaly detector index. */ -public class AnomalyDetectionIndices implements LocalNodeMasterListener, ClusterStateListener { +public class AnomalyDetectionIndices implements LocalNodeMasterListener { // The alias of the index in which to write AD result history public static final String AD_RESULT_HISTORY_WRITE_INDEX_ALIAS = AnomalyResult.ANOMALY_RESULT_INDEX; @@ -71,7 +68,6 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener, Cluster private ClusterService clusterService; private final AdminClient adminClient; - private final Client client; private final ThreadPool threadPool; private volatile TimeValue requestTimeout; @@ -82,8 +78,6 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener, Cluster private Scheduler.Cancellable scheduledRollover = null; private static final Logger logger = LogManager.getLogger(AnomalyDetectionIndices.class); - private TimeValue lastRolloverTime = null; - private ClientUtil requestUtil; /** * Constructor function @@ -92,20 +86,11 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener, Cluster * @param clusterService ES cluster service * @param threadPool ES thread pool * @param settings ES cluster setting - * @param requestUtil wrapper to send a non-blocking timed request */ - public AnomalyDetectionIndices( - Client client, - ClusterService clusterService, - ThreadPool threadPool, - Settings settings, - ClientUtil requestUtil - ) { - this.client = client; + public AnomalyDetectionIndices(Client client, ClusterService clusterService, ThreadPool threadPool, Settings settings) { this.adminClient = client.admin(); this.clusterService = clusterService; this.threadPool = threadPool; - this.clusterService.addListener(this); this.clusterService.addLocalNodeMasterListener(this); this.requestTimeout = REQUEST_TIMEOUT.get(settings); this.historyMaxAge = AD_RESULT_HISTORY_INDEX_MAX_AGE.get(settings); @@ -118,7 +103,6 @@ public AnomalyDetectionIndices( rescheduleRollover(); }); clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it); - this.requestUtil = requestUtil; } /** @@ -269,11 +253,6 @@ public String executorName() { return ThreadPool.Names.MANAGEMENT; } - @Override - public void clusterChanged(ClusterChangedEvent event) { - boolean hasAdResultAlias = event.state().metaData().hasAlias(AD_RESULT_HISTORY_WRITE_INDEX_ALIAS); - } - private void rescheduleRollover() { if (clusterService.state().getNodes().isLocalNodeElectedMaster()) { if (scheduledRollover != null) { @@ -303,8 +282,6 @@ private boolean rolloverHistoryIndex() { RolloverResponse response = adminClient.indices().rolloversIndex(request).actionGet(requestTimeout); if (!response.isRolledOver()) { logger.warn("{} not rolled over. Conditions were: {}", AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, response.getConditionStatus()); - } else { - lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis()); } return response.isRolledOver(); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java index 30494502..da297dd0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManager.java @@ -43,7 +43,6 @@ import com.google.gson.Gson; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.monitor.jvm.JvmService; import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException; @@ -51,6 +50,7 @@ import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; import com.amazon.opendistroforelasticsearch.ad.ml.rcf.CombinedRcfResult; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.serialize.RandomCutForestSerDe; @@ -104,7 +104,7 @@ public String getName() { private final Duration checkpointInterval; // dependencies - private final ClusterService clusterService; + private final DiscoveryNodeFilterer nodeFilter; private final JvmService jvmService; private final RandomCutForestSerDe rcfSerde; private final CheckpointDao checkpointDao; @@ -120,7 +120,7 @@ public String getName() { /** * Constructor. * - * @param clusterService cluster info + * @param nodeFilter utility class to select nodes * @param jvmService jvm info * @param rcfSerde RCF model serialization * @param checkpointDao model checkpoint storage @@ -144,7 +144,7 @@ public String getName() { * @param shingleSize required shingle size before RCF emitting anomaly scores */ public ModelManager( - ClusterService clusterService, + DiscoveryNodeFilterer nodeFilter, JvmService jvmService, RandomCutForestSerDe rcfSerde, CheckpointDao checkpointDao, @@ -168,7 +168,7 @@ public ModelManager( int shingleSize ) { - this.clusterService = clusterService; + this.nodeFilter = nodeFilter; this.jvmService = jvmService; this.rcfSerde = rcfSerde; this.checkpointDao = checkpointDao; @@ -261,7 +261,7 @@ public Entry getPartitionedForestSizes(RandomCutForest forest, int numPartitions = (int) Math.ceil((double) totalSize / (double) partitionSize); int forestSize = (int) Math.ceil((double) forest.getNumberOfTrees() / (double) numPartitions); - int numNodes = clusterService.state().nodes().getDataNodes().size(); + int numNodes = nodeFilter.getEligibleDataNodes().length; if (numPartitions > numNodes) { // partition by cluster size partitionSize = (long) Math.ceil((double) totalSize / (double) numNodes); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java index 5b6f4764..d193b4c8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -32,7 +32,6 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; @@ -59,7 +58,7 @@ public abstract class AbstractSearchAction extends B private final Logger logger = LogManager.getLogger(AbstractSearchAction.class); - public AbstractSearchAction(RestController controller, String urlPath, String index, Class clazz) { + public AbstractSearchAction(String urlPath, String index, Class clazz) { this.index = index; this.clazz = clazz; this.urlPath = urlPath; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java index 2f63b9c8..5d068572 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import java.io.IOException; @@ -50,16 +49,9 @@ public class RestAnomalyDetectorJobAction extends BaseRestHandler { public static final String AD_JOB_ACTION = "anomaly_detector_job_action"; private volatile TimeValue requestTimeout; private final AnomalyDetectionIndices anomalyDetectionIndices; - private final Settings settings; private final ClusterService clusterService; - public RestAnomalyDetectorJobAction( - Settings settings, - RestController controller, - ClusterService clusterService, - AnomalyDetectionIndices anomalyDetectionIndices - ) { - this.settings = settings; + public RestAnomalyDetectorJobAction(Settings settings, ClusterService clusterService, AnomalyDetectionIndices anomalyDetectionIndices) { this.anomalyDetectionIndices = anomalyDetectionIndices; this.requestTimeout = REQUEST_TIMEOUT.get(settings); this.clusterService = clusterService; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java index caa41229..ffd23f28 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -32,7 +32,6 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestStatusToXContentListener; @@ -53,7 +52,7 @@ public class RestDeleteAnomalyDetectorAction extends BaseRestHandler { private final ClusterService clusterService; private final AnomalyDetectorActionHandler handler = new AnomalyDetectorActionHandler(); - public RestDeleteAnomalyDetectorAction(RestController controller, ClusterService clusterService) { + public RestDeleteAnomalyDetectorAction(ClusterService clusterService) { this.clusterService = clusterService; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java index fd99a6d8..a62b51a9 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -41,7 +41,6 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestActionListener; @@ -73,12 +72,7 @@ public class RestExecuteAnomalyDetectorAction extends BaseRestHandler { private final Logger logger = LogManager.getLogger(RestExecuteAnomalyDetectorAction.class); - public RestExecuteAnomalyDetectorAction( - Settings settings, - RestController controller, - ClusterService clusterService, - AnomalyDetectorRunner anomalyDetectorRunner - ) { + public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService, AnomalyDetectorRunner anomalyDetectorRunner) { this.anomalyDetectorRunner = anomalyDetectorRunner; this.requestTimeout = REQUEST_TIMEOUT.get(settings); maxAnomalyFeatures = MAX_ANOMALY_FEATURES.get(settings); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java index 8f8a48e4..ad0e6742 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java @@ -39,7 +39,6 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; @@ -72,11 +71,7 @@ public class RestGetAnomalyDetectorAction extends BaseRestHandler { private final Set allProfileTypeStrs; private final Set allProfileTypes; - public RestGetAnomalyDetectorAction( - RestController controller, - AnomalyDetectorProfileRunner profileRunner, - Set allProfileTypeStrs - ) { + public RestGetAnomalyDetectorAction(AnomalyDetectorProfileRunner profileRunner, Set allProfileTypeStrs) { this.profileRunner = profileRunner; this.allProfileTypes = new HashSet(Arrays.asList(ProfileName.values())); this.allProfileTypeStrs = ProfileName.getNames(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java index b88812ea..b9360523 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import java.io.IOException; @@ -63,7 +62,6 @@ public class RestIndexAnomalyDetectorAction extends BaseRestHandler { public RestIndexAnomalyDetectorAction( Settings settings, - RestController controller, ClusterService clusterService, AnomalyDetectionIndices anomalyDetectionIndices ) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java index cee39085..b7a84d25 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -17,8 +17,6 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.RestController; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; @@ -30,8 +28,8 @@ public class RestSearchAnomalyDetectorAction extends AbstractSearchAction validStats = adStats.getStats().keySet(); + ADStatsRequest adStatsRequest = null; if (!Strings.isEmpty(nodesIdsStr)) { - nodeIdsArr = nodesIdsStr.split(","); + String[] nodeIdsArr = nodesIdsStr.split(","); + adStatsRequest = new ADStatsRequest(nodeIdsArr); + } else { + DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); + adStatsRequest = new ADStatsRequest(dataNodes); } - ADStatsRequest adStatsRequest = new ADStatsRequest(nodeIdsArr); adStatsRequest.timeout(request.param("timeout")); // parse the stats the user wants to see diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java index 7036ca4c..d8b85a81 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsNodeRequest.java @@ -57,11 +57,6 @@ public ADStatsRequest getADStatsRequest() { return request; } - public void readFrom(StreamInput in) throws IOException { - request = new ADStatsRequest(); - request.readFrom(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java index d7de18dc..d21422c8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsRequest.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.ad.transport; import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -50,6 +51,16 @@ public ADStatsRequest(String... nodeIds) { statsToBeRetrieved = new HashSet<>(); } + /** + * Constructor + * + * @param nodes nodes of nodes' stats to be retrieved + */ + public ADStatsRequest(DiscoveryNode... nodes) { + super(nodes); + statsToBeRetrieved = new HashSet<>(); + } + /** * Adds a stat to the set of stats to be retrieved * diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java index d434837e..be542961 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StopDetectorTransportAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package com.amazon.opendistroforelasticsearch.ad.transport; import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -27,7 +29,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -37,18 +38,18 @@ public class StopDetectorTransportAction extends HandledTransportAction { if (response.hasFailures()) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/DiscoveryNodeFilterer.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/DiscoveryNodeFilterer.java new file mode 100644 index 00000000..93054c13 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/DiscoveryNodeFilterer.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; + +/** + * Util class to filter unwanted node types + * + */ +public class DiscoveryNodeFilterer { + private static final Logger LOG = LogManager.getLogger(DiscoveryNodeFilterer.class); + private final ClusterService clusterService; + private final HotDataNodePredicate eligibleNodeFilter; + + public DiscoveryNodeFilterer(ClusterService clusterService) { + this.clusterService = clusterService; + eligibleNodeFilter = new HotDataNodePredicate(); + } + + /** + * Find nodes that are elibile to be used by us. For example, Ultrawarm + * introduces warm nodes into the ES cluster. Currently, we distribute + * model partitions to all data nodes in the cluster randomly, which + * could cause a model performance downgrade issue once warm nodes + * are throttled due to resource limitations. The PR excludes warm nodes + * to place model partitions. + * @return an array of eligible data nodes + */ + public DiscoveryNode[] getEligibleDataNodes() { + ClusterState state = this.clusterService.state(); + final List eligibleNodes = new ArrayList<>(); + for (DiscoveryNode node : state.nodes()) { + if (eligibleNodeFilter.test(node)) { + eligibleNodes.add(node); + } + } + return eligibleNodes.toArray(new DiscoveryNode[0]); + } + + /** + * @param node a discovery node + * @return whether we should use this node for AD + */ + public boolean isEligibleNode(DiscoveryNode node) { + return eligibleNodeFilter.test(node); + } + + static class HotDataNodePredicate implements Predicate { + @Override + public boolean test(DiscoveryNode discoveryNode) { + return discoveryNode.isDataNode() + && discoveryNode + .getAttributes() + .getOrDefault(CommonName.BOX_TYPE_KEY, CommonName.HOT_BOX_TYPE) + .equals(CommonName.HOT_BOX_TYPE); + } + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index 3af20295..4ee06535 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -36,6 +36,7 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; @@ -373,7 +374,13 @@ public static ActionListener createActionListener( } public static void waitForIndexCreationToComplete(Client client, final String indexName) { - client.admin().cluster().prepareHealth(indexName).setWaitForEvents(Priority.URGENT).get(); + ClusterHealthResponse clusterHealthResponse = client + .admin() + .cluster() + .prepareHealth(indexName) + .setWaitForEvents(Priority.URGENT) + .get(); + logger.info("Status of " + indexName + ": " + clusterHealthResponse.getStatus()); } public static ClusterService createClusterService(ThreadPool threadPool, ClusterSettings clusterSettings) { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java index ff8c54b9..8305b10d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/ADClusterEventListenerTests.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Matchers.any; +import java.util.HashMap; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -32,7 +33,9 @@ import static java.util.Collections.emptySet; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -61,6 +64,7 @@ public class ADClusterEventListenerTests extends AbstractADTest { private ClusterState newClusterState; private DiscoveryNode masterNode; private DiscoveryNode dataNode1; + private DiscoveryNodeFilterer nodeFilter; @BeforeClass public static void setUpBeforeClass() { @@ -81,6 +85,8 @@ public void setUp() throws Exception { hashRing = mock(HashRing.class); when(hashRing.build()).thenReturn(true); modelManager = mock(ModelManager.class); + + nodeFilter = new DiscoveryNodeFilterer(clusterService); masterNode = new DiscoveryNode(masterNodeId, buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); dataNode1 = new DiscoveryNode(dataNode1Id, buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT); oldClusterState = ClusterState @@ -92,7 +98,7 @@ public void setUp() throws Exception { .nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1)) .build(); - listener = new ADClusterEventListener(clusterService, hashRing, modelManager); + listener = new ADClusterEventListener(clusterService, hashRing, modelManager, nodeFilter); } @Override @@ -109,7 +115,21 @@ public void tearDown() throws Exception { public void testIsMasterNode() { listener.clusterChanged(new ClusterChangedEvent("foo", oldClusterState, oldClusterState)); - assertTrue(testAppender.containsMessage(ADClusterEventListener.MASTER_NOT_APPLIED_MSG)); + assertTrue(testAppender.containsMessage(ADClusterEventListener.NODE_NOT_APPLIED_MSG)); + } + + public void testIsWarmNode() { + HashMap attributesForNode1 = new HashMap<>(); + attributesForNode1.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + dataNode1 = new DiscoveryNode(dataNode1Id, buildNewFakeTransportAddress(), attributesForNode1, BUILT_IN_ROLES, Version.CURRENT); + + ClusterState warmNodeClusterState = ClusterState + .builder(new ClusterName(clusterName)) + .nodes(new DiscoveryNodes.Builder().masterNodeId(masterNodeId).localNodeId(dataNode1Id).add(masterNode).add(dataNode1)) + .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) + .build(); + listener.clusterChanged(new ClusterChangedEvent("foo", warmNodeClusterState, oldClusterState)); + assertTrue(testAppender.containsMessage(ADClusterEventListener.NODE_NOT_APPLIED_MSG)); } public void testNotRecovered() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java index d50b2f1d..571f6d13 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HashRingTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -24,11 +24,16 @@ import static org.mockito.Mockito.when; import java.util.List; +import java.util.Map; import java.util.Optional; import java.time.Clock; import java.util.ArrayList; +import java.util.HashMap; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,11 +49,38 @@ public class HashRingTests extends AbstractADTest { private ClusterService clusterService; + private DiscoveryNodeFilterer nodeFilter; private Settings settings; private Clock clock; - private DiscoveryNode createNode(String nodeId) { - return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), BUILT_IN_ROLES, Version.CURRENT); + private DiscoveryNode createNode(String nodeId, Map attributes) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), attributes, BUILT_IN_ROLES, Version.CURRENT); + } + + private void setNodeState() { + setNodeState(emptyMap()); + } + + private void setNodeState(Map attributesForNode1) { + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List discoveryNodes = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + DiscoveryNode node = null; + if (i != 1) { + node = createNode(Integer.toString(i), emptyMap()); + } else { + node = createNode(Integer.toString(i), attributesForNode1); + } + + discoBuilder = discoBuilder.add(node); + discoveryNodes.add(node); + } + discoBuilder.localNodeId("1"); + discoBuilder.masterNodeId("0"); + ClusterState.Builder stateBuilder = ClusterState.builder(clusterService.getClusterName()); + stateBuilder.nodes(discoBuilder); + ClusterState clusterState = stateBuilder.build(); + setState(clusterService.getClusterApplierService(), clusterState); } @BeforeClass @@ -67,19 +99,9 @@ public void setUp() throws Exception { super.setUp(); super.setUpLog4jForJUnit(HashRing.class); clusterService = createClusterService(threadPool); - DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); - List discoveryNodes = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - final DiscoveryNode node = createNode(Integer.toString(i)); - discoBuilder = discoBuilder.add(node); - discoveryNodes.add(node); - } - discoBuilder.localNodeId(randomFrom(discoveryNodes).getId()); - discoBuilder.masterNodeId(randomFrom(discoveryNodes).getId()); - ClusterState.Builder stateBuilder = ClusterState.builder(clusterService.getClusterName()); - stateBuilder.nodes(discoBuilder); - ClusterState clusterState = stateBuilder.build(); - setState(clusterService.getClusterApplierService(), clusterState); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + nodeFilter = new DiscoveryNodeFilterer(clusterService); settings = Settings .builder() @@ -98,7 +120,9 @@ public void tearDown() throws Exception { } public void testGetOwningNode() { - HashRing ring = new HashRing(clusterService, clock, settings); + setNodeState(); + + HashRing ring = new HashRing(nodeFilter, clock, settings); Optional node = ring.getOwningNode("http-latency-rcf-1"); assertTrue(node.isPresent()); String id = node.get().getId(); @@ -110,4 +134,16 @@ public void testGetOwningNode() { assertEquals(node, node2); assertTrue(testAppender.containsMessage(HashRing.COOLDOWN_MSG)); } + + public void testWarmNodeExcluded() { + HashMap attributesForNode1 = new HashMap<>(); + attributesForNode1.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + setNodeState(attributesForNode1); + + HashRing ring = new HashRing(nodeFilter, clock, settings); + Optional node = ring.getOwningNode("http-latency-rcf-1"); + assertTrue(node.isPresent()); + String id = node.get().getId(); + assertTrue(id.equals("2")); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java index 0b979df4..9060845d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCronTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -24,11 +24,15 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.transport.CronAction; import com.amazon.opendistroforelasticsearch.ad.transport.CronNodeResponse; import com.amazon.opendistroforelasticsearch.ad.transport.CronResponse; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; @@ -59,6 +63,9 @@ public void templateHourlyCron(HourlyCronTestExecutionMode mode) { ClusterService clusterService = mock(ClusterService.class); ClusterState state = ClusterCreation.state(1); when(clusterService.state()).thenReturn(state); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + DiscoveryNodeFilterer nodeFilter = new DiscoveryNodeFilterer(clusterService); Client client = mock(Client.class); doAnswer(invocation -> { @@ -104,7 +111,7 @@ public void templateHourlyCron(HourlyCronTestExecutionMode mode) { return null; }).when(client).execute(eq(CronAction.INSTANCE), any(), any()); - HourlyCron cron = new HourlyCron(clusterService, client); + HourlyCron cron = new HourlyCron(client, nodeFilter); cron.run(); Logger LOG = LogManager.getLogger(HourlyCron.class); @@ -128,7 +135,7 @@ public void testAllFail() { templateHourlyCron(HourlyCronTestExecutionMode.ALL_FAIL); } - public void testNodeFail() { + public void testNodeFail() throws Exception { templateHourlyCron(HourlyCronTestExecutionMode.NODE_FAIL); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 0a0a4a17..435c5e58 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -25,9 +25,13 @@ import java.time.Clock; import java.util.Arrays; +import java.util.HashMap; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -46,6 +50,7 @@ public class MasterEventListenerTests extends AbstractADTest { private Cancellable dailyCancellable; private MasterEventListener masterService; private ClientUtil clientUtil; + private DiscoveryNodeFilterer nodeFilter; @Override @Before @@ -62,7 +67,11 @@ public void setUp() throws Exception { client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); - masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil); + HashMap ignoredAttributes = new HashMap(); + ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); + nodeFilter = new DiscoveryNodeFilterer(clusterService); + + masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, nodeFilter); } public void testOnOffMaster() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java index 629e13ab..a0b1a5ff 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -15,19 +15,14 @@ package com.amazon.opendistroforelasticsearch.ad.indices; -import static org.mockito.Mockito.mock; - import com.amazon.opendistroforelasticsearch.ad.TestHelpers; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; -import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; -import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -37,11 +32,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.io.IOException; -import java.time.Clock; import java.util.HashSet; import java.util.Set; @@ -49,11 +42,8 @@ public class AnomalyDetectionIndicesTests extends ESIntegTestCase { private AnomalyDetectionIndices indices; private ClusterSettings clusterSetting; - private ClientUtil requestUtil; private Settings settings; private ClusterService clusterService; - private Client client; - private ThreadPool context; @Before public void setup() { @@ -73,12 +63,7 @@ public void setup() { clusterSettings.add(AnomalyDetectorSettings.REQUEST_TIMEOUT); clusterSetting = new ClusterSettings(settings, clusterSettings); clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting); - context = TestHelpers.createThreadPool(); - client = mock(Client.class); - Clock clock = Clock.systemUTC(); - Throttler throttler = new Throttler(clock); - requestUtil = new ClientUtil(settings, client, throttler, context); - indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil); + indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings); } public void testAnomalyDetectorIndexNotExists() { @@ -138,7 +123,7 @@ public void testAnomalyResultIndexExistsAndNotRecreate() throws IOException { .initAnomalyResultIndexIfAbsent( TestHelpers .createActionListener( - response -> response.isAcknowledged(), + response -> logger.info("Acknowledged: " + response.isAcknowledged()), failure -> { throw new RuntimeException("should not recreate index"); } ) ); @@ -149,7 +134,9 @@ public void testAnomalyResultIndexExistsAndNotRecreate() throws IOException { TestHelpers .createActionListener( response -> { throw new RuntimeException("should not recreate index " + AnomalyResult.ANOMALY_RESULT_INDEX); }, - failure -> { throw new RuntimeException("should not recreate index " + AnomalyResult.ANOMALY_RESULT_INDEX); } + failure -> { + throw new RuntimeException("should not recreate index " + AnomalyResult.ANOMALY_RESULT_INDEX, failure); + } ) ); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java index cbce5fae..a591394b 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ml/ModelManagerTests.java @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException; import com.amazon.opendistroforelasticsearch.ad.ml.rcf.CombinedRcfResult; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import com.google.gson.Gson; import junitparams.JUnitParamsRunner; @@ -39,7 +40,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.monitor.jvm.JvmService; @@ -92,7 +92,7 @@ public class ModelManagerTests { private AnomalyDetector anomalyDetector; @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ClusterService clusterService; + private DiscoveryNodeFilterer nodeFilter; @Mock(answer = Answers.RETURNS_DEEP_STUBS) private JvmService jvmService; @@ -167,7 +167,7 @@ public void setup() { modelManager = spy( new ModelManager( - clusterService, + nodeFilter, jvmService, rcfSerde, checkpointDao, @@ -283,7 +283,7 @@ public void getPartitionedForestSizes_returnExpected( when(modelManager.estimateModelSize(rcf)).thenReturn(totalModelSize); when(jvmService.info().getMem().getHeapMax().getBytes()).thenReturn(heapSize); - when(clusterService.state().nodes().getDataNodes()).thenReturn(dataNodes); + when(nodeFilter.getEligibleDataNodes()).thenReturn(dataNodes.values().toArray(DiscoveryNode.class)); assertEquals(expected, modelManager.getPartitionedForestSizes(rcf, "id")); } @@ -304,7 +304,7 @@ public void getPartitionedForestSizes_throwLimitExceeded( ) { when(modelManager.estimateModelSize(rcf)).thenReturn(totalModelSize); when(jvmService.info().getMem().getHeapMax().getBytes()).thenReturn(heapSize); - when(clusterService.state().nodes().getDataNodes()).thenReturn(dataNodes); + when(nodeFilter.getEligibleDataNodes()).thenReturn(dataNodes.values().toArray(DiscoveryNode.class)); modelManager.getPartitionedForestSizes(rcf, "id"); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java index 405d5e6b..52496683 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsIT.java @@ -37,7 +37,7 @@ protected Collection> transportClientPlugins() { } public void testNormalADStats() throws ExecutionException, InterruptedException { - ADStatsRequest adStatsRequest = new ADStatsRequest(); + ADStatsRequest adStatsRequest = new ADStatsRequest(new String[0]); ADStatsNodesResponse response = client().execute(ADStatsNodesAction.INSTANCE, adStatsRequest).get(); assertTrue("getting stats failed", !response.hasFailures()); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java index a5c38baf..a119ee7b 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTests.java @@ -50,6 +50,7 @@ public class ADStatsTests extends ESTestCase { Map clusterStats; DiscoveryNode discoveryNode1; + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -72,7 +73,7 @@ public void testADStatsNodeRequest() throws IOException { ADStatsNodeRequest adStatsNodeRequest1 = new ADStatsNodeRequest(); assertNull("ADStatsNodeRequest default constructor failed", adStatsNodeRequest1.getADStatsRequest()); - ADStatsRequest adStatsRequest = new ADStatsRequest(); + ADStatsRequest adStatsRequest = new ADStatsRequest(new String[0]); ADStatsNodeRequest adStatsNodeRequest2 = new ADStatsNodeRequest(adStatsRequest); assertEquals("ADStatsNodeRequest has the wrong ADStatsRequest", adStatsNodeRequest2.getADStatsRequest(), adStatsRequest); @@ -80,7 +81,7 @@ public void testADStatsNodeRequest() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); adStatsNodeRequest2.writeTo(output); StreamInput streamInput = output.bytes().streamInput(); - adStatsNodeRequest1.readFrom(streamInput); + adStatsNodeRequest1 = new ADStatsNodeRequest(streamInput); assertEquals( "readStats failed", adStatsNodeRequest2.getADStatsRequest().getStatsToBeRetrieved(), @@ -117,7 +118,7 @@ public void testADStatsNodeResponse() throws IOException, JsonPathNotFoundExcept @Test public void testADStatsRequest() throws IOException { List allStats = Arrays.stream(StatNames.values()).map(StatNames::getName).collect(Collectors.toList()); - ADStatsRequest adStatsRequest = new ADStatsRequest(); + ADStatsRequest adStatsRequest = new ADStatsRequest(new String[0]); // Test clear() adStatsRequest.clear();