Skip to content

Commit

Permalink
Extract TimingStats-related functionality into TimingStatsReporter (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Jun 25, 2019
1 parent c594a95 commit b15e40f
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ private static void addTimingStatsExceptBucketCountMapping(XContentBuilder build
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject()
.startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.startObject(TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class TimingStats implements ToXContentObject, Writeable {
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
new ParseField("exponential_average_bucket_processing_time_ms");

public static final ParseField TYPE = new ParseField("timing_stats");
Expand All @@ -49,7 +49,7 @@ public class TimingStats implements ToXContentObject, Writeable {
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
}

public static String documentId(String jobId) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
}
if (exponentialAvgBucketProcessingTimeMs != null) {
builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
}
builder.endObject();
return builder;
Expand Down Expand Up @@ -219,34 +219,4 @@ public int hashCode() {
public String toString() {
return Strings.toString(this);
}

/**
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
*/
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
|| differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
}

/**
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
* This can be interpreted as values { value1, value2 } differing significantly from each other.
* This method also returns:
* - {@code true} in case one value is {@code null} while the other is not.
* - {@code false} in case both values are {@code null}.
*/
static boolean differSignificantly(Double value1, Double value2) {
if (value1 != null && value2 != null) {
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
}
return (value1 != null) || (value2 != null);
}

/**
* Minimum ratio of values that is interpreted as values being similar.
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
*/
private static final double MIN_VALID_RATIO = 0.9;
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public final class ReservedFieldNames {
TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),

GetResult._ID,
GetResult._INDEX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
Expand Down Expand Up @@ -124,33 +123,6 @@ public void testDocumentId() {
assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
}

public void testTimingStatsDifferSignificantly() {
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
is(false));
assertThat(
TimingStats.differSignificantly(
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
is(true));
}

public void testValuesDifferSignificantly() {
assertThat(TimingStats.differSignificantly((Double) null, (Double) null), is(false));
assertThat(TimingStats.differSignificantly(1.0, null), is(true));
assertThat(TimingStats.differSignificantly(null, 1.0), is(true));
assertThat(TimingStats.differSignificantly(0.9, 1.0), is(false));
assertThat(TimingStats.differSignificantly(1.0, 0.9), is(false));
assertThat(TimingStats.differSignificantly(0.9, 1.000001), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.899999), is(true));
assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
}

/**
* Creates a matcher of {@link TimingStats}s that matches when an examined stats are equal
* to the specified <code>operand</code>, within a range of +/- <code>error</code>.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;

import java.util.Objects;

/**
* {@link TimingStatsReporter} class handles the logic of persisting {@link TimingStats} if they changed significantly since the last time
* they were persisted.
*
* This class is not thread-safe.
*/
public class TimingStatsReporter {

/** Persisted timing stats. May be stale. */
private TimingStats persistedTimingStats;
/** Current timing stats. */
private TimingStats currentTimingStats;
/** Object used to persist current timing stats. */
private JobResultsPersister.Builder bulkResultsPersister;

public TimingStatsReporter(TimingStats timingStats, JobResultsPersister.Builder jobResultsPersister) {
Objects.requireNonNull(timingStats);
this.persistedTimingStats = new TimingStats(timingStats);
this.currentTimingStats = new TimingStats(timingStats);
this.bulkResultsPersister = Objects.requireNonNull(jobResultsPersister);
}

public TimingStats getCurrentTimingStats() {
return new TimingStats(currentTimingStats);
}

public void reportBucketProcessingTime(long bucketProcessingTimeMs) {
currentTimingStats.updateStats(bucketProcessingTimeMs);
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush();
}
}

public void flush() {
persistedTimingStats = new TimingStats(currentTimingStats);
bulkResultsPersister.persistTimingStats(persistedTimingStats);
}

/**
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
*/
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
return differSignificantly(stats1.getMinBucketProcessingTimeMs(), stats2.getMinBucketProcessingTimeMs())
|| differSignificantly(stats1.getMaxBucketProcessingTimeMs(), stats2.getMaxBucketProcessingTimeMs())
|| differSignificantly(stats1.getAvgBucketProcessingTimeMs(), stats2.getAvgBucketProcessingTimeMs())
|| differSignificantly(stats1.getExponentialAvgBucketProcessingTimeMs(), stats2.getExponentialAvgBucketProcessingTimeMs());
}

/**
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
* This can be interpreted as values { value1, value2 } differing significantly from each other.
* This method also returns:
* - {@code true} in case one value is {@code null} while the other is not.
* - {@code false} in case both values are {@code null}.
*/
static boolean differSignificantly(Double value1, Double value2) {
if (value1 != null && value2 != null) {
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
}
return (value1 != null) || (value2 != null);
}

/**
* Minimum ratio of values that is interpreted as values being similar.
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
*/
private static final double MIN_VALID_RATIO = 0.9;
}
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,13 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
jobId,
renormalizer,
jobResultsPersister,
process,
autodetectParams.modelSizeStats(),
autodetectParams.timingStats());
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autodetectExecutorService.submit(() -> processor.process(process));
autodetectExecutorService.submit(processor::process);
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
Expand Down
Loading

0 comments on commit b15e40f

Please sign in to comment.