From ae084e71656054ffbc73d39c6c819ff3bc857245 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Sun, 22 Oct 2023 02:02:44 -0700 Subject: [PATCH] refactoring code to address review comments Signed-off-by: Surya Sashank Nistala --- .../SecurityAnalyticsPlugin.java | 15 ++++++-- .../securityanalytics/model/LogType.java | 13 +++---- .../model/ThreatIntelFeedData.java | 2 +- .../DetectorThreatIntelService.java | 22 +++++++---- .../ThreatIntelFeedDataService.java | 37 ++++++++++--------- .../action/TransportPutTIFJobAction.java | 1 + .../integTests/ThreatIntelJobRunnerIT.java | 14 ++++--- 7 files changed, 60 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index 6e67c5798..81fc4be38 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -40,7 +40,13 @@ import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.plugins.*; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.ClusterPlugin; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.plugins.MapperPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SearchPlugin; +import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -59,7 +65,8 @@ import org.opensearch.securityanalytics.resthandler.*; import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService; import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService; -import org.opensearch.securityanalytics.threatIntel.action.*; +import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; +import org.opensearch.securityanalytics.threatIntel.action.TransportPutTIFJobAction; import org.opensearch.securityanalytics.threatIntel.common.TIFLockService; import org.opensearch.securityanalytics.threatIntel.feedMetadata.BuiltInTIFMetadataLoader; import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter; @@ -98,7 +105,7 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/correlation/rules"; public static final String CUSTOM_LOG_TYPE_URI = PLUGINS_BASE_URI + "/logtype"; - public static final String JOB_INDEX_NAME = ".opensearch-sap-threat-intel-job"; + public static final String JOB_INDEX_NAME = ".opensearch-sap--job"; public static final Map TIF_JOB_INDEX_SETTING = Map.of(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1, IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all", IndexMetadata.SETTING_INDEX_HIDDEN, true); private CorrelationRuleIndices correlationRuleIndices; @@ -210,7 +217,7 @@ public List getRestHandlers(Settings settings, @Override public String getJobType() { - return "opensearch_sap_threat_intel_job"; + return "opensearch_sap_job"; } @Override diff --git a/src/main/java/org/opensearch/securityanalytics/model/LogType.java b/src/main/java/org/opensearch/securityanalytics/model/LogType.java index 8cee7ab23..9bdb96d1a 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/LogType.java +++ b/src/main/java/org/opensearch/securityanalytics/model/LogType.java @@ -67,14 +67,11 @@ public LogType(Map logTypeAsMap) { new Mapping(e.get(RAW_FIELD), e.get(ECS), e.get(OCSF)) ).collect(Collectors.toList()); } - if(logTypeAsMap.containsKey(IOC_FIELDS)) { + if (logTypeAsMap.containsKey(IOC_FIELDS)) { List> iocFieldsList = (List>) logTypeAsMap.get(IOC_FIELDS); - if (iocFieldsList.size() > 0) { - this.iocFieldsList = new ArrayList<>(mappings.size()); - this.iocFieldsList = iocFieldsList.stream().map(e -> - new IocFields(e.get(IOC).toString(), (List) e.get(FIELDS)) - ).collect(Collectors.toList()); - } + this.iocFieldsList = iocFieldsList.stream().map(e -> + new IocFields(e.get(IOC).toString(), (List) e.get(FIELDS)) + ).collect(Collectors.toList()); } else { iocFieldsList = Collections.emptyList(); } @@ -159,8 +156,8 @@ public static Mapping readFrom(StreamInput sin) throws IOException { * stores information of list of field names that contain information for given IoC (Indicator of Compromise). */ public static class IocFields implements Writeable { - private final String ioc; + private final String ioc; private final List fields; public IocFields(String ioc, List fields) { diff --git a/src/main/java/org/opensearch/securityanalytics/model/ThreatIntelFeedData.java b/src/main/java/org/opensearch/securityanalytics/model/ThreatIntelFeedData.java index 7696b331e..9f9f5d855 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/ThreatIntelFeedData.java +++ b/src/main/java/org/opensearch/securityanalytics/model/ThreatIntelFeedData.java @@ -138,8 +138,8 @@ private XContentBuilder createXContentBuilder(XContentBuilder builder, ToXConten if (params.paramAsBoolean("with_type", false)) { builder.startObject(type); } - builder.field(TYPE_FIELD, type); builder + .field(TYPE_FIELD, type) .field(IOC_TYPE_FIELD, iocType) .field(IOC_VALUE_FIELD, iocValue) .field(FEED_ID_FIELD, feedId) diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java index 589f24703..5dde43788 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java @@ -39,7 +39,9 @@ import static org.opensearch.securityanalytics.model.Detector.DETECTORS_INDEX; import static org.opensearch.securityanalytics.util.DetectorUtils.getDetectors; - +/** + * Service that populates detectors with queries generated from threat intelligence data. + */ public class DetectorThreatIntelService { private static final Logger log = LogManager.getLogger(DetectorThreatIntelService.class); @@ -77,7 +79,7 @@ public List createDocLevelQueriesFromThreatIntelList( List fields = iocFieldList.stream().filter(t -> entry.getKey().matches(t.getIoc())).findFirst().get().getFields(); // create doc - for (String field : fields) { //todo increase max clause count from 1024 + for (String field : fields) { queries.add(new DocLevelQuery( constructId(detector, entry.getKey()), tifdList.get(0).getFeedId(), Collections.emptyList(), @@ -105,6 +107,9 @@ private String buildQueryStringQueryWithIocList(Set iocs) { return sb.toString(); } + /** + * Fetches threat intel data and creates doc level queries from threat intel data + */ public void createDocLevelQueryFromThreatIntel(List iocFieldList, Detector detector, ActionListener> listener) { try { if (false == detector.getThreatIntelEnabled() || iocFieldList.isEmpty()) { @@ -146,6 +151,7 @@ private static String constructId(Detector detector, String iocType) { return detector.getName() + "_threat_intel_" + iocType + "_" + UUID.randomUUID(); } + /** Updates all detectors having threat intel detection enabled with the latest threat intel feed data*/ public void updateDetectorsWithLatestThreatIntelRules() { try { QueryBuilder queryBuilder = @@ -159,8 +165,8 @@ public void updateDetectorsWithLatestThreatIntelRules() { ssb.size(9999); CountDownLatch countDownLatch = new CountDownLatch(1); client.execute(SearchDetectorAction.INSTANCE, new SearchDetectorRequest(searchRequest), - ActionListener.wrap(r -> { - List detectors = getDetectors(r, xContentRegistry); + ActionListener.wrap(searchResponse -> { + List detectors = getDetectors(searchResponse, xContentRegistry); detectors.forEach(detector -> { assert detector.getThreatIntelEnabled(); client.execute(IndexDetectorAction.INSTANCE, new IndexDetectorRequest( @@ -168,8 +174,8 @@ public void updateDetectorsWithLatestThreatIntelRules() { RestRequest.Method.PUT, detector), ActionListener.wrap( - res -> { - log.debug("updated {} with latest threat intel info", res.getDetector().getId()); + indexDetectorResponse -> { + log.debug("updated {} with latest threat intel info", indexDetectorResponse.getDetector().getId()); countDownLatch.countDown(); }, e -> { @@ -182,9 +188,9 @@ public void updateDetectorsWithLatestThreatIntelRules() { log.error("Failed to fetch detectors to update with threat intel queries.", e); countDownLatch.countDown(); })); - countDownLatch.await(); + countDownLatch.await(5, TimeUnit.MINUTES); } catch (InterruptedException e) { - throw new RuntimeException(e); + log.error(""); } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java index 6e22a6b8a..40bc7bc53 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java @@ -62,9 +62,9 @@ * Service to handle CRUD operations on Threat Intel Feed Data */ public class ThreatIntelFeedDataService { + private static final Logger log = LogManager.getLogger(ThreatIntelFeedDataService.class); - private final Client client; - private final IndexNameExpressionResolver indexNameExpressionResolver; + public static final String SETTING_INDEX_REFRESH_INTERVAL = "index.refresh_interval"; private static final Map INDEX_SETTING_TO_CREATE = Map.of( IndexMetadata.SETTING_NUMBER_OF_SHARDS, @@ -76,9 +76,12 @@ public class ThreatIntelFeedDataService { IndexMetadata.SETTING_INDEX_HIDDEN, true ); + private final ClusterService clusterService; private final ClusterSettings clusterSettings; private final NamedXContentRegistry xContentRegistry; + private final Client client; + private final IndexNameExpressionResolver indexNameExpressionResolver; public ThreatIntelFeedDataService( ClusterService clusterService, @@ -96,26 +99,18 @@ public void getThreatIntelFeedData( ActionListener> listener ) { try { - //if index not exists - if (IndexUtils.getNewIndexByCreationDate( - this.clusterService.state(), - this.indexNameExpressionResolver, - ".opensearch-sap-threat-intel*" - ) == null) { + + String tifdIndex = getLatestIndexByCreationDate(); + if (tifdIndex == null) { createThreatIntelFeedData(); + tifdIndex = getLatestIndexByCreationDate(); } - //if index exists - String tifdIndex = IndexUtils.getNewIndexByCreationDate( - this.clusterService.state(), - this.indexNameExpressionResolver, - ".opensearch-sap-threat-intel*" - ); - SearchRequest searchRequest = new SearchRequest(tifdIndex); searchRequest.source().size(9999); //TODO: convert to scroll + String finalTifdIndex = tifdIndex; client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { log.error(String.format( - "Failed to fetch threat intel feed data from system index %s", tifdIndex), e); + "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); listener.onFailure(e); })); } catch (InterruptedException e) { @@ -124,6 +119,14 @@ public void getThreatIntelFeedData( } } + private String getLatestIndexByCreationDate() { + return IndexUtils.getNewIndexByCreationDate( + this.clusterService.state(), + this.indexNameExpressionResolver, + THREAT_INTEL_DATA_INDEX_NAME_PREFIX + "*" + ); + } + /** * Create an index for a threat intel feed *

@@ -169,7 +172,7 @@ public void parseAndSaveThreatIntelFeedDataCSV( List tifdList = new ArrayList<>(); while (iterator.hasNext()) { CSVRecord record = iterator.next(); - String iocType = tifMetadata.getIocType(); //todo make generic in upcoming versions + String iocType = tifMetadata.getIocType(); Integer colNum = tifMetadata.getIocCol(); String iocValue = record.values()[colNum].split(" ")[0]; if (iocType.equals("ip") && !isValidIp(iocValue)) { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java index 00437c94a..1346da40c 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java @@ -37,6 +37,7 @@ * Transport action to create job to fetch threat intel feed data and save IoCs */ public class TransportPutTIFJobAction extends HandledTransportAction { + // TODO refactor this into a service class that creates feed updation job. This is not necessary to be a transport action private static final Logger log = LogManager.getLogger(TransportPutTIFJobAction.class); private final ThreadPool threadPool; diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java index d8c955f6a..8b2055ed3 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/integTests/ThreatIntelJobRunnerIT.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME; import static org.opensearch.securityanalytics.TestHelpers.*; import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE; import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL; @@ -51,8 +52,8 @@ public class ThreatIntelJobRunnerIT extends SecurityAnalyticsRestTestCase { public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOException, InterruptedException { // update job runner to run every minute - updateClusterSetting(TIF_UPDATE_INTERVAL.getKey(),"1m"); - + updateClusterSetting(TIF_UPDATE_INTERVAL.getKey(), "1m"); + // Create a detector updateClusterSetting(ENABLE_WORKFLOW_USAGE.getKey(), "true"); String index = createTestIndex(randomIndex(), windowsIndexMapping()); @@ -115,7 +116,7 @@ public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOExcep // Verify workflow verifyWorkflow(detectorMap, monitorIds, 1); List iocs = getThreatIntelFeedIocs(3); - assertEquals(iocs.size(),3); + assertEquals(iocs.size(), 3); // get job runner index and verify parameters exist List jobMetaDataList = getJobSchedulerParameter(); @@ -151,7 +152,7 @@ public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOExcep // verify new threat intel feed timestamp is different List newFeedTimestamp = getThreatIntelFeedsTime(); - for (int i =0; i< newFeedTimestamp.size(); i++) { + for (int i = 0; i < newFeedTimestamp.size(); i++) { assertNotEquals(newFeedTimestamp.get(i), originalFeedTimestamp.get(i)); } @@ -171,7 +172,7 @@ protected boolean verifyJobRan(Instant firstUpdatedTime) throws IOException { TIFJobParameter newJobMetaData = newJobMetaDataList.get(0); Instant newUpdatedTime = newJobMetaData.getLastUpdateTime(); - if (!firstUpdatedTime.toString().equals(newUpdatedTime.toString())){ + if (!firstUpdatedTime.toString().equals(newUpdatedTime.toString())) { return true; } return false; @@ -197,9 +198,10 @@ private List getThreatIntelFeedsTime() throws IOException { private List getJobSchedulerParameter() throws IOException { String request = getMatchAllSearchRequestString(); - SearchResponse res = executeSearchAndGetResponse(".opensearch-sap-threat-intel-job*", request, false); + SearchResponse res = executeSearchAndGetResponse(JOB_INDEX_NAME + "*", request, false); return getTIFJobParameterList(res, xContentRegistry()).stream().collect(Collectors.toList()); } + public static List getTIFJobParameterList(SearchResponse searchResponse, NamedXContentRegistry xContentRegistry) { List list = new ArrayList<>(); if (searchResponse.getHits().getHits().length != 0) {