Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow detectors to be stopped if underlying workflow is deleted. Don't allow them to then be started/editted #810

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@
this.alertsHistoryIndexPattern = alertsHistoryIndexPattern;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

Check warning on line 587 in src/main/java/org/opensearch/securityanalytics/model/Detector.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/model/Detector.java#L586-L587

Added lines #L586 - L587 were not covered by tests

public void setEnabledTime(Instant enabledTime) {
this.enabledTime = enabledTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -43,9 +42,11 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -62,6 +63,11 @@
public class TransportDeleteDetectorAction extends HandledTransportAction<DeleteDetectorRequest, DeleteDetectorResponse> {

private static final Logger log = LogManager.getLogger(TransportDeleteDetectorAction.class);
private static final List<ThrowableCheckingPredicates> ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS = List.of(

Check warning on line 66 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L66

Added line #L66 was not covered by tests
ThrowableCheckingPredicates.MONITOR_NOT_FOUND,
ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND,
ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND
);

private final Client client;

Expand All @@ -84,9 +90,13 @@

private final DetectorIndices detectorIndices;

private final ExceptionChecker exceptionChecker;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices, ClusterService clusterService,
Settings settings) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client,
ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices,
DetectorIndices detectorIndices, ClusterService clusterService, Settings settings,
ExceptionChecker exceptionChecker) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
Expand All @@ -101,6 +111,7 @@

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L114

Added line #L114 was not covered by tests
}

@Override
Expand Down Expand Up @@ -205,7 +216,8 @@

@Override
public void onFailure(Exception e) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(e, detector.getId());

Check warning on line 220 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L220

Added line #L220 was not covered by tests
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
Expand Down Expand Up @@ -244,7 +256,8 @@

private void handleDeleteWorkflowFailure(final String detectorId, final Exception deleteWorkflowException,
final ActionListener<AcknowledgedResponse> actionListener) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(deleteWorkflowException, detectorId)) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(deleteWorkflowException, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(deleteWorkflowException, detectorId);

Check warning on line 260 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L260

Added line #L260 was not covered by tests
actionListener.onResponse(new AcknowledgedResponse(true));
} else {
actionListener.onFailure(deleteWorkflowException);
Expand Down Expand Up @@ -306,39 +319,12 @@
}
}));
}

private boolean isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(
Exception ex,
String detectorId
) {
// grouped action listener listens on mutliple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
int len = ex.getSuppressed().length;
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (isMonitorNotFoundException(e) || isWorkflowNotFoundException(e) || isAlertingConfigIndexNotFoundException(e)) {
log.error(
String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +
" Proceeding with detector %s deletion", detectorId),
e);
} else {
return false;
}
}
return true;
}
}

private boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

private boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

private boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
private void logAcceptableEntityMissingException(final Exception e, final String detectorId) {
final String errorMsg = String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +

Check warning on line 325 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L325

Added line #L325 was not covered by tests
" Proceeding with detector %s deletion", detectorId);
log.error(errorMsg, e);

Check warning on line 327 in src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java#L327

Added line #L327 was not covered by tests
}

private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@
import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.DetectorUtils;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -163,6 +165,8 @@
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DetectorThreatIntelService detectorThreatIntelService;

private final ExceptionChecker exceptionChecker;

private final TimeValue indexTimeout;
@Inject
public TransportIndexDetectorAction(TransportService transportService,
Expand All @@ -178,7 +182,8 @@
NamedWriteableRegistry namedWriteableRegistry,
LogTypeService logTypeService,
IndexNameExpressionResolver indexNameExpressionResolver,
DetectorThreatIntelService detectorThreatIntelService) {
DetectorThreatIntelService detectorThreatIntelService,
ExceptionChecker exceptionChecker) {
super(IndexDetectorAction.NAME, transportService, actionFilters, IndexDetectorRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand All @@ -201,6 +206,7 @@

this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES, this::setFilterByEnabled);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;

Check warning on line 209 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L209

Added line #L209 was not covered by tests
}

@Override
Expand Down Expand Up @@ -720,8 +726,7 @@
}
@Override
public void onFailure(Exception e) {
log.error("Failed to update the workflow");
listener.onFailure(e);
handleUpsertWorkflowFailure(e, listener, detector, monitorsToBeDeleted, refreshPolicy, updatedMonitors);

Check warning on line 729 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L729

Added line #L729 was not covered by tests
}
});
}
Expand Down Expand Up @@ -778,6 +783,25 @@
return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null);
}

private void handleUpsertWorkflowFailure(final Exception e, final ActionListener<List<IndexMonitorResponse>> listener,
final Detector detector, final List<String> monitorsToBeDeleted,
final RefreshPolicy refreshPolicy, final List<IndexMonitorResponse> updatedMonitors) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND))) {
if (detector.getEnabled()) {
final String errorMessage = String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName());
log.error(errorMessage);
listener.onFailure(new SecurityAnalyticsException(errorMessage, RestStatus.BAD_REQUEST, e));
} else {
log.error("Underlying workflow associated with detector {} not found. Proceeding to disable detector.", detector.getName());
deleteMonitorStep(monitorsToBeDeleted, refreshPolicy, updatedMonitors, listener);

Check warning on line 797 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L791-L797

Added lines #L791 - L797 were not covered by tests
}
} else {
log.error("Failed to update the workflow");
listener.onFailure(e);

Check warning on line 801 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L800-L801

Added lines #L800 - L801 were not covered by tests
}
}

Check warning on line 803 in src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java#L803

Added line #L803 was not covered by tests

private void addThreatIntelBasedDocLevelQueries(Detector detector, ActionListener<List<DocLevelQuery>> listener) {
try {
if (detector.getThreatIntelEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class ExceptionChecker {

public boolean doesGroupedActionListenerExceptionMatch(final Exception ex, final List<ThrowableCheckingPredicates> exceptionMatchers) {
// grouped action listener listens on multiple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
return Stream.concat(Arrays.stream(ex.getSuppressed()), Stream.of(ex))
.allMatch(throwable -> doesExceptionMatch(throwable, exceptionMatchers));
}

private boolean doesExceptionMatch(final Throwable throwable, final List<ThrowableCheckingPredicates> exceptionMatchers) {
return exceptionMatchers.stream()
.map(ThrowableCheckingPredicates::getMatcherPredicate)
.anyMatch(matcher -> matcher.test(throwable));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.function.Predicate;

public enum ThrowableCheckingPredicates {
MONITOR_NOT_FOUND(ThrowableCheckingPredicates::isMonitorNotFoundException),
WORKFLOW_NOT_FOUND(ThrowableCheckingPredicates::isWorkflowNotFoundException),
ALERTING_CONFIG_INDEX_NOT_FOUND(ThrowableCheckingPredicates::isAlertingConfigIndexNotFoundException);

private final Predicate<Throwable> matcherPredicate;
ThrowableCheckingPredicates(final Predicate<Throwable> matcherPredicate) {
this.matcherPredicate = matcherPredicate;
}

public Predicate<Throwable> getMatcherPredicate() {
return this.matcherPredicate;
}

private static boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

public static boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

public static boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,50 @@ public void testUpdateADetectorWithIndexNotExists() throws IOException {
}
}

public void testDisableEnableADetectorWithWorkflowNotExists() throws IOException {
final String index = createTestIndex(randomIndex(), windowsIndexMapping());

// Execute CreateMappingsAction to add alias mapping for index
final Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI);
// both req params and req body are supported
createMappingRequest.setJsonEntity(
"{ \"index_name\":\"" + index + "\"," +
" \"rule_topic\":\"" + randomDetectorType() + "\", " +
" \"partial\":true" +
"}"
);

final Response createMappingResponse = client().performRequest(createMappingRequest);
assertEquals(HttpStatus.SC_OK, createMappingResponse.getStatusLine().getStatusCode());

final Detector detector = randomDetector(getRandomPrePackagedRules());
final Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse));

final Map<String, Object> createResponseAsMap = asMap(createResponse);
final String detectorId = createResponseAsMap.get("_id").toString();

final Map<String, Object> detectorSourceAsMap = getDetectorSourceAsMap(detectorId);
final String workflowId = ((List<String>) detectorSourceAsMap.get("workflow_ids")).get(0);

final Response deleteWorkflowResponse = deleteAlertingWorkflow(workflowId);
assertEquals(200, deleteWorkflowResponse.getStatusLine().getStatusCode());
entityAsMap(deleteWorkflowResponse);

detector.setEnabled(false);
Response updateResponse = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode());

try {
detector.setEnabled(true);
makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
} catch (ResponseException ex) {
Assert.assertEquals(400, ex.getResponse().getStatusLine().getStatusCode());
Assert.assertEquals(true, ex.getMessage().contains(String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName())));
}
}

@SuppressWarnings("unchecked")
public void testDeletingADetector_single_ruleTopicIndex() throws IOException {
String index = createTestIndex(randomIndex(), windowsIndexMapping());
Expand Down
Loading
Loading