Skip to content

Commit

Permalink
Refactoring NodeStateManager etc. to support forecasting functionality (
Browse files Browse the repository at this point in the history
#965)

* Refactoring NodeStateManager etc. to support forecasting functionality

This commit extends the codebase to support both Anomaly Detection (AD) and forecasting. It contains a mixture of refactoring, renaming, removal of unused code, and package moving tasks. Here are the details:

Refactoring:
- `NodeStateManager.getAnomalyDetector` is now `getConfig`, with added functionality to fetch a Forecaster. The method comments are updated for clarity.
- Existing methods (`getFeatureSamplesForPeriods`, `getColdStartSamplesForPeriods`, `createPreviewSearchRequest`, `getMinDataTime`) have been added in `SearchFeatureDao` to handle forecasting logic.
- Adjusted `SecurityClientUtil` and `ParseUtils` to handle forecasting logic.
- Cleaned up `NodeState` to differentiate state for AD and forecasting.

Renaming:
- `AnomalyDetectorJob` is renamed to `Job` to facilitate reuse for forecasting.
- `NodeStateManager.getAnomalyDetectorJob` is renamed to `getJob`.
- Certain settings in `AnomalyDetectorSettings` are renamed to reflect they are meant for the AD setting. They have been marked as deprecated and new settings are used in `TimeSeriesSettings` instead.
- `IndexAnomalyDetectorJobActionHandler.getAnomalyDetectorJobForWrite` is renamed to `getJobForWrite`.
- `ADSafeSecurityInjector` is renamed to `TimeSeriesSafeSecurityInjector`.

Removing unused code:
- Synchronous code in `ClientUtil`, `IndexUtils`, and `CheckpointDao` is removed.
- The unused class `Throttler` is deleted.
- Mapping file names are changed, and the code referencing these files is adjusted.

Package moving:
- Several classes (`ClientUtil`, `MultiResponsesDelegateActionListener`, `SafeSecurityInjector`, `SecurityUtil`, `ExceptionUtil`, `SearchFeatureDao`, `CleanState`, `ExpiringState`, `MaintenanceState`, `NodeState`, `SingleStreamModelIdMapper`, `BackPressureRouting`) are moved to the respective `org.opensearch.timeseries` packages.

Miscellaneous:
- Fixed compiler failures caused by changes in opensearch-project/OpenSearch#8730 by replacing `DoubleArrayList` with `java.util.ArrayList`.
- Updates the Backwards Compatibility (bwc) version to align with the core's incremented bwc version as per [OpenSearch PR #8670](opensearch-project/OpenSearch#8670). This change prevents the issue described in [OpenSearch Issue #5076](opensearch-project/OpenSearch#5076).

Testing:
- Executed a `gradle build`.
- Added new tests for `ClientUtil` and `NodeStateManager`.

Signed-off-by: Kaituo Li <[email protected]>

* improve comment

Signed-off-by: Kaituo Li <[email protected]>

* fix compiler error and comments

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Jul 27, 2023
1 parent f0ed43b commit 1130a1b
Show file tree
Hide file tree
Showing 165 changed files with 1,708 additions and 1,655 deletions.
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.9.0"
bwcVersionShort = "2.10.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -672,8 +672,6 @@ List<String> jacocoExclusions = [
'org.opensearch.timeseries.settings.TimeSeriesSettings',
'org.opensearch.forecast.settings.ForecastSettings',

'org.opensearch.ad.util.ClientUtil',

'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

Expand Down
38 changes: 20 additions & 18 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -58,11 +56,15 @@
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.InternalFailure;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.SecurityUtil;

import com.google.common.base.Throwables;

Expand Down Expand Up @@ -134,12 +136,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
String detectorId = scheduledJobParameter.getName();
log.info("Start to run AD job {}", detectorId);
adTaskManager.refreshRealtimeJobRunTime(detectorId);
if (!(scheduledJobParameter instanceof AnomalyDetectorJob)) {
if (!(scheduledJobParameter instanceof Job)) {
throw new IllegalArgumentException(
"Job parameter is not instance of AnomalyDetectorJob, type: " + scheduledJobParameter.getClass().getCanonicalName()
"Job parameter is not instance of Job, type: " + scheduledJobParameter.getClass().getCanonicalName()
);
}
AnomalyDetectorJob jobParameter = (AnomalyDetectorJob) scheduledJobParameter;
Job jobParameter = (Job) scheduledJobParameter;
Instant executionStartTime = Instant.now();
IntervalSchedule schedule = (IntervalSchedule) jobParameter.getSchedule();
Instant detectionStartTime = executionStartTime.minus(schedule.getInterval(), schedule.getUnit());
Expand All @@ -148,12 +150,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont

Runnable runnable = () -> {
try {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();
AnomalyDetector detector = (AnomalyDetector) detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
Expand Down Expand Up @@ -216,7 +218,7 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
* @param detector associated detector accessor
*/
protected void runAdJob(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -284,7 +286,7 @@ protected void runAdJob(
}

private void runAnomalyDetectionJob(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -393,7 +395,7 @@ private void runAnomalyDetectionJob(
* @param detector associated detector accessor
*/
protected void handleAdException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -482,7 +484,7 @@ protected void handleAdException(
}

private void stopAdJobForEndRunException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -524,9 +526,9 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
if (job.isEnabled()) {
AnomalyDetectorJob newJob = new AnomalyDetectorJob(
Job newJob = new Job(
job.getName(),
job.getSchedule(),
job.getWindowDelay(),
Expand Down Expand Up @@ -566,7 +568,7 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
}

private void indexAnomalyResult(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -590,7 +592,7 @@ private void indexAnomalyResult(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -621,7 +623,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -646,7 +648,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -666,7 +668,7 @@ private void indexAnomalyResultException(
}
}

private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) {
private void releaseLock(Job jobParameter, LockService lockService, LockModel lock) {
lockService
.release(
lock,
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -49,9 +48,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -68,11 +64,16 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down Expand Up @@ -159,7 +160,7 @@ private void prepareProfile(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -315,6 +316,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
} else {
Expand Down Expand Up @@ -368,6 +370,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
}
Expand Down Expand Up @@ -418,7 +421,7 @@ private void profileStateRelated(
private void profileModels(
AnomalyDetector detector,
Set<DetectorProfileName> profiles,
AnomalyDetectorJob job,
Job job,
boolean forMultiEntityDetector,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
Expand All @@ -430,7 +433,7 @@ private void profileModels(
private ActionListener<ProfileResponse> onModelResponse(
AnomalyDetector detector,
Set<DetectorProfileName> profilesToCollect,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
boolean isMultientityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -464,7 +467,7 @@ private ActionListener<ProfileResponse> onModelResponse(
}

private void profileMultiEntityDetectorStateRelated(
AnomalyDetectorJob job,
Job job,
Set<DetectorProfileName> profilesToCollect,
ProfileResponse profileResponse,
DetectorProfile.Builder profileBuilder,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityAnomalyResult;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;

/**
* Runner to trigger an anomaly detector.
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityProfile;
import org.opensearch.ad.model.EntityProfileName;
Expand All @@ -38,8 +37,6 @@
import org.opensearch.ad.transport.EntityProfileAction;
import org.opensearch.ad.transport.EntityProfileRequest;
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -53,11 +50,15 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;

public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
Expand Down Expand Up @@ -188,6 +189,7 @@ private void validateEntity(
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);

Expand Down Expand Up @@ -228,7 +230,7 @@ private void getJob(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);

int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
Expand Down Expand Up @@ -331,7 +333,7 @@ private void profileStateRelated(
Entity entityValue,
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
) {
if (totalUpdates == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;

public class ExecuteADResultResponseRecorder {
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);
Expand Down Expand Up @@ -337,20 +339,20 @@ private void confirmTotalRCFUpdatesFound(
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
nodeStateManager.getJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(detectorId, "fail to get job"));
return;
}

ProfileUtil
.confirmDetectorRealtimeInitStatus(
detectorOptional.get(),
(AnomalyDetector) detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/CacheBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.ExpiringState;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.EntityModel;
Expand All @@ -36,6 +35,7 @@
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.timeseries.ExpiringState;

/**
* We use a layered cache to manage active entities’ states. We have a two-level
Expand Down
Loading

0 comments on commit 1130a1b

Please sign in to comment.