Skip to content

Commit

Permalink
JS for Threat intel feeds - changed extension (#675)
Browse files Browse the repository at this point in the history
* merge conflicts

Signed-off-by: Joanne Wang <[email protected]>

* fixed java wildcards and changed update key name

Signed-off-by: Joanne Wang <[email protected]>

* integ test failing

Signed-off-by: Joanne Wang <[email protected]>

* fix job scheduler params

Signed-off-by: Joanne Wang <[email protected]>

* changed extension and has debug messages

Signed-off-by: Joanne Wang <[email protected]>

* clean up

Signed-off-by: Joanne Wang <[email protected]>

* fixed job scheduler plugin spi jar resolution

* cleaned up TODOs and changed job scheduler name

Signed-off-by: Joanne Wang <[email protected]>

---------

Signed-off-by: Joanne Wang <[email protected]>
Co-authored-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
jowg-amazon and eirsep authored Oct 19, 2023
1 parent 0bdd58b commit 0e89286
Show file tree
Hide file tree
Showing 19 changed files with 370 additions and 231 deletions.
22 changes: 8 additions & 14 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ opensearchplugin {
name 'opensearch-security-analytics'
description 'OpenSearch Security Analytics plugin'
classname 'org.opensearch.securityanalytics.SecurityAnalyticsPlugin'
// extendedPlugins = ['opensearch-job-scheduler']
extendedPlugins = ['opensearch-job-scheduler']
}

javaRestTest {
Expand Down Expand Up @@ -143,12 +143,6 @@ repositories {
sourceSets.main.java.srcDirs = ['src/main/generated','src/main/java']
configurations {
zipArchive

all {
resolutionStrategy {
force "com.google.guava:guava:32.0.1-jre"
}
}
}

dependencies {
Expand All @@ -159,20 +153,14 @@ dependencies {
api "org.opensearch:common-utils:${common_utils_version}@jar"
api "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
implementation "org.apache.commons:commons-csv:1.10.0"

// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'alerting', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-notifications-core', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'notifications', version: "${opensearch_build}"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"

//spotless
implementation('com.google.googlejavaformat:google-java-format:1.17.0') {
exclude group: 'com.google.guava'
}
implementation 'com.google.guava:guava:32.0.1-jre'
}

// RPM & Debian build
Expand Down Expand Up @@ -303,6 +291,12 @@ testClusters.integTest {
}
}
}))
nodes.each { node ->
def plugins = node.plugins
def firstPlugin = plugins.get(0)
plugins.remove(0)
plugins.add(firstPlugin)
}
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
*/
package org.opensearch.securityanalytics;

import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.ActionListener;
Expand All @@ -32,6 +36,9 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.*;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
Expand All @@ -54,6 +61,7 @@
import org.opensearch.securityanalytics.threatIntel.action.*;
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.feedMetadata.BuiltInTIFMetadataLoader;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobRunner;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobUpdateService;
Expand All @@ -68,13 +76,12 @@
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX;

public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin, SystemIndexPlugin {
public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin, SystemIndexPlugin, JobSchedulerExtension {

private static final Logger log = LogManager.getLogger(SecurityAnalyticsPlugin.class);

Expand All @@ -90,6 +97,8 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/correlation/rules";

public static final String CUSTOM_LOG_TYPE_URI = PLUGINS_BASE_URI + "/logtype";
public static final String JOB_INDEX_NAME = ".opensearch-sap-threatintel-job";
public static final Map<String, Object> TIF_JOB_INDEX_SETTING = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-all", "index.hidden", true);

private CorrelationRuleIndices correlationRuleIndices;

Expand Down Expand Up @@ -117,6 +126,8 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
return List.of(new SystemIndexDescriptor(THREAT_INTEL_DATA_INDEX_NAME_PREFIX, "System index used for threat intel data"));
}



@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
Expand Down Expand Up @@ -147,7 +158,7 @@ public Collection<Object> createComponents(Client client,
TIFJobUpdateService tifJobUpdateService = new TIFJobUpdateService(clusterService, tifJobParameterService, threatIntelFeedDataService, builtInTIFMetadataLoader);
TIFLockService threatIntelLockService = new TIFLockService(clusterService, client);

TIFJobRunner.getJobRunnerInstance().initialize(clusterService,tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);
TIFJobRunner.getJobRunnerInstance().initialize(clusterService, tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService);

return List.of(
detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices,
Expand Down Expand Up @@ -192,10 +203,31 @@ public List<RestHandler> getRestHandlers(Settings settings,
new RestSearchCorrelationRuleAction(),
new RestIndexCustomLogTypeAction(),
new RestSearchCustomLogTypeAction(),
new RestDeleteCustomLogTypeAction()
new RestDeleteCustomLogTypeAction(),
new RestPutTIFJobHandler(clusterSettings)
);
}

@Override
public String getJobType() {
return "opensearch_sap_threatintel_job";
}

@Override
public String getJobIndex() {
return JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
return TIFJobRunner.getJobRunnerInstance();
}

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> TIFJobParameter.PARSER.parse(parser, null);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public class SecurityAnalyticsSettings {
// threat intel settings
public static final Setting<TimeValue> TIF_UPDATE_INTERVAL = Setting.timeSetting(
"plugins.security_analytics.threatintel.tifjob.update_interval",
TimeValue.timeValueHours(24),
TimeValue.timeValueHours(1),
TimeValue.timeValueMinutes(1),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX;
Expand Down Expand Up @@ -103,7 +105,7 @@ public void getThreatIntelFeedData(
) {
try {
//if index not exists
if(IndexUtils.getNewIndexByCreationDate(
if (IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
".opensearch-sap-threatintel*"
Expand All @@ -129,7 +131,7 @@ public void getThreatIntelFeedData(
listener.onFailure(e);
}
}

private void createThreatIntelFeedData() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(PutTIFJobAction.INSTANCE, new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL))).actionGet();
Expand All @@ -138,7 +140,7 @@ private void createThreatIntelFeedData() throws InterruptedException {

/**
* Create an index for a threat intel feed
*
* <p>
* Index setting start with single shard, zero replica, no refresh interval, and hidden.
* Once the threat intel feed is indexed, do refresh and force merge.
* Then, change the index setting to expand replica to all nodes, and read only allow delete.
Expand Down Expand Up @@ -174,7 +176,7 @@ private String getIndexMapping() {
* Puts threat intel feed from CSVRecord iterator into a given index in bulk
*
* @param indexName Index name to save the threat intel feed
* @param iterator TIF data to insert
* @param iterator TIF data to insert
* @param renewLock Runnable to renew lock
*/
public void parseAndSaveThreatIntelFeedDataCSV(
Expand All @@ -197,6 +199,10 @@ public void parseAndSaveThreatIntelFeedDataCSV(
String iocType = tifMetadata.getIocType(); //todo make generic in upcoming versions
Integer colNum = tifMetadata.getIocCol();
String iocValue = record.values()[colNum].split(" ")[0];
if (iocType.equals("ip") && !isValidIp(iocValue)) {
log.info("Invalid IP address, skipping this ioc record.");
continue;
}
String feedId = tifMetadata.getFeedId();
Instant timestamp = Instant.now();
ThreatIntelFeedData threatIntelFeedData = new ThreatIntelFeedData(iocType, iocValue, feedId, timestamp);
Expand All @@ -218,8 +224,14 @@ public void parseAndSaveThreatIntelFeedDataCSV(
freezeIndex(indexName);
}

public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) {
public static boolean isValidIp(String ip) {
String ipPattern = "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$";
Pattern pattern = Pattern.compile(ipPattern);
Matcher matcher = pattern.matcher(ip);
return matcher.matches();
}

public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) {
try {
BulkResponse response = StashedThreadContext.run(client, () -> {
return client.bulk(bulkRequest).actionGet(timeout);
Expand Down Expand Up @@ -252,10 +264,6 @@ private void freezeIndex(final String indexName) {
});
}

public void deleteThreatIntelDataIndex(final String index) {
deleteThreatIntelDataIndex(Arrays.asList(index));
}

public void deleteThreatIntelDataIndex(final List<String> indices) {
if (indices == null || indices.isEmpty()) {
return;
Expand Down Expand Up @@ -286,10 +294,4 @@ public void deleteThreatIntelDataIndex(final List<String> indices) {
throw new OpenSearchException("failed to delete data[{}]", String.join(",", indices));
}
}
public static class ThreatIntelFeedUpdateHandler implements Runnable {

@Override
public void run() {

}
}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public TimeValue getUpdateInterval() {
return this.updateInterval;
}

public void setUpdateInterval(TimeValue timeValue) {
this.updateInterval = timeValue;
}

/**
* Default constructor
* @param name name of a tif job
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatIntel.action;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.PUT;

/**
* Rest handler for threat intel TIFjob creation
*
* This handler handles a request of
* PUT /_plugins/security_analytics/threatintel/tifjob/{id}
* {
* "id": {id},
* "name": {name},
* "update_interval_in_days": 1
* }
*
* When request is received, it will create a TIFjob
* After the creation of TIFjob is completed, it will schedule the next update task after update_interval_in_days.
*
*/
public class RestPutTIFJobHandler extends BaseRestHandler {
private static final String ACTION_NAME = "threatintel_tifjob_put";
private final ClusterSettings clusterSettings;

public RestPutTIFJobHandler(final ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
}

@Override
public String getName() {
return ACTION_NAME;
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final PutTIFJobRequest putTIFJobRequest = new PutTIFJobRequest("jobname",
new TimeValue(1, TimeUnit.MINUTES));

return channel -> client.executeLocally(PutTIFJobAction.INSTANCE, putTIFJobRequest, new RestToXContentListener<>(channel));
}

@Override
public List<Route> routes() {
String path = "/_p/_s";
return List.of(new Route(GET, path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ protected void doExecute(final Task task, final DeleteTIFJobRequest request, fin
return;
}
try {
// TODO: makes every sub-methods as async call to avoid using a thread in generic pool
threadPool.generic().submit(() -> {
try {
deleteTIFJob(request.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

package org.opensearch.securityanalytics.threatIntel.common;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobExtension.JOB_INDEX_NAME;
import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;


import java.time.Instant;
import java.util.Optional;
Expand Down
Loading

0 comments on commit 0e89286

Please sign in to comment.