Skip to content

Commit

Permalink
Backport: Fix the profile API returns prematurely. (opendistro-for-el…
Browse files Browse the repository at this point in the history
…asticsearch#340)

Besides backporting, this PR also:
First, it fixes another premature return in AnomalyDetectorProfileRunner.onInittedEver.
Second, it replaces set-env with GITHUB_ENV so that backport PR can invoke CI. The `set-env` command is disabled.

Testing done:
1. Verfied manually the new early return bug is fixed..
2. Manual tests to run profile calls for single-stream and multi-entity detectors for different phases of the detector lifecycle (disabled, init, running). Verified profile results make sense.
  • Loading branch information
kaituo committed Dec 29, 2020
1 parent 5d30bb9 commit 2cd9cdd
Show file tree
Hide file tree
Showing 12 changed files with 634 additions and 237 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ jobs:
echo "RUN /usr/share/elasticsearch/bin/elasticsearch-plugin install --batch file:/tmp/opendistro-anomaly-detection-$plugin_version.zip" >> Dockerfile
docker build -t odfe-ad:test .
echo "::set-env name=imagePresent::true"
echo "imagePresent=true" >> $GITHUB_ENV
else
echo "::set-env name=imagePresent::false"
echo "imagePresent=false" >> $GITHUB_ENV
fi
- name: Run Docker Image
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void profile(String detectorId, ActionListener<DetectorProfile> listener,
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}

calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
}

Expand All @@ -118,10 +117,38 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser::getTokenLocation);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
ActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isMultientityDetector();

int totalResponsesToWait = 0;

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
totalResponsesToWait++;
}
Expand Down Expand Up @@ -158,50 +185,20 @@ private void calculateTotalResponsesToWait(
new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + detectorId,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId,
false
);

prepareProfile(detector, delegateListener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId);
client.get(getStateRequest, onGetDetectorState(listener, detectorId, enabledTimeMs));
client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs));
}

boolean isMultiEntityDetector = detector.isMultientityDetector();

// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide
// when to consolidate results and return to users
if (isMultiEntityDetector) {
if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) {
profileEntityStats(listener, detector);
profileEntityStats(delegateListener, detector);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
Expand All @@ -210,24 +207,24 @@ private void prepareProfile(
|| profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(DetectorProfileName.STATE)) {
profileModels(detector, profilesToCollect, job, true, listener);
profileModels(detector, profilesToCollect, job, true, delegateListener);
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
profileStateRelated(detector, listener, job.isEnabled(), profilesToCollect);
profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)) {
profileModels(detector, profilesToCollect, job, false, listener);
profileModels(detector, profilesToCollect, job, false, delegateListener);
}
}

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
onGetDetectorForPrepare(listener, profilesToCollect);
Expand Down Expand Up @@ -261,20 +258,19 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
DetectorProfile profile = profileBuilder.totalEntities(value).build();
listener.onResponse(profile);
}, searchException -> { listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); })
);
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
}
}

private void onGetDetectorForPrepare(
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profiles
) {
private void onGetDetectorForPrepare(ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profiles) {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (profiles.contains(DetectorProfileName.STATE)) {
profileBuilder.state(DetectorState.DISABLED);
}
listener.respondImmediately(profileBuilder.build());
listener.onResponse(profileBuilder.build());
}

/**
Expand Down Expand Up @@ -340,8 +336,8 @@ private ActionListener<GetResponse> onGetDetectorState(
listener.onResponse(profileBuilder.build());

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
// detector state for this detector does not exist
Expand Down Expand Up @@ -463,8 +459,8 @@ private ActionListener<SearchResponse> onInittedEver(
processInitResponse(detector, profilesToCollect, totalUpdates, false, profileBuilder, listener);
} else {
createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
listener.onResponse(profileBuilder.build());
}
listener.onResponse(profileBuilder.build());
}, exception -> {
if (exception instanceof IndexNotFoundException) {
// anomaly result index is not created yet
Expand All @@ -475,7 +471,7 @@ private ActionListener<SearchResponse> onInittedEver(
"Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}",
detector.getDetectorId()
);
listener.failImmediately(new RuntimeException("Fail to find detector state: " + detector.getDetectorId(), exception));
listener.onFailure(exception);
}
});
}
Expand Down Expand Up @@ -523,7 +519,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getDetectorId()),
exception
);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detector.getDetectorId(), exception);
listener.onFailure(exception);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.CATEGORY_FIELD_LIMIT;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.Optional;
Expand All @@ -35,7 +34,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -113,25 +111,7 @@ public void profile(
new InvalidParameterException(CommonErrorMessages.CATEGORICAL_FIELD_NUMBER_SURPASSED + CATEGORY_FIELD_LIMIT)
);
} else {
int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + entityValue + " of detector " + detectorId,
false
);
prepareEntityProfile(delegateListener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0));
prepareEntityProfile(listener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0));
}
} catch (Exception t) {
listener.onFailure(t);
Expand All @@ -143,7 +123,7 @@ public void profile(
}

private void prepareEntityProfile(
MultiResponsesDelegateActionListener<EntityProfile> delegateListener,
ActionListener<EntityProfile> listener,
String detectorId,
String entityValue,
Set<EntityProfileName> profilesToCollect,
Expand All @@ -158,8 +138,8 @@ private void prepareEntityProfile(
request,
ActionListener
.wrap(
r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, delegateListener),
delegateListener::failImmediately
r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, listener),
listener::onFailure
)
);
}
Expand All @@ -171,7 +151,7 @@ private void getJob(
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
EntityProfileResponse entityProfileResponse,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
ActionListener<EntityProfile> listener
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
Expand All @@ -184,6 +164,25 @@ private void getJob(
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);

int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(EntityProfileName.STATE)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) {
totalResponsesToWait++;
}
if (profilesToCollect.contains(EntityProfileName.MODELS)) {
totalResponsesToWait++;
}
MultiResponsesDelegateActionListener<EntityProfile> delegateListener =
new MultiResponsesDelegateActionListener<EntityProfile>(
listener,
totalResponsesToWait,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + entityValue + " of detector " + detectorId,
false
);

if (profilesToCollect.contains(EntityProfileName.MODELS)) {
EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue);
if (false == job.isEnabled()) {
Expand Down Expand Up @@ -233,20 +232,20 @@ private void getJob(
delegateListener.onResponse(builder.build());
}));
}
} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
delegateListener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener);
sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
logger.info(exception.getMessage());
sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener);
sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener);
} else {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception);
delegateListener.failImmediately(exception);
listener.onFailure(exception);
}
}));
}
Expand Down Expand Up @@ -285,14 +284,14 @@ private void sendUnknownState(
String categoryField,
String entityValue,
boolean immediate,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
ActionListener<EntityProfile> delegateListener
) {
EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue);
if (profilesToCollect.contains(EntityProfileName.STATE)) {
builder.state(EntityState.UNKNOWN);
}
if (immediate) {
delegateListener.respondImmediately(builder.build());
delegateListener.onResponse(builder.build());
} else {
delegateListener.onResponse(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public class CommonErrorMessages {
public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector ";
public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than ";
public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid";
public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for ";
}
Loading

0 comments on commit 2cd9cdd

Please sign in to comment.