Skip to content

Commit

Permalink
[ML] Return statistics about forecasts as part of the jobsstats and u…
Browse files Browse the repository at this point in the history
…sage API (#31647)

This change adds stats about forecasts, to the jobstats api as well as xpack/_usage. The following 
information is collected:

_xpack/ml/anomaly_detectors/{jobid|_all}/_stats:

 -  total number of forecasts
 -  memory statistics (mean/min/max)
 -  runtime statistics
 -  record statistics
 -  counts by status

_xpack/usage

 -  collected by job status as well as overall (_all):
     -  total number of forecasts
     -  number of jobs that have at least 1 forecast
     -  memory, runtime, record statistics
     -  counts by status

Fixes #31395
  • Loading branch information
Hendrik Muhs committed Jul 4, 2018
1 parent 1ff7066 commit 85177b6
Show file tree
Hide file tree
Showing 18 changed files with 1,071 additions and 160 deletions.
31 changes: 31 additions & 0 deletions x-pack/docs/en/rest-api/ml/jobcounts.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ progress of a job.
(object) An object that provides information about the size and contents of the model.
See <<ml-modelsizestats,model size stats objects>>

`forecasts_stats`::
(object) An object that provides statistical information about forecasts
of this job. See <<ml-forecastsstats, forecasts stats objects>>

`node`::
(object) For open jobs only, contains information about the node where the
job runs. See <<ml-stats-node,node object>>.
Expand Down Expand Up @@ -177,6 +181,33 @@ NOTE: The `over` field values are counted separately for each detector and parti
`timestamp`::
(date) The timestamp of the `model_size_stats` according to the timestamp of the data.

[float]
[[ml-forecastsstats]]
==== Forecasts Stats Objects

The `forecasts_stats` object shows statistics about forecasts. It has the following properties:

`total`::
(long) The number of forecasts currently available for this model.

`forecasted_jobs`::
(long) The number of jobs that have at least one forecast.

`memory_bytes`::
(object) Statistics about the memory usage: minimum, maximum, average and total.

`records`::
(object) Statistics about the number of forecast records: minimum, maximum, average and total.

`processing_time_ms`::
(object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total.

`status`::
(object) Counts per forecast status, for example: {"finished" : 2}.

NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
these fields are ommitted.

[float]
[[ml-stats-node]]
==== Node Objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
public static final String DATAFEEDS_FIELD = "datafeeds";
public static final String COUNT = "count";
public static final String DETECTORS = "detectors";
public static final String FORECASTS = "forecasts";
public static final String MODEL_SIZE = "model_size";

private final Map<String, Object> jobsUsage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
Expand All @@ -46,6 +47,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo

private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String FORECASTS_STATS = "forecasts_stats";
private static final String STATE = "state";
private static final String NODE = "node";

Expand Down Expand Up @@ -159,18 +161,22 @@ public static class JobStats implements ToXContentObject, Writeable {
@Nullable
private ModelSizeStats modelSizeStats;
@Nullable
private ForecastStats forecastStats;
@Nullable
private TimeValue openTime;
private JobState state;
@Nullable
private DiscoveryNode node;
@Nullable
private String assignmentExplanation;

public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.forecastStats = forecastStats;
this.state = Objects.requireNonNull(state);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
Expand All @@ -185,6 +191,9 @@ public JobStats(StreamInput in) throws IOException {
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
openTime = in.readOptionalTimeValue();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
forecastStats = in.readOptionalWriteable(ForecastStats::new);
}
}

public String getJobId() {
Expand All @@ -198,6 +207,10 @@ public DataCounts getDataCounts() {
public ModelSizeStats getModelSizeStats() {
return modelSizeStats;
}

public ForecastStats getForecastStats() {
return forecastStats;
}

public JobState getState() {
return state;
Expand Down Expand Up @@ -231,6 +244,10 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
if (forecastStats != null) {
builder.field(FORECASTS_STATS, forecastStats);
}

builder.field(STATE, state.toString());
if (node != null) {
builder.startObject(NODE);
Expand Down Expand Up @@ -264,11 +281,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
out.writeOptionalTimeValue(openTime);
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeOptionalWriteable(forecastStats);
}
}

@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime);
return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime);
}

@Override
Expand All @@ -283,6 +303,7 @@ public boolean equals(Object obj) {
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.forecastStats, other.forecastStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ml.stats;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* An accumulator for simple counts where statistical measures
* are not of interest.
*/
public class CountAccumulator implements Writeable {

private Map<String, Long> counts;

public CountAccumulator() {
this.counts = new HashMap<String, Long>();
}

private CountAccumulator(Map<String, Long> counts) {
this.counts = counts;
}

public CountAccumulator(StreamInput in) throws IOException {
this.counts = in.readMap(StreamInput::readString, StreamInput::readLong);
}

public void merge(CountAccumulator other) {
counts = Stream.of(counts, other.counts).flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (x, y) -> x + y));
}

public void add(String key, Long count) {
counts.put(key, counts.getOrDefault(key, 0L) + count);
}

public Map<String, Long> asMap() {
return counts;
}

public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) {
return new CountAccumulator(termsAggregation.getBuckets().stream()
.collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount())));
}

public void writeTo(StreamOutput out) throws IOException {
out.writeMap(counts, StreamOutput::writeString, StreamOutput::writeLong);
}

@Override
public int hashCode() {
return Objects.hash(counts);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

CountAccumulator other = (CountAccumulator) obj;
return Objects.equals(counts, other.counts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ml.stats;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* A class to hold statistics about forecasts.
*/
public class ForecastStats implements ToXContentObject, Writeable {

public static class Fields {
public static final String TOTAL = "total";
public static final String FORECASTED_JOBS = "forecasted_jobs";
public static final String MEMORY = "memory_bytes";
public static final String RUNTIME = "processing_time_ms";
public static final String RECORDS = "records";
public static final String STATUSES = "status";
}

private long total;
private long forecastedJobs;
private StatsAccumulator memoryStats;
private StatsAccumulator recordStats;
private StatsAccumulator runtimeStats;
private CountAccumulator statusCounts;

public ForecastStats() {
this.total = 0;
this.forecastedJobs = 0;
this.memoryStats = new StatsAccumulator();
this.recordStats = new StatsAccumulator();
this.runtimeStats = new StatsAccumulator();
this.statusCounts = new CountAccumulator();
}

/*
* Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it.
*/
public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats,
CountAccumulator statusCounts) {
this.total = total;
this.forecastedJobs = total > 0 ? 1 : 0;
this.memoryStats = Objects.requireNonNull(memoryStats);
this.recordStats = Objects.requireNonNull(recordStats);
this.runtimeStats = Objects.requireNonNull(runtimeStats);
this.statusCounts = Objects.requireNonNull(statusCounts);
}

public ForecastStats(StreamInput in) throws IOException {
this.total = in.readLong();
this.forecastedJobs = in.readLong();
this.memoryStats = new StatsAccumulator(in);
this.recordStats = new StatsAccumulator(in);
this.runtimeStats = new StatsAccumulator(in);
this.statusCounts = new CountAccumulator(in);
}

public ForecastStats merge(ForecastStats other) {
if (other == null) {
return this;
}
total += other.total;
forecastedJobs += other.forecastedJobs;
memoryStats.merge(other.memoryStats);
recordStats.merge(other.recordStats);
runtimeStats.merge(other.runtimeStats);
statusCounts.merge(other.statusCounts);

return this;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
doXContentBody(builder, params);
return builder.endObject();
}

public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.TOTAL, total);
builder.field(Fields.FORECASTED_JOBS, forecastedJobs);

if (total > 0) {
builder.field(Fields.MEMORY, memoryStats.asMap());
builder.field(Fields.RECORDS, recordStats.asMap());
builder.field(Fields.RUNTIME, runtimeStats.asMap());
builder.field(Fields.STATUSES, statusCounts.asMap());
}

return builder;
}

public Map<String, Object> asMap() {
Map<String, Object> map = new HashMap<>();
map.put(Fields.TOTAL, total);
map.put(Fields.FORECASTED_JOBS, forecastedJobs);

if (total > 0) {
map.put(Fields.MEMORY, memoryStats.asMap());
map.put(Fields.RECORDS, recordStats.asMap());
map.put(Fields.RUNTIME, runtimeStats.asMap());
map.put(Fields.STATUSES, statusCounts.asMap());
}

return map;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(total);
out.writeLong(forecastedJobs);
memoryStats.writeTo(out);
recordStats.writeTo(out);
runtimeStats.writeTo(out);
statusCounts.writeTo(out);
}

@Override
public int hashCode() {
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

ForecastStats other = (ForecastStats) obj;
return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs)
&& Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats)
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts);
}
}
Loading

0 comments on commit 85177b6

Please sign in to comment.