Skip to content

Commit

Permalink
Refactoring NodeStateManager etc. to support forecasting functionality
Browse files Browse the repository at this point in the history
This commit extends the codebase to support both Anomaly Detection (AD) and forecasting. It contains a mixture of refactoring, renaming, removal of unused code, and package moving tasks. Here are the details:

Refactoring:
- `NodeStateManager.getAnomalyDetector` is now `getConfig`, with added functionality to fetch a Forecaster. The method comments are updated for clarity.
- Existing methods (`getFeatureSamplesForPeriods`, `getColdStartSamplesForPeriods`, `createPreviewSearchRequest`, `getMinDataTime`) have been added in `SearchFeatureDao` to handle forecasting logic.
- Adjusted `SecurityClientUtil` and `ParseUtils` to handle forecasting logic.
- Cleaned up `NodeState` to differentiate state for AD and forecasting.

Renaming:
- `AnomalyDetectorJob` is renamed to `Job` to facilitate reuse for forecasting.
- `NodeStateManager.getAnomalyDetectorJob` is renamed to `getJob`.
- Certain settings in `AnomalyDetectorSettings` are renamed to reflect they are meant for the AD setting. They have been marked as deprecated and new settings are used in `TimeSeriesSettings` instead.
- `IndexAnomalyDetectorJobActionHandler.getAnomalyDetectorJobForWrite` is renamed to `getJobForWrite`.
- `ADSafeSecurityInjector` is renamed to `TimeSeriesSafeSecurityInjector`.

Removing unused code:
- Synchronous code in `ClientUtil`, `IndexUtils`, and `CheckpointDao` is removed.
- The unused class `Throttler` is deleted.
- Mapping file names are changed, and the code referencing these files is adjusted.

Package moving:
- Several classes (`ClientUtil`, `MultiResponsesDelegateActionListener`, `SafeSecurityInjector`, `SecurityUtil`, `ExceptionUtil`, `SearchFeatureDao`, `CleanState`, `ExpiringState`, `MaintenanceState`, `NodeState`, `SingleStreamModelIdMapper`, `BackPressureRouting`) are moved to the respective `org.opensearch.timeseries` packages.

Miscellaneous:
- Fixed compiler failures caused by changes in opensearch-project/OpenSearch#8730 by replacing `DoubleArrayList` with `java.util.ArrayList`.

Testing:
- Executed a `gradle build`.
- Added new tests for `ClientUtil` and `NodeStateManager`.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jul 20, 2023
1 parent f0ed43b commit 6b618e6
Show file tree
Hide file tree
Showing 164 changed files with 1,691 additions and 1,646 deletions.
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,6 @@ List<String> jacocoExclusions = [
'org.opensearch.timeseries.settings.TimeSeriesSettings',
'org.opensearch.forecast.settings.ForecastSettings',

'org.opensearch.ad.util.ClientUtil',

'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

Expand Down
36 changes: 19 additions & 17 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -58,11 +56,15 @@
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.InternalFailure;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.SecurityUtil;

import com.google.common.base.Throwables;

Expand Down Expand Up @@ -134,12 +136,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
String detectorId = scheduledJobParameter.getName();
log.info("Start to run AD job {}", detectorId);
adTaskManager.refreshRealtimeJobRunTime(detectorId);
if (!(scheduledJobParameter instanceof AnomalyDetectorJob)) {
if (!(scheduledJobParameter instanceof Job)) {
throw new IllegalArgumentException(
"Job parameter is not instance of AnomalyDetectorJob, type: " + scheduledJobParameter.getClass().getCanonicalName()
);
}
AnomalyDetectorJob jobParameter = (AnomalyDetectorJob) scheduledJobParameter;
Job jobParameter = (Job) scheduledJobParameter;
Instant executionStartTime = Instant.now();
IntervalSchedule schedule = (IntervalSchedule) jobParameter.getSchedule();
Instant detectionStartTime = executionStartTime.minus(schedule.getInterval(), schedule.getUnit());
Expand All @@ -148,12 +150,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont

Runnable runnable = () -> {
try {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();
AnomalyDetector detector = (AnomalyDetector) detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
Expand Down Expand Up @@ -216,7 +218,7 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
* @param detector associated detector accessor
*/
protected void runAdJob(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -284,7 +286,7 @@ protected void runAdJob(
}

private void runAnomalyDetectionJob(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -393,7 +395,7 @@ private void runAnomalyDetectionJob(
* @param detector associated detector accessor
*/
protected void handleAdException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -482,7 +484,7 @@ protected void handleAdException(
}

private void stopAdJobForEndRunException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -524,9 +526,9 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
if (job.isEnabled()) {
AnomalyDetectorJob newJob = new AnomalyDetectorJob(
Job newJob = new Job(
job.getName(),
job.getSchedule(),
job.getWindowDelay(),
Expand Down Expand Up @@ -566,7 +568,7 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
}

private void indexAnomalyResult(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -590,7 +592,7 @@ private void indexAnomalyResult(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -621,7 +623,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -646,7 +648,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -666,7 +668,7 @@ private void indexAnomalyResultException(
}
}

private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) {
private void releaseLock(Job jobParameter, LockService lockService, LockModel lock) {
lockService
.release(
lock,
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -49,9 +48,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -68,11 +64,16 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down Expand Up @@ -159,7 +160,7 @@ private void prepareProfile(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -315,6 +316,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
} else {
Expand Down Expand Up @@ -368,6 +370,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
}
Expand Down Expand Up @@ -418,7 +421,7 @@ private void profileStateRelated(
private void profileModels(
AnomalyDetector detector,
Set<DetectorProfileName> profiles,
AnomalyDetectorJob job,
Job job,
boolean forMultiEntityDetector,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
Expand All @@ -430,7 +433,7 @@ private void profileModels(
private ActionListener<ProfileResponse> onModelResponse(
AnomalyDetector detector,
Set<DetectorProfileName> profilesToCollect,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
boolean isMultientityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -464,7 +467,7 @@ private ActionListener<ProfileResponse> onModelResponse(
}

private void profileMultiEntityDetectorStateRelated(
AnomalyDetectorJob job,
Job job,
Set<DetectorProfileName> profilesToCollect,
ProfileResponse profileResponse,
DetectorProfile.Builder profileBuilder,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityAnomalyResult;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;

/**
* Runner to trigger an anomaly detector.
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityProfile;
import org.opensearch.ad.model.EntityProfileName;
Expand All @@ -38,8 +37,6 @@
import org.opensearch.ad.transport.EntityProfileAction;
import org.opensearch.ad.transport.EntityProfileRequest;
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -53,11 +50,15 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;

public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
Expand Down Expand Up @@ -188,6 +189,7 @@ private void validateEntity(
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);

Expand Down Expand Up @@ -228,7 +230,7 @@ private void getJob(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);

int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
Expand Down Expand Up @@ -331,7 +333,7 @@ private void profileStateRelated(
Entity entityValue,
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
) {
if (totalUpdates == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;

public class ExecuteADResultResponseRecorder {
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);
Expand Down Expand Up @@ -337,20 +339,20 @@ private void confirmTotalRCFUpdatesFound(
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
nodeStateManager.getJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(detectorId, "fail to get job"));
return;
}

ProfileUtil
.confirmDetectorRealtimeInitStatus(
detectorOptional.get(),
(AnomalyDetector) detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/CacheBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.ExpiringState;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.EntityModel;
Expand All @@ -36,6 +35,7 @@
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.timeseries.ExpiringState;

/**
* We use a layered cache to manage active entities’ states. We have a two-level
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/caching/DoorKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.ExpiringState;
import org.opensearch.ad.MaintenanceState;
import org.opensearch.timeseries.ExpiringState;
import org.opensearch.timeseries.MaintenanceState;

import com.google.common.base.Charsets;
import com.google.common.hash.BloomFilter;
Expand Down
Loading

0 comments on commit 6b618e6

Please sign in to comment.