-
Notifications
You must be signed in to change notification settings - Fork 36
Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40
Conversation
@@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) { | |||
public void run() { | |||
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class); | |||
|
|||
// we also add the cancel query function here based on query text from the negative cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)
* @return unprocessed features and processed features for the current data point | ||
*/ | ||
@Deprecated | ||
public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime) { | ||
public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The added parameter should be a dependency injected rather than passed all the way down the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We should not pass state manager around.
* @throws IllegalStateException when unexpected failures happen | ||
* @return features from search results, empty when no data found | ||
*/ | ||
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) { | ||
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The needed dependency can be injected in this class.
return Optional.empty(); | ||
} | ||
if (negativeCache.containsKey(detector.getDetectorId())) { | ||
return Optional.of(negativeCache.get(detector.getDetectorId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. This method can be simplified to this line.
@@ -119,6 +124,47 @@ public int getPartitionNumber(String adID) throws InterruptedException { | |||
return partitionNum; | |||
} | |||
|
|||
/** | |||
* Get negative cache value(QueryBuilder, Instant) for given detector | |||
* If detectorId is null, return Optional.empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. This can be assumed to be unlikely. If this is a real concern that must be addressed, the proper behavior to expect is to throw an exception.
if (detector.getDetectorId() == null) { | ||
return; | ||
} | ||
negativeCache.putIfAbsent(detector.getDetectorId(), new SimpleEntry<>(searchRequest, clock.instant())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question. Is put more expected for the client? Or, the insert call returns but the entry is still not updated. If that's by design, the documentation should make it clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think put is better than putIfAbsent here. Will update.
if (detector.getDetectorId() == null) { | ||
return; | ||
} | ||
negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Map::remove should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion. Throttling can be separated into its own module as a throttler, which can be injected into state manager and client util.
Optional<Entry<SearchRequest, Instant>> queryEntry = stateManager.getFilteredQuery(anomalyDetector); | ||
if (queryEntry.isPresent()) { | ||
LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); | ||
listener.onResponse(new AnomalyResultResponse(Double.NaN, Double.NaN, new ArrayList<FeatureData>())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client can't distinguish this case with "No data in current detection window"(line 295) as both return empty result. How about we throw an exception like line243
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Yes, we should throw exception in this case. Will update in next revision.
@@ -249,6 +251,12 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< | |||
return; | |||
} | |||
AnomalyDetector anomalyDetector = detector.get(); | |||
Optional<Entry<SearchRequest, Instant>> queryEntry = stateManager.getFilteredQuery(anomalyDetector); | |||
if (queryEntry.isPresent()) { | |||
LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a query stuck for a long time, that will impact AD realtime detection. How about we cancel the stuck/running query and run the new coming query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. Canceling the stuck query whenever new query comes is one way. However it can be expensive since right now we can only use string compare(query.getDescription()) to find which query to kill. Also killing query using task management API is not effective instantly. It will only happen when moving to next segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most stylish issues. Consider race conditions.
/** | ||
* Check if there is running query on given detector | ||
* @param detector Anomaly Detector | ||
* @return boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. The doc is incomplete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
@@ -114,7 +114,9 @@ public SearchFeatureDao( | |||
} | |||
|
|||
/** | |||
* Gets features for the given time period. | |||
* Gets features for the given time period. This function also add given detector to negative cache before sending es request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Adds.
Also, use descriptive language instead of prescriptive. See java doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update, thanks for the suggestion
AnomalyDetector detector | ||
) { | ||
try { | ||
throttler.insertFilteredQuery(detector, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should happen if the detector is already in cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't happen here since if a detector is already in cache, we will throw exception in AnomalyResultTransportAction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a race condition when concurrent threads have finished the check at the higher level and try to execute queries. The method needs to be synchronized.
* @return boolean | ||
*/ | ||
public boolean hasRunningQuery(AnomalyDetector detector) { | ||
Optional<Map.Entry<ActionRequest, Instant>> queryEntry = throttler.getFilteredQuery(detector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. The method can be simplified to one line. return throttler.getFilteredQuery(detector).isPresent()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
import java.util.concurrent.ConcurrentHashMap; | ||
import org.elasticsearch.action.ActionRequest; | ||
|
||
public class Throttler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. The class java doc is missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
if (negativeCache.containsKey(detector.getDetectorId())) { | ||
return Optional.of(negativeCache.get(detector.getDetectorId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MInor. The whole method can be simplified to this line return Optional.of(negativeCache.get(detector.getDetectorId()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be Optional.ofNullable
|
||
/** | ||
* Insert the negative cache entry for given detector | ||
* If detectorId is null, do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. The java doc is outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
return Optional.of(negativeCache.get(detector.getDetectorId())); | ||
} | ||
return Optional.empty(); | ||
return Optional.of(negativeCache.get(detector.getDetectorId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional.ofNullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pr has some conflicts with current branch to resolve
@@ -115,6 +115,9 @@ public SearchFeatureDao( | |||
|
|||
/** | |||
* Gets features for the given time period. | |||
* This function also adds given detector to negative cache before sending es request. | |||
* Once we get response/exception within timeout, we treat this request as complete and clear the negative cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. We is prescriptive (giving orders to code), not descriptive (stating what code does).
AnomalyDetector detector | ||
) { | ||
try { | ||
throttler.insertFilteredQuery(detector, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a race condition when concurrent threads have finished the check at the higher level and try to execute queries. The method needs to be synchronized.
|
||
/** | ||
* Clear the negative cache for given detector. | ||
* If detectorId is null, do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Outdated doc.
* @param detector AnomalyDetector | ||
*/ | ||
public void clearFilteredQuery(AnomalyDetector detector) { | ||
negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Why not using Map::remove?
2) Adding check when inserting new entry to negative cache.
// if key already exist, reject the request and throws exception | ||
if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) { | ||
LOG.error("There is one query running for detectorId: {}", detector.getDetectorId()); | ||
throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MInor. This exception should be documented for client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
return Optional.ofNullable(respReference.get()); | ||
} catch (InterruptedException e1) { | ||
LOG.error(CommonErrorMessages.WAIT_ERR_MSG); | ||
throw new IllegalStateException(e1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If exception happens, should clear negative cache for this detector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the change.
Description:
Adding negative cache to throttle extra request #33
Updated on Feb 05:
Test: