Skip to content

Commit

Permalink
Fix flaky test and log level/msgs and enable auto-expand replication (#…
Browse files Browse the repository at this point in the history
…202)

* Fix flaky test and log level/messages and enable auto-expand replica of AD job index

This PR (hopefully, as I cannot reproduce the failure locally) fixed flaky tests in MultiEntityResultTests. The tests are flaky, maybe because we expect two pages in our pagination, but we may create more than two pages due to a race condition. Please read comments in MultiEntityResultTests for detail.

This PR also changes the log level of the updating real-time task log from info to debug. We don't need info as Opensearch prints the log repeatedly in each interval. Also, I changed the log message in ADTaskManager to match what the relevant code does.

This PR also enables auto-expand replication for AD job indexes. The job scheduler puts both primary and replica shards in the hash ring. Enabling auto-expand the number of replicas based on the number of data nodes (up to 20) in the cluster so that each node can become a coordinating node. Enabling auto-expanding is useful when customers scale out their cluster so that we can do adaptive scaling accordingly. Also, this PR changed the primary number of shards of the AD job index to 1 as the AD job index is small.

Testing done:
1. Checked that the AD job index setting change is effective and won't negatively impact normal e2e workflow.
  • Loading branch information
kaituo authored and ohltyler committed Sep 23, 2021
1 parent 6093263 commit fa932a4
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 77 deletions.
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.common.exception.ResourceNotFoundException',
'org.opensearch.ad.task.ADTaskSlotLimit',
'org.opensearch.ad.task.ADTaskCacheManager',

// TODO: to be fixed by kaituo
'org.opensearch.ad.indices.AnomalyDetectionIndices'
]

jacocoTestCoverageVerification {
Expand Down
1 change: 0 additions & 1 deletion docs/ml-rfc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ In the meanwhile, we observe more and more machine learning features required to
* **Machine Learning in SIEM**: SIEM(Security Information and Event Management) is another domain in OpenSearch. Machine learning is also very useful in SIEM to help facilitate security analytics, and it can reduce the effort on sophisticated tasks, enable real time threat analysis and uncover anomalies.

## Solution
![](./images/ml-arch.png)
The solution is to introduce a new Machine Learning Framework inside the OpenSearch cluster, and all ML jobs only run and are managed in it. Existing functionalities: security, node communication, node management, can be leveraged. The major functionalities in this solution include:

* **Unified Client Interfaces:** clients can use common interfaces for training and inference tasks, and then follow the algorithm interface to give right input parameters, such as input data, hyperparameters. A client library will be built for easy use.
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ protected void runAdJob(
);
return;
}
indexUtil.updateMappingIfNecessary();
indexUtil.update();
/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
Expand Down Expand Up @@ -643,10 +643,11 @@ private void updateLatestRealtimeTask(
Long detectorIntervalInMinutes,
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTask(detectorId, taskState, rcfTotalUpdates, detectorIntervalInMinutes, error, ActionListener.wrap(r -> {
log
.info(
.debug(
"Updated latest realtime task successfully for detector {}, taskState: {},"
+ " RCF total update: {}, detectorIntervalInMinutes: {}",
detectorId,
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,14 @@ public Collection<Object> createComponents(
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
this.clusterService = clusterService;

SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ private void processResponse(SearchResponse response, Runnable retry, ActionList

try {
Page page = analyzePage(response);
if (totalResults < maxEntities && afterKey != null) {
// we can process at most maxEntities entities
if (totalResults <= maxEntities && afterKey != null) {
updateCompositeAfterKey(response, source);
listener.onResponse(page);
} else {
Expand Down
249 changes: 219 additions & 30 deletions src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.ad.NodeStateManager;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.caching.CacheProvider;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
Expand Down Expand Up @@ -168,6 +170,10 @@ protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatur
// lazy init since we don't expect retryable requests to happen often
Set<String> retryableRequests = null;
Set<String> notFoundModels = null;
boolean printedUnexpectedFailure = false;
// contain requests that we will set the detector's exception to
// EndRunException (stop now = false)
Map<String, Exception> stopDetectorRequests = null;
for (MultiGetItemResponse itemResponse : itemResponses) {
String modelId = itemResponse.getId();
if (itemResponse.isFailed()) {
Expand All @@ -189,7 +195,17 @@ protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatur
LOG.error("too many get AD model checkpoint requests or shard not available");
setCoolDownStart();
} else {
LOG.error("Unexpected failure", failure);
// some unexpected bug occurred or cluster is unstable (e.g., ClusterBlockException) or index is red (e.g.
// NoShardAvailableActionException) while fetching a checkpoint. As this might happen for a large amount
// of entities, we don't want to flood logs with such exception trace. Only print it once.
if (!printedUnexpectedFailure) {
LOG.error("Unexpected failure", failure);
printedUnexpectedFailure = true;
}
if (stopDetectorRequests == null) {
stopDetectorRequests = new HashMap<>();
}
stopDetectorRequests.put(modelId, failure);
}
} else if (!itemResponse.getResponse().isExists()) {
// lazy init as we don't expect retrying happens often
Expand All @@ -213,6 +229,22 @@ protected ActionListener<MultiGetResponse> getResponseListener(List<EntityFeatur
}
}

// deal with failures that we will retry for a limited amount of times
// before stopping the detector
if (stopDetectorRequests != null) {
for (EntityRequest origRequest : toProcess) {
Optional<String> modelId = origRequest.getModelId();
if (modelId.isPresent() && stopDetectorRequests.containsKey(modelId.get())) {
String adID = origRequest.detectorId;
nodeStateManager
.setException(
adID,
new EndRunException(adID, CommonErrorMessages.BUG_RESPONSE, stopDetectorRequests.get(modelId.get()), false)
);
}
}
}

if (successfulRequests.isEmpty() && (retryableRequests == null || retryableRequests.isEmpty())) {
// don't need to proceed further since no checkpoint is available
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,9 @@ private AnomalyDetectorSettings() {}
// such as "there are at least 10000 entities", the default is set to 10,000. That is, requests will count the
// total entities up to 10,000.
public static final int MAX_TOTAL_ENTITIES_TO_TRACK = 10_000;

// ======================================
// AD Index setting
// ======================================
public static int MAX_UPDATE_RETRY_TIMES = 10_000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.NetworkExceptionHelper;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -793,13 +794,16 @@ private void findException(Throwable cause, String adID, AtomicReference<Excepti
}
} else if (causeException instanceof IndexNotFoundException
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME)) {
// checkpoint index does not exist
// ResourceNotFoundException will trigger cold start later
failure.set(new ResourceNotFoundException(adID, causeException.getMessage()));
} else if (causeException instanceof OpenSearchTimeoutException) {
// we can have OpenSearchTimeoutException when a node tries to load RCF or
// threshold model
failure.set(new InternalFailure(adID, causeException));
} else {
// some unexpected bugs occur while predicting anomaly
// some unexpected bug occurred or cluster is unstable (e.g., ClusterBlockException) or index is red (e.g.
// NoShardAvailableActionException) while predicting anomaly
failure.set(new EndRunException(adID, CommonErrorMessages.BUG_RESPONSE, causeException, false));
}
}
Expand Down Expand Up @@ -1099,6 +1103,7 @@ private boolean hasConnectionIssue(Throwable e) {
|| e instanceof ReceiveTimeoutTransportException
|| e instanceof NodeNotConnectedException
|| e instanceof ConnectException
|| NetworkExceptionHelper.isCloseConnectionException(e)
|| e instanceof ActionNotFoundTransportException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected void adExecute(
ThreadContext.StoredContext storedContext,
ActionListener<IndexAnomalyDetectorResponse> listener
) {
anomalyDetectionIndices.updateMappingIfNecessary();
anomalyDetectionIndices.update();
String detectorId = request.getDetectorID();
long seqNo = request.getSeqNo();
long primaryTerm = request.getPrimaryTerm();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.ad.TestHelpers;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -72,7 +73,14 @@ public void setup() {

nodeFilter = new DiscoveryNodeFilterer(clusterService());

indices = new AnomalyDetectionIndices(client(), clusterService(), client().threadPool(), settings, nodeFilter);
indices = new AnomalyDetectionIndices(
client(),
clusterService(),
client().threadPool(),
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
}

public void testAnomalyDetectorIndexNotExists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,14 @@ public void setUp() throws Exception {
clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build();
when(clusterService.state()).thenReturn(clusterState);

adIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
adIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -175,7 +182,11 @@ private void adaptivePrimaryShardsIndexCreationTemplate(String index) throws IOE
}

Settings settings = request.settings();
assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(numberOfHotNodes)));
if (index.equals(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)) {
assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(1)));
} else {
assertThat(settings.get("index.number_of_shards"), equalTo(Integer.toString(numberOfHotNodes)));
}

ActionListener<CreateIndexResponse> listener = (ActionListener<CreateIndexResponse>) invocation.getArgument(1);

Expand Down
9 changes: 8 additions & 1 deletion src/test/java/org/opensearch/ad/indices/RolloverTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,14 @@ public void setUp() throws Exception {
numberOfNodes = 2;
when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes);

adIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
adIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);

clusterAdminClient = mock(ClusterAdminClient.class);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
Expand Down
Loading

0 comments on commit fa932a4

Please sign in to comment.