Skip to content

Commit

Permalink
refactoring code to address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Oct 22, 2023
1 parent 31eebcb commit ae084e7
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.*;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand All @@ -59,7 +65,8 @@
import org.opensearch.securityanalytics.resthandler.*;
import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService;
import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService;
import org.opensearch.securityanalytics.threatIntel.action.*;
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction;
import org.opensearch.securityanalytics.threatIntel.action.TransportPutTIFJobAction;
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.feedMetadata.BuiltInTIFMetadataLoader;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter;
Expand Down Expand Up @@ -98,7 +105,7 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/correlation/rules";

public static final String CUSTOM_LOG_TYPE_URI = PLUGINS_BASE_URI + "/logtype";
public static final String JOB_INDEX_NAME = ".opensearch-sap-threat-intel-job";
public static final String JOB_INDEX_NAME = ".opensearch-sap--job";
public static final Map<String, Object> TIF_JOB_INDEX_SETTING = Map.of(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1, IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all", IndexMetadata.SETTING_INDEX_HIDDEN, true);

private CorrelationRuleIndices correlationRuleIndices;
Expand Down Expand Up @@ -210,7 +217,7 @@ public List<RestHandler> getRestHandlers(Settings settings,

@Override
public String getJobType() {
return "opensearch_sap_threat_intel_job";
return "opensearch_sap_job";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,11 @@ public LogType(Map<String, Object> logTypeAsMap) {
new Mapping(e.get(RAW_FIELD), e.get(ECS), e.get(OCSF))
).collect(Collectors.toList());
}
if(logTypeAsMap.containsKey(IOC_FIELDS)) {
if (logTypeAsMap.containsKey(IOC_FIELDS)) {
List<Map<String, Object>> iocFieldsList = (List<Map<String, Object>>) logTypeAsMap.get(IOC_FIELDS);
if (iocFieldsList.size() > 0) {
this.iocFieldsList = new ArrayList<>(mappings.size());
this.iocFieldsList = iocFieldsList.stream().map(e ->
new IocFields(e.get(IOC).toString(), (List<String>) e.get(FIELDS))
).collect(Collectors.toList());
}
this.iocFieldsList = iocFieldsList.stream().map(e ->
new IocFields(e.get(IOC).toString(), (List<String>) e.get(FIELDS))
).collect(Collectors.toList());
} else {
iocFieldsList = Collections.emptyList();
}
Expand Down Expand Up @@ -159,8 +156,8 @@ public static Mapping readFrom(StreamInput sin) throws IOException {
* stores information of list of field names that contain information for given IoC (Indicator of Compromise).
*/
public static class IocFields implements Writeable {
private final String ioc;

private final String ioc;
private final List<String> fields;

public IocFields(String ioc, List<String> fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ private XContentBuilder createXContentBuilder(XContentBuilder builder, ToXConten
if (params.paramAsBoolean("with_type", false)) {
builder.startObject(type);
}
builder.field(TYPE_FIELD, type);
builder
.field(TYPE_FIELD, type)
.field(IOC_TYPE_FIELD, iocType)
.field(IOC_VALUE_FIELD, iocValue)
.field(FEED_ID_FIELD, feedId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import static org.opensearch.securityanalytics.model.Detector.DETECTORS_INDEX;
import static org.opensearch.securityanalytics.util.DetectorUtils.getDetectors;


/**
* Service that populates detectors with queries generated from threat intelligence data.
*/
public class DetectorThreatIntelService {

private static final Logger log = LogManager.getLogger(DetectorThreatIntelService.class);
Expand Down Expand Up @@ -77,7 +79,7 @@ public List<DocLevelQuery> createDocLevelQueriesFromThreatIntelList(
List<String> fields = iocFieldList.stream().filter(t -> entry.getKey().matches(t.getIoc())).findFirst().get().getFields();

// create doc
for (String field : fields) { //todo increase max clause count from 1024
for (String field : fields) {
queries.add(new DocLevelQuery(
constructId(detector, entry.getKey()), tifdList.get(0).getFeedId(),
Collections.emptyList(),
Expand Down Expand Up @@ -105,6 +107,9 @@ private String buildQueryStringQueryWithIocList(Set<String> iocs) {
return sb.toString();
}

/**
* Fetches threat intel data and creates doc level queries from threat intel data
*/
public void createDocLevelQueryFromThreatIntel(List<LogType.IocFields> iocFieldList, Detector detector, ActionListener<List<DocLevelQuery>> listener) {
try {
if (false == detector.getThreatIntelEnabled() || iocFieldList.isEmpty()) {
Expand Down Expand Up @@ -146,6 +151,7 @@ private static String constructId(Detector detector, String iocType) {
return detector.getName() + "_threat_intel_" + iocType + "_" + UUID.randomUUID();
}

/** Updates all detectors having threat intel detection enabled with the latest threat intel feed data*/
public void updateDetectorsWithLatestThreatIntelRules() {
try {
QueryBuilder queryBuilder =
Expand All @@ -159,17 +165,17 @@ public void updateDetectorsWithLatestThreatIntelRules() {
ssb.size(9999);
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(SearchDetectorAction.INSTANCE, new SearchDetectorRequest(searchRequest),
ActionListener.wrap(r -> {
List<Detector> detectors = getDetectors(r, xContentRegistry);
ActionListener.wrap(searchResponse -> {
List<Detector> detectors = getDetectors(searchResponse, xContentRegistry);
detectors.forEach(detector -> {
assert detector.getThreatIntelEnabled();
client.execute(IndexDetectorAction.INSTANCE, new IndexDetectorRequest(
detector.getId(), WriteRequest.RefreshPolicy.IMMEDIATE,
RestRequest.Method.PUT,
detector),
ActionListener.wrap(
res -> {
log.debug("updated {} with latest threat intel info", res.getDetector().getId());
indexDetectorResponse -> {
log.debug("updated {} with latest threat intel info", indexDetectorResponse.getDetector().getId());
countDownLatch.countDown();
},
e -> {
Expand All @@ -182,9 +188,9 @@ public void updateDetectorsWithLatestThreatIntelRules() {
log.error("Failed to fetch detectors to update with threat intel queries.", e);
countDownLatch.countDown();
}));
countDownLatch.await();
countDownLatch.await(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
log.error("");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@
* Service to handle CRUD operations on Threat Intel Feed Data
*/
public class ThreatIntelFeedDataService {

private static final Logger log = LogManager.getLogger(ThreatIntelFeedDataService.class);
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;

public static final String SETTING_INDEX_REFRESH_INTERVAL = "index.refresh_interval";
private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
IndexMetadata.SETTING_NUMBER_OF_SHARDS,
Expand All @@ -76,9 +76,12 @@ public class ThreatIntelFeedDataService {
IndexMetadata.SETTING_INDEX_HIDDEN,
true
);

private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
private final NamedXContentRegistry xContentRegistry;
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;

public ThreatIntelFeedDataService(
ClusterService clusterService,
Expand All @@ -96,26 +99,18 @@ public void getThreatIntelFeedData(
ActionListener<List<ThreatIntelFeedData>> listener
) {
try {
//if index not exists
if (IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
".opensearch-sap-threat-intel*"
) == null) {

String tifdIndex = getLatestIndexByCreationDate();
if (tifdIndex == null) {
createThreatIntelFeedData();
tifdIndex = getLatestIndexByCreationDate();
}
//if index exists
String tifdIndex = IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
".opensearch-sap-threat-intel*"
);

SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(
"Failed to fetch threat intel feed data from system index %s", tifdIndex), e);
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));
} catch (InterruptedException e) {
Expand All @@ -124,6 +119,14 @@ public void getThreatIntelFeedData(
}
}

private String getLatestIndexByCreationDate() {
return IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
THREAT_INTEL_DATA_INDEX_NAME_PREFIX + "*"
);
}

/**
* Create an index for a threat intel feed
* <p>
Expand Down Expand Up @@ -169,7 +172,7 @@ public void parseAndSaveThreatIntelFeedDataCSV(
List<ThreatIntelFeedData> tifdList = new ArrayList<>();
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String iocType = tifMetadata.getIocType(); //todo make generic in upcoming versions
String iocType = tifMetadata.getIocType();
Integer colNum = tifMetadata.getIocCol();
String iocValue = record.values()[colNum].split(" ")[0];
if (iocType.equals("ip") && !isValidIp(iocValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* Transport action to create job to fetch threat intel feed data and save IoCs
*/
public class TransportPutTIFJobAction extends HandledTransportAction<PutTIFJobRequest, AcknowledgedResponse> {
// TODO refactor this into a service class that creates feed updation job. This is not necessary to be a transport action
private static final Logger log = LogManager.getLogger(TransportPutTIFJobAction.class);

private final ThreadPool threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;
import static org.opensearch.securityanalytics.TestHelpers.*;
import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE;
import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL;
Expand All @@ -51,8 +52,8 @@ public class ThreatIntelJobRunnerIT extends SecurityAnalyticsRestTestCase {
public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOException, InterruptedException {

// update job runner to run every minute
updateClusterSetting(TIF_UPDATE_INTERVAL.getKey(),"1m");
updateClusterSetting(TIF_UPDATE_INTERVAL.getKey(), "1m");

// Create a detector
updateClusterSetting(ENABLE_WORKFLOW_USAGE.getKey(), "true");
String index = createTestIndex(randomIndex(), windowsIndexMapping());
Expand Down Expand Up @@ -115,7 +116,7 @@ public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOExcep
// Verify workflow
verifyWorkflow(detectorMap, monitorIds, 1);
List<String> iocs = getThreatIntelFeedIocs(3);
assertEquals(iocs.size(),3);
assertEquals(iocs.size(), 3);

// get job runner index and verify parameters exist
List<TIFJobParameter> jobMetaDataList = getJobSchedulerParameter();
Expand Down Expand Up @@ -151,7 +152,7 @@ public void testCreateDetector_threatIntelEnabled_testJobRunner() throws IOExcep

// verify new threat intel feed timestamp is different
List<Instant> newFeedTimestamp = getThreatIntelFeedsTime();
for (int i =0; i< newFeedTimestamp.size(); i++) {
for (int i = 0; i < newFeedTimestamp.size(); i++) {
assertNotEquals(newFeedTimestamp.get(i), originalFeedTimestamp.get(i));
}

Expand All @@ -171,7 +172,7 @@ protected boolean verifyJobRan(Instant firstUpdatedTime) throws IOException {

TIFJobParameter newJobMetaData = newJobMetaDataList.get(0);
Instant newUpdatedTime = newJobMetaData.getLastUpdateTime();
if (!firstUpdatedTime.toString().equals(newUpdatedTime.toString())){
if (!firstUpdatedTime.toString().equals(newUpdatedTime.toString())) {
return true;
}
return false;
Expand All @@ -197,9 +198,10 @@ private List<Instant> getThreatIntelFeedsTime() throws IOException {

private List<TIFJobParameter> getJobSchedulerParameter() throws IOException {
String request = getMatchAllSearchRequestString();
SearchResponse res = executeSearchAndGetResponse(".opensearch-sap-threat-intel-job*", request, false);
SearchResponse res = executeSearchAndGetResponse(JOB_INDEX_NAME + "*", request, false);
return getTIFJobParameterList(res, xContentRegistry()).stream().collect(Collectors.toList());
}

public static List<TIFJobParameter> getTIFJobParameterList(SearchResponse searchResponse, NamedXContentRegistry xContentRegistry) {
List<TIFJobParameter> list = new ArrayList<>();
if (searchResponse.getHits().getHits().length != 0) {
Expand Down

0 comments on commit ae084e7

Please sign in to comment.