Skip to content

Commit

Permalink
[ML] Add _cat/ml/data_frame/analytics API (#52260)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Feb 13, 2020
1 parent 1aeda20 commit 30d8fa4
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@
import org.elasticsearch.xpack.ml.rest.calendar.RestPostCalendarEventAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestPutCalendarJobAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatDatafeedsAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatJobsAction;
import org.elasticsearch.xpack.ml.rest.cat.RestCatTrainedModelsAction;
Expand Down Expand Up @@ -770,7 +771,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
// CAT Handlers
new RestCatJobsAction(),
new RestCatTrainedModelsAction(),
new RestCatDatafeedsAction()
new RestCatDatafeedsAction(),
new RestCatDataFrameAnalyticsAction()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.cat;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.rest.action.cat.RestTable;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response.Stats;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;

import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.rest.RestRequest.Method.GET;

public class RestCatDataFrameAnalyticsAction extends AbstractCatAction {

@Override
public List<Route> routes() {
return unmodifiableList(asList(
new Route(GET, "_cat/ml/data_frame/analytics/{" + DataFrameAnalyticsConfig.ID.getPreferredName() + "}"),
new Route(GET, "_cat/ml/data_frame/analytics")));
}

@Override
public String getName() {
return "cat_ml_get_data_frame_analytics_action";
}

@Override
protected RestChannelConsumer doCatRequest(RestRequest restRequest, NodeClient client) {
String dataFrameAnalyticsId = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
if (Strings.isNullOrEmpty(dataFrameAnalyticsId)) {
dataFrameAnalyticsId = MetaData.ALL;
}

GetDataFrameAnalyticsAction.Request getRequest = new GetDataFrameAnalyticsAction.Request(dataFrameAnalyticsId);
getRequest.setAllowNoResources(
restRequest.paramAsBoolean(
GetDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), getRequest.isAllowNoResources()));

GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(dataFrameAnalyticsId);
getStatsRequest.setAllowNoMatch(true);

return channel -> client.execute(GetDataFrameAnalyticsAction.INSTANCE, getRequest, new RestActionListener<>(channel) {
@Override
public void processResponse(GetDataFrameAnalyticsAction.Response getResponse) {
client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(GetDataFrameAnalyticsStatsAction.Response getStatsResponse) throws Exception {
return RestTable.buildResponse(buildTable(getResponse, getStatsResponse), channel);
}
});
}
});
}

@Override
protected void documentation(StringBuilder sb) {
sb.append("/_cat/ml/data_frame/analytics\n");
sb.append("/_cat/ml/data_frame/analytics/{").append(DataFrameAnalyticsConfig.ID.getPreferredName()).append("}\n");
}

@Override
protected Table getTableWithHeader(RestRequest unused) {
return getTableWithHeader();
}

private static Table getTableWithHeader() {
return new Table()
.startHeaders()
// DFA config info
.addCell("id", TableColumnAttributeBuilder.builder("the id").build())
.addCell("type",
TableColumnAttributeBuilder.builder("analysis type")
.setAliases("t")
.build())
.addCell("create_time",
TableColumnAttributeBuilder.builder("job creation time")
.setAliases("ct", "createTime")
.build())
.addCell("version",
TableColumnAttributeBuilder.builder("the version of Elasticsearch when the analytics was created", false)
.setAliases("v")
.build())
.addCell("source_index",
TableColumnAttributeBuilder.builder("source index", false)
.setAliases("si", "sourceIndex")
.build())
.addCell("dest_index",
TableColumnAttributeBuilder.builder("destination index", false)
.setAliases("di", "destIndex")
.build())
.addCell("description",
TableColumnAttributeBuilder.builder("description", false)
.setAliases("d")
.build())
.addCell("model_memory_limit",
TableColumnAttributeBuilder.builder("model memory limit", false)
.setAliases("mml", "modelMemoryLimit")
.build())
// DFA stats info
.addCell("state",
TableColumnAttributeBuilder.builder("job state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build())
.addCell("failure_reason",
TableColumnAttributeBuilder.builder("failure reason", false)
.setAliases("fr", "failureReason")
.build())
.addCell("progress",
TableColumnAttributeBuilder.builder("progress", false)
.setAliases("p")
.build())
.addCell("assignment_explanation",
TableColumnAttributeBuilder.builder("why the job is or is not assigned to a node", false)
.setAliases("ae", "assignmentExplanation")
.build())
// Node info
.addCell("node.id",
TableColumnAttributeBuilder.builder("id of the assigned node", false)
.setAliases("ni", "nodeId")
.build())
.addCell("node.name",
TableColumnAttributeBuilder.builder("name of the assigned node", false)
.setAliases("nn", "nodeName")
.build())
.addCell("node.ephemeral_id",
TableColumnAttributeBuilder.builder("ephemeral id of the assigned node", false)
.setAliases("ne", "nodeEphemeralId")
.build())
.addCell("node.address",
TableColumnAttributeBuilder.builder("network address of the assigned node", false)
.setAliases("na", "nodeAddress")
.build())
.endHeaders();
}

private static Table buildTable(GetDataFrameAnalyticsAction.Response getResponse,
GetDataFrameAnalyticsStatsAction.Response getStatsResponse) {
Map<String, Stats> statsById = getStatsResponse.getResponse().results().stream().collect(toMap(Stats::getId, Function.identity()));
Table table = getTableWithHeader();
for (DataFrameAnalyticsConfig config : getResponse.getResources().results()) {
Stats stats = statsById.get(config.getId());
DiscoveryNode node = stats == null ? null : stats.getNode();
table
.startRow()
.addCell(config.getId())
.addCell(config.getAnalysis().getWriteableName())
.addCell(config.getCreateTime())
.addCell(config.getVersion())
.addCell(String.join(",", config.getSource().getIndex()))
.addCell(config.getDest().getIndex())
.addCell(config.getDescription())
.addCell(config.getModelMemoryLimit())
.addCell(stats == null ? null : stats.getState())
.addCell(stats == null ? null : stats.getFailureReason())
.addCell(stats == null ? null : progressToString(stats.getProgress()))
.addCell(stats == null ? null : stats.getAssignmentExplanation())
.addCell(node == null ? null : node.getId())
.addCell(node == null ? null : node.getName())
.addCell(node == null ? null : node.getEphemeralId())
.addCell(node == null ? null : node.getAddress().toString())
.endRow();
}
return table;
}

private static String progressToString(List<PhaseProgress> phases) {
return phases.stream().map(p -> p.getPhase() + ":" + p.getProgressPercent()).collect(joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ protected Table getTableWithHeader(RestRequest request) {
table.startHeaders();

// Datafeed Info
table.addCell("id", TableColumnAttributeBuilder.builder().setDescription("the datafeed_id").build());
table.addCell("state", TableColumnAttributeBuilder.builder()
.setDescription("the datafeed state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build());
table.addCell("id", TableColumnAttributeBuilder.builder("the datafeed_id").build());
table.addCell("state",
TableColumnAttributeBuilder.builder("the datafeed state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build());
table.addCell("assignment_explanation",
TableColumnAttributeBuilder.builder("why the datafeed is or is not assigned to a node", false)
.setAliases("ae")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,15 @@ protected Table getTableWithHeader(RestRequest request) {
table.startHeaders();

// Job Info
table.addCell("id", TableColumnAttributeBuilder.builder().setDescription("the job_id").build());
table.addCell("state", TableColumnAttributeBuilder.builder()
.setDescription("the job state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build());
table.addCell("id", TableColumnAttributeBuilder.builder("the job_id").build());
table.addCell("state",
TableColumnAttributeBuilder.builder("the job state")
.setAliases("s")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build());
table.addCell("opened_time",
TableColumnAttributeBuilder.builder()
.setDescription("the amount of time the job has been opened")
TableColumnAttributeBuilder.builder("the amount of time the job has been opened", false)
.setAliases("ot")
.setDisplayByDefault(false)
.build());
table.addCell("assignment_explanation",
TableColumnAttributeBuilder.builder("why the job is or is not assigned to a node", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,15 @@ protected Table getTableWithHeader(RestRequest request) {
table.startHeaders();

// Trained Model Info
table.addCell("id", TableColumnAttributeBuilder.builder().setDescription("the trained model id").build());
table.addCell("id", TableColumnAttributeBuilder.builder("the trained model id").build());
table.addCell("created_by", TableColumnAttributeBuilder.builder("who created the model", false)
.setAliases("c", "createdBy")
.setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
.build());
table.addCell("heap_size", TableColumnAttributeBuilder.builder()
.setDescription("the estimated heap size to keep the model in memory")
table.addCell("heap_size", TableColumnAttributeBuilder.builder("the estimated heap size to keep the model in memory")
.setAliases("hs","modelHeapSize")
.build());
table.addCell("operations", TableColumnAttributeBuilder.builder()
.setDescription("the estimated number of operations to use the model")
table.addCell("operations", TableColumnAttributeBuilder.builder("the estimated number of operations to use the model")
.setAliases("o", "modelOperations")
.build());
table.addCell("license", TableColumnAttributeBuilder.builder("The license level of the model", false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{
"cat.ml_data_frame_analytics":{
"documentation":{
"url":"http://www.elastic.co/guide/en/elasticsearch/reference/current/get-dfanalytics-stats.html"
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_cat/ml/data_frame/analytics",
"methods":[
"GET"
]
},
{
"path":"/_cat/ml/data_frame/analytics/{id}",
"methods":[
"GET"
],
"parts":{
"id":{
"type":"string",
"description":"The ID of the data frame analytics to fetch"
}
}
}
]
},
"params":{
"allow_no_match":{
"type":"boolean",
"required":false,
"description":"Whether to ignore if a wildcard expression matches no configs. (This includes `_all` string or when no configs have been specified)"
},
"bytes":{
"type":"enum",
"description":"The unit in which to display byte values",
"options":[
"b",
"k",
"kb",
"m",
"mb",
"g",
"gb",
"t",
"tb",
"p",
"pb"
]
},
"format":{
"type":"string",
"description":"a short version of the Accept header, e.g. json, yaml"
},
"h":{
"type":"list",
"description":"Comma-separated list of column names to display"
},
"help":{
"type":"boolean",
"description":"Return help information",
"default":false
},
"s":{
"type":"list",
"description":"Comma-separated list of column names or column aliases to sort by"
},
"time":{
"type":"enum",
"description":"The unit in which to display time values",
"options":[
"d (Days)",
"h (Hours)",
"m (Minutes)",
"s (Seconds)",
"ms (Milliseconds)",
"micros (Microseconds)",
"nanos (Nanoseconds)"
]
},
"v":{
"type":"boolean",
"description":"Verbose mode. Display column headers",
"default":false
}
}
}
}
Loading

0 comments on commit 30d8fa4

Please sign in to comment.