Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alerts in correlations [Experminental] #1040

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.opensearch.script.ScriptService;
import org.opensearch.securityanalytics.action.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove * imports.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

import org.opensearch.securityanalytics.correlation.index.codec.CorrelationCodecService;
import org.opensearch.securityanalytics.correlation.alert.CorrelationAlertService;
import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService;
import org.opensearch.securityanalytics.correlation.index.mapper.CorrelationVectorFieldMapper;
import org.opensearch.securityanalytics.correlation.index.query.CorrelationQueryBuilder;
import org.opensearch.securityanalytics.indexmanagment.DetectorIndexManagementService;
Expand Down Expand Up @@ -165,13 +168,14 @@ public Collection<Object> createComponents(Client client,
TIFJobParameterService tifJobParameterService = new TIFJobParameterService(client, clusterService);
TIFJobUpdateService tifJobUpdateService = new TIFJobUpdateService(clusterService, tifJobParameterService, threatIntelFeedDataService, builtInTIFMetadataLoader);
TIFLockService threatIntelLockService = new TIFLockService(clusterService, client);

CorrelationAlertService correlationAlertService = new CorrelationAlertService(client, xContentRegistry);
NotificationService notificationServiceService = new NotificationService((NodeClient)client, scriptService);
TIFJobRunner.getJobRunnerInstance().initialize(clusterService, tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);

return List.of(
detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices,
mapperService, indexTemplateManager, builtinLogTypeLoader, builtInTIFMetadataLoader, threatIntelFeedDataService, detectorThreatIntelService,
tifJobUpdateService, tifJobParameterService, threatIntelLockService);
tifJobUpdateService, tifJobParameterService, threatIntelLockService, correlationAlertService, notificationServiceService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig;
import org.opensearch.securityanalytics.correlation.alert.CorrelationAlertService;
import org.opensearch.securityanalytics.correlation.alert.CorrelationRuleScheduler;
import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.CorrelationQuery;
import org.opensearch.securityanalytics.model.CorrelationRule;
import org.opensearch.securityanalytics.model.CorrelationRuleTrigger;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.AutoCorrelationsRepo;
Expand Down Expand Up @@ -68,18 +72,27 @@ public class JoinEngine {

private final LogTypeService logTypeService;

private final CorrelationAlertService correlationAlertService;

private final NotificationService notificationService;

private volatile TimeValue indexTimeout;

private static final Logger log = LogManager.getLogger(JoinEngine.class);

public JoinEngine(Client client, PublishFindingsRequest request, NamedXContentRegistry xContentRegistry,
long corrTimeWindow, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
LogTypeService logTypeService, boolean enableAutoCorrelations) {
long corrTimeWindow, TimeValue indexTimeout, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is index timeout being set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JoinEngine

        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.INDEX_TIMEOUT, it -> indexTimeout = it);

LogTypeService logTypeService, boolean enableAutoCorrelations, CorrelationAlertService correlationAlertService, NotificationService notificationService) {
this.client = client;
this.request = request;
this.xContentRegistry = xContentRegistry;
this.corrTimeWindow = corrTimeWindow;
this.indexTimeout = indexTimeout;
this.correlateFindingAction = correlateFindingAction;
this.logTypeService = logTypeService;
this.enableAutoCorrelations = enableAutoCorrelations;
this.correlationAlertService = correlationAlertService;
this.notificationService = notificationService;
}

public void onSearchDetectorResponse(Detector detector, Finding finding) {
Expand Down Expand Up @@ -349,7 +362,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
}
}
searchFindingsByTimestamp(detectorType, categoryToQueriesMap, categoryToTimeWindowMap,
filteredCorrelationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList()),
filteredCorrelationRules.stream().map(it -> it.correlationRule).collect(Collectors.toList()),
autoCorrelations
);
}, this::onFailure));
Expand All @@ -362,7 +375,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
* this method searches for parent findings given the log category & correlation time window & collects all related docs
* for them.
*/
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<Pair<String, List<CorrelationQuery>>> categoryToQueriesPairs = new ArrayList<>();
Expand Down Expand Up @@ -418,14 +431,14 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
searchDocsWithFilterKeys(detectorType, relatedDocsMap, categoryToTimeWindowMap, correlationRules, autoCorrelations);
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()) , autoCorrelations);
}
}

/**
* Given the related docs from parent findings, this method filters only those related docs which match parent join criteria.
*/
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();

Expand Down Expand Up @@ -476,15 +489,15 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
getCorrelatedFindings(detectorType, filteredRelatedDocIds, categoryToTimeWindowMap, correlationRules, autoCorrelations);
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations);
}
}

/**
* Given the filtered related docs of the parent findings, this method gets the actual filtered parent findings for
* the finding to be correlated.
*/
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();
Expand Down Expand Up @@ -540,6 +553,12 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
++idx;
}

if (!correlatedFindings.isEmpty()) {
CorrelationRuleScheduler correlationRuleScheduler = new CorrelationRuleScheduler(client, correlationAlertService, notificationService);
correlationRuleScheduler.schedule(correlationRules, correlatedFindings, request.getFinding().getId(), indexTimeout);
correlationRuleScheduler.shutdown();
}

for (Map.Entry<String, List<String>> autoCorrelation: autoCorrelations.entrySet()) {
if (correlatedFindings.containsKey(autoCorrelation.getKey())) {
Set<String> alreadyCorrelatedFindings = new HashSet<>(correlatedFindings.get(autoCorrelation.getKey()));
Expand All @@ -549,10 +568,10 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
correlatedFindings.put(autoCorrelation.getKey(), autoCorrelation.getValue());
}
}
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules);
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()));
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.correlation.alert;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.commons.alerting.model.CorrelationAlert;
import org.opensearch.securityanalytics.util.CorrelationIndices;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;

public class CorrelationAlertService {
private static final Logger log = LogManager.getLogger(CorrelationAlertService.class);

private final NamedXContentRegistry xContentRegistry;
private final Client client;

public CorrelationAlertService(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.xContentRegistry = xContentRegistry;
}

/**
* Searches for active Alerts in the correlation alerts index within a specified time range.
*
* @param ruleId The correlation rule ID to filter the alerts
* @param currentTime The current time of the search range
* @return The search response containing active alerts
*/
public void getActiveAlerts(String ruleId, long currentTime, ActionListener<CorrelationAlertsList> listener) {
Instant currentTimeDate = Instant.ofEpochMilli(currentTime);
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("correlation_rule_id", ruleId))
.must(QueryBuilders.rangeQuery("start_time").lte(currentTimeDate))
.must(QueryBuilders.rangeQuery("end_time").gte(currentTimeDate))
.must(QueryBuilders.termQuery("state", "ACTIVE"));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.seqNoAndPrimaryTerm(true)
.version(true)
.size(10000) // set the size to 10,000
.query(queryBuilder);

SearchRequest searchRequest = new SearchRequest(CorrelationIndices.CORRELATION_ALERT_INDEX)
.source(searchSourceBuilder);

client.search(searchRequest, ActionListener.wrap(
searchResponse -> {
if (searchResponse.getHits().getTotalHits().equals(0)) {
listener.onResponse(new CorrelationAlertsList(Collections.emptyList(), 0));
} else {
listener.onResponse(new CorrelationAlertsList(
Collections.emptyList(),
searchResponse.getHits() != null && searchResponse.getHits().getTotalHits() != null ?
(int) searchResponse.getHits().getTotalHits().value : 0)
);
}
},
e -> {
log.error("Search request to fetch correlation alerts failed", e);
listener.onFailure(e);
}
));
}

public void indexCorrelationAlert(CorrelationAlert correlationAlert, TimeValue indexTimeout, ActionListener<IndexResponse> listener) {
// Convert CorrelationAlert to a map
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("correlated_finding_ids", correlationAlert.getCorrelatedFindingIds());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use the final variables declared above

builder.field("correlation_rule_id", correlationAlert.getCorrelationRuleId());
builder.field("correlation_rule_name", correlationAlert.getCorrelationRuleName());
builder.field("id", correlationAlert.getId());
builder.field("user", correlationAlert.getUser()); // Convert User object to map
builder.field("schema_version", correlationAlert.getSchemaVersion());
builder.field("severity", correlationAlert.getSeverity());
builder.field("state", correlationAlert.getState());
builder.field("trigger_name", correlationAlert.getTriggerName());
builder.field("version", correlationAlert.getVersion());
builder.field("start_time", correlationAlert.getStartTime());
builder.field("end_time", correlationAlert.getEndTime());
builder.field("action_execution_results", correlationAlert.getActionExecutionResults());
builder.field("error_message", correlationAlert.getErrorMessage());
builder.field("acknowledged_time", correlationAlert.getAcknowledgedTime());
builder.endObject();
IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_ALERT_INDEX)
.id(correlationAlert.getId())
.source(builder)
.timeout(indexTimeout);

client.index(indexRequest, listener);
} catch (IOException ex) {
log.error("Exception while adding alerts in .opensearch-sap-correlation-alerts index", ex);
}
}

public List<CorrelationAlert> parseCorrelationAlerts(final SearchResponse response) throws IOException {
List<CorrelationAlert> alerts = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
XContentParser xcp = XContentType.JSON.xContent().createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.getSourceAsString()
);
xcp.nextToken();
CorrelationAlert correlationAlert = CorrelationAlert.parse(xcp, hit.getId(), hit.getVersion());
alerts.add(correlationAlert);
}
return alerts;
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.correlation.alert;

import org.opensearch.commons.alerting.model.CorrelationAlert;

import java.util.List;

/**
* Wrapper class that holds list of correlation alerts and total number of alerts available.
* Useful for pagination.
*/
public class CorrelationAlertsList {

private final List<CorrelationAlert> correlationAlertList;
private final Integer totalAlerts;

public CorrelationAlertsList(List<CorrelationAlert> correlationAlertList, Integer totalAlerts) {
this.correlationAlertList = correlationAlertList;
this.totalAlerts = totalAlerts;
}

public List<CorrelationAlert> getCorrelationAlertList() {
return correlationAlertList;
}

public Integer getTotalAlerts() {
return totalAlerts;
}

}
Loading
Loading