Skip to content

Commit

Permalink
Implemented draft of IocService.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed May 29, 2024
1 parent cf8c5cd commit 2b9e04a
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.services;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.securityanalytics.action.FetchIocsActionResponse;
import org.opensearch.securityanalytics.model.IocDao;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.IOC_INDEX_NAME_BASE;

/**
* IOC Service implements operations that interact with retrieving IOCs from data sources,
* parsing them into threat intel data models (i.e., [IocDao]), and ingesting them to system indexes.
*/
public class IocService {
private final Logger log = LogManager.getLogger(IocService.class);
private Client client;
private ClusterService clusterService;

public IocService(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}

/**
* Checks whether the [IOC_INDEX_NAME_BASE]-related index exists.
* @param index The index to evaluate.
* @return TRUE if the index is an IOC-related system index, and exists; else returns FALSE.
*/
public boolean hasIocSystemIndex(String index) {
return index.startsWith(IOC_INDEX_NAME_BASE) && this.clusterService.state().routingTable().hasIndex(index);
}

public void initSystemIndexes(String index, ActionListener<FetchIocsActionResponse> listener) {
if (!hasIocSystemIndex(index)) {
var indexRequest = new CreateIndexRequest(index)
// TODO hurneyt finalize mappings once IOC data model PR is merged
// .mapping(iocMappings())
.settings(Settings.builder().put("index.hidden", true).build());
((AdminClient) client).indices().create(indexRequest, new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
// TODO should this be info, or debug level?
log.info("Created system index {}", index);
}

@Override
public void onFailure(Exception e) {
log.error("Failed to create system index {}", index);
listener.onFailure(e);
}
});
}
}

public void indexIocs(List<IocDao> allIocs, ActionListener<FetchIocsActionResponse> listener) throws IOException {
// TODO hurneyt this is using TIF batch size setting. Consider adding IOC-specific setting
Integer batchSize = this.clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE);

List<BulkRequest> bulkRequestList = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (IocDao ioc : allIocs) {
initSystemIndexes(ioc.getType().getSystemIndexName(), listener);

IndexRequest indexRequest = new IndexRequest(ioc.getType().getSystemIndexName())
.opType(DocWriteRequest.OpType.INDEX)
.source(ioc.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
bulkRequest.add(indexRequest);

if (bulkRequest.requests().size() == batchSize) {
bulkRequestList.add(bulkRequest);
bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
bulkRequestList.add(bulkRequest);

GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> {
int idx = 0;
for (BulkResponse response : bulkResponses) {
BulkRequest request = bulkRequestList.get(idx);
if (response.hasFailures()) {
throw new OpenSearchException(
"Error occurred while ingesting IOCs in {} with an error {}",
StringUtils.join(request.getIndices()),
response.buildFailureMessage()
);
}
}
listener.onResponse(new FetchIocsActionResponse(allIocs));
}, listener::onFailure), bulkRequestList.size());

for (BulkRequest req : bulkRequestList) {
try {
StashedThreadContext.run(client, () -> client.bulk(req, bulkResponseListener));
} catch (OpenSearchException e) {
log.error("Failed to save IOCs.", e);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.services;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.TestHelpers;
import org.opensearch.securityanalytics.action.FetchIocsActionResponse;
import org.opensearch.securityanalytics.model.IocDao;
import org.opensearch.securityanalytics.model.IocDaoTests;
import org.opensearch.securityanalytics.model.IocDto;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.IOC_ALL_INDEX_PATTERN;
import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.IOC_IP_INDEX_NAME;

public class IocServiceIT extends OpenSearchIntegTestCase {
private IocService service;
private String testIndex;

@Before
private void beforeTest() {
service = new IocService(client(), clusterService());
testIndex = null;
}

@After
private void afterTest() throws ExecutionException, InterruptedException {
if (testIndex != null && !testIndex.isBlank()) {
client().admin().indices().delete(new DeleteIndexRequest(testIndex)).get();
}
}

public void test_hasIocSystemIndex_returnsFalse_whenIndexNotCreated() throws ExecutionException, InterruptedException {
// Confirm index doesn't exist before running test case
testIndex = IOC_IP_INDEX_NAME;
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).get();
assertFalse(clusterHealthResponse.getIndices().containsKey(testIndex));

// Run test case
assertFalse(service.hasIocSystemIndex(testIndex));
}

public void test_hasIocSystemIndex_returnsFalse_withInvalidIndex() throws ExecutionException, InterruptedException {
// Create test index
testIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
client().admin().indices().create(new CreateIndexRequest(testIndex)).get();

// Run test case
assertFalse(service.hasIocSystemIndex(testIndex));
}

public void test_hasIocSystemIndex_returnsTrue_whenIndexExists() throws ExecutionException, InterruptedException {
// Create test index
testIndex = IOC_IP_INDEX_NAME;
client().admin().indices().create(new CreateIndexRequest(testIndex)).get();

// Run test case
assertTrue(service.hasIocSystemIndex(testIndex));
}

public void test_initSystemIndexes_createsIndexes() {
// Confirm index doesn't exist
testIndex = IOC_IP_INDEX_NAME;
assertFalse(service.hasIocSystemIndex(testIndex));

// Run test case
service.initSystemIndexes(testIndex, new ActionListener<>() {
@Override
public void onResponse(FetchIocsActionResponse fetchIocsActionResponse) {}

@Override
public void onFailure(Exception e) {
fail(String.format("Creation of %s should not fail: %s", testIndex, e));
}
});
assertTrue(service.hasIocSystemIndex(testIndex));
}

public void test_indexIocs_ingestsIocsCorrectly() throws IOException {
// Prepare test IOCs
List<IocDao> iocs = IntStream.range(0, randomInt())
.mapToObj(i -> TestHelpers.randomIocDao())
.collect(Collectors.toList());

// Run test case
service.indexIocs(iocs, new ActionListener<>() {
@Override
public void onResponse(FetchIocsActionResponse fetchIocsActionResponse) {
// Confirm expected number of IOCs in response
assertEquals(iocs.size(), fetchIocsActionResponse.getIocs().size());

try {
// Search system indexes directly
SearchRequest searchRequest = new SearchRequest()
.indices(IOC_ALL_INDEX_PATTERN)
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
SearchResponse searchResponse = client().search(searchRequest).get();

// Confirm expected number of hits
assertEquals(iocs.size(), searchResponse.getHits().getHits().length);

// Parse hits to IOCs
List<IocDao> iocHits = Collections.emptyList();
for (SearchHit ioc : searchResponse.getHits()) {
try {
iocHits.add(IocDao.parse(TestHelpers.parser(ioc.getSourceAsString()), null));
} catch (IOException e) {
fail(String.format("Failed to parse IOC hit: %s", e));
}
}

// Confirm expected number of IOCs
assertEquals(iocs.size(), iocHits.size());

// Sort IOCs for comparison
iocs.sort(Comparator.comparing(IocDao::getId));
fetchIocsActionResponse.getIocs().sort(Comparator.comparing(IocDto::getId));
iocHits.sort(Comparator.comparing(IocDao::getId));

// Confirm IOCs are equal
for (int i = 0; i < iocs.size(); i++) {
assertEqualIocs(iocs.get(i), fetchIocsActionResponse.getIocs().get(i));
IocDaoTests.assertEqualIocDaos(iocs.get(i), iocHits.get(i));
}
} catch (InterruptedException | ExecutionException e) {
fail(String.format("IOC_ALL_INDEX_PATTERN search failed: %s", e));
}
}

@Override
public void onFailure(Exception e) {
fail(String.format("Ingestion of IOCs should not fail: %s", e));
}
});
}

private void assertEqualIocs(IocDao iocDao, IocDto iocDto) {
assertEquals(iocDao.getId(), iocDto.getId());
assertEquals(iocDao.getName(), iocDto.getName());
assertEquals(iocDao.getValue(), iocDto.getValue());
assertEquals(iocDao.getSeverity(), iocDto.getSeverity());
assertEquals(iocDao.getSpecVersion(), iocDto.getSpecVersion());
assertEquals(iocDao.getCreated(), iocDto.getCreated());
assertEquals(iocDao.getModified(), iocDto.getModified());
assertEquals(iocDao.getDescription(), iocDto.getDescription());
assertEquals(iocDao.getLabels(), iocDto.getLabels());
assertEquals(iocDao.getFeedId(), iocDto.getFeedId());
}
}

0 comments on commit 2b9e04a

Please sign in to comment.