-
Notifications
You must be signed in to change notification settings - Fork 36
Add state and error to profile API #84
Add state and error to profile API #84
Conversation
We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API. We expect three kinds of states: -Disabled: if get ad job api says the job is disabled; -Init: if anomaly score after the last update time of the detector is larger than 0 -Running: if neither of the above applies and no exceptions. Error is populated if error of the latest anomaly result is not empty. Testing done: -manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted -added unit tests to cover above scenario
private SearchRequest createInittedEverRequest(String detectorId, long lastUpdateTimeEpochMs) { | ||
BoolQueryBuilder filterQuery = new BoolQueryBuilder(); | ||
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); | ||
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(lastUpdateTimeEpochMs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use job enabled_time
here, think of the case : detector last update time not changed, but we disabled and restarted job multiple times. So we may get some AD result with non-zero anomaly score which generated before latest job enabled time, but actually the latest AD job is still initializing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Done.
private SearchRequest createLatestAnomalyResultRequest(String detectorId, long lastUpdateTimeEpochMs) { | ||
BoolQueryBuilder filterQuery = new BoolQueryBuilder(); | ||
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); | ||
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(lastUpdateTimeEpochMs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar as line 260, we should use AD job "enabled_time"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} catch (IOException | XContentParseException e) { | ||
String error = "Fail to parse detector with id: " + detectorId; | ||
logger.error(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log exception stack trace to make operation easier? Similar to other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catch block is changed after addressing other comments. New code would log stack trace.
private final ActionListener<T> delegate; | ||
private final AtomicInteger collectedResponseCount; | ||
private final int expectedResponseCount; | ||
private final List<T> savedResponses; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saved
means the responses are from some saved result from ES indices ? Or means we cache these response ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the latter. Added a comment.
controller | ||
.registerHandler( | ||
RestRequest.Method.GET, | ||
String.format(Locale.ROOT, "%s/{%s}/%s/{%s}", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, PROFILE, TYPE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we add some comments about what TYPE means and the supported value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, errorMsg); | ||
} | ||
|
||
private Set<String> getProfilesToCollect(String typesStr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we validate type here and return Set<ProfileName>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
import com.amazon.opendistroforelasticsearch.ad.model.Mergeable; | ||
|
||
public class DelegateActionListener<T extends Mergeable> implements ActionListener<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you design this general delegate listener not only for profile API. Can you add more comments? Suggest to use a more specific name like MultiResponsesDelegateActionListener
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
try { | ||
this.exceptions.add(e.getMessage()); | ||
} finally { | ||
if (collectedResponseCount.incrementAndGet() == expectedResponseCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If expectedResponseCount==0
, collectedResponseCount.incrementAndGet()
will always greater than expectedResponseCount
, please add some validation for expectedResponseCount
, or change to collectedResponseCount.incrementAndGet() >= expectedResponseCount
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Used the latter.
this.delegate.onFailure(new RuntimeException(String.format("Unexpected exceptions"))); | ||
} else { | ||
T response0 = savedResponses.get(0); | ||
LOG.info(response0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why log response0
here? Similar for line 84.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used for debugging. removed.
DelegateActionListener<DetectorProfile> delegateListener = new DelegateActionListener<DetectorProfile>( | ||
listener, | ||
profiles.size(), | ||
"Fail to fetch profile for " + detectorId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, the finalErrorMsg
is "Fail to fetch profile for " + detectorId
.
From line89 of class DelegateActionListener
: this.delegate.onFailure(new RuntimeException(String.format(Locale.ROOT, finalErrorMsg, exceptions)));
, String.format(...)
will not include exceptions, is this by design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format(...) would include exception message. Could you explain your questions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed as we discussed offline.
); | ||
|
||
if (profiles.isEmpty()) { | ||
listener.onFailure(new RuntimeException("Unsupported profile types.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we change to "Must set at least one profile type" to avoid confusion between empty profile types and wrong profile types which we don't support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RestGetAnomalyDetectorAction.getProfilesToCollect would return an interaction between valid types and the provided types. If the result is empty, it means all of the types from the users are unsupported. So the error is not that customers have not set at least one profile type. It is all of the profile types are invalid.
|
||
} catch (IOException | XContentParseException | NullPointerException e) { | ||
logger.error(e); | ||
listener.failImmediately(new RuntimeException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: can use this method: listener.failImmediately(String errMsg, Exception e)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed.
profile.setState(DetectorState.DISABLED); | ||
listener.onResponse(profile); | ||
} | ||
} catch (IOException | XContentParseException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If some uncatched exception, will not execute listener.onFailure
method, collectedResponseCount
will not increase, so will never execute finish
.
Suggest to catch Exception
here to avoid some uncatched exceptions. Similar to line 236
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If some uncatched exception, control flow would be redirected to the exception branch and listener.onFailure would be called. Please see the implementation of ActionListener.
listener.onResponse(profile); | ||
} | ||
}, exception -> { | ||
logger.warn(exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add custom error message here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is removed after addressing other comments.
profile.setState(DetectorState.INIT); | ||
listener.onResponse(profile); | ||
} else { | ||
logger.error("Fail to find latest anomaly result of id: {}", detectorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: make the error message more accurate, like Fail to find latest anomaly result with anomalyScore>0 from XXX for detector XXX
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
SearchHits hits = searchResponse.getHits(); | ||
if (hits.getTotalHits().value == 0L) { | ||
logger.error("We should not get empty result: {}", detectorId); | ||
listener.onFailure(new RuntimeException("Unexpected error while looking for detector state: " + detectorId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why throw exception if we can't find AD result ? If not AD result, that means AD job is initializing and no error. But from DelegateActionListener
line 89, if any exception occurs, will execute this.delegate.onFailure(...)
rather than return Init
state and null error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. Fixed.
…ndistro-for-elasticsearch#82) * Added URL for jb_scheduler-plugin_zip instead of local file path * Fixed windows path by adding additional /
* Use callbacks and bug fix This PR includes the following changes: 1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes. 2. Use ClientUtil instead of Elasticsearch’s client in AD job runner 3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager. 4. Change ADStateManager.getAnomalyDetector to use callback. 5. Change AnomalyResultTransportAction to use callback to get features. 6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist. 7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException. 8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction 9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying. 10. Add error in anomaly result schema.11. Fix broken tests due to my changes. Testing done: 1. unit/integration tests pass 2. do end-to-end testing and make sure my fix achieves the purpose * timeout issue is gone * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API. We expect three kinds of states: -Disabled: if get ad job api says the job is disabled; -Init: if anomaly score after the last update time of the detector is larger than 0 -Running: if neither of the above applies and no exceptions. Error is populated if error of the latest anomaly result is not empty. Testing done: -manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted -added unit tests to cover above scenario
"Fail to fetch profile for " + detectorId | ||
); | ||
|
||
if (profiles.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not blocker: you can move this isEmpty() check to the entry of this method, aka line 67. And then you can skip the check on line 78
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. Fixed.
case "error": | ||
return ERROR; | ||
default: | ||
throw new IllegalArgumentException("Unsupported prof"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsupported profile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the catch. Fixed.
|
||
@Override | ||
public void onFailure(Exception e) { | ||
LOG.info(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.error()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
try { | ||
this.exceptions.add(e.getMessage()); | ||
} finally { | ||
if (collectedResponseCount.incrementAndGet() >= expectedResponseCount) { | ||
finish(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like to be duplicate with above line 60. Can we remove the finally
here? And also, the only potential scenario where exception can get thrown is that Exception e
is null, but I don't think it is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this class is to collect async requests: no matter it is a failure or success, and then increment the count. If the count equals to or larger than expected, then send a final success or failure responses. We need finally here to increment the count when there is a failure. THis is not a failure when e is null. It means an async request fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After another look, I guess exception can be thrown if thread is interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, any exception can be thrown by an asynchronous request.
} catch (Exception e) { | ||
onFailure(e); | ||
} finally { | ||
if (collectedResponseCount.incrementAndGet() >= expectedResponseCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expected
implies that the total collected count must be more than expectedResponseCount
, otherwise it is a failure. Based on my understanding of use of this class, I guess maxResponseCount
might be a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private void finish() { | ||
if (this.exceptions.size() == 0) { | ||
if (savedResponses.size() == 0) { | ||
this.delegate.onFailure(new RuntimeException(String.format("Unexpected exceptions"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format may not be needed if only static string is there. Also, I think in case of empty exceptions and empty savedResponses, it may be better to throw exception with message like No response collected
, which makes more sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. Removed String.format and changed to "No response collected".
client.get(getDetectorRequest, onGetDetectorResponse(listener, detectorId, profiles)); | ||
} | ||
}, exception -> { | ||
if (exception instanceof IndexNotFoundException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you log exception here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few minor comments. please feel free to check in after addressing them.
Author: Kaituo Li <[email protected]> Date: Wed Apr 15 15:45:13 2020 -0700 Add state and error to profile API (opendistro-for-elasticsearch#84) * Add state and error to profile API We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API. We expect three kinds of states: -Disabled: if get ad job api says the job is disabled; -Init: if anomaly score after the last update time of the detector is larger than 0 -Running: if neither of the above applies and no exceptions. Error is populated if error of the latest anomaly result is not empty. Testing done: -manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted -added unit tests to cover above scenario commit 0c33050 Author: Kaituo Li <[email protected]> Date: Tue Apr 14 11:52:20 2020 -0700 Use callbacks and bug fix (opendistro-for-elasticsearch#83) * Use callbacks and bug fix This PR includes the following changes: 1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes. 2. Use ClientUtil instead of Elasticsearch’s client in AD job runner 3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager. 4. Change ADStateManager.getAnomalyDetector to use callback. 5. Change AnomalyResultTransportAction to use callback to get features. 6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist. 7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException. 8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction 9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying. 10. Add error in anomaly result schema.11. Fix broken tests due to my changes. Testing done: 1. unit/integration tests pass 2. do end-to-end testing and make sure my fix achieves the purpose * timeout issue is gone * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
Issue #, if available:
Description of changes:
We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API.
We expect three kinds of states:
-Disabled: if get ad job api says the job is disabled;
-Init: if anomaly score after the last update time of the detector is larger than 0
-Running: if neither of the above applies and no exceptions.
Error is populated if error of the latest anomaly result is not empty.
Example:
Testing done:
-manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted
-added unit tests to cover above scenario
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.