-
Notifications
You must be signed in to change notification settings - Fork 76
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
prelim framework for jobscheduler and datasource
- Loading branch information
1 parent
52190ab
commit 994739c
Showing
9 changed files
with
436 additions
and
142 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
42 changes: 42 additions & 0 deletions
42
src/main/java/org/opensearch/securityanalytics/threatintel/common/StashedThreadContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.securityanalytics.threatintel.common; | ||
|
||
import java.util.function.Supplier; | ||
|
||
import org.opensearch.client.Client; | ||
import org.opensearch.common.util.concurrent.ThreadContext; | ||
|
||
/** | ||
* Helper class to run code with stashed thread context | ||
* | ||
* Code need to be run with stashed thread context if it interacts with system index | ||
* when security plugin is enabled. | ||
*/ | ||
public class StashedThreadContext { | ||
/** | ||
* Set the thread context to default, this is needed to allow actions on model system index | ||
* when security plugin is enabled | ||
* @param function runnable that needs to be executed after thread context has been stashed, accepts and returns nothing | ||
*/ | ||
public static void run(final Client client, final Runnable function) { | ||
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { | ||
function.run(); | ||
} | ||
} | ||
|
||
/** | ||
* Set the thread context to default, this is needed to allow actions on model system index | ||
* when security plugin is enabled | ||
* @param function supplier function that needs to be executed after thread context has been stashed, return object | ||
*/ | ||
public static <T> T run(final Client client, final Supplier<T> function) { | ||
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { | ||
return function.get(); | ||
} | ||
} | ||
} | ||
|
45 changes: 45 additions & 0 deletions
45
src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.securityanalytics.threatintel.common; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
|
||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.threadpool.ExecutorBuilder; | ||
import org.opensearch.threadpool.FixedExecutorBuilder; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
/** | ||
* Provide a list of static methods related with executors for threat intel | ||
*/ | ||
public class ThreatIntelExecutor { | ||
private static final String THREAD_POOL_NAME = "_plugin_securityanalytics_threatintel_datasource_update"; | ||
private final ThreadPool threadPool; | ||
|
||
public ThreatIntelExecutor(final ThreadPool threadPool) { | ||
this.threadPool = threadPool; | ||
} | ||
|
||
/** | ||
* We use fixed thread count of 1 for updating datasource as updating datasource is running background | ||
* once a day at most and no need to expedite the task. | ||
* | ||
* @param settings the settings | ||
* @return the executor builder | ||
*/ | ||
public static ExecutorBuilder executorBuilder(final Settings settings) { | ||
return new FixedExecutorBuilder(settings, THREAD_POOL_NAME, 1, 1000, THREAD_POOL_NAME, false); | ||
} | ||
|
||
/** | ||
* Return an executor service for datasource update task | ||
* | ||
* @return the executor service | ||
*/ | ||
public ExecutorService forDatasourceUpdate() { | ||
return threadPool.executor(THREAD_POOL_NAME); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
139 changes: 139 additions & 0 deletions
139
src/main/java/org/opensearch/securityanalytics/threatintel/dao/DatasourceDao.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,143 @@ | ||
package org.opensearch.securityanalytics.threatintel.dao; | ||
|
||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.get.GetRequest; | ||
import org.opensearch.action.get.GetResponse; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.xcontent.LoggingDeprecationHandler; | ||
import org.opensearch.common.xcontent.XContentFactory; | ||
import org.opensearch.common.xcontent.XContentHelper; | ||
|
||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.index.IndexNotFoundException; | ||
import org.opensearch.securityanalytics.model.DetectorTrigger; | ||
import org.opensearch.securityanalytics.threatintel.common.StashedThreadContext; | ||
import org.opensearch.securityanalytics.threatintel.common.ThreatIntelSettings; | ||
import org.opensearch.securityanalytics.threatintel.jobscheduler.DatasourceExtension; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.securityanalytics.threatintel.jobscheduler.Datasource; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.nio.charset.StandardCharsets; | ||
import java.time.Instant; | ||
import java.util.stream.Collectors; | ||
|
||
public class DatasourceDao { | ||
private static final Logger log = LogManager.getLogger(DetectorTrigger.class); | ||
|
||
private static final Integer MAX_SIZE = 1000; | ||
private final Client client; | ||
private final ClusterService clusterService; | ||
private final ClusterSettings clusterSettings; | ||
|
||
public DatasourceDao(final Client client, final ClusterService clusterService) { | ||
this.client = client; | ||
this.clusterService = clusterService; | ||
this.clusterSettings = clusterService.getClusterSettings(); | ||
} | ||
|
||
// /** | ||
// * Create datasource index | ||
// * | ||
// * @param stepListener setup listener | ||
// */ | ||
// public void createIndexIfNotExists(final StepListener<Void> stepListener) { | ||
// if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) { | ||
// stepListener.onResponse(null); | ||
// return; | ||
// } | ||
// final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping()) | ||
// .settings(DatasourceExtension.INDEX_SETTING); | ||
// | ||
// StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { | ||
// @Override | ||
// public void onResponse(final CreateIndexResponse createIndexResponse) { | ||
// stepListener.onResponse(null); | ||
// } | ||
// | ||
// @Override | ||
// public void onFailure(final Exception e) { | ||
// if (e instanceof ResourceAlreadyExistsException) { | ||
// log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME); | ||
// stepListener.onResponse(null); | ||
// return; | ||
// } | ||
// stepListener.onFailure(e); | ||
// } | ||
// })); | ||
// } | ||
|
||
private String getIndexMapping() { | ||
try { | ||
try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threatintel_datasource.json")) { | ||
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { | ||
return reader.lines().map(String::trim).collect(Collectors.joining()); | ||
} | ||
} | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} | ||
* @param name the name of a datasource | ||
* @return datasource | ||
* @throws IOException exception | ||
*/ | ||
public Datasource getDatasource(final String name) throws IOException { | ||
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); | ||
GetResponse response; | ||
try { | ||
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(ThreatIntelSettings.TIMEOUT))); | ||
if (response.isExists() == false) { | ||
log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME); | ||
return null; | ||
} | ||
} catch (IndexNotFoundException e) { | ||
log.error("Index[{}] is not found", DatasourceExtension.JOB_INDEX_NAME); | ||
return null; | ||
} | ||
|
||
XContentParser parser = XContentHelper.createParser( | ||
NamedXContentRegistry.EMPTY, | ||
LoggingDeprecationHandler.INSTANCE, | ||
response.getSourceAsBytesRef() | ||
); | ||
return Datasource.PARSER.parse(parser, null); | ||
} | ||
|
||
/** | ||
* Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} | ||
* @param datasource the datasource | ||
* @return index response | ||
*/ | ||
public IndexResponse updateDatasource(final Datasource datasource) { | ||
datasource.setLastUpdateTime(Instant.now()); | ||
return StashedThreadContext.run(client, () -> { | ||
try { | ||
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME) | ||
.setId(datasource.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute() | ||
.actionGet(clusterSettings.get(ThreatIntelSettings.TIMEOUT)); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
} | ||
|
||
} |
Oops, something went wrong.