Skip to content

Commit

Permalink
Fix flaky test and log level/msgs and enable auto-expand replication (o…
Browse files Browse the repository at this point in the history
…pensearch-project#202)

* Fix flaky test and log level/messages and enable auto-expand replica of AD job index

This PR (hopefully, as I cannot reproduce the failure locally) fixed flaky tests in MultiEntityResultTests. The tests are flaky, maybe because we expect two pages in our pagination, but we may create more than two pages due to a race condition. Please read comments in MultiEntityResultTests for detail.

This PR also changes the log level of the updating real-time task log from info to debug. We don't need info as Opensearch prints the log repeatedly in each interval. Also, I changed the log message in ADTaskManager to match what the relevant code does.

This PR also enables auto-expand replication for AD job indexes. The job scheduler puts both primary and replica shards in the hash ring. Enabling auto-expand the number of replicas based on the number of data nodes (up to 20) in the cluster so that each node can become a coordinating node. Enabling auto-expanding is useful when customers scale out their cluster so that we can do adaptive scaling accordingly. Also, this PR changed the primary number of shards of the AD job index to 1 as the AD job index is small.

Testing done:
1. Checked that the AD job index setting change is effective and won't negatively impact normal e2e workflow.
  • Loading branch information
kaituo authored and ohltyler committed Sep 23, 2021
1 parent c2d9a51 commit d3ad040
Show file tree
Hide file tree
Showing 16 changed files with 487 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
with:
repository: 'opensearch-project/OpenSearch'
path: OpenSearch
ref: '1.x'
ref: '1.1'
- name: Build OpenSearch
working-directory: ./OpenSearch
run: ./gradlew publishToMavenLocal
Expand Down
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.common.exception.ResourceNotFoundException',
'org.opensearch.ad.task.ADTaskSlotLimit',
'org.opensearch.ad.task.ADTaskCacheManager',

// TODO: to be fixed by kaituo
'org.opensearch.ad.indices.AnomalyDetectionIndices'
]

jacocoTestCoverageVerification {
Expand Down
1 change: 0 additions & 1 deletion docs/ml-rfc.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ In the meanwhile, we observe more and more machine learning features required to
* **Machine Learning in SIEM**: SIEM(Security Information and Event Management) is another domain in OpenSearch. Machine learning is also very useful in SIEM to help facilitate security analytics, and it can reduce the effort on sophisticated tasks, enable real time threat analysis and uncover anomalies.

## Solution
![](./images/ml-arch.png)
The solution is to introduce a new Machine Learning Framework inside the OpenSearch cluster, and all ML jobs only run and are managed in it. Existing functionalities: security, node communication, node management, can be leveraged. The major functionalities in this solution include:

* **Unified Client Interfaces:** clients can use common interfaces for training and inference tasks, and then follow the algorithm interface to give right input parameters, such as input data, hyperparameters. A client library will be built for easy use.
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ protected void runAdJob(
);
return;
}
indexUtil.updateMappingIfNecessary();
indexUtil.update();
/*
* We need to handle 3 cases:
* 1. Detectors created by older versions and never updated. These detectors wont have User details in the
Expand Down Expand Up @@ -643,10 +643,11 @@ private void updateLatestRealtimeTask(
Long detectorIntervalInMinutes,
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTask(detectorId, taskState, rcfTotalUpdates, detectorIntervalInMinutes, error, ActionListener.wrap(r -> {
log
.info(
.debug(
"Updated latest realtime task successfully for detector {}, taskState: {},"
+ " RCF total update: {}, detectorIntervalInMinutes: {}",
detectorId,
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,14 @@ public Collection<Object> createComponents(
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, nodeFilter);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
this.clusterService = clusterService;

SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ private void processResponse(SearchResponse response, Runnable retry, ActionList

try {
Page page = analyzePage(response);
if (totalResults < maxEntities && afterKey != null) {
// we can process at most maxEntities entities
if (totalResults <= maxEntities && afterKey != null) {
updateCompositeAfterKey(response, source);
listener.onResponse(page);
} else {
Expand Down
Loading

0 comments on commit d3ad040

Please sign in to comment.