Skip to content

Commit

Permalink
Add Support for Handling Missing Data in Anomaly Detection
Browse files Browse the repository at this point in the history
This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Aug 8, 2024
1 parent 63dacaa commit f56a69a
Show file tree
Hide file tree
Showing 139 changed files with 7,139 additions and 1,683 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
matrix:
# each test scenario (rule, hc, single_stream) is treated as a separate job.
test: [rule, hc, single_stream]
test: [rule, hc, single_stream,missing]
fail-fast: false
concurrency:
# The concurrency setting is used to limit the concurrency of each test scenario group to ensure they do not run concurrently on the same machine.
Expand Down Expand Up @@ -48,11 +48,16 @@ jobs:
chown -R 1000:1000 `pwd`
case ${{ matrix.test }} in
rule)
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RuleModelPerfIT' \
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RealTimeRuleModelPerfIT' \
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.HistoricalRuleModelPerfIT' \
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
;;
hc)
su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \
Expand All @@ -66,4 +71,10 @@ jobs:
-Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \
-Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true"
;;
missing)
su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.RealTimeMissingSingleFeatureModelPerfIT' \
-Dtests.seed=60CDDB34427ACD0C -Dtests.security.manager=false \
-Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \
-Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true"
;;
esac
64 changes: 44 additions & 20 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'
// implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
// implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
// implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'
implementation files('lib/randomcutforest-core-4.1.0.jar')
implementation files('lib/randomcutforest-parkservices-4.1.0.jar')
implementation files('lib/randomcutforest-serialization-4.1.0.jar')

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand Down Expand Up @@ -356,8 +359,7 @@ integTest {

if (System.getProperty("model-benchmark") == null || System.getProperty("model-benchmark") == "false") {
filter {
excludeTestsMatching "org.opensearch.ad.e2e.SingleStreamModelPerfIT"
excludeTestsMatching "org.opensearch.ad.e2e.RuleModelPerfIT"
excludeTestsMatching "org.opensearch.ad.e2e.*ModelPerfIT"
}
}

Expand Down Expand Up @@ -676,34 +678,56 @@ List<String> jacocoExclusions = [
// rest layer is tested in integration testing mostly, difficult to mock all of it
'org.opensearch.ad.rest.*',

'org.opensearch.ad.model.ModelProfileOnNode',
'org.opensearch.ad.model.InitProgressProfile',
'org.opensearch.ad.rest.*',
'org.opensearch.ad.AnomalyDetectorJobRunner',

// Class containing just constants. Don't need to test
'org.opensearch.ad.constant.*',
'org.opensearch.forecast.constant.*',
'org.opensearch.timeseries.constant.*',
'org.opensearch.timeseries.settings.TimeSeriesSettings',
'org.opensearch.forecast.settings.ForecastSettings',

'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

// related to transport actions added for security
'org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction.1',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction',
// TODO: fix unstable code coverage caused by null NodeClient issue
// https://github.com/opensearch-project/anomaly-detection/issues/241
'org.opensearch.ad.task.ADBatchTaskRunner',
'org.opensearch.ad.task.ADTaskManager',
// TODO: add forecast test coverage before release

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.timeseries.*',
'org.opensearch.ad.*',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction',
'org.opensearch.ad.ml.ADColdStart',
'org.opensearch.ad.transport.ADHCImputeNodesResponse',
'org.opensearch.timeseries.transport.BooleanNodeResponse',
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
'org.opensearch.timeseries.transport.JobRequest',
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
'org.opensearch.timeseries.ml.Inferencer',
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
'org.opensearch.timeseries.transport.BooleanResponse',
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
'org.opensearch.timeseries.transport.SuggestConfigParamRequest',
'org.opensearch.timeseries.ml.MemoryAwareConcurrentHashmap',
'org.opensearch.timeseries.transport.ResultBulkTransportAction',
'org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler',
'org.opensearch.timeseries.transport.handler.ResultIndexingHandler',
'org.opensearch.ad.transport.ADHCImputeNodeResponse',
'org.opensearch.timeseries.ml.Sample',
'org.opensearch.timeseries.ratelimit.FeatureRequest',
'org.opensearch.ad.transport.ADHCImputeNodeRequest',
'org.opensearch.timeseries.model.ModelProfileOnNode',
'org.opensearch.timeseries.transport.ValidateConfigRequest',
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
'org.opensearch.ad.transport.ADHCImputeRequest',
'org.opensearch.timeseries.transport.BaseDeleteConfigTransportAction.1',
'org.opensearch.timeseries.transport.BaseSuggestConfigParamTransportAction',
'org.opensearch.timeseries.rest.AbstractSearchAction.1',
'org.opensearch.ad.transport.ADSingleStreamResultTransportAction',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker.RequestQueue',
'org.opensearch.timeseries.rest.RestStatsAction',
'org.opensearch.ad.ml.ADCheckpointDao',
'org.opensearch.timeseries.transport.CronRequest',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker',
]


Expand Down
Binary file added lib/randomcutforest-core-4.1.0.jar
Binary file not shown.
Binary file added lib/randomcutforest-parkservices-4.1.0.jar
Binary file not shown.
Binary file added lib/randomcutforest-serialization-4.1.0.jar
Binary file not shown.
6 changes: 2 additions & 4 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public void executeDetector(
startTime.toEpochMilli(),
endTime.toEpochMilli(),
ActionListener.wrap(features -> {
List<ThresholdingResult> entityResults = modelManager
.getPreviewResults(features, detector.getShingleSize(), detector.getTimeDecay());
List<ThresholdingResult> entityResults = modelManager.getPreviewResults(features, detector);
List<AnomalyResult> sampledEntityResults = sample(
parsePreviewResult(detector, features, entityResults, entity),
maxPreviewResults
Expand All @@ -116,8 +115,7 @@ public void executeDetector(
} else {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager
.getPreviewResults(features, detector.getShingleSize(), detector.getTimeDecay());
List<ThresholdingResult> results = modelManager.getPreviewResults(features, detector);
listener.onResponse(sample(parsePreviewResult(detector, features, results, null), maxPreviewResults));
} catch (Exception e) {
onFailure(e, listener, detector.getId());
Expand Down
46 changes: 27 additions & 19 deletions src/main/java/org/opensearch/ad/ml/ADColdStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -171,7 +172,7 @@ protected List<Sample> trainModelFromDataSegments(

double[] firstPoint = pointSamples.get(0).getValueList();
if (firstPoint == null || firstPoint.length == 0) {
logger.info("Return early since data points must not be empty.");
logger.info("Return early since the first data point must not be empty.");
return null;
}

Expand Down Expand Up @@ -216,6 +217,31 @@ protected List<Sample> trainModelFromDataSegments(
}

AnomalyDetector detector = (AnomalyDetector) config;
applyRule(rcfBuilder, detector);

// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

List<Sample> imputed = new ArrayList<>();
for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
// We don't keep missing values during cold start as the actual data may not be reconstructed during the early stage.
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());
imputed.add(new Sample(dataValue, dataSample.getDataStartTime(), dataSample.getDataEndTime()));
}

entityState.setModel(trcf);

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
}

public static void applyRule(ThresholdedRandomCutForest.Builder rcfBuilder, AnomalyDetector detector) {
ThresholdArrays thresholdArrays = IgnoreSimilarExtractor.processDetectorRules(detector);

if (thresholdArrays != null) {
Expand All @@ -235,23 +261,5 @@ protected List<Sample> trainModelFromDataSegments(
rcfBuilder.ignoreNearExpectedFromBelowByRatio(thresholdArrays.ignoreSimilarFromBelowByRatio);
}
}

// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());
}

entityState.setModel(trcf);

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
}
}
50 changes: 50 additions & 0 deletions src/main/java/org/opensearch/ad/ml/ADInferencer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.ml;

import static org.opensearch.timeseries.TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME;

import org.opensearch.ad.caching.ADCacheProvider;
import org.opensearch.ad.caching.ADPriorityCache;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
import org.opensearch.ad.ratelimit.ADColdStartWorker;
import org.opensearch.ad.ratelimit.ADSaveResultStrategy;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.ml.Inferencer;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.stats.Stats;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class ADInferencer extends
Inferencer<ThresholdedRandomCutForest, AnomalyResult, ThresholdingResult, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADColdStart, ADModelManager, ADSaveResultStrategy, ADPriorityCache, ADColdStartWorker> {

public ADInferencer(
ADModelManager modelManager,
Stats stats,
ADCheckpointDao checkpointDao,
ADColdStartWorker coldStartWorker,
ADSaveResultStrategy resultWriteWorker,
ADCacheProvider cache,
ThreadPool threadPool
) {
super(
modelManager,
stats,
StatNames.AD_MODEL_CORRUTPION_COUNT.getName(),
checkpointDao,
coldStartWorker,
resultWriteWorker,
cache,
threadPool,
AD_THREAD_POOL_NAME
);
}

}
Loading

0 comments on commit f56a69a

Please sign in to comment.