diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java index 0bceb15..267e0f4 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/CommandManagerPlugin.java @@ -35,13 +35,18 @@ import java.util.List; import java.util.function.Supplier; - +/** + * The Command Manager plugin exposes an HTTP API with a single endpoint to + * receive raw commands from the Wazuh Server. These commands are processed, + * indexed and sent back to the Server for its delivery to, in most cases, the + * Agents. + */ public class CommandManagerPlugin extends Plugin implements ActionPlugin { public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_commandmanager"; - public static final String COMMAND_MANAGER_INDEX_NAME = "command-manager"; + public static final String COMMAND_MANAGER_INDEX_NAME = ".commands"; + public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands"; private CommandIndex commandIndex; - private ThreadPool threadPool; @Override public Collection createComponents( @@ -57,8 +62,7 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - this.commandIndex = new CommandIndex(client); - this.threadPool = threadPool; + this.commandIndex = new CommandIndex(client, clusterService, threadPool); return Collections.emptyList(); } @@ -71,6 +75,6 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return Collections.singletonList(new RestPostCommandAction(this.commandIndex, this.threadPool)); + return Collections.singletonList(new RestPostCommandAction(this.commandIndex)); } } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java index 483457f..92ba69f 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/index/CommandIndex.java @@ -9,22 +9,26 @@ import com.wazuh.commandmanager.CommandManagerPlugin; import com.wazuh.commandmanager.model.Command; +import com.wazuh.commandmanager.utils.IndexTemplateUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; public class CommandIndex implements IndexingOperationListener { @@ -32,80 +36,106 @@ public class CommandIndex implements IndexingOperationListener { private static final Logger logger = LogManager.getLogger(CommandIndex.class); private final Client client; + private final ClusterService clusterService; + private final ThreadPool threadPool; /** - * @param client + * Default constructor + * + * @param client OpenSearch client. + * @param clusterService OpenSearch cluster service. + * @param threadPool An OpenSearch ThreadPool. */ - public CommandIndex(Client client) { + public CommandIndex(Client client, ClusterService clusterService, ThreadPool threadPool) { this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; } /** - * @param command a Command class command - * @return Indexing operation RestStatus response - * @throws ExecutionException + * @param command: A Command model object + * @return A CompletableFuture with the RestStatus response from the operation */ - public RestStatus create(Command command) throws ExecutionException, InterruptedException { - CompletableFuture inProgressFuture = new CompletableFuture<>(); + public CompletableFuture asyncCreate(Command command) { + CompletableFuture future = new CompletableFuture<>(); + ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE); + + // Create index template if it does not exist. + if (!indexTemplateExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) { + putIndexTemplate(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME); + } else { + logger.info( + "Index template {} already exists. Skipping creation.", + CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME + ); + } + + logger.debug("Indexing command {}", command); try { - logger.info("Creating request for command: {}", command.getId()); IndexRequest request = new IndexRequest() .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) .source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .id(command.getId()) .create(true); - - client.index( - request, - new ActionListener<>() { - @Override - public void onResponse(IndexResponse indexResponse) { - inProgressFuture.complete(indexResponse); - } - - @Override - public void onFailure(Exception e) { - logger.info("Could not process command: {}", command.getId(), e); - inProgressFuture.completeExceptionally(e); + executor.submit( + () -> { + try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().stashContext()) { + RestStatus restStatus = client.index(request).actionGet().status(); + future.complete(restStatus); + } catch (Exception e) { + future.completeExceptionally(e); } } ); } catch (IOException e) { - logger.error("IOException occurred creating command details", e); + logger.error( + "Failed to index command with ID {}: {}", command.getId(), e); } - return inProgressFuture.get().status(); + return future; } /** - * - * @param command: A Command model object - * @param threadPool: An OpenSearch ThreadPool as passed to the createComponents() method - * @return A CompletableFuture with the RestStatus response from the operation + * @return */ + public boolean indexTemplateExists(String template_name) { + Map templates = this.clusterService + .state() + .metadata() + .templates(); + logger.debug("Existing index templates: {} ", templates); - public CompletableFuture asyncCreate(Command command, ThreadPool threadPool) { - CompletableFuture future = new CompletableFuture<>(); - ExecutorService executor = threadPool.executor(ThreadPool.Names.WRITE); + return templates.containsKey(template_name); + } + + /** + * Inserts an index template + * + * @param templateName : The name if the index template to load + */ + public void putIndexTemplate(String templateName) { + ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE); try { - IndexRequest request = new IndexRequest() - .index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME) - .source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .id(command.getId()) - .create(true); - executor.submit( - () -> { - try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) { - RestStatus restStatus = client.index(request).actionGet().status(); - future.complete(restStatus); - } catch (Exception e) { - future.completeExceptionally(e); - } + // @throws IOException + Map template = IndexTemplateUtils.fromFile(templateName + ".json"); + + PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest() + .mapping(IndexTemplateUtils.get(template, "mappings")) + .settings(IndexTemplateUtils.get(template, "settings")) + .name(templateName) + .patterns((List) template.get("index_patterns")); + + executor.submit(() -> { + AcknowledgedResponse acknowledgedResponse = this.client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet(); + if (acknowledgedResponse.isAcknowledged()) { + logger.info( + "Index template created successfully: {}", + templateName + ); } - ); - } catch (Exception e) { - logger.error(e); + }); + + } catch (IOException e) { + logger.error("Error reading index template from filesystem {}", templateName); } - return future; } - } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Action.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Action.java index 7e53aeb..97c1d86 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Action.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Action.java @@ -108,4 +108,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(VERSION, this.version); return builder.endObject(); } + + @Override + public String toString() { + return "Action{" + + "type='" + type + '\'' + + ", args=" + args + + ", version='" + version + '\'' + + '}'; + } } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java index f4ddcd9..be11d7d 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/model/Command.java @@ -7,6 +7,8 @@ */ package com.wazuh.commandmanager.model; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.UUIDs; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; @@ -19,7 +21,7 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; public class Command implements ToXContentObject { - + public static final String NAME = "command"; public static final String ORDER_ID = "order_id"; public static final String REQUEST_ID = "request_id"; public static final String SOURCE = "source"; @@ -85,10 +87,12 @@ public static Command parse(XContentParser parser) throws IOException { String user = null; Action action = null; - // @TODO check if this call is necessary as ensureExpectedToken is invoked previously - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + // skips JSON's root level "command" + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = parser.currentName(); + parser.nextToken(); switch (fieldName) { case SOURCE: @@ -132,6 +136,7 @@ public static Command parse(XContentParser parser) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.startObject(NAME); builder.field(SOURCE, this.source); builder.field(USER, this.user); builder.field(TARGET, this.target); @@ -141,6 +146,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(STATUS, this.status); builder.field(ORDER_ID, this.orderId); builder.field(REQUEST_ID, this.requestId); + builder.endObject(); return builder.endObject(); } diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/action/RestPostCommandAction.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/action/RestPostCommandAction.java index 218a305..d42a948 100644 --- a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/action/RestPostCommandAction.java +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/rest/action/RestPostCommandAction.java @@ -19,7 +19,6 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; -import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; @@ -39,17 +38,14 @@ public class RestPostCommandAction extends BaseRestHandler { public static final String POST_COMMAND_ACTION_REQUEST_DETAILS = "post_command_action_request_details"; private static final Logger logger = LogManager.getLogger(RestPostCommandAction.class); private final CommandIndex commandIndex; - private final ThreadPool threadPool; /** * Default constructor * * @param commandIndex persistence layer - * @param threadPool */ - public RestPostCommandAction(CommandIndex commandIndex, ThreadPool threadPool) { + public RestPostCommandAction(CommandIndex commandIndex) { this.commandIndex = commandIndex; - this.threadPool = threadPool; } @@ -60,21 +56,21 @@ public String getName() { @Override public List routes() { return Collections.singletonList( - new Route( - POST, - String.format( - Locale.ROOT, - "%s", - CommandManagerPlugin.COMMAND_MANAGER_BASE_URI + new Route( + POST, + String.format( + Locale.ROOT, + "%s", + CommandManagerPlugin.COMMAND_MANAGER_BASE_URI + ) ) - ) ); } @Override protected RestChannelConsumer prepareRequest( - final RestRequest restRequest, - final NodeClient client + final RestRequest restRequest, + final NodeClient client ) throws IOException { // Get request details XContentParser parser = restRequest.contentParser(); @@ -84,7 +80,7 @@ protected RestChannelConsumer prepareRequest( // Send response return channel -> { - commandIndex.asyncCreate(command, this.threadPool) + this.commandIndex.asyncCreate(command) .thenAccept(restStatus -> { try (XContentBuilder builder = channel.newBuilder()) { builder.startObject(); @@ -94,7 +90,7 @@ protected RestChannelConsumer prepareRequest( builder.endObject(); channel.sendResponse(new BytesRestResponse(restStatus, builder)); } catch (Exception e) { - logger.error("Error indexing command: ",e); + logger.error("Error indexing command: ", e); } }).exceptionally(e -> { channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); diff --git a/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java new file mode 100644 index 0000000..bed3434 --- /dev/null +++ b/plugins/command-manager/src/main/java/com/wazuh/commandmanager/utils/IndexTemplateUtils.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.wazuh.commandmanager.utils; + +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import reactor.util.annotation.NonNull; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +/** + * Util functions to parse and manage index templates files. + */ +public class IndexTemplateUtils { + + /** + * Default constructor + */ + public IndexTemplateUtils() { + } + + /** + * Read index template file from the resources folder and returns its JSON + * content as a map. + * + * @param filename name of the index template to read from the resources folder + * @return the JSON index template as a map + * @throws IOException file not found or could not be read + */ + + public static Map fromFile(@NonNull String filename) throws IOException { + InputStream is = IndexTemplateUtils.class.getClassLoader().getResourceAsStream(filename); + return IndexTemplateUtils.toMap(is); + } + + /** + * Convert from a JSON InputStream into a String, Object map. + *

+ * Used to convert the JSON index templates to the required format. + *

+ * + * @param is: the JSON formatted InputStream + * @return a map with the json string contents. + * @throws IOException thrown by {@link JsonXContent#createParser(NamedXContentRegistry, DeprecationHandler, InputStream)} + */ + public static Map toMap(InputStream is) throws IOException { + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + is); + parser.nextToken(); + return parser.map(); + } + + /** + * Cast map's element to a String, Object map. + *

+ * Used to retrieve the settings and mappings from the index templates, + * which are a JSON object themselves. + *

+ * + * @param map the index template as a map. + * @param key the element's key to retrieve and cast. + * @return a String, Object map + */ + public static Map get(Map map, String key) { + return (Map) map.get(key); + } + +} + diff --git a/plugins/command-manager/src/main/resources/index-template-commands.json b/plugins/command-manager/src/main/resources/index-template-commands.json new file mode 100644 index 0000000..5170315 --- /dev/null +++ b/plugins/command-manager/src/main/resources/index-template-commands.json @@ -0,0 +1,91 @@ +{ + "index_patterns": [ + ".commands*" + ], + "mappings": { + "date_detection": false, + "properties": { + "command": { + "properties": { + "action": { + "properties": { + "args": { + "ignore_above": 1024, + "type": "keyword" + }, + "type": { + "ignore_above": 1024, + "type": "keyword" + }, + "version": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "order_id": { + "ignore_above": 1024, + "type": "keyword" + }, + "request_id": { + "ignore_above": 1024, + "type": "keyword" + }, + "result": { + "properties": { + "code": { + "type": "short" + }, + "data": { + "ignore_above": 1024, + "type": "keyword" + }, + "message": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "source": { + "ignore_above": 1024, + "type": "keyword" + }, + "status": { + "ignore_above": 1024, + "type": "keyword" + }, + "target": { + "ignore_above": 1024, + "type": "keyword" + }, + "timeout": { + "type": "short" + }, + "type": { + "ignore_above": 1024, + "type": "keyword" + }, + "user": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + } + }, + "order": 1, + "settings": { + "index": { + "hidden": true, + "number_of_replicas": "0", + "number_of_shards": "1", + "query.default_field": [ + "command.source", + "command.target", + "command.status", + "command.type" + ], + "refresh_interval": "5s" + } + } +} \ No newline at end of file diff --git a/plugins/command-manager/src/yamlRestTest/resources/rest-api-spec/test/20_create.yml b/plugins/command-manager/src/yamlRestTest/resources/rest-api-spec/test/20_create.yml index 4a7489d..c667872 100644 --- a/plugins/command-manager/src/yamlRestTest/resources/rest-api-spec/test/20_create.yml +++ b/plugins/command-manager/src/yamlRestTest/resources/rest-api-spec/test/20_create.yml @@ -3,33 +3,34 @@ - do: _plugins._commandmanager: body: - source: "Users/Services" - user: "user13" - target: "WazuhServerCluster5" - type: "agent_group" - action: { - type: "Server cluster", - args: [ "/path/to/executable/arg8" ], - version: "v4" - } - timeout: 100 + command: + source: "Users/Services" + user: "user13" + target: "WazuhServerCluster5" + type: "agent_group" + action: { + type: "Server cluster", + args: [ "/path/to/executable/arg8" ], + version: "v4" + } + timeout: 100 - set: { _id: document_id } - - match: { _index: command-manager } + - match: { _index: .commands } - do: get: - index: command-manager + index: .commands id: $document_id - - match: { _source.source: "Users/Services" } - - match: { _source.user: "user13" } - - match: { _source.target: "WazuhServerCluster5" } - - match: { _source.type: "agent_group" } - - match: { _source.action: + - match: { _source.command.source: "Users/Services" } + - match: { _source.command.user: "user13" } + - match: { _source.command.target: "WazuhServerCluster5" } + - match: { _source.command.type: "agent_group" } + - match: { _source.command.action: { type: "Server cluster", args: [ "/path/to/executable/arg8" ], version: "v4" } } - - match: { _source.timeout: 100 } + - match: { _source.command.timeout: 100 }