Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Integration with Ultrawarm #95

Merged
merged 2 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
Expand Down Expand Up @@ -145,6 +146,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ADStats adStats;
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;
private ClusterStateUtils clusterStateUtils;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -201,7 +203,11 @@ public List<RestHandler> getRestHandlers(
clusterService,
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(restController, adStats);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
restController,
adStats,
this.clusterStateUtils
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
restController,
Expand Down Expand Up @@ -259,8 +265,9 @@ public Collection<Object> createComponents(
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME);

this.clusterStateUtils = new ClusterStateUtils(clusterService);
ModelManager modelManager = new ModelManager(
clusterService,
clusterStateUtils,
jvmService,
rcfSerde,
checkpoint,
Expand All @@ -284,7 +291,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.SHINGLE_SIZE
);

HashRing hashRing = new HashRing(clusterService, clock, settings);
HashRing hashRing = new HashRing(clusterStateUtils, clock, settings);
ADStateManager stateManager = new ADStateManager(
client,
xContentRegistry,
Expand Down Expand Up @@ -353,11 +360,11 @@ public Collection<Object> createComponents(
clock,
stateManager,
runner,
new ADClusterEventListener(clusterService, hashRing, modelManager),
new ADClusterEventListener(clusterService, hashRing, modelManager, clusterStateUtils),
deleteUtil,
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil)
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
import java.util.concurrent.Semaphore;

import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -32,7 +34,7 @@

public class ADClusterEventListener implements ClusterStateListener {
private static final Logger LOG = LogManager.getLogger(ADClusterEventListener.class);
static final String MASTER_NOT_APPLIED_MSG = "AD does not use master nodes";
static final String NODE_NOT_APPLIED_MSG = "AD does not use master or ultrawarm nodes";
static final String NOT_RECOVERED_MSG = "CLuster is not recovered yet.";
static final String IN_PROGRESS_MSG = "Cluster state change in progress, return.";
static final String REMOVE_MODEL_MSG = "Remove model";
Expand All @@ -43,21 +45,28 @@ public class ADClusterEventListener implements ClusterStateListener {
private HashRing hashRing;
private ModelManager modelManager;
private final ClusterService clusterService;
private final ClusterStateUtils clusterStateUtils;

@Inject
public ADClusterEventListener(ClusterService clusterService, HashRing hashRing, ModelManager modelManager) {
public ADClusterEventListener(
ClusterService clusterService,
HashRing hashRing,
ModelManager modelManager,
ClusterStateUtils clusterStateUtils
) {
this.clusterService = clusterService;
this.clusterService.addListener(this);
this.hashRing = hashRing;
this.modelManager = modelManager;
this.inProgress = new Semaphore(1);
this.clusterStateUtils = clusterStateUtils;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {

if (!event.state().nodes().getLocalNode().isDataNode()) {
LOG.debug(MASTER_NOT_APPLIED_MSG);
if (clusterStateUtils.isIgnoredNode(event.state().nodes().getLocalNode())) {
LOG.debug(NODE_NOT_APPLIED_MSG);
return;
}

Expand All @@ -79,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// Check whether it was a data node that was removed
boolean dataNodeRemoved = false;
for (DiscoveryNode removedNode : delta.removedNodes()) {
if (removedNode.isDataNode()) {
if (!clusterStateUtils.isIgnoredNode(removedNode)) {
LOG.info(NODE_REMOVED_MSG + " {}", removedNode.getId());
dataNodeRemoved = true;
break;
Expand All @@ -89,7 +98,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// Check whether it was a data node that was added
boolean dataNodeAdded = false;
for (DiscoveryNode addedNode : delta.addedNodes()) {
if (addedNode.isDataNode()) {
if (!clusterStateUtils.isIgnoredNode(addedNode)) {
LOG.info(NODE_ADDED_MSG + " {}", addedNode.getId());
dataNodeAdded = true;
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,10 +28,10 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.carrotsearch.hppc.cursors.ObjectCursor;

public class HashRing {
Expand All @@ -43,7 +43,7 @@ public class HashRing {
static final String COOLDOWN_MSG = "Hash ring doesn't respond to cluster state change within the cooldown period.";

private final int VIRTUAL_NODE_COUNT = 100;
private final ClusterService clusterService;
private final ClusterStateUtils clusterStateUtils;
private TreeMap<Integer, DiscoveryNode> circle;
private Semaphore inProgress;
// the UTC epoch milliseconds of the most recent successful update
Expand All @@ -52,9 +52,9 @@ public class HashRing {
private final Clock clock;
private AtomicBoolean membershipChangeRequied;

public HashRing(ClusterService clusterService, Clock clock, Settings settings) {
public HashRing(ClusterStateUtils clusterStateUtils, Clock clock, Settings settings) {
this.circle = new TreeMap<Integer, DiscoveryNode>();
this.clusterService = clusterService;
this.clusterStateUtils = clusterStateUtils;
this.inProgress = new Semaphore(1);
this.clock = clock;
this.coolDownPeriod = COOLDOWN_MINUTES.get(settings);
Expand Down Expand Up @@ -93,7 +93,7 @@ public boolean build() {
TreeMap<Integer, DiscoveryNode> newCircle = new TreeMap<>();

try {
for (ObjectCursor<DiscoveryNode> cursor : clusterService.state().nodes().getDataNodes().values()) {
for (ObjectCursor<DiscoveryNode> cursor : clusterStateUtils.getEligibleDataNodes().values()) {
DiscoveryNode curNode = cursor.value;
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
newCircle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -21,27 +21,27 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;

import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronRequest;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;

public class HourlyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(HourlyCron.class);
static final String SUCCEEDS_LOG_MSG = "Hourly maintenance succeeds";
static final String NODE_EXCEPTION_LOG_MSG = "Hourly maintenance of node has exception";
static final String EXCEPTION_LOG_MSG = "Hourly maintenance has exception.";
private ClusterService clusterService;
private ClusterStateUtils clientStateUtils;
private Client client;

public HourlyCron(ClusterService clusterService, Client client) {
this.clusterService = clusterService;
public HourlyCron(Client client, ClusterStateUtils clientStateUtils) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. clusterStateUtils

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

this.clientStateUtils = clientStateUtils;
this.client = client;
}

@Override
public void run() {
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode[] dataNodes = clientStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class);

// we also add the cancel query function here based on query text from the negative cache.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;

import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -39,14 +40,16 @@ public class MasterEventListener implements LocalNodeMasterListener {
private Client client;
private Clock clock;
private ClientUtil clientUtil;
private ClusterStateUtils clusterStateUtils;

public MasterEventListener(
ClusterService clusterService,
ThreadPool threadPool,
DeleteDetector deleteUtil,
Client client,
Clock clock,
ClientUtil clientUtil
ClientUtil clientUtil,
ClusterStateUtils clusterStateUtils
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand All @@ -55,13 +58,14 @@ public MasterEventListener(
this.clusterService.addLocalNodeMasterListener(this);
this.clock = clock;
this.clientUtil = clientUtil;
this.clusterStateUtils = clusterStateUtils;
}

@Override
public void onMaster() {
if (hourlyCron == null) {
hourlyCron = threadPool
.scheduleWithFixedDelay(new HourlyCron(clusterService, client), TimeValue.timeValueHours(1), executorName());
.scheduleWithFixedDelay(new HourlyCron(client, clusterStateUtils), TimeValue.timeValueHours(1), executorName());
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ public class CommonName {
// Anomaly Detector name for X-Opaque-Id header
// ======================================
public static final String ANOMALY_DETECTOR = "[Anomaly Detector]";

// ======================================
// Ultrawarm node attributes
// ======================================

// warm node
public static String WARM_BOX_TYPE = "warm";

// box type
public static final String BOX_TYPE_KEY = "box_type";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
import com.google.gson.Gson;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.monitor.jvm.JvmService;

import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.ml.rcf.CombinedRcfResult;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.serialize.RandomCutForestSerDe;

Expand Down Expand Up @@ -103,7 +103,7 @@ public String getName() {
private final Duration checkpointInterval;

// dependencies
private final ClusterService clusterService;
private final ClusterStateUtils clusterStateUtils;
private final JvmService jvmService;
private final RandomCutForestSerDe rcfSerde;
private final CheckpointDao checkpointDao;
Expand All @@ -119,7 +119,7 @@ public String getName() {
/**
* Constructor.
*
* @param clusterService cluster info
* @param clusterStateUtils cluster info
* @param jvmService jvm info
* @param rcfSerde RCF model serialization
* @param checkpointDao model checkpoint storage
Expand All @@ -140,9 +140,10 @@ public String getName() {
* @param minPreviewSize minimum number of data points for preview
* @param modelTtl time to live for hosted models
* @param checkpointInterval interval between checkpoints
* @param shingleSize size of a shingle
*/
public ModelManager(
ClusterService clusterService,
ClusterStateUtils clusterStateUtils,
JvmService jvmService,
RandomCutForestSerDe rcfSerde,
CheckpointDao checkpointDao,
Expand All @@ -166,7 +167,7 @@ public ModelManager(
int shingleSize
) {

this.clusterService = clusterService;
this.clusterStateUtils = clusterStateUtils;
this.jvmService = jvmService;
this.rcfSerde = rcfSerde;
this.checkpointDao = checkpointDao;
Expand Down Expand Up @@ -259,7 +260,7 @@ public Entry<Integer, Integer> getPartitionedForestSizes(RandomCutForest forest,
int numPartitions = (int) Math.ceil((double) totalSize / (double) partitionSize);
int forestSize = (int) Math.ceil((double) forest.getNumberOfTrees() / (double) numPartitions);

int numNodes = clusterService.state().nodes().getDataNodes().size();
int numNodes = clusterStateUtils.getEligibleDataNodes().size();
if (numPartitions > numNodes) {
// partition by cluster size
partitionSize = (long) Math.ceil((double) totalSize / (double) numNodes);
Expand Down
Loading