Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Forward porting ultrawarm integration (#125)
Browse files Browse the repository at this point in the history
* Integration with Ultrawarm (#95)

Ultrawarm introduces warm nodes into the ES cluster. Currently, we distribute model partitions to all data nodes in the cluster randomly, which could cause a model performance downgrade issue once warm nodes are throttled due to resource limitations. The PR excludes warm nodes to place model partitions.

Since index shards are hosted on hot nodes, AD's coordinating nodes are in hot nodes as well. We don't need to send HourlyCron job and stats requests to warm nodes anymore. This PR implements those changes.

Testing done:
1. Verified AD runs only in hot nodes.
2. stats API and HourlyCron still works.

* Integration with Ultrawarm - Follow up (#97)

This is a follow up PR to address comments.

Testing done:
1. gradle build passes
2. Verified AD runs only in hot nodes.
3. stats API and HourlyCron still works.
  • Loading branch information
kaituo authored May 19, 2020
1 parent 4c5a2b7 commit 9b14cd6
Show file tree
Hide file tree
Showing 30 changed files with 325 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension;
Expand Down Expand Up @@ -147,6 +148,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ADStats adStats;
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -183,34 +185,27 @@ public List<RestHandler> getRestHandlers(
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(
restController,
profileRunner,
ProfileName.getNames()
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner, ProfileName.getNames());
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
restController,
clusterService,
anomalyDetectionIndices
);
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction(settings, restController);
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction(settings, restController);
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(restController, clusterService);
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(clusterService);
RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(
settings,
restController,
clusterService,
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
restController,
clusterService,
adStats
adStats,
this.nodeFilter,
this.clusterService
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
restController,
clusterService,
anomalyDetectionIndices
);
Expand Down Expand Up @@ -253,7 +248,7 @@ public Collection<Object> createComponents(
Throttler throttler = new Throttler(clock);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings);
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;

Expand All @@ -266,8 +261,10 @@ public Collection<Object> createComponents(
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME);

this.nodeFilter = new DiscoveryNodeFilterer(this.clusterService);

ModelManager modelManager = new ModelManager(
clusterService,
nodeFilter,
jvmService,
rcfSerde,
checkpoint,
Expand All @@ -291,7 +288,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.SHINGLE_SIZE
);

HashRing hashRing = new HashRing(clusterService, clock, settings);
HashRing hashRing = new HashRing(nodeFilter, clock, settings);
ADStateManager stateManager = new ADStateManager(
client,
xContentRegistry,
Expand Down Expand Up @@ -357,11 +354,12 @@ public Collection<Object> createComponents(
clock,
stateManager,
runner,
new ADClusterEventListener(clusterService, hashRing, modelManager),
new ADClusterEventListener(clusterService, hashRing, modelManager, nodeFilter),
deleteUtil,
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil)
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, nodeFilter),
nodeFilter
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
import java.util.concurrent.Semaphore;

import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -32,7 +34,7 @@

public class ADClusterEventListener implements ClusterStateListener {
private static final Logger LOG = LogManager.getLogger(ADClusterEventListener.class);
static final String MASTER_NOT_APPLIED_MSG = "AD does not use master nodes";
static final String NODE_NOT_APPLIED_MSG = "AD does not use master or ultrawarm nodes";
static final String NOT_RECOVERED_MSG = "CLuster is not recovered yet.";
static final String IN_PROGRESS_MSG = "Cluster state change in progress, return.";
static final String REMOVE_MODEL_MSG = "Remove model";
Expand All @@ -43,21 +45,28 @@ public class ADClusterEventListener implements ClusterStateListener {
private HashRing hashRing;
private ModelManager modelManager;
private final ClusterService clusterService;
private final DiscoveryNodeFilterer nodeFilter;

@Inject
public ADClusterEventListener(ClusterService clusterService, HashRing hashRing, ModelManager modelManager) {
public ADClusterEventListener(
ClusterService clusterService,
HashRing hashRing,
ModelManager modelManager,
DiscoveryNodeFilterer nodeFilter
) {
this.clusterService = clusterService;
this.clusterService.addListener(this);
this.hashRing = hashRing;
this.modelManager = modelManager;
this.inProgress = new Semaphore(1);
this.nodeFilter = nodeFilter;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {

if (!event.state().nodes().getLocalNode().isDataNode()) {
LOG.debug(MASTER_NOT_APPLIED_MSG);
if (!nodeFilter.isEligibleNode(event.state().nodes().getLocalNode())) {
LOG.debug(NODE_NOT_APPLIED_MSG);
return;
}

Expand All @@ -79,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// Check whether it was a data node that was removed
boolean dataNodeRemoved = false;
for (DiscoveryNode removedNode : delta.removedNodes()) {
if (removedNode.isDataNode()) {
if (nodeFilter.isEligibleNode(removedNode)) {
LOG.info(NODE_REMOVED_MSG + " {}", removedNode.getId());
dataNodeRemoved = true;
break;
Expand All @@ -89,7 +98,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// Check whether it was a data node that was added
boolean dataNodeAdded = false;
for (DiscoveryNode addedNode : delta.addedNodes()) {
if (addedNode.isDataNode()) {
if (nodeFilter.isEligibleNode(addedNode)) {
LOG.info(NODE_ADDED_MSG + " {}", addedNode.getId());
dataNodeAdded = true;
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,11 +28,10 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

public class HashRing {
private static final Logger LOG = LogManager.getLogger(HashRing.class);
Expand All @@ -43,7 +42,7 @@ public class HashRing {
static final String COOLDOWN_MSG = "Hash ring doesn't respond to cluster state change within the cooldown period.";

private final int VIRTUAL_NODE_COUNT = 100;
private final ClusterService clusterService;
private final DiscoveryNodeFilterer nodeFilter;
private TreeMap<Integer, DiscoveryNode> circle;
private Semaphore inProgress;
// the UTC epoch milliseconds of the most recent successful update
Expand All @@ -52,9 +51,9 @@ public class HashRing {
private final Clock clock;
private AtomicBoolean membershipChangeRequied;

public HashRing(ClusterService clusterService, Clock clock, Settings settings) {
public HashRing(DiscoveryNodeFilterer nodeFilter, Clock clock, Settings settings) {
this.circle = new TreeMap<Integer, DiscoveryNode>();
this.clusterService = clusterService;
this.nodeFilter = nodeFilter;
this.inProgress = new Semaphore(1);
this.clock = clock;
this.coolDownPeriod = COOLDOWN_MINUTES.get(settings);
Expand Down Expand Up @@ -93,8 +92,7 @@ public boolean build() {
TreeMap<Integer, DiscoveryNode> newCircle = new TreeMap<>();

try {
for (ObjectCursor<DiscoveryNode> cursor : clusterService.state().nodes().getDataNodes().values()) {
DiscoveryNode curNode = cursor.value;
for (DiscoveryNode curNode : nodeFilter.getEligibleDataNodes()) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
newCircle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -21,27 +21,27 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;

import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronRequest;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

public class HourlyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(HourlyCron.class);
static final String SUCCEEDS_LOG_MSG = "Hourly maintenance succeeds";
static final String NODE_EXCEPTION_LOG_MSG = "Hourly maintenance of node has exception";
static final String EXCEPTION_LOG_MSG = "Hourly maintenance has exception.";
private ClusterService clusterService;
private DiscoveryNodeFilterer nodeFilter;
private Client client;

public HourlyCron(ClusterService clusterService, Client client) {
this.clusterService = clusterService;
public HourlyCron(Client client, DiscoveryNodeFilterer nodeFilter) {
this.nodeFilter = nodeFilter;
this.client = client;
}

@Override
public void run() {
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();

// we also add the cancel query function here based on query text from the negative cache.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -39,14 +40,16 @@ public class MasterEventListener implements LocalNodeMasterListener {
private Client client;
private Clock clock;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;

public MasterEventListener(
ClusterService clusterService,
ThreadPool threadPool,
DeleteDetector deleteUtil,
Client client,
Clock clock,
ClientUtil clientUtil
ClientUtil clientUtil,
DiscoveryNodeFilterer nodeFilter
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand All @@ -55,13 +58,13 @@ public MasterEventListener(
this.clusterService.addLocalNodeMasterListener(this);
this.clock = clock;
this.clientUtil = clientUtil;
this.nodeFilter = nodeFilter;
}

@Override
public void onMaster() {
if (hourlyCron == null) {
hourlyCron = threadPool
.scheduleWithFixedDelay(new HourlyCron(clusterService, client), TimeValue.timeValueHours(1), executorName());
hourlyCron = threadPool.scheduleWithFixedDelay(new HourlyCron(client, nodeFilter), TimeValue.timeValueHours(1), executorName());
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,17 @@ public class CommonName {
// Anomaly Detector name for X-Opaque-Id header
// ======================================
public static final String ANOMALY_DETECTOR = "[Anomaly Detector]";

// ======================================
// Ultrawarm node attributes
// ======================================

// hot node
public static String HOT_BOX_TYPE = "hot";

// warm node
public static String WARM_BOX_TYPE = "warm";

// box type
public static final String BOX_TYPE_KEY = "box_type";
}
Loading

0 comments on commit 9b14cd6

Please sign in to comment.