Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alerts in correlations [Experminental] #1040

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.opensearch.script.ScriptService;
import org.opensearch.securityanalytics.action.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove * imports.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

import org.opensearch.securityanalytics.correlation.index.codec.CorrelationCodecService;
import org.opensearch.securityanalytics.correlation.alert.CorrelationAlertService;
import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService;
import org.opensearch.securityanalytics.correlation.index.mapper.CorrelationVectorFieldMapper;
import org.opensearch.securityanalytics.correlation.index.query.CorrelationQueryBuilder;
import org.opensearch.securityanalytics.indexmanagment.DetectorIndexManagementService;
Expand Down Expand Up @@ -165,13 +168,14 @@ public Collection<Object> createComponents(Client client,
TIFJobParameterService tifJobParameterService = new TIFJobParameterService(client, clusterService);
TIFJobUpdateService tifJobUpdateService = new TIFJobUpdateService(clusterService, tifJobParameterService, threatIntelFeedDataService, builtInTIFMetadataLoader);
TIFLockService threatIntelLockService = new TIFLockService(clusterService, client);

CorrelationAlertService correlationAlertService = new CorrelationAlertService(client, xContentRegistry);
NotificationService notificationServiceService = new NotificationService((NodeClient)client, scriptService);
TIFJobRunner.getJobRunnerInstance().initialize(clusterService, tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);

return List.of(
detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices,
mapperService, indexTemplateManager, builtinLogTypeLoader, builtInTIFMetadataLoader, threatIntelFeedDataService, detectorThreatIntelService,
tifJobUpdateService, tifJobParameterService, threatIntelLockService);
tifJobUpdateService, tifJobParameterService, threatIntelLockService, correlationAlertService, notificationServiceService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.alerting.action.PublishFindingsRequest;
import org.opensearch.commons.alerting.model.Finding;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.BoolQueryBuilder;
Expand All @@ -32,9 +33,13 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig;
import org.opensearch.securityanalytics.correlation.alert.CorrelationAlertService;
import org.opensearch.securityanalytics.correlation.alert.CorrelationRuleScheduler;
import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.CorrelationQuery;
import org.opensearch.securityanalytics.model.CorrelationRule;
import org.opensearch.securityanalytics.model.CorrelationRuleTrigger;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.AutoCorrelationsRepo;
Expand Down Expand Up @@ -68,18 +73,30 @@ public class JoinEngine {

private final LogTypeService logTypeService;

private final CorrelationAlertService correlationAlertService;

private final NotificationService notificationService;

private volatile TimeValue indexTimeout;

private static final Logger log = LogManager.getLogger(JoinEngine.class);

private final User user;

public JoinEngine(Client client, PublishFindingsRequest request, NamedXContentRegistry xContentRegistry,
long corrTimeWindow, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
LogTypeService logTypeService, boolean enableAutoCorrelations) {
long corrTimeWindow, TimeValue indexTimeout, TransportCorrelateFindingAction.AsyncCorrelateFindingAction correlateFindingAction,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is index timeout being set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JoinEngine

        this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.INDEX_TIMEOUT, it -> indexTimeout = it);

LogTypeService logTypeService, boolean enableAutoCorrelations, CorrelationAlertService correlationAlertService, NotificationService notificationService, User user) {
this.client = client;
this.request = request;
this.xContentRegistry = xContentRegistry;
this.corrTimeWindow = corrTimeWindow;
this.indexTimeout = indexTimeout;
this.correlateFindingAction = correlateFindingAction;
this.logTypeService = logTypeService;
this.enableAutoCorrelations = enableAutoCorrelations;
this.correlationAlertService = correlationAlertService;
this.notificationService = notificationService;
this.user = user;
}

public void onSearchDetectorResponse(Detector detector, Finding finding) {
Expand Down Expand Up @@ -349,7 +366,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
}
}
searchFindingsByTimestamp(detectorType, categoryToQueriesMap, categoryToTimeWindowMap,
filteredCorrelationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList()),
filteredCorrelationRules.stream().map(it -> it.correlationRule).collect(Collectors.toList()),
autoCorrelations
);
}, this::onFailure));
Expand All @@ -362,7 +379,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
* this method searches for parent findings given the log category & correlation time window & collects all related docs
* for them.
*/
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<Pair<String, List<CorrelationQuery>>> categoryToQueriesPairs = new ArrayList<>();
Expand Down Expand Up @@ -418,14 +435,14 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
searchDocsWithFilterKeys(detectorType, relatedDocsMap, categoryToTimeWindowMap, correlationRules, autoCorrelations);
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()) , autoCorrelations);
}
}

/**
* Given the related docs from parent findings, this method filters only those related docs which match parent join criteria.
*/
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();

Expand Down Expand Up @@ -476,15 +493,15 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
getCorrelatedFindings(detectorType, filteredRelatedDocIds, categoryToTimeWindowMap, correlationRules, autoCorrelations);
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations);
}
}

/**
* Given the filtered related docs of the parent findings, this method gets the actual filtered parent findings for
* the finding to be correlated.
*/
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<CorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();
Expand Down Expand Up @@ -540,6 +557,12 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
++idx;
}

if (!correlatedFindings.isEmpty()) {
CorrelationRuleScheduler correlationRuleScheduler = new CorrelationRuleScheduler(client, correlationAlertService, notificationService);
correlationRuleScheduler.schedule(correlationRules, correlatedFindings, request.getFinding().getId(), indexTimeout, user);
correlationRuleScheduler.shutdown();
}

for (Map.Entry<String, List<String>> autoCorrelation: autoCorrelations.entrySet()) {
if (correlatedFindings.containsKey(autoCorrelation.getKey())) {
Set<String> alreadyCorrelatedFindings = new HashSet<>(correlatedFindings.get(autoCorrelation.getKey()));
Expand All @@ -549,10 +572,10 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
correlatedFindings.put(autoCorrelation.getKey(), autoCorrelation.getValue());
}
}
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules);
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()));
}, this::onFailure));
} else {
getTimestampFeature(detectorType, correlationRules, autoCorrelations);
getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations);
}
}

Expand Down
Loading
Loading