Skip to content

Commit

Permalink
Backport PR 238 to 1.x (#240)
Browse files Browse the repository at this point in the history
* backport changes on ThresholdedRandomCutForest integration

* pass the correct shingleSize to ThresholdedRandomCutForest

Previously, I used shingleSize 1 for externally shingled ThresholdedRandomCutForest because of the double multiplication with shingle size in RCF.  Now RCF has fixed the issue. This commits adds new RCF libraries from aws/random-cut-forest-by-aws#278 and passes the correct shingleSize to ThresholdedRandomCutForest.
  • Loading branch information
kaituo authored Sep 27, 2021
1 parent 59ae734 commit d3c79f7
Show file tree
Hide file tree
Showing 70 changed files with 2,232 additions and 2,318 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,12 @@ dependencies {
// implementation scope let the dependency in both compiling and running classpath, but
// not leaked through to clients (Opensearch). Here we force the jackson version to whatever
// opensearch uses.
compile 'software.amazon.randomcutforest:randomcutforest-core:2.0.1'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:2.0.1'
implementation "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
compile files('lib/randomcutforest-parkservices-2.0.1.jar')
compile files('lib/randomcutforest-core-2.0.1.jar')

// used for serializing/deserializing rcf models.
compile group: 'io.protostuff', name: 'protostuff-core', version: '1.7.4'
Expand Down
Binary file added lib/randomcutforest-core-2.0.1.jar
Binary file not shown.
Binary file added lib/randomcutforest-parkservices-2.0.1.jar
Binary file not shown.
36 changes: 10 additions & 26 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.opensearch.ad.ml.EntityColdStarter;
import org.opensearch.ad.ml.HybridThresholdingModel;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ml.ModelPartitioner;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
Expand Down Expand Up @@ -205,9 +204,10 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestState;
import com.amazon.randomcutforest.serialize.json.v1.V1JsonToV2StateConverter;
import com.amazon.randomcutforest.state.RandomCutForestMapper;
import com.amazon.randomcutforest.state.RandomCutForestState;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand Down Expand Up @@ -374,8 +374,6 @@ public Collection<Object> createComponents(
mapper.setSaveExecutorContextEnabled(true);
mapper.setSaveTreeStateEnabled(true);
mapper.setPartialTreeStateEnabled(true);
Schema<RandomCutForestState> schema = AccessController
.doPrivileged((PrivilegedAction<Schema<RandomCutForestState>>) () -> RuntimeSchema.getSchema(RandomCutForestState.class));
V1JsonToV2StateConverter converter = new V1JsonToV2StateConverter();

double modelMaxSizePercent = AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE.get(settings);
Expand All @@ -390,21 +388,13 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

ModelPartitioner modelPartitioner = new ModelPartitioner(
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE,
AnomalyDetectorSettings.NUM_TREES,
nodeFilter,
memoryTracker
);

NodeStateManager stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
modelPartitioner,
clusterService
);

Expand Down Expand Up @@ -455,13 +445,19 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
CommonName.CHECKPOINT_INDEX_NAME,
gson,
mapper,
schema,
converter,
new ThresholdedRandomCutForestMapper(),
AccessController
.doPrivileged(
(PrivilegedAction<Schema<ThresholdedRandomCutForestState>>) () -> RuntimeSchema
.getSchema(ThresholdedRandomCutForestState.class)
),
HybridThresholdingModel.class,
anomalyDetectionIndices,
AnomalyDetectorSettings.MAX_CHECKPOINT_BYTES,
serializeRCFBufferPool,
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES,
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE
);

Random random = new Random(42);
Expand Down Expand Up @@ -518,11 +514,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
interpolator,
searchFeatureDao,
AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
AnomalyDetectorSettings.THRESHOLD_MAX_RANK_ERROR,
AnomalyDetectorSettings.THRESHOLD_MAX_SCORE,
AnomalyDetectorSettings.THRESHOLD_NUM_LOGNORMAL_QUANTILES,
AnomalyDetectorSettings.THRESHOLD_DOWNSAMPLES,
AnomalyDetectorSettings.THRESHOLD_MAX_SAMPLES,
featureManager,
settings,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
Expand Down Expand Up @@ -557,16 +548,10 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.TIME_DECAY,
AnomalyDetectorSettings.NUM_MIN_SAMPLES,
AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
AnomalyDetectorSettings.THRESHOLD_MAX_RANK_ERROR,
AnomalyDetectorSettings.THRESHOLD_MAX_SCORE,
AnomalyDetectorSettings.THRESHOLD_NUM_LOGNORMAL_QUANTILES,
AnomalyDetectorSettings.THRESHOLD_DOWNSAMPLES,
AnomalyDetectorSettings.THRESHOLD_MAX_SAMPLES,
AnomalyDetectorSettings.MIN_PREVIEW_SIZE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
entityColdStarter,
modelPartitioner,
featureManager,
memoryTracker
);
Expand Down Expand Up @@ -757,7 +742,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
nodeFilter,
multiEntityResultHandler,
checkpoint,
modelPartitioner,
cacheProvider,
adTaskManager,
adBatchTaskRunner,
Expand Down
156 changes: 91 additions & 65 deletions src/main/java/org/opensearch/ad/MemoryTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.util.MathUtil;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.monitor.jvm.JvmService;

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
* Class to track AD memory usage.
Expand Down Expand Up @@ -103,11 +102,11 @@ public MemoryTracker(
* This function derives from the old code: https://tinyurl.com/2eaabja6
*
* @param detectorId Detector Id
* @param rcf Random cut forest model
* @param trcf Thresholded random cut forest model
* @return true if there is enough memory; otherwise throw LimitExceededException.
*/
public synchronized boolean isHostingAllowed(String detectorId, RandomCutForest rcf) {
long requiredBytes = estimateTotalModelSize(rcf);
public synchronized boolean isHostingAllowed(String detectorId, ThresholdedRandomCutForest trcf) {
long requiredBytes = estimateTRCFModelSize(trcf);
if (canAllocateReserved(requiredBytes)) {
return true;
} else {
Expand Down Expand Up @@ -175,46 +174,29 @@ private void adjustOriginMemoryRelease(long memoryToConsume, Origin origin, Map<
/**
* Gets the estimated size of an entity's model.
*
* @param forest RCF forest object
* @param trcf ThresholdedRandomCutForest object
* @return estimated model size in bytes
*/
public long estimateTotalModelSize(RandomCutForest forest) {
return estimateRCFModelSize(forest.getDimensions(), forest.getNumberOfTrees(), forest.getBoundingBoxCacheFraction())
+ thresholdModelBytes;
}

/**
* Gets the estimated size of a RCF model.
*
* @param forest RCF forest object
* @return estimated model size in bytes
*/
public long estimateRCFModelSize(RandomCutForest forest) {
return estimateRCFModelSize(forest.getDimensions(), forest.getNumberOfTrees(), forest.getBoundingBoxCacheFraction());
}

/**
* Gets the estimated size of an entity's model according to
* the detector configuration.
*
* @param detector detector config object
* @param numberOfTrees the number of trees in a RCF forest
* @param boundingBoxCacheFraction Bounding box cache ratio in RCF
* @return estimated model size in bytes
*/
public long estimateTotalModelSize(AnomalyDetector detector, int numberOfTrees, double boundingBoxCacheFraction) {
return estimateRCFModelSize(
detector.getEnabledFeatureIds().size() * detector.getShingleSize(),
numberOfTrees,
boundingBoxCacheFraction
) + thresholdModelBytes;
public long estimateTRCFModelSize(ThresholdedRandomCutForest trcf) {
RandomCutForest forest = trcf.getForest();
return estimateTRCFModelSize(
forest.getDimensions(),
forest.getNumberOfTrees(),
forest.getBoundingBoxCacheFraction(),
forest.getShingleSize(),
forest.isInternalShinglingEnabled()
);
}

/**
* Gets the estimated size of an entity's model.
*
* RCF size:
* Assume the sample size is 256. A compact RCF forest consists of:
* Assume the sample size is 256. I measured the memory size of a ThresholdedRandomCutForest
* using heap dump. A ThresholdedRandomCutForest comprises a compact rcf model and
* a threshold model.
*
* A compact RCF forest consists of:
* - Random number generator: 56 bytes
* - PointStoreCoordinator: 24 bytes
* - SequentialForestUpdateExecutor: 24 bytes
Expand All @@ -224,8 +206,27 @@ public long estimateTotalModelSize(AnomalyDetector detector, int numberOfTrees,
* - int array for free indexes: 256 * numberOfTrees * 4, where 4 is the size of an integer
* - two int array for locationList and refCount: 256 * numberOfTrees * 4 bytes * 2
* - a float array for data store: 256 * trees * dimension * 4 bytes: due to various
* optimization like shingleSize(dimensions), we don't use all of the array. The actual
* usage percentage is
* optimization like shingleSize(dimensions), we don't use all of the array. The average
* usage percentage depends on shingle size and if internal shingling is enabled.
* I did experiments with power-of-two shingle sizes and internal shingling on/off
* by running ThresholdedRandomCutForest over a million points.
* My experiment shows that
* * if internal shingling is off, data store is filled at full
* capacity.
* * otherwise, data store usage depends on shingle size:
*
* Shingle Size usage
* 1 1
* 2 0.53
* 4 0.27
* 8 0.27
* 16 0.13
* 32 0.07
* 64 0.07
*
* The formula reflects the data and fits the point store usage to the closest power-of-two case.
* For example, if shingle size is 17, we use the usage 0.13 since it is closer to 16.
*
* {@code IF(dimensions>=32, 1/(LOG(dimensions+1, 2)+LOG(dimensions+1, 10)), 1/LOG(dimensions+1, 2))}
* where LOG gets the logarithm of a number and the syntax of LOG is {@code LOG (number, [base])}.
* We derive the formula by observing the point store usage ratio is a decreasing function of dimensions
Expand All @@ -238,42 +239,67 @@ public long estimateTotalModelSize(AnomalyDetector detector, int numberOfTrees,
* - SmallNodeStore (small node store since our sample size is 256, less than the max of short): 6120
* + BoxCacheFloat
* - other: 104
* - BoundingBoxFloat: (1040 + 255* (dimension * 4 * 2 + 64)) * adjusted bounding box cache usage,
* where if full we have 255 inner node and each box has 80 bytes.
* Plus metadata, we can have in total 21544 bytes.
* {@code adjusted bounding box cache usage = (bounding box cache fraction >= 0.3? 1: bounding box cache fraction)}
* {@code >= 0.3} we will still initialize bounding box cache array of the max size,
* but exclude them using the cache ratio. It is not guaranteed we will only
* use cache ratio in the array. For example, with cache ratio 0.5, we used 150
* out of 255 elements. So can have two float array whose size is the number of
* dimensions; other constants are the metadata size.
* - BoundingBoxFloat: (1040 + 255* ((dimension * 4 + 16) * 2 + 32)) * actual bounding box cache usage,
* {@code actual bounding box cache usage = (bounding box cache fraction >= 0.3? 1: bounding box cache fraction)}
* {@code >= 0.3} we will still initialize bounding box cache array of the max size.
* 1040 is the size of BoundingBoxFloat's fields unrelated to tree size (255 nodes in our formula)
* In total, RCF size is
* 56 + # trees * (2248 + 152 + 6120 + 104 + (1040 + 255* (dimension * 4 * 2 + 64)) * adjusted bounding box cache ratio) +
* (256 * # trees * 2 + 256 * # trees * dimension) * 4 bytes * 0.5 + 1064 + 24 + 24 + 16
* = 56 + # trees * (8624 + (1040 + 255 * (dimension * 8 + 64)) * adjusted bounding box cache ratio) + 256 * # trees *
* (3 + dimension) * 4 * 0.5 + 1128
* 56 + # trees * (2248 + 152 + 6120 + 104 + (1040 + 255* (dimension * 4 + 16) * 2 + 32)) * adjusted bounding box cache ratio) +
* (256 * # trees * 2 + 256 * # trees * dimension) * 4 bytes * point store ratio + 30744 * 2 + 15432 + 208) + 24 + 24 + 16
* = 56 + # trees * (8624 + (1040 + 255 * (dimension * 8 + 64)) * actual bounding box cache usage) + 256 * # trees *
* dimension * 4 * point store ratio + 77192
*
* Thresholder size
* + Preprocessor:
* - lastShingledInput and lastShingledPoint: 2*(dimension*8 + 16) (2 due to 2 double arrays, 16 are array object size)
* - previousTimeStamps: shingle*8
* - other: 248
* - BasicThrehsolder: 256
* + lastAnomalyAttribution:
* - high and low: 2*(dimension*8 + 16)(2 due to 2 double arrays, 16 are array object)
* - other 24
* - lastAnomalyPoint and lastExpectedPoint: 2*(dimension*8 + 16)
* - other like ThresholdedRandomCutForest object size: 96
* In total, thresholder size is:
* 6*(dimension*8 + 16) + shingle*8 + 248 + 256 + 24 + 96
* = 6*(dimension*8 + 16) + shingle*8 + 624
*
* @param dimension The number of feature dimensions in RCF
* @param numberOfTrees The number of trees in RCF
* @param boundingBoxCacheFraction Bounding box cache ratio in RCF
* @return estimated RCF model size
* @param boundingBoxCacheFraction Bounding box cache usage in RCF
* @param shingleSize shingle size
* @param internalShingling whether internal shingling is enabled or not
* @return estimated TRCF model size
*
* @throws IllegalArgumentException when the input shingle size is out of range [1, 64]
*/
public long estimateRCFModelSize(int dimension, int numberOfTrees, double boundingBoxCacheFraction) {
public long estimateTRCFModelSize(
int dimension,
int numberOfTrees,
double boundingBoxCacheFraction,
int shingleSize,
boolean internalShingling
) {
double averagePointStoreUsage = 0;
int logNumber = dimension + 1;
if (dimension >= 32) {
averagePointStoreUsage = 1.0d / (MathUtil.log2(logNumber) + Math.log10(logNumber));
if (!internalShingling || shingleSize == 1) {
averagePointStoreUsage = 1;
} else if (shingleSize <= 3) {
averagePointStoreUsage = 0.53;
} else if (shingleSize <= 12) {
averagePointStoreUsage = 0.27;
} else if (shingleSize <= 24) {
averagePointStoreUsage = 0.13;
} else if (shingleSize <= 64) {
averagePointStoreUsage = 0.07;
} else {
averagePointStoreUsage = 1.0d / MathUtil.log2(logNumber);
throw new IllegalArgumentException("out of range shingle size " + shingleSize);
}

double actualBoundingBoxUsage = boundingBoxCacheFraction >= 0.3 ? 1d : boundingBoxCacheFraction;
long compactRcfSize = (long) (56 + numberOfTrees * (8624 + (1040 + 255 * (dimension * 8 + 64)) * actualBoundingBoxUsage) + 256
* numberOfTrees * (3 + dimension) * 4 * averagePointStoreUsage + 1128);
return compactRcfSize;
}

public long estimateTotalModelSize(int dimension, int numberOfTrees, double boundingBoxCacheFraction) {
return estimateRCFModelSize(dimension, numberOfTrees, boundingBoxCacheFraction) + thresholdModelBytes;
* numberOfTrees * dimension * 4 * averagePointStoreUsage + 77192);
long thresholdSize = 6 * (dimension * 8 + 16) + shingleSize * 8 + 624;
return compactRcfSize + thresholdSize;
}

/**
Expand Down
Loading

0 comments on commit d3c79f7

Please sign in to comment.