Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIF Job Runner Cleanup #676

Merged
merged 15 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.core.action.ActionResponse;
Expand Down Expand Up @@ -97,8 +98,8 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/correlation/rules";

public static final String CUSTOM_LOG_TYPE_URI = PLUGINS_BASE_URI + "/logtype";
public static final String JOB_INDEX_NAME = ".opensearch-sap-threatintel-job";
public static final Map<String, Object> TIF_JOB_INDEX_SETTING = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-all", "index.hidden", true);
public static final String JOB_INDEX_NAME = ".opensearch-sap-threat-intel-job";
public static final Map<String, Object> TIF_JOB_INDEX_SETTING = Map.of(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1, IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all", IndexMetadata.SETTING_INDEX_HIDDEN, true);

private CorrelationRuleIndices correlationRuleIndices;

Expand All @@ -123,7 +124,7 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map
private LogTypeService logTypeService;
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings){
return List.of(new SystemIndexDescriptor(THREAT_INTEL_DATA_INDEX_NAME_PREFIX, "System index used for threat intel data"));
return Collections.singletonList(new SystemIndexDescriptor(THREAT_INTEL_DATA_INDEX_NAME_PREFIX, "System index used for threat intel data"));
}


Expand Down Expand Up @@ -210,7 +211,7 @@ public List<RestHandler> getRestHandlers(Settings settings,

@Override
public String getJobType() {
return "opensearch_sap_threatintel_job";
return "opensearch_sap_threat_intel_job";
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class SecurityAnalyticsSettings {
// threat intel settings
public static final Setting<TimeValue> TIF_UPDATE_INTERVAL = Setting.timeSetting(
"plugins.security_analytics.threatintel.tifjob.update_interval",
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(1440),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -49,7 +50,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
Expand All @@ -63,28 +63,30 @@
*/
public class ThreatIntelFeedDataService {
private static final Logger log = LogManager.getLogger(ThreatIntelFeedDataService.class);

private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
public static final String SETTING_INDEX_REFRESH_INTERVAL = "index.refresh_interval";
public static final String SETTING_INDEX_BLOCKS_WRITE = "index.blocks.write";

private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
"index.number_of_shards",
IndexMetadata.SETTING_NUMBER_OF_SHARDS,
1,
"index.number_of_replicas",
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
0,
"index.refresh_interval",
SETTING_INDEX_REFRESH_INTERVAL,
-1,
"index.hidden",
IndexMetadata.SETTING_INDEX_HIDDEN,
true
);
private static final Map<String, Object> INDEX_SETTING_TO_FREEZE = Map.of(
"index.auto_expand_replicas",
IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS,
"0-all",
"index.blocks.write",
SETTING_INDEX_BLOCKS_WRITE,
true
);
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
private final NamedXContentRegistry xContentRegistry;

public ThreatIntelFeedDataService(
ClusterService clusterService,
Expand All @@ -98,8 +100,6 @@ public ThreatIntelFeedDataService(
this.clusterSettings = clusterService.getClusterSettings();
}

private final NamedXContentRegistry xContentRegistry;

public void getThreatIntelFeedData(
ActionListener<List<ThreatIntelFeedData>> listener
) {
Expand All @@ -108,15 +108,15 @@ public void getThreatIntelFeedData(
if (IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
".opensearch-sap-threatintel*"
".opensearch-sap-threat-intel*"
) == null) {
createThreatIntelFeedData();
}
//if index exists
String tifdIndex = IndexUtils.getNewIndexByCreationDate(
this.clusterService.state(),
this.indexNameExpressionResolver,
".opensearch-sap-threatintel*"
".opensearch-sap-threat-intel*"
);

SearchRequest searchRequest = new SearchRequest(tifdIndex);
Expand All @@ -127,17 +127,11 @@ public void getThreatIntelFeedData(
listener.onFailure(e);
}));
} catch (InterruptedException e) {
log.error("failed to get threat intel feed data", e);
log.error("Failed to get threat intel feed data", e);
listener.onFailure(e);
}
}

private void createThreatIntelFeedData() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(PutTIFJobAction.INSTANCE, new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL))).actionGet();
countDownLatch.await();
}

/**
* Create an index for a threat intel feed
* <p>
Expand All @@ -159,19 +153,6 @@ public void createIndexIfNotExists(final String indexName) {
);
}

private String getIndexMapping() {
try {
try (InputStream is = TIFJobParameterService.class.getResourceAsStream("/mappings/threat_intel_feed_mapping.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
log.error("Runtime exception when getting the threat intel index mapping", e);
throw new SecurityAnalyticsException("Runtime exception when getting the threat intel index mapping", RestStatus.INTERNAL_SERVER_ERROR, e);
}
}

/**
* Puts threat intel feed from CSVRecord iterator into a given index in bulk
*
Expand Down Expand Up @@ -221,7 +202,7 @@ public void parseAndSaveThreatIntelFeedDataCSV(
}
saveTifds(bulkRequest, timeout);
renewLock.run();
freezeIndex(indexName);
setIndexReadOnly(indexName);
}

public static boolean isValidIp(String ip) {
Expand Down Expand Up @@ -250,20 +231,6 @@ public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) {

}

private void freezeIndex(final String indexName) {
TimeValue timeout = clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(INDEX_SETTING_TO_FREEZE)
.execute()
.actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT));
});
}

public void deleteThreatIntelDataIndex(final List<String> indices) {
if (indices == null || indices.isEmpty()) {
return;
Expand Down Expand Up @@ -294,4 +261,42 @@ public void deleteThreatIntelDataIndex(final List<String> indices) {
throw new OpenSearchException("failed to delete data[{}]", String.join(",", indices));
}
}

private void createThreatIntelFeedData() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(PutTIFJobAction.INSTANCE, new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL))).actionGet();
countDownLatch.await();
}

private String getIndexMapping() {
try {
try (InputStream is = TIFJobParameterService.class.getResourceAsStream("/mappings/threat_intel_feed_mapping.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
}
} catch (IOException e) {
log.error("Runtime exception when getting the threat intel index mapping", e);
throw new SecurityAnalyticsException("Runtime exception when getting the threat intel index mapping", RestStatus.INTERNAL_SERVER_ERROR, e);
}
}

/**
* Sets the TIFData index as read only to prevent further writing to it
* When index needs to be updated, all TIFData indices will be deleted then repopulated
* @param indexName
*/
private void setIndexReadOnly(final String indexName) {
TimeValue timeout = clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(INDEX_SETTING_TO_FREEZE)
.execute()
.actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ public List<String> validateTIFJobName(final String tifJobName) {
String.format(Locale.ROOT, "threat intel feed job name must not contain the following characters %s", Strings.INVALID_FILENAME_CHARS)
);
}
if (tifJobName.contains("#")) {
if (tifJobName.contains("#") || tifJobName.contains(":") ) {
errorMsgs.add("threat intel feed job name must not contain '#'");
}
if (tifJobName.contains(":")) {
errorMsgs.add("threat intel feed job name must not contain ':'");
}
if (tifJobName.charAt(0) == '_' || tifJobName.charAt(0) == '-' || tifJobName.charAt(0) == '+') {
errorMsgs.add("threat intel feed job name must not start with '_', '-', or '+'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,6 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeBoolean(hasHeader);
}

private TIFMetadata() {
}


@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,23 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter {
/**
* Prefix of indices having threatIntel data
*/
public static final String THREAT_INTEL_DATA_INDEX_NAME_PREFIX = ".opensearch-sap-threatintel";
public static final String THREAT_INTEL_DATA_INDEX_NAME_PREFIX = ".opensearch-sap-threat-intel";


/**
* String fields for job scheduling parameters used for ParseField
*/
private static final String name_field = "name";
private static final String enabled_field = "update_enabled";
private static final String last_update_time_field = "last_update_time";
private static final String last_update_time_field_readable = "last_update_time_field";
private static final String schedule_field = "schedule";
private static final String enabled_time_field = "enabled_time";
private static final String enabled_time_field_readable = "enabled_time_field";
private static final String state_field = "state";
private static final String indices_field = "indices";
private static final String update_stats_field = "update_stats";



/**
Expand Down Expand Up @@ -543,15 +559,15 @@ public void setLastFailedAt(Instant now) {
*/
public static class Builder {
public static TIFJobParameter build(final PutTIFJobRequest request) {
long minutes = request.getUpdateInterval().minutes();
String name = request.getName();
IntervalSchedule schedule = new IntervalSchedule(
Instant.now().truncatedTo(ChronoUnit.MILLIS),
(int) request.getUpdateInterval().minutes(),
(int) minutes,
ChronoUnit.MINUTES
);
return new TIFJobParameter(name, schedule);


}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
*/
package org.opensearch.securityanalytics.util;

import java.util.Set;

import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -28,26 +21,23 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.mapper.MapperUtils;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.model.Rule;
import org.opensearch.securityanalytics.rules.backend.OSQueryBackend;
import org.opensearch.securityanalytics.rules.backend.QueryBackend;
Expand All @@ -56,24 +46,14 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.securityanalytics.model.Detector.NO_ID;
import static org.opensearch.securityanalytics.model.Detector.NO_VERSION;

public class RuleIndices {
Expand Down
Loading