Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade rcf to 4.0 #1173

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:

jobs:
backport:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
permissions:
contents: write
Expand All @@ -25,4 +26,5 @@ jobs:
uses: VachaShah/[email protected]
with:
github_token: ${{ steps.github_app_token.outputs.token }}
branch_name: backport/backport-${{ github.event.number }}
head_template: backport/backport-<%= number %>-to-<%= base %>
failure_labels: backport-failed
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

# anomaly-detection
- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Build and Run Tests
run: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test_build_multi_platform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:

- name: Build and Run Tests
run: |
./gradlew build
./gradlew build -x spotlessJava
- name: Publish to Maven Local
run: |
./gradlew publishToMavenLocal
Expand Down Expand Up @@ -85,13 +85,13 @@ jobs:
java-version: ${{ matrix.java }}

- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Assemble / build / mavenlocal / integTest
run: |
chown -R 1000:1000 `pwd`
su `id -un 1000` -c "./gradlew assemble &&
./gradlew build &&
./gradlew build -x spotlessJava &&
./gradlew publishToMavenLocal &&
./gradlew integTest -PnumNodes=3"
- name: Upload Coverage Report
Expand Down Expand Up @@ -127,7 +127,7 @@ jobs:
./gradlew assemble
- name: Build and Run Tests
run: |
./gradlew build
./gradlew build -x spotlessJava
- name: Publish to Maven Local
run: |
./gradlew publishToMavenLocal
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_bwc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

# anomaly-detection
- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved

- name: Assemble anomaly-detection
run: |
Expand Down
23 changes: 13 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.10.0"
bwcVersionShort = "2.14.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand All @@ -149,6 +149,9 @@ dependencies {
exclude group: 'org.ow2.asm', module: 'asm-tree'
}

// used for output encoding of config descriptions
implementation group: 'org.owasp.encoder' , name: 'encoder', version: '1.2.3'
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved

testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.9.0'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.3'
Expand Down Expand Up @@ -538,7 +541,7 @@ List<Provider<RegularFile>> plugins = [

// Creates 2 test clusters with 3 nodes of the old version.
2.times {i ->
task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) {
task "${baseName}#oldVersionClusterTask$i"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}$i"
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand All @@ -554,7 +557,7 @@ List<Provider<RegularFile>> plugins = [
// Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version
// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node.
// This is also used as a one third upgraded cluster for a rolling upgrade.
task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#mixedClusterTask"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}0"
dependsOn "${baseName}#oldVersionClusterTask0"
doFirst {
Expand All @@ -573,7 +576,7 @@ task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
// Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes.
// This is used for rolling upgrade.
task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#twoThirdsUpgradedClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#mixedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
Expand All @@ -592,7 +595,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas
// Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
// This results in a fully upgraded cluster.
// This is used for rolling upgrade.
task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#rollingUpgradeClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#twoThirdsUpgradedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
Expand All @@ -611,7 +614,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask)

// Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
// at the same time resulting in a fully upgraded cluster.
task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#fullRestartClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#oldVersionClusterTask1"
useCluster testClusters."${baseName}1"
doFirst {
Expand All @@ -627,7 +630,7 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
}

// A bwc test suite which runs all the bwc tasks combined.
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
task bwcTestSuite(type: RestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
dependsOn tasks.named("${baseName}#mixedClusterTask")
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ private Optional<ThresholdedRandomCutForest> convertToTRCF(Optional<RandomCutFor
if (kllThreshold.isPresent()) {
scores = kllThreshold.get().extractScores();
}
return Optional.of(new ThresholdedRandomCutForest(rcf.get(), anomalyRate, scores));
// last parameter is lastShingledInput. Since we don't know it, use all 0 double array
return Optional.of(new ThresholdedRandomCutForest(rcf.get(), anomalyRate, scores, new double[rcf.get().getDimensions()]));
}

/**
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,7 @@
import static org.opensearch.ad.constant.ADCommonMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR;
import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX;
import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.ADTask.COORDINATING_NODE_FIELD;
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
import static org.opensearch.ad.model.ADTask.ERROR_FIELD;
import static org.opensearch.ad.model.ADTask.ESTIMATED_MINUTES_LEFT_FIELD;
import static org.opensearch.ad.model.ADTask.EXECUTION_END_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.EXECUTION_START_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.INIT_PROGRESS_FIELD;
import static org.opensearch.ad.model.ADTask.IS_LATEST_FIELD;
import static org.opensearch.ad.model.ADTask.LAST_UPDATE_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.PARENT_TASK_ID_FIELD;
import static org.opensearch.ad.model.ADTask.STATE_FIELD;
import static org.opensearch.ad.model.ADTask.STOPPED_BY_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_PROGRESS_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_TYPE_FIELD;
import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES;
Expand All @@ -52,6 +39,19 @@
import static org.opensearch.timeseries.constant.CommonName.TASK_ID_FIELD;
import static org.opensearch.timeseries.model.TaskState.NOT_ENDED_STATES;
import static org.opensearch.timeseries.model.TaskType.taskTypeToString;
import static org.opensearch.timeseries.model.TimeSeriesTask.COORDINATING_NODE_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.ERROR_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.ESTIMATED_MINUTES_LEFT_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.EXECUTION_END_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.EXECUTION_START_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.INIT_PROGRESS_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.IS_LATEST_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.LAST_UPDATE_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.PARENT_TASK_ID_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.STATE_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.STOPPED_BY_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.TASK_PROGRESS_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.TASK_TYPE_FIELD;
import static org.opensearch.timeseries.settings.TimeSeriesSettings.NUM_MIN_SAMPLES;
import static org.opensearch.timeseries.util.ExceptionUtil.getErrorMessage;
import static org.opensearch.timeseries.util.ExceptionUtil.getShardsFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testBackwardsCompatibility() throws Exception {
case MIXED:
// TODO: We have no way to specify whether send request to old node or new node now.
// Add more test later when it's possible to specify request node.
Assert.assertTrue(pluginNames.contains("opensearch-anomaly-detection"));
Assert.assertTrue(pluginNames.contains("opensearch-time-series-analytics"));
Assert.assertTrue(pluginNames.contains("opensearch-job-scheduler"));

// Create single entity detector and start realtime job
Expand Down
59 changes: 28 additions & 31 deletions src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1067,27 +1067,22 @@ public void testDeserializeTRCFModel() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(4.814651669367903);
scores.add(5.566968073093689);
scores.add(5.919907610660049);
scores.add(5.770278090352401);
scores.add(5.319779117320102);

List<Double> grade = new ArrayList<>();
grade.add(1.0);
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
scores.add(5.052069275347555);
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
scores.add(6.117465704461799);
scores.add(6.6401649744661055);
scores.add(6.918514609476484);
scores.add(6.928318158276434);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down Expand Up @@ -1133,21 +1128,22 @@ public void testDeserialize_rcf3_rc3_single_stream_model() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(3.3830441158587066);
scores.add(2.825961659490065);
scores.add(2.4685871670647384);
scores.add(2.3123460886413647);
scores.add(2.1401987653477135);
scores.add(3.678754481587072);
scores.add(3.6809634269790252);
scores.add(3.683659822587799);
scores.add(3.6852688612219646);
scores.add(3.6859330728661064);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down Expand Up @@ -1190,21 +1186,22 @@ public void testDeserialize_rcf3_rc3_hc_model() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data but on RCF4.0 that changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(1.86645896573027);
scores.add(1.8760247712797833);
scores.add(1.6809181763279901);
scores.add(1.7126716645678555);
scores.add(1.323776514074674);
scores.add(2.119532552959117);
scores.add(2.7347456872746325);
scores.add(3.066704948143919);
scores.add(3.2965580521876725);
scores.add(3.1888920146607047);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void testAccuracyOneMinuteIntervalNoInterpolation() throws Exception {
clusterService
);

accuracyTemplate(1, 0.6f, 0.6f);
accuracyTemplate(1, 0.5f, 0.5f);
}

private ModelState<EntityModel> createStateForCacheRelease() {
Expand Down
25 changes: 19 additions & 6 deletions src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.ADUnitTestCase;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.mock.model.MockSimpleLog;
Expand All @@ -89,6 +88,7 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.stats.InternalStatNames;
import org.opensearch.ad.transport.ADStatsNodeResponse;
import org.opensearch.ad.transport.ADStatsNodesResponse;
Expand Down Expand Up @@ -120,6 +120,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.common.exception.DuplicateTaskException;
import org.opensearch.timeseries.constant.CommonName;
Expand All @@ -139,7 +140,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class ADTaskManagerTests extends ADUnitTestCase {
public class ADTaskManagerTests extends AbstractTimeSeriesTest {

private Settings settings;
private Client client;
Expand Down Expand Up @@ -1447,10 +1448,22 @@ public void testForwardRequestToLeadNodeWithNotExistingNode() throws IOException
@SuppressWarnings("unchecked")
public void testScaleTaskLaneOnCoordinatingNode() {
ADTask adTask = mock(ADTask.class);
when(adTask.getCoordinatingNode()).thenReturn(node1.getId());
when(nodeFilter.getEligibleDataNodes()).thenReturn(new DiscoveryNode[] { node1, node2 });
ActionListener<JobResponse> listener = mock(ActionListener.class);
adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, transportService, listener);
try {
// bring up real transport service as mockito cannot mock final method
// and transportService.sendRequest is called. A lot of null pointer
// exception will be thrown if we use mocked transport service.
setUpThreadPool(ADTaskManagerTests.class.getSimpleName());
setupTestNodes(AnomalyDetectorSettings.AD_MAX_ENTITIES_PER_QUERY, AnomalyDetectorSettings.AD_PAGE_SIZE);
when(adTask.getCoordinatingNode()).thenReturn(testNodes[1].getNodeId());
when(nodeFilter.getEligibleDataNodes())
.thenReturn(new DiscoveryNode[] { testNodes[0].discoveryNode(), testNodes[1].discoveryNode() });
ActionListener<JobResponse> listener = mock(ActionListener.class);

adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, testNodes[1].transportService, listener);
} finally {
tearDownTestNodes();
tearDownThreadPool();
}
}

@SuppressWarnings("unchecked")
Expand Down
Loading
Loading