-
Notifications
You must be signed in to change notification settings - Fork 36
Adds initialization progress to profile API #164
Changes from 4 commits
670d2a9
575c983
c0cf04f
c56889d
4d6b588
9285cd7
66d772f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,8 @@ | |
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest; | ||
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse; | ||
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction; | ||
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler; | ||
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler; | ||
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectorStateHandler; | ||
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; | ||
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext; | ||
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel; | ||
|
@@ -77,8 +78,9 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner { | |
private Client client; | ||
private ClientUtil clientUtil; | ||
private ThreadPool threadPool; | ||
private AnomalyResultHandler anomalyResultHandler; | ||
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler; | ||
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount; | ||
private DetectorStateHandler detectorStateHandler; | ||
|
||
public static AnomalyDetectorJobRunner getJobRunnerInstance() { | ||
if (INSTANCE != null) { | ||
|
@@ -110,7 +112,7 @@ public void setThreadPool(ThreadPool threadPool) { | |
this.threadPool = threadPool; | ||
} | ||
|
||
public void setAnomalyResultHandler(AnomalyResultHandler anomalyResultHandler) { | ||
public void setAnomalyResultHandler(AnomalyIndexHandler<AnomalyResult> anomalyResultHandler) { | ||
this.anomalyResultHandler = anomalyResultHandler; | ||
} | ||
|
||
|
@@ -119,6 +121,10 @@ public void setSettings(Settings settings) { | |
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings); | ||
} | ||
|
||
public void setDetectorStateHandler(DetectorStateHandler detectorStateHandler) { | ||
this.detectorStateHandler = detectorStateHandler; | ||
} | ||
|
||
@Override | ||
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { | ||
String detectorId = jobParameter.getName(); | ||
|
@@ -436,7 +442,8 @@ private void indexAnomalyResult( | |
Instant.now(), | ||
response.getError() | ||
); | ||
anomalyResultHandler.indexAnomalyResult(anomalyResult); | ||
anomalyResultHandler.index(anomalyResult, detectorId); | ||
detectorStateHandler.saveError(response.getError(), detectorId); | ||
kaituo marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are there any reasons not inverse the dependency, i.e., letting detector state handler be a dependency within state manager? there are for two benefits
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. saveError method needs to be called by a source that runs regularly. job runner is the source. Both state manager and detector state handler reacts to job runner. About the two benefits:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
hi, Lai, good questions, before decoupling, we have no job runner and we handle all business logic and exceptions in transport action as it stands on top of other components. After decoupling, job runner is the top component, split the exception handling to job runner to avoid the exception handling logic scattered into multiple components(transport actions, job runners, etc). So job runner will catch all exceptions including its own exceptions, then decide the following error handling such as stop detector if no enough resource, and write error in AD result index. Now Kaituo add Of course, we may have other options to do this, like create a new error handler, pass it to every component to handle exception and persist in AD result individually. Both options have pros and cons, need to do some tradeoff. How about we keep current design for this PR? We can discuss different options and solve the error handling in another PR if we have better option. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not a blocker, as i have approved the pr. it's good to use interface to decouple business logic from work execution dependencies. in another word, consider if the job scheduler code (a work execution dependency) change/migrate to some other dependency for execution, how much needs to re-developed, copy and pasted, re-tested? can/should the work be further reduced? in the ideal case, the interface should serve to protect the business logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the suggestion. Agree to "use interface to decouple business logic from work execution dependencies". I think we can restructure when do AD workbench as it will change a lot of how to schedule detection task/job but not overdesign. Let's discuss more when do AD workbench. |
||
} catch (Exception e) { | ||
log.error("Failed to index anomaly result for " + detectorId, e); | ||
} finally { | ||
|
@@ -490,7 +497,8 @@ private void indexAnomalyResultException( | |
Instant.now(), | ||
errorMessage | ||
); | ||
anomalyResultHandler.indexAnomalyResult(anomalyResult); | ||
anomalyResultHandler.index(anomalyResult, detectorId); | ||
detectorStateHandler.saveError(errorMessage, detectorId); | ||
} catch (Exception e) { | ||
log.error("Failed to index anomaly result for " + detectorId, e); | ||
} finally { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, should we rename
setDetectorStateHandler
assetDetectionStateHandler
? Same question for other...detectorState...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed