Skip to content

Commit

Permalink
Alerts in correlations [Experminental] (opensearch-project#1040)
Browse files Browse the repository at this point in the history
* notification for alerting in correlation

* correlation alerts mapping change

* working code

Signed-off-by: Riya Saxena <[email protected]>

* alertsInCorrelation without notifciations

Signed-off-by: Riya Saxena <[email protected]>

* alertsInCorrelation without notifciations

Signed-off-by: Riya Saxena <[email protected]>

* alertsInCorrelation without notifciations

Signed-off-by: Riya Saxena <[email protected]>

* alerts in correlations notification service added

Signed-off-by: Riya Saxena <[email protected]>

* addressing the comments

Signed-off-by: Riya Saxena <[email protected]>

* addressing the comments

Signed-off-by: Riya Saxena <[email protected]>

* address the design changes discussed

Signed-off-by: Riya Saxena <[email protected]>

* address the design changes discussed

Signed-off-by: Riya Saxena <[email protected]>

* fixed tests

Signed-off-by: Riya Saxena <[email protected]>

---------

Signed-off-by: Riya <[email protected]>
Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn authored and jowg-amazon committed Jul 2, 2024
1 parent 41873a9 commit d8846bc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.action.PublishFindingsRequest;
import org.opensearch.commons.alerting.model.Finding;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
Expand All @@ -32,9 +33,11 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig;
import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.CorrelationQuery;
import org.opensearch.securityanalytics.model.CorrelationRule;
import org.opensearch.securityanalytics.model.CorrelationRuleTrigger;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.AutoCorrelationsRepo;
Expand Down Expand Up @@ -68,18 +71,27 @@ public class JoinEngine {

private final LogTypeService logTypeService;

private final NotificationService notificationService;

private volatile TimeValue indexTimeout;

private static final Logger log = LogManager.getLogger(JoinEngine.class);

private final User user;

public JoinEngine(Client client, PublishFindingsRequest request, NamedXContentRegistry xContentRegistry,
long corrTimeWindow, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
LogTypeService logTypeService, boolean enableAutoCorrelations) {
long corrTimeWindow, TimeValue indexTimeout, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
LogTypeService logTypeService, boolean enableAutoCorrelations, NotificationService notificationService, User user) {
this.client = client;
this.request = request;
this.xContentRegistry = xContentRegistry;
this.corrTimeWindow = corrTimeWindow;
this.indexTimeout = indexTimeout;
this.correlateFindingAction = correlateFindingAction;
this.logTypeService = logTypeService;
this.enableAutoCorrelations = enableAutoCorrelations;
this.notificationService = notificationService;
this.user = user;
}

public void onSearchDetectorResponse(Detector detector, Finding finding) {
Expand Down Expand Up @@ -349,7 +361,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
}
}
searchFindingsByTimestamp(detectorType, categoryToQueriesMap, categoryToTimeWindowMap,
filteredCorrelationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList()),
filteredCorrelationRules.stream().map(it -> it.correlationRule).collect(Collectors.toList()),
autoCorrelations
);
}, this::onFailure));
Expand All @@ -362,7 +374,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
* this method searches for parent findings given the log category & correlation time window & collects all related docs
* for them.
*/
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<Pair<String, List<CorrelationQuery>>> categoryToQueriesPairs = new ArrayList<>();
Expand Down Expand Up @@ -418,14 +430,14 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
searchDocsWithFilterKeys(detectorType, relatedDocsMap, categoryToTimeWindowMap, correlationRules, autoCorrelations);
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()) , autoCorrelations);
}
}

/**
* Given the related docs from parent findings, this method filters only those related docs which match parent join criteria.
*/
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();

Expand Down Expand Up @@ -476,15 +488,15 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
getCorrelatedFindings(detectorType, filteredRelatedDocIds, categoryToTimeWindowMap, correlationRules, autoCorrelations);
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations);
}
}

/**
* Given the filtered related docs of the parent findings, this method gets the actual filtered parent findings for
* the finding to be correlated.
*/
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();
Expand Down Expand Up @@ -549,10 +561,10 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
correlatedFindings.put(autoCorrelation.getKey(), autoCorrelation.getValue());
}
}
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules);
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()));
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.commons.alerting.action.PublishFindingsRequest;
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse;
import org.opensearch.commons.alerting.action.AlertingActions;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -49,6 +50,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.correlation.JoinEngine;
import org.opensearch.securityanalytics.correlation.VectorEmbeddingsEngine;
import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.CustomLogType;
import org.opensearch.securityanalytics.model.Detector;
Expand Down Expand Up @@ -99,6 +101,8 @@ public class TransportCorrelateFindingAction extends HandledTransportAction<Acti

private volatile boolean enableAutoCorrelation;

private final NotificationService notificationService;

@Inject
public TransportCorrelateFindingAction(TransportService transportService,
Client client,
Expand All @@ -108,7 +112,7 @@ public TransportCorrelateFindingAction(TransportService transportService,
LogTypeService logTypeService,
ClusterService clusterService,
Settings settings,
ActionFilters actionFilters) {
ActionFilters actionFilters, NotificationService notificationService) {
super(AlertingActions.SUBSCRIBE_FINDINGS_ACTION_NAME, transportService, actionFilters, PublishFindingsRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand All @@ -117,6 +121,7 @@ public TransportCorrelateFindingAction(TransportService transportService,
this.logTypeService = logTypeService;
this.clusterService = clusterService;
this.settings = settings;
this.notificationService = notificationService;
this.threadPool = this.detectorIndices.getThreadPool();

this.indexTimeout = SecurityAnalyticsSettings.INDEX_TIMEOUT.get(this.settings);
Expand All @@ -132,7 +137,7 @@ public TransportCorrelateFindingAction(TransportService transportService,
protected void doExecute(Task task, ActionRequest request, ActionListener<SubscribeFindingsResponse> actionListener) {
try {
PublishFindingsRequest transformedRequest = transformRequest(request);
AsyncCorrelateFindingAction correlateFindingAction = new AsyncCorrelateFindingAction(task, transformedRequest, actionListener);
AsyncCorrelateFindingAction correlateFindingAction = new AsyncCorrelateFindingAction(task, transformedRequest, readUserFromThreadContext(this.threadPool), actionListener);

if (!this.correlationIndices.correlationIndexExists()) {
try {
Expand All @@ -146,7 +151,6 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Subscr
);
}


if (!correlationIndices.correlationMetadataIndexExists()) {
try {
correlationIndices.initCorrelationMetadataIndex(ActionListener.wrap(createIndexResponse -> {
Expand All @@ -168,6 +172,19 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Subscr
correlateFindingAction.onFailures(ex);
}
}
if (!correlationIndices.correlationAlertIndexExists()) {
try {
correlationIndices.initCorrelationAlertIndex(ActionListener.wrap(createIndexResponse -> {
if (createIndexResponse.isAcknowledged()) {
IndexUtils.correlationAlertIndexUpdated();
} else {
correlateFindingAction.onFailures(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR));
}
}, correlateFindingAction::onFailures));
} catch (Exception ex) {
correlateFindingAction.onFailures(ex);
}
}
} else {
correlateFindingAction.onFailures(new OpenSearchStatusException("Failed to create correlation Index", RestStatus.INTERNAL_SERVER_ERROR));
}
Expand All @@ -193,14 +210,12 @@ public class AsyncCorrelateFindingAction {
private final AtomicBoolean counter = new AtomicBoolean();
private final Task task;

AsyncCorrelateFindingAction(Task task, PublishFindingsRequest request, ActionListener<SubscribeFindingsResponse> listener) {
AsyncCorrelateFindingAction(Task task, PublishFindingsRequest request, User user, ActionListener<SubscribeFindingsResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;

this.response =new AtomicReference<>();

this.joinEngine = new JoinEngine(client, request, xContentRegistry, corrTimeWindow, this, logTypeService, enableAutoCorrelation);
this.joinEngine = new JoinEngine(client, request, xContentRegistry, corrTimeWindow, indexTimeout, this, logTypeService, enableAutoCorrelation, notificationService, user);
this.vectorEmbeddingsEngine = new VectorEmbeddingsEngine(client, indexTimeout, corrTimeWindow, this);
}

Expand Down

0 comments on commit d8846bc

Please sign in to comment.