Skip to content

Commit

Permalink
Preliminary framework for jobscheduler and datasource (#626)
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored Oct 5, 2023
1 parent c58d233 commit 18f4498
Show file tree
Hide file tree
Showing 38 changed files with 4,187 additions and 6 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ dependencies {
api "org.opensearch:common-utils:${common_utils_version}@jar"
api "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
implementation "org.apache.commons:commons-csv:1.10.0"

// Needed for integ tests
zipArchive group: 'org.opensearch.plugin', name:'alerting', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Collection<Object> createComponents(Client client,
mapperService = new MapperService(client, clusterService, indexNameExpressionResolver, indexTemplateManager, logTypeService);
ruleIndices = new RuleIndices(logTypeService, client, clusterService, threadPool);
correlationRuleIndices = new CorrelationRuleIndices(client, clusterService);
ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), client, indexNameExpressionResolver, xContentRegistry);
ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), clusterService, client, indexNameExpressionResolver, xContentRegistry);
DetectorThreatIntelService detectorThreatIntelService = new DetectorThreatIntelService(threatIntelFeedDataService);
this.client = client;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
grant {
permission java.lang.management.ManagementPermission "reputation.alienvault.com:443" "connect,resolve";
};
Original file line number Diff line number Diff line change
@@ -1,46 +1,115 @@
package org.opensearch.securityanalytics.threatIntel;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.findings.FindingsService;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelSettings;
import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.threatIntel.common.Constants;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import static org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource.THREAT_INTEL_DATA_INDEX_NAME_PREFIX;

/**
* Service to handle CRUD operations on Threat Intel Feed Data
*/
public class ThreatIntelFeedDataService {
private static final Logger log = LogManager.getLogger(FindingsService.class);
private static final String SCHEMA_VERSION = "schema_version";
private static final String IOC_TYPE = "ioc_type";
private static final String IOC_VALUE = "ioc_value";
private static final String FEED_ID = "feed_id";
private static final String TIMESTAMP = "timestamp";
private static final String TYPE = "type";
private static final String DATA_FIELD_NAME = "_data";

private final ClusterState state;
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;

private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
"index.number_of_shards",
1,
"index.number_of_replicas",
0,
"index.refresh_interval",
-1,
"index.hidden",
true
);
private static final Map<String, Object> INDEX_SETTING_TO_FREEZE = Map.of(
"index.auto_expand_replicas",
"0-all",
"index.blocks.write",
true
);
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;

public ThreatIntelFeedDataService(
ClusterState state,
ClusterService clusterService,
Client client,
IndexNameExpressionResolver indexNameExpressionResolver,
NamedXContentRegistry xContentRegistry) {
this.state = state;
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.xContentRegistry = xContentRegistry;
this.clusterService = clusterService;
this.clusterSettings = clusterService.getClusterSettings();
}

private final NamedXContentRegistry xContentRegistry;
Expand All @@ -52,7 +121,7 @@ public void getThreatIntelFeedData(
String tifdIndex = IndexUtils.getNewIndexByCreationDate(
this.state,
this.indexNameExpressionResolver,
".opendsearch-sap-threatintel*"
".opensearch-sap-threatintel*" //name?
);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("ioc_type", iocType)));
Expand Down Expand Up @@ -87,4 +156,175 @@ private List<ThreatIntelFeedData> getTifdList(SearchResponse searchResponse) {
}
return list;
}

/**
* Create an index for a threat intel feed
*
* Index setting start with single shard, zero replica, no refresh interval, and hidden.
* Once the threat intel feed is indexed, do refresh and force merge.
* Then, change the index setting to expand replica to all nodes, and read only allow delete.
*
* @param indexName index name
*/
public void createIndexIfNotExists(final String indexName) {
if (clusterService.state().metadata().hasIndex(indexName) == true) {
return;
}
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE)
.mapping(getIndexMapping());
StashedThreadContext.run(
client,
() -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT))
);
}

private void freezeIndex(final String indexName) {
TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(INDEX_SETTING_TO_FREEZE)
.execute()
.actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT));
});
}

private String getIndexMapping() {
try {
try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threat_intel_feed_mapping.json")) { // TODO: check Datasource dao and this mapping
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
log.error("Runtime exception when getting the threat intel index mapping", e);
throw new SecurityAnalyticsException("Runtime exception when getting the threat intel index mapping", RestStatus.INTERNAL_SERVER_ERROR, e);
}
}

/**
* Create CSVParser of a threat intel feed
*
* @param manifest Datasource manifest
* @return CSVParser for threat intel feed
*/
@SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file")
public CSVParser getDatabaseReader(final DatasourceManifest manifest) {
SpecialPermission.check();
return AccessController.doPrivileged((PrivilegedAction<CSVParser>) () -> {
try {
URL url = new URL(manifest.getUrl());
return internalGetDatabaseReader(manifest, url.openConnection());
} catch (IOException e) {
log.error("Exception: failed to read threat intel feed data from {}",manifest.getUrl(), e);
throw new OpenSearchException("failed to read threat intel feed data from {}", manifest.getUrl(), e);
}
});
}

@SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") // TODO: update this function because no zip file...
protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest, final URLConnection connection) throws IOException {
connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE);
ZipInputStream zipIn = new ZipInputStream(connection.getInputStream());
ZipEntry zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
if (zipEntry.getName().equalsIgnoreCase(manifest.getDbName()) == false) {
zipEntry = zipIn.getNextEntry();
continue;
}
return new CSVParser(new BufferedReader(new InputStreamReader(zipIn)), CSVFormat.RFC4180);
}
throw new IllegalArgumentException(
String.format(Locale.ROOT, "database file [%s] does not exist in the zip file [%s]", manifest.getDbName(), manifest.getUrl())
);
}

/**
* Puts threat intel feed from CSVRecord iterator into a given index in bulk
*
* @param indexName Index name to puts the TIF data
* @param fields Field name matching with data in CSVRecord in order
* @param iterator TIF data to insert
* @param renewLock Runnable to renew lock
*/
public void saveThreatIntelFeedData(
final String indexName,
final String[] fields,
final Iterator<CSVRecord> iterator,
final Runnable renewLock
// final ThreatIntelFeedData threatIntelFeedData
) throws IOException {
if (indexName == null || fields == null || iterator == null || renewLock == null){
throw new IllegalArgumentException("Fields cannot be null");
}

TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT);
Integer batchSize = clusterSettings.get(ThreatIntelSettings.BATCH_SIZE);
final BulkRequest bulkRequest = new BulkRequest();
Queue<DocWriteRequest> requests = new LinkedList<>();
for (int i = 0; i < batchSize; i++) {
requests.add(Requests.indexRequest(indexName));
}
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
// XContentBuilder tifData = threatIntelFeedData.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = (IndexRequest) requests.poll();
// indexRequest.source(tifData);
indexRequest.id(record.get(0));
bulkRequest.add(indexRequest);
if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) {
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
throw new OpenSearchException(
"error occurred while ingesting threat intel feed data in {} with an error {}",
indexName,
response.buildFailureMessage()
);
}
requests.addAll(bulkRequest.requests());
bulkRequest.requests().clear();
}
renewLock.run();
}
freezeIndex(indexName);
}

public void deleteThreatIntelDataIndex(final String index) {
deleteThreatIntelDataIndex(Arrays.asList(index));
}

public void deleteThreatIntelDataIndex(final List<String> indices) {
if (indices == null || indices.isEmpty()) {
return;
}

Optional<String> invalidIndex = indices.stream()
.filter(index -> index.startsWith(THREAT_INTEL_DATA_INDEX_NAME_PREFIX) == false)
.findAny();
if (invalidIndex.isPresent()) {
throw new OpenSearchException(
"the index[{}] is not threat intel data index which should start with {}",
invalidIndex.get(),
THREAT_INTEL_DATA_INDEX_NAME_PREFIX
);
}

AcknowledgedResponse response = StashedThreadContext.run(
client,
() -> client.admin()
.indices()
.prepareDelete(indices.toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT))
);

if (response.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opensearch.securityanalytics.threatIntel.common;

import org.opensearch.Version;

import java.util.Locale;
public class Constants {
public static final String USER_AGENT_KEY = "User-Agent";
public static final String USER_AGENT_VALUE = String.format(Locale.ROOT, "OpenSearch/%s vanilla", Version.CURRENT.toString());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatIntel.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Threat intel datasource delete action
*/
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Delete datasource action instance
*/
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction();
/**
* Delete datasource action name
*/
public static final String NAME = "cluster:admin/security_analytics/datasource/delete";

private DeleteDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Loading

0 comments on commit 18f4498

Please sign in to comment.